You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/10/13 17:16:06 UTC
[geode] 01/02: GEODE-3829: add tests for clients with register
interest
This is an automated email from the ASF dual-hosted git repository.
klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 9c050addb8059d2b0d5b4de4643579fa2ebb4d06
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Thu Oct 12 14:42:01 2017 -0700
GEODE-3829: add tests for clients with register interest
* ClientWithInterestFailoverTest -- verifies that registered
interest carries over when client fails over to other server
* RegisterInterestServerMetaDataTest -- verifies that server
metadata is correct for clients that register interest
---
.../internal/admin/remote/RemoteBridgeServer.java | 6 +
.../geode/internal/cache/AbstractCacheServer.java | 2 +-
.../geode/internal/cache/CacheServerImpl.java | 20 ++
.../geode/internal/cache/InternalCacheServer.java | 23 ++
.../internal/cache/tier/sockets/AcceptorImpl.java | 3 +-
.../cache/tier/sockets/CacheClientProxy.java | 2 +-
.../cache/tier/sockets/InternalAcceptor.java | 20 ++
.../cache/xmlcache/CacheServerCreation.java | 5 +
.../sockets/ClientWithInterestFailoverTest.java | 280 +++++++++++++++++++++
.../RegisterInterestServerMetaDataTest.java | 272 ++++++++++++++++++++
10 files changed, 630 insertions(+), 3 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/admin/remote/RemoteBridgeServer.java b/geode-core/src/main/java/org/apache/geode/internal/admin/remote/RemoteBridgeServer.java
index 832feea..dfae1fe 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/admin/remote/RemoteBridgeServer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/admin/remote/RemoteBridgeServer.java
@@ -36,6 +36,7 @@ import org.apache.geode.internal.Version;
import org.apache.geode.internal.admin.AdminBridgeServer;
import org.apache.geode.internal.cache.AbstractCacheServer;
import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.sockets.InternalAcceptor;
import org.apache.geode.internal.i18n.LocalizedStrings;
/**
@@ -224,6 +225,11 @@ public class RemoteBridgeServer extends AbstractCacheServer
}
}
+ @Override
+ public InternalAcceptor getAcceptor() {
+ throw new UnsupportedOperationException("not implemented on " + getClass().getSimpleName());
+ }
+
private static class RemoteLoadProbe extends ServerLoadProbeAdapter {
/** The description of this callback */
private final String desc;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractCacheServer.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractCacheServer.java
index 0a6c24b..93e1194 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractCacheServer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractCacheServer.java
@@ -38,7 +38,7 @@ import org.apache.geode.management.membership.ClientMembershipListener;
*
* @since GemFire 5.7
*/
-public abstract class AbstractCacheServer implements CacheServer {
+public abstract class AbstractCacheServer implements InternalCacheServer {
public static final String TEST_OVERRIDE_DEFAULT_PORT_PROPERTY =
DistributionConfig.GEMFIRE_PREFIX + "test.CacheServer.OVERRIDE_DEFAULT_PORT";
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
index bcd8b32..0cf7081 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
@@ -151,6 +151,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
// //////////////////// Instance Methods ///////////////////
+ @Override
public CancelCriterion getCancelCriterion() {
return cache.getCancelCriterion();
}
@@ -272,6 +273,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
}
+ @Override
public ClientSubscriptionConfig getClientSubscriptionConfig() {
return this.clientSubscriptionConfig;
}
@@ -422,10 +424,12 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
}
}
+ @Override
public boolean isRunning() {
return this.acceptor != null && this.acceptor.isRunning();
}
+ @Override
public synchronized void stop() {
if (!isRunning()) {
return;
@@ -525,24 +529,29 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
*
* @return the internal acceptor
*/
+ @Override
public AcceptorImpl getAcceptor() {
return this.acceptor;
}
// DistributionAdvisee methods
+ @Override
public DM getDistributionManager() {
return getSystem().getDistributionManager();
}
+ @Override
public ClientSession getClientSession(String durableClientId) {
return getCacheClientNotifier().getClientProxy(durableClientId);
}
+ @Override
public ClientSession getClientSession(DistributedMember member) {
return getCacheClientNotifier().getClientProxy(ClientProxyMembershipID.getClientId(member));
}
+ @Override
public Set getAllClientSessions() {
return new HashSet(getCacheClientNotifier().getClientProxies());
}
@@ -657,6 +666,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
* Marker class name to identify the lock more easily in thread dumps private static class
* ClientMessagesRegionLock extends Object { }
*/
+ @Override
public DistributionAdvisor getDistributionAdvisor() {
return this.advisor;
}
@@ -668,10 +678,12 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
return this.advisor;
}
+ @Override
public Profile getProfile() {
return getDistributionAdvisor().createProfile();
}
+ @Override
public DistributionAdvisee getParentAdvisee() {
return null;
}
@@ -681,14 +693,17 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
*
* @return the underlying{@code InternalDistributedSystem}
*/
+ @Override
public InternalDistributedSystem getSystem() {
return (InternalDistributedSystem) this.cache.getDistributedSystem();
}
+ @Override
public String getName() {
return "CacheServer";
}
+ @Override
public String getFullPath() {
return getName();
}
@@ -721,6 +736,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
return groupList.toArray(groups);
}
+ @Override
public /* synchronized causes deadlock */ void fillInProfile(Profile profile) {
assert profile instanceof CacheServerProfile;
CacheServerProfile bp = (CacheServerProfile) profile;
@@ -734,6 +750,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
bp.finishInit();
}
+ @Override
public int getSerialNumber() {
return this.serialNumber;
}
@@ -751,6 +768,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
* @throws IllegalStateException if the BridgeServer has not been started
* @since GemFire 5.8Beta
*/
+ @Override
public void registerInterestRegistrationListener(InterestRegistrationListener listener) {
if (!this.isRunning()) {
throw new IllegalStateException(
@@ -767,6 +785,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
*
* @since GemFire 5.8Beta
*/
+ @Override
public void unregisterInterestRegistrationListener(InterestRegistrationListener listener) {
getCacheClientNotifier().unregisterInterestRegistrationListener(listener);
}
@@ -778,6 +797,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
*
* @since GemFire 5.8Beta
*/
+ @Override
public Set getInterestRegistrationListeners() {
return getCacheClientNotifier().getInterestRegistrationListeners();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCacheServer.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCacheServer.java
new file mode 100644
index 0000000..e4687c9
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCacheServer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.cache.tier.sockets.InternalAcceptor;
+
+public interface InternalCacheServer extends CacheServer {
+
+ InternalAcceptor getAcceptor();
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 704369e..1b5dfa9 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -99,7 +99,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* @since GemFire 2.0.2
*/
@SuppressWarnings("deprecation")
-public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
+public class AcceptorImpl extends Acceptor implements InternalAcceptor, Runnable, CommBufferPool {
private static final Logger logger = LogService.getLogger();
private static final boolean isJRockit = System.getProperty("java.vm.name").contains("JRockit");
@@ -1736,6 +1736,7 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
*
* @return the instance that provides client notification
*/
+ @Override
public CacheClientNotifier getCacheClientNotifier() {
return this.clientNotifier;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 34f232d..a9c1064 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -2994,7 +2994,7 @@ public class CacheClientProxy implements ClientSession {
*
* @since GemFire 6.1
*/
- public Map getRegionsWithEmptyDataPolicy() {
+ public Map<String, Integer> getRegionsWithEmptyDataPolicy() {
return regionsWithEmptyDataPolicy;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/InternalAcceptor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/InternalAcceptor.java
new file mode 100644
index 0000000..8b3860f
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/InternalAcceptor.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+public interface InternalAcceptor {
+
+ CacheClientNotifier getCacheClientNotifier();
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java
index 30d6849..0a9c4af 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java
@@ -21,6 +21,7 @@ import org.apache.geode.cache.server.ClientSubscriptionConfig;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.cache.AbstractCacheServer;
import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.sockets.InternalAcceptor;
import org.apache.geode.internal.i18n.LocalizedStrings;
import java.io.IOException;
@@ -241,4 +242,8 @@ public class CacheServerCreation extends AbstractCacheServer {
throw new UnsupportedOperationException("Shouldn't be invoked");
}
+ @Override
+ public InternalAcceptor getAcceptor() {
+ throw new UnsupportedOperationException("Shouldn't be invoked");
+ }
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientWithInterestFailoverTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientWithInterestFailoverTest.java
new file mode 100644
index 0000000..a655ad5
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientWithInterestFailoverTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.internal.AvailablePort.SOCKET;
+import static org.apache.geode.internal.AvailablePort.getRandomAvailablePort;
+import static org.apache.geode.test.dunit.Disconnect.disconnectAllFromDS;
+import static org.apache.geode.test.dunit.Host.getHost;
+import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
+import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.internal.InternalClientCache;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalCacheServer;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+@SuppressWarnings("serial")
+public class ClientWithInterestFailoverTest implements Serializable {
+
+ private static final String PROXY_REGION_NAME = "PROXY_REGION_NAME";
+ private static final String CACHING_PROXY_REGION_NAME = "CACHING_PROXY_REGION_NAME";
+ private static final String REGEX = ".*";
+
+ private static InternalCache cache;
+ private static InternalClientCache clientCache;
+
+ private VM client;
+ private VM server;
+ private VM server2;
+ private int serverPort1;
+ private int serverPort2;
+ private int primaryServerPort;
+
+ @ClassRule
+ public static DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+ @Before
+ public void setUp() throws Exception {
+ client = getHost(0).getVM(0);
+
+ server = getHost(0).getVM(1);
+ server2 = getHost(0).getVM(2);
+
+ primaryServerPort = givenTwoCacheServers();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ disconnectAllFromDS();
+
+ cache = null;
+ invokeInEveryVM(() -> cache = null);
+
+ clientCache = null;
+ invokeInEveryVM(() -> clientCache = null);
+ }
+
+ @Test
+ public void clientWithSingleKeyInterestFailsOver() throws Exception {
+ client.invoke(() -> registerKey(PROXY_REGION_NAME, 1));
+ client.invoke(() -> registerKey(CACHING_PROXY_REGION_NAME, 1));
+
+ performFailoverTesting();
+ }
+
+ @Test
+ public void clientWithKeyListInterestFailsOver() throws Exception {
+ client.invoke(() -> registerKeys(PROXY_REGION_NAME, 1, 2));
+ client.invoke(() -> registerKeys(CACHING_PROXY_REGION_NAME, 1, 2));
+
+ performFailoverTesting();
+ }
+
+ @Test
+ public void clientWithRegexInterestFailsOver() throws Exception {
+ client.invoke(() -> registerRegex(PROXY_REGION_NAME));
+ client.invoke(() -> registerRegex(CACHING_PROXY_REGION_NAME));
+
+ performFailoverTesting();
+ }
+
+ private void performFailoverTesting() {
+ // arrange
+ VM primaryServerVM = getPrimaryServerVM();
+ VM secondaryServerVM = getSecondaryServerVM();
+ primaryServerVM.invoke(() -> awaitServerMetaDataToContainClient());
+ primaryServerVM.invoke(() -> validateServerMetaDataKnowsThatClientRegisteredInterest());
+ primaryServerVM.invoke(() -> validateServerMetaDataKnowsWhichClientRegionIsEmpty());
+
+ // act
+ primaryServerVM.invoke(() -> stopCacheServer());
+
+ // assert
+ secondaryServerVM.invoke(() -> awaitServerMetaDataToContainClient());
+ secondaryServerVM.invoke(() -> validateServerMetaDataKnowsThatClientRegisteredInterest());
+ secondaryServerVM.invoke(() -> validateServerMetaDataKnowsWhichClientRegionIsEmpty());
+ }
+
+ private int createServerCache() throws IOException {
+ cache = (InternalCache) new CacheFactory().create();
+
+ RegionFactory<Integer, Object> regionFactory = cache.createRegionFactory();
+ regionFactory.setDataPolicy(DataPolicy.REPLICATE);
+ regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+
+ regionFactory.<Integer, Object>create(PROXY_REGION_NAME);
+ regionFactory.<Integer, Object>create(CACHING_PROXY_REGION_NAME);
+
+ CacheServer server = cache.addCacheServer();
+ server.setPort(getRandomAvailablePort(SOCKET));
+ server.start();
+ return server.getPort();
+ }
+
+ /**
+ * Create client cache and return the client connection pool's primary server port
+ */
+ private int createClientCacheWithTwoRegions(final String host1, final int port1,
+ final String host2, final int port2) {
+ clientCache = (InternalClientCache) new ClientCacheFactory().create();
+ assertThat(clientCache.isClient()).isTrue();
+
+ PoolFactory poolFactory = createPoolFactory();
+ poolFactory.addServer(host1, port1);
+ poolFactory.addServer(host2, port2);
+
+ Pool pool = poolFactory.create(getClass().getSimpleName() + "-Pool");
+
+ createRegionOnClient(PROXY_REGION_NAME, ClientRegionShortcut.PROXY, pool);
+ createRegionOnClient(CACHING_PROXY_REGION_NAME, ClientRegionShortcut.CACHING_PROXY, pool);
+
+ return ((PoolImpl) pool).getPrimaryPort();
+ }
+
+ private PoolFactory createPoolFactory() {
+ return PoolManager.createFactory().setThreadLocalConnections(true).setMinConnections(3)
+ .setSubscriptionEnabled(true).setSubscriptionRedundancy(0).setReadTimeout(10000)
+ .setSocketBufferSize(32768);
+ }
+
+ private void createRegionOnClient(final String regionName, final ClientRegionShortcut shortcut,
+ final Pool pool) {
+ ClientRegionFactory<Integer, Object> regionFactory =
+ clientCache.createClientRegionFactory(shortcut);
+ regionFactory.setPoolName(pool.getName());
+ Region<Integer, Object> region = regionFactory.create(regionName);
+ assertThat(region.getAttributes().getCloningEnabled()).isFalse();
+ }
+
+ private int givenTwoCacheServers() {
+ serverPort1 = server.invoke(() -> createServerCache());
+ serverPort2 = server2.invoke(() -> createServerCache());
+
+ return client.invoke(() -> createClientCacheWithTwoRegions(getServerHostName(server.getHost()),
+ serverPort1, getServerHostName(server2.getHost()), serverPort2));
+ }
+
+ private VM getPrimaryServerVM() {
+ assertThat(primaryServerPort).isGreaterThan(-1);
+ if (primaryServerPort == serverPort1) {
+ return server;
+ } else {
+ return server2;
+ }
+ }
+
+ private VM getSecondaryServerVM() {
+ assertThat(primaryServerPort).isGreaterThan(-1);
+ if (primaryServerPort == serverPort1) {
+ return server2;
+ } else {
+ return server;
+ }
+ }
+
+ private void registerKey(final String regionName, final int key) {
+ Region<Integer, Object> region = clientCache.getRegion(regionName);
+ region.registerInterest(key);
+ }
+
+ private void registerKeys(final String regionName, final int... keys) {
+ Region region = clientCache.getRegion(regionName);
+
+ List<Integer> list = new ArrayList<>();
+ for (int key : keys) {
+ list.add(key);
+ }
+
+ region.registerInterest(list);
+ }
+
+ private void registerRegex(final String regionName) {
+ Region<Integer, Object> region = clientCache.getRegion(regionName);
+ region.registerInterestRegex(REGEX);
+ }
+
+ private void stopCacheServer() {
+ getCacheServer().stop();
+ }
+
+ private void awaitServerMetaDataToContainClient() {
+ await().atMost(30, SECONDS)
+ .until(() -> assertThat(
+ getCacheServer().getAcceptor().getCacheClientNotifier().getClientProxies().size())
+ .isEqualTo(1));
+
+ CacheClientProxy proxy = getClientProxy();
+ assertThat(proxy).isNotNull();
+
+ await().atMost(30, SECONDS).until(() -> getClientProxy().isAlive() && getClientProxy()
+ .getRegionsWithEmptyDataPolicy().containsKey(Region.SEPARATOR + PROXY_REGION_NAME));
+ }
+
+ private void validateServerMetaDataKnowsThatClientRegisteredInterest() {
+ CacheClientProxy proxy = getClientProxy();
+ assertThat(proxy.hasRegisteredInterested()).isTrue();
+ }
+
+ private void validateServerMetaDataKnowsWhichClientRegionIsEmpty() {
+ CacheClientProxy proxy = getClientProxy();
+ assertThat(proxy.getRegionsWithEmptyDataPolicy())
+ .containsKey(Region.SEPARATOR + PROXY_REGION_NAME);
+ assertThat(proxy.getRegionsWithEmptyDataPolicy())
+ .doesNotContainKey(Region.SEPARATOR + CACHING_PROXY_REGION_NAME);
+ assertThat(proxy.getRegionsWithEmptyDataPolicy()).hasSize(1);
+ assertThat(proxy.getRegionsWithEmptyDataPolicy())
+ .containsEntry(Region.SEPARATOR + PROXY_REGION_NAME, 0);
+ }
+
+ private InternalCacheServer getCacheServer() {
+ return (InternalCacheServer) cache.getCacheServers().iterator().next();
+ }
+
+ private CacheClientProxy getClientProxy() {
+ CacheClientNotifier notifier = getCacheServer().getAcceptor().getCacheClientNotifier();
+ return notifier.getClientProxies().stream().findFirst().orElse(null);
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RegisterInterestServerMetaDataTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RegisterInterestServerMetaDataTest.java
new file mode 100644
index 0000000..0511d4b
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RegisterInterestServerMetaDataTest.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.internal.AvailablePort.SOCKET;
+import static org.apache.geode.internal.AvailablePort.getRandomAvailablePort;
+import static org.apache.geode.test.dunit.Disconnect.disconnectAllFromDS;
+import static org.apache.geode.test.dunit.Host.getHost;
+import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
+import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.internal.InternalClientCache;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalCacheServer;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+@SuppressWarnings("serial")
+public class RegisterInterestServerMetaDataTest implements Serializable {
+
+ private static final String PROXY_REGION_NAME = "PROXY_REGION_NAME";
+ private static final String CACHING_PROXY_REGION_NAME = "CACHING_PROXY_REGION_NAME";
+
+ private static InternalCache cache;
+ private static InternalClientCache clientCache;
+
+ private String hostName;
+
+ private VM server;
+ private VM client;
+ private int serverPort1;
+
+ @ClassRule
+ public static DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+ @Before
+ public void setUp() throws Exception {
+ server = getHost(0).getVM(0);
+ client = getHost(0).getVM(1);
+
+ hostName = getServerHostName(server.getHost());
+
+ serverPort1 = server.invoke(() -> createServerCache());
+ client.invoke(() -> createClientCacheWithTwoRegions(hostName, serverPort1));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ disconnectAllFromDS();
+
+ cache = null;
+ invokeInEveryVM(() -> cache = null);
+
+ clientCache = null;
+ invokeInEveryVM(() -> clientCache = null);
+ }
+
+ @Test
+ public void registerInterestSingleKeyCanBeInvokedMultipleTimes() throws Exception {
+ client.invoke(() -> registerKey(PROXY_REGION_NAME, 1));
+ client.invoke(() -> registerKey(CACHING_PROXY_REGION_NAME, 1));
+
+ server.invoke(() -> awaitServerMetaDataToContainClient());
+ server.invoke(() -> validateServerMetaDataKnowsThatClientRegisteredInterest());
+ server.invoke(() -> validateServerMetaDataKnowsWhichClientRegionIsEmpty());
+
+ // invoke registerKey multiple times
+
+ client.invoke(() -> registerKey(PROXY_REGION_NAME, 2));
+ client.invoke(() -> registerKey(PROXY_REGION_NAME, 3));
+ client.invoke(() -> registerKey(CACHING_PROXY_REGION_NAME, 4));
+
+ server.invoke(() -> validateServerMetaDataKnowsThatClientRegisteredInterest());
+ server.invoke(() -> validateServerMetaDataKnowsWhichClientRegionIsEmpty());
+ }
+
+ @Test
+ public void registerInterestRegexCanBeInvokedMultipleTimes() throws Exception {
+ client.invoke(() -> registerRegex(PROXY_REGION_NAME, ".*"));
+ client.invoke(() -> registerRegex(CACHING_PROXY_REGION_NAME, ".*"));
+
+ server.invoke(() -> awaitServerMetaDataToContainClient());
+ server.invoke(() -> validateServerMetaDataKnowsThatClientRegisteredInterest());
+ server.invoke(() -> validateServerMetaDataKnowsWhichClientRegionIsEmpty());
+
+ // invoke registerRegex multiple times
+
+ client.invoke(() -> registerRegex(PROXY_REGION_NAME, "8"));
+ client.invoke(() -> registerRegex(PROXY_REGION_NAME, "7"));
+ client.invoke(() -> registerRegex(CACHING_PROXY_REGION_NAME, "9"));
+
+ server.invoke(() -> validateServerMetaDataKnowsThatClientRegisteredInterest());
+ server.invoke(() -> validateServerMetaDataKnowsWhichClientRegionIsEmpty());
+ }
+
+ @Test
+ public void registerInterestKeyListCanBeInvokedMultipleTimes() throws Exception {
+ client.invoke(() -> registerKeys(PROXY_REGION_NAME, 1, 2));
+ client.invoke(() -> registerKeys(CACHING_PROXY_REGION_NAME, 1, 2));
+
+ server.invoke(() -> awaitServerMetaDataToContainClient());
+ server.invoke(() -> validateServerMetaDataKnowsThatClientRegisteredInterest());
+ server.invoke(() -> validateServerMetaDataKnowsWhichClientRegionIsEmpty());
+
+ // invoke registerKeys multiple times
+
+ client.invoke(() -> registerKeys(PROXY_REGION_NAME, 3, 4));
+ client.invoke(() -> registerKeys(PROXY_REGION_NAME, 5));
+ client.invoke(() -> registerKeys(CACHING_PROXY_REGION_NAME, 3));
+
+ server.invoke(() -> validateServerMetaDataKnowsThatClientRegisteredInterest());
+ server.invoke(() -> validateServerMetaDataKnowsWhichClientRegionIsEmpty());
+ }
+
+ private int createServerCache() throws IOException {
+ cache = (InternalCache) new CacheFactory().create();
+
+ RegionFactory<Integer, Object> regionFactory = cache.createRegionFactory();
+ regionFactory.setDataPolicy(DataPolicy.REPLICATE);
+ regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+
+ regionFactory.<Integer, Object>create(PROXY_REGION_NAME);
+ regionFactory.<Integer, Object>create(CACHING_PROXY_REGION_NAME);
+
+ CacheServer server = cache.addCacheServer();
+ server.setPort(getRandomAvailablePort(SOCKET));
+ server.start();
+ return server.getPort();
+ }
+
+ private void createClientCacheWithTwoRegions(final String host, final int port) {
+ createClientCache();
+
+ PoolFactory poolFactory = createPoolFactory();
+ poolFactory.addServer(host, port);
+
+ Pool pool = poolFactory.create(getClass().getSimpleName() + "-Pool");
+
+ createRegionOnClient(PROXY_REGION_NAME, ClientRegionShortcut.PROXY, pool);
+ Region<Integer, Object> region2 =
+ createRegionOnClient(CACHING_PROXY_REGION_NAME, ClientRegionShortcut.CACHING_PROXY, pool);
+
+ region2.getAttributesMutator().setCloningEnabled(true);
+ assertThat(region2.getAttributes().getCloningEnabled()).isTrue();
+ }
+
+ private void createClientCache() {
+ clientCache = (InternalClientCache) new ClientCacheFactory().create();
+ assertThat(clientCache.isClient()).isTrue();
+ }
+
+ private PoolFactory createPoolFactory() {
+ return PoolManager.createFactory().setThreadLocalConnections(true).setMinConnections(3)
+ .setSubscriptionEnabled(true).setSubscriptionRedundancy(0).setReadTimeout(10000)
+ .setSocketBufferSize(32768);
+ }
+
+ private Region<Integer, Object> createRegionOnClient(final String regionName,
+ final ClientRegionShortcut shortcut, final Pool pool) {
+ ClientRegionFactory<Integer, Object> regionFactory =
+ clientCache.createClientRegionFactory(shortcut);
+ regionFactory.setPoolName(pool.getName());
+ Region<Integer, Object> region = regionFactory.create(regionName);
+ assertThat(region.getAttributes().getCloningEnabled()).isFalse();
+ return region;
+ }
+
+ private CacheClientProxy getClientProxy() {
+ CacheClientNotifier notifier = getCacheServer().getAcceptor().getCacheClientNotifier();
+ return notifier.getClientProxies().stream().findFirst().orElse(null);
+ }
+
+ private InternalCacheServer getCacheServer() {
+ return (InternalCacheServer) cache.getCacheServers().iterator().next();
+ }
+
+ /**
+ * Register single key on given region
+ */
+ private void registerKey(final String regionName, final int key) {
+ Region<Integer, Object> region = clientCache.getRegion(regionName);
+ region.registerInterest(key);
+ }
+
+ /**
+ * Register list of keys on given region
+ */
+ private void registerKeys(final String regionName, final int... keys) {
+ Region region = clientCache.getRegion(regionName);
+
+ List<Integer> list = new ArrayList<>();
+ for (int key : keys) {
+ list.add(key);
+ }
+
+ region.registerInterest(list);
+ }
+
+ private void registerRegex(final String regionName, final String regex) {
+ Region<Integer, Object> region = clientCache.getRegion(regionName);
+ region.registerInterestRegex(regex);
+ }
+
+ private void awaitServerMetaDataToContainClient() {
+ await().atMost(30, SECONDS)
+ .until(() -> assertThat(
+ getCacheServer().getAcceptor().getCacheClientNotifier().getClientProxies().size())
+ .isEqualTo(1));
+
+ CacheClientProxy proxy = getClientProxy();
+ assertThat(proxy).isNotNull();
+
+ await().atMost(30, SECONDS).until(() -> getClientProxy().isAlive() && getClientProxy()
+ .getRegionsWithEmptyDataPolicy().containsKey(Region.SEPARATOR + PROXY_REGION_NAME));
+ }
+
+ private void validateServerMetaDataKnowsThatClientRegisteredInterest() {
+ CacheClientProxy proxy = getClientProxy();
+ assertThat(proxy.hasRegisteredInterested()).isTrue();
+ }
+
+ private void validateServerMetaDataKnowsWhichClientRegionIsEmpty() {
+ CacheClientProxy proxy = getClientProxy();
+ assertThat(proxy.getRegionsWithEmptyDataPolicy())
+ .containsKey(Region.SEPARATOR + PROXY_REGION_NAME);
+ assertThat(proxy.getRegionsWithEmptyDataPolicy())
+ .doesNotContainKey(Region.SEPARATOR + CACHING_PROXY_REGION_NAME);
+ assertThat(proxy.getRegionsWithEmptyDataPolicy()).hasSize(1);
+ assertThat(proxy.getRegionsWithEmptyDataPolicy())
+ .containsEntry(Region.SEPARATOR + PROXY_REGION_NAME, 0);
+ }
+}
--
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.