You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/10/13 17:16:06 UTC

[geode] 01/02: GEODE-3829: add tests for clients with register interest

This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 9c050addb8059d2b0d5b4de4643579fa2ebb4d06
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Thu Oct 12 14:42:01 2017 -0700

    GEODE-3829: add tests for clients with register interest
    
    * ClientWithInterestFailoverTest -- verifies that registered
    interest carries over when client fails over to other server
    
    * RegisterInterestServerMetaDataTest -- verifies that server
    metadata is correct for clients that register interest
---
 .../internal/admin/remote/RemoteBridgeServer.java  |   6 +
 .../geode/internal/cache/AbstractCacheServer.java  |   2 +-
 .../geode/internal/cache/CacheServerImpl.java      |  20 ++
 .../geode/internal/cache/InternalCacheServer.java  |  23 ++
 .../internal/cache/tier/sockets/AcceptorImpl.java  |   3 +-
 .../cache/tier/sockets/CacheClientProxy.java       |   2 +-
 .../cache/tier/sockets/InternalAcceptor.java       |  20 ++
 .../cache/xmlcache/CacheServerCreation.java        |   5 +
 .../sockets/ClientWithInterestFailoverTest.java    | 280 +++++++++++++++++++++
 .../RegisterInterestServerMetaDataTest.java        | 272 ++++++++++++++++++++
 10 files changed, 630 insertions(+), 3 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/admin/remote/RemoteBridgeServer.java b/geode-core/src/main/java/org/apache/geode/internal/admin/remote/RemoteBridgeServer.java
index 832feea..dfae1fe 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/admin/remote/RemoteBridgeServer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/admin/remote/RemoteBridgeServer.java
@@ -36,6 +36,7 @@ import org.apache.geode.internal.Version;
 import org.apache.geode.internal.admin.AdminBridgeServer;
 import org.apache.geode.internal.cache.AbstractCacheServer;
 import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.sockets.InternalAcceptor;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 
 /**
@@ -224,6 +225,11 @@ public class RemoteBridgeServer extends AbstractCacheServer
     }
   }
 
+  @Override
+  public InternalAcceptor getAcceptor() {
+    throw new UnsupportedOperationException("not implemented on " + getClass().getSimpleName());
+  }
+
   private static class RemoteLoadProbe extends ServerLoadProbeAdapter {
     /** The description of this callback */
     private final String desc;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractCacheServer.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractCacheServer.java
index 0a6c24b..93e1194 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractCacheServer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractCacheServer.java
@@ -38,7 +38,7 @@ import org.apache.geode.management.membership.ClientMembershipListener;
  *
  * @since GemFire 5.7
  */
-public abstract class AbstractCacheServer implements CacheServer {
+public abstract class AbstractCacheServer implements InternalCacheServer {
 
   public static final String TEST_OVERRIDE_DEFAULT_PORT_PROPERTY =
       DistributionConfig.GEMFIRE_PREFIX + "test.CacheServer.OVERRIDE_DEFAULT_PORT";
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
index bcd8b32..0cf7081 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
@@ -151,6 +151,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
 
   // //////////////////// Instance Methods ///////////////////
 
+  @Override
   public CancelCriterion getCancelCriterion() {
     return cache.getCancelCriterion();
   }
@@ -272,6 +273,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
   }
 
 
+  @Override
   public ClientSubscriptionConfig getClientSubscriptionConfig() {
     return this.clientSubscriptionConfig;
   }
@@ -422,10 +424,12 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
     }
   }
 
+  @Override
   public boolean isRunning() {
     return this.acceptor != null && this.acceptor.isRunning();
   }
 
+  @Override
   public synchronized void stop() {
     if (!isRunning()) {
       return;
@@ -525,24 +529,29 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
    * 
    * @return the internal acceptor
    */
+  @Override
   public AcceptorImpl getAcceptor() {
     return this.acceptor;
   }
 
   // DistributionAdvisee methods
 
+  @Override
   public DM getDistributionManager() {
     return getSystem().getDistributionManager();
   }
 
+  @Override
   public ClientSession getClientSession(String durableClientId) {
     return getCacheClientNotifier().getClientProxy(durableClientId);
   }
 
+  @Override
   public ClientSession getClientSession(DistributedMember member) {
     return getCacheClientNotifier().getClientProxy(ClientProxyMembershipID.getClientId(member));
   }
 
+  @Override
   public Set getAllClientSessions() {
     return new HashSet(getCacheClientNotifier().getClientProxies());
   }
@@ -657,6 +666,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
    * Marker class name to identify the lock more easily in thread dumps private static class
    * ClientMessagesRegionLock extends Object { }
    */
+  @Override
   public DistributionAdvisor getDistributionAdvisor() {
     return this.advisor;
   }
@@ -668,10 +678,12 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
     return this.advisor;
   }
 
+  @Override
   public Profile getProfile() {
     return getDistributionAdvisor().createProfile();
   }
 
+  @Override
   public DistributionAdvisee getParentAdvisee() {
     return null;
   }
@@ -681,14 +693,17 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
    * 
    * @return the underlying{@code InternalDistributedSystem}
    */
+  @Override
   public InternalDistributedSystem getSystem() {
     return (InternalDistributedSystem) this.cache.getDistributedSystem();
   }
 
+  @Override
   public String getName() {
     return "CacheServer";
   }
 
+  @Override
   public String getFullPath() {
     return getName();
   }
@@ -721,6 +736,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
     return groupList.toArray(groups);
   }
 
+  @Override
   public /* synchronized causes deadlock */ void fillInProfile(Profile profile) {
     assert profile instanceof CacheServerProfile;
     CacheServerProfile bp = (CacheServerProfile) profile;
@@ -734,6 +750,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
     bp.finishInit();
   }
 
+  @Override
   public int getSerialNumber() {
     return this.serialNumber;
   }
@@ -751,6 +768,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
    * @throws IllegalStateException if the BridgeServer has not been started
    * @since GemFire 5.8Beta
    */
+  @Override
   public void registerInterestRegistrationListener(InterestRegistrationListener listener) {
     if (!this.isRunning()) {
       throw new IllegalStateException(
@@ -767,6 +785,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
    * 
    * @since GemFire 5.8Beta
    */
+  @Override
   public void unregisterInterestRegistrationListener(InterestRegistrationListener listener) {
     getCacheClientNotifier().unregisterInterestRegistrationListener(listener);
   }
@@ -778,6 +797,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
    * 
    * @since GemFire 5.8Beta
    */
+  @Override
   public Set getInterestRegistrationListeners() {
     return getCacheClientNotifier().getInterestRegistrationListeners();
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCacheServer.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCacheServer.java
new file mode 100644
index 0000000..e4687c9
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCacheServer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.cache.tier.sockets.InternalAcceptor;
+
+public interface InternalCacheServer extends CacheServer {
+
+  InternalAcceptor getAcceptor();
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 704369e..1b5dfa9 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -99,7 +99,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * @since GemFire 2.0.2
  */
 @SuppressWarnings("deprecation")
-public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
+public class AcceptorImpl extends Acceptor implements InternalAcceptor, Runnable, CommBufferPool {
   private static final Logger logger = LogService.getLogger();
 
   private static final boolean isJRockit = System.getProperty("java.vm.name").contains("JRockit");
@@ -1736,6 +1736,7 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
    *
    * @return the instance that provides client notification
    */
+  @Override
   public CacheClientNotifier getCacheClientNotifier() {
     return this.clientNotifier;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 34f232d..a9c1064 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -2994,7 +2994,7 @@ public class CacheClientProxy implements ClientSession {
    *
    * @since GemFire 6.1
    */
-  public Map getRegionsWithEmptyDataPolicy() {
+  public Map<String, Integer> getRegionsWithEmptyDataPolicy() {
     return regionsWithEmptyDataPolicy;
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/InternalAcceptor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/InternalAcceptor.java
new file mode 100644
index 0000000..8b3860f
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/InternalAcceptor.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+public interface InternalAcceptor {
+
+  CacheClientNotifier getCacheClientNotifier();
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java
index 30d6849..0a9c4af 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheServerCreation.java
@@ -21,6 +21,7 @@ import org.apache.geode.cache.server.ClientSubscriptionConfig;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.internal.cache.AbstractCacheServer;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.sockets.InternalAcceptor;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 
 import java.io.IOException;
@@ -241,4 +242,8 @@ public class CacheServerCreation extends AbstractCacheServer {
     throw new UnsupportedOperationException("Shouldn't be invoked");
   }
 
+  @Override
+  public InternalAcceptor getAcceptor() {
+    throw new UnsupportedOperationException("Shouldn't be invoked");
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientWithInterestFailoverTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientWithInterestFailoverTest.java
new file mode 100644
index 0000000..a655ad5
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientWithInterestFailoverTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.internal.AvailablePort.SOCKET;
+import static org.apache.geode.internal.AvailablePort.getRandomAvailablePort;
+import static org.apache.geode.test.dunit.Disconnect.disconnectAllFromDS;
+import static org.apache.geode.test.dunit.Host.getHost;
+import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
+import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.internal.InternalClientCache;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalCacheServer;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+@SuppressWarnings("serial")
+public class ClientWithInterestFailoverTest implements Serializable {
+
+  private static final String PROXY_REGION_NAME = "PROXY_REGION_NAME";
+  private static final String CACHING_PROXY_REGION_NAME = "CACHING_PROXY_REGION_NAME";
+  private static final String REGEX = ".*";
+
+  private static InternalCache cache;
+  private static InternalClientCache clientCache;
+
+  private VM client;
+  private VM server;
+  private VM server2;
+  private int serverPort1;
+  private int serverPort2;
+  private int primaryServerPort;
+
+  @ClassRule
+  public static DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+  @Before
+  public void setUp() throws Exception {
+    client = getHost(0).getVM(0);
+
+    server = getHost(0).getVM(1);
+    server2 = getHost(0).getVM(2);
+
+    primaryServerPort = givenTwoCacheServers();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    disconnectAllFromDS();
+
+    cache = null;
+    invokeInEveryVM(() -> cache = null);
+
+    clientCache = null;
+    invokeInEveryVM(() -> clientCache = null);
+  }
+
+  @Test
+  public void clientWithSingleKeyInterestFailsOver() throws Exception {
+    client.invoke(() -> registerKey(PROXY_REGION_NAME, 1));
+    client.invoke(() -> registerKey(CACHING_PROXY_REGION_NAME, 1));
+
+    performFailoverTesting();
+  }
+
+  @Test
+  public void clientWithKeyListInterestFailsOver() throws Exception {
+    client.invoke(() -> registerKeys(PROXY_REGION_NAME, 1, 2));
+    client.invoke(() -> registerKeys(CACHING_PROXY_REGION_NAME, 1, 2));
+
+    performFailoverTesting();
+  }
+
+  @Test
+  public void clientWithRegexInterestFailsOver() throws Exception {
+    client.invoke(() -> registerRegex(PROXY_REGION_NAME));
+    client.invoke(() -> registerRegex(CACHING_PROXY_REGION_NAME));
+
+    performFailoverTesting();
+  }
+
+  private void performFailoverTesting() {
+    // arrange
+    VM primaryServerVM = getPrimaryServerVM();
+    VM secondaryServerVM = getSecondaryServerVM();
+    primaryServerVM.invoke(() -> awaitServerMetaDataToContainClient());
+    primaryServerVM.invoke(() -> validateServerMetaDataKnowsThatClientRegisteredInterest());
+    primaryServerVM.invoke(() -> validateServerMetaDataKnowsWhichClientRegionIsEmpty());
+
+    // act
+    primaryServerVM.invoke(() -> stopCacheServer());
+
+    // assert
+    secondaryServerVM.invoke(() -> awaitServerMetaDataToContainClient());
+    secondaryServerVM.invoke(() -> validateServerMetaDataKnowsThatClientRegisteredInterest());
+    secondaryServerVM.invoke(() -> validateServerMetaDataKnowsWhichClientRegionIsEmpty());
+  }
+
+  private int createServerCache() throws IOException {
+    cache = (InternalCache) new CacheFactory().create();
+
+    RegionFactory<Integer, Object> regionFactory = cache.createRegionFactory();
+    regionFactory.setDataPolicy(DataPolicy.REPLICATE);
+    regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+
+    regionFactory.<Integer, Object>create(PROXY_REGION_NAME);
+    regionFactory.<Integer, Object>create(CACHING_PROXY_REGION_NAME);
+
+    CacheServer server = cache.addCacheServer();
+    server.setPort(getRandomAvailablePort(SOCKET));
+    server.start();
+    return server.getPort();
+  }
+
+  /**
+   * Create client cache and return the client connection pool's primary server port
+   */
+  private int createClientCacheWithTwoRegions(final String host1, final int port1,
+      final String host2, final int port2) {
+    clientCache = (InternalClientCache) new ClientCacheFactory().create();
+    assertThat(clientCache.isClient()).isTrue();
+
+    PoolFactory poolFactory = createPoolFactory();
+    poolFactory.addServer(host1, port1);
+    poolFactory.addServer(host2, port2);
+
+    Pool pool = poolFactory.create(getClass().getSimpleName() + "-Pool");
+
+    createRegionOnClient(PROXY_REGION_NAME, ClientRegionShortcut.PROXY, pool);
+    createRegionOnClient(CACHING_PROXY_REGION_NAME, ClientRegionShortcut.CACHING_PROXY, pool);
+
+    return ((PoolImpl) pool).getPrimaryPort();
+  }
+
+  private PoolFactory createPoolFactory() {
+    return PoolManager.createFactory().setThreadLocalConnections(true).setMinConnections(3)
+        .setSubscriptionEnabled(true).setSubscriptionRedundancy(0).setReadTimeout(10000)
+        .setSocketBufferSize(32768);
+  }
+
+  private void createRegionOnClient(final String regionName, final ClientRegionShortcut shortcut,
+      final Pool pool) {
+    ClientRegionFactory<Integer, Object> regionFactory =
+        clientCache.createClientRegionFactory(shortcut);
+    regionFactory.setPoolName(pool.getName());
+    Region<Integer, Object> region = regionFactory.create(regionName);
+    assertThat(region.getAttributes().getCloningEnabled()).isFalse();
+  }
+
+  private int givenTwoCacheServers() {
+    serverPort1 = server.invoke(() -> createServerCache());
+    serverPort2 = server2.invoke(() -> createServerCache());
+
+    return client.invoke(() -> createClientCacheWithTwoRegions(getServerHostName(server.getHost()),
+        serverPort1, getServerHostName(server2.getHost()), serverPort2));
+  }
+
+  private VM getPrimaryServerVM() {
+    assertThat(primaryServerPort).isGreaterThan(-1);
+    if (primaryServerPort == serverPort1) {
+      return server;
+    } else {
+      return server2;
+    }
+  }
+
+  private VM getSecondaryServerVM() {
+    assertThat(primaryServerPort).isGreaterThan(-1);
+    if (primaryServerPort == serverPort1) {
+      return server2;
+    } else {
+      return server;
+    }
+  }
+
+  private void registerKey(final String regionName, final int key) {
+    Region<Integer, Object> region = clientCache.getRegion(regionName);
+    region.registerInterest(key);
+  }
+
+  private void registerKeys(final String regionName, final int... keys) {
+    Region region = clientCache.getRegion(regionName);
+
+    List<Integer> list = new ArrayList<>();
+    for (int key : keys) {
+      list.add(key);
+    }
+
+    region.registerInterest(list);
+  }
+
+  private void registerRegex(final String regionName) {
+    Region<Integer, Object> region = clientCache.getRegion(regionName);
+    region.registerInterestRegex(REGEX);
+  }
+
+  private void stopCacheServer() {
+    getCacheServer().stop();
+  }
+
+  private void awaitServerMetaDataToContainClient() {
+    await().atMost(30, SECONDS)
+        .until(() -> assertThat(
+            getCacheServer().getAcceptor().getCacheClientNotifier().getClientProxies().size())
+                .isEqualTo(1));
+
+    CacheClientProxy proxy = getClientProxy();
+    assertThat(proxy).isNotNull();
+
+    await().atMost(30, SECONDS).until(() -> getClientProxy().isAlive() && getClientProxy()
+        .getRegionsWithEmptyDataPolicy().containsKey(Region.SEPARATOR + PROXY_REGION_NAME));
+  }
+
+  private void validateServerMetaDataKnowsThatClientRegisteredInterest() {
+    CacheClientProxy proxy = getClientProxy();
+    assertThat(proxy.hasRegisteredInterested()).isTrue();
+  }
+
+  private void validateServerMetaDataKnowsWhichClientRegionIsEmpty() {
+    CacheClientProxy proxy = getClientProxy();
+    assertThat(proxy.getRegionsWithEmptyDataPolicy())
+        .containsKey(Region.SEPARATOR + PROXY_REGION_NAME);
+    assertThat(proxy.getRegionsWithEmptyDataPolicy())
+        .doesNotContainKey(Region.SEPARATOR + CACHING_PROXY_REGION_NAME);
+    assertThat(proxy.getRegionsWithEmptyDataPolicy()).hasSize(1);
+    assertThat(proxy.getRegionsWithEmptyDataPolicy())
+        .containsEntry(Region.SEPARATOR + PROXY_REGION_NAME, 0);
+  }
+
+  private InternalCacheServer getCacheServer() {
+    return (InternalCacheServer) cache.getCacheServers().iterator().next();
+  }
+
+  private CacheClientProxy getClientProxy() {
+    CacheClientNotifier notifier = getCacheServer().getAcceptor().getCacheClientNotifier();
+    return notifier.getClientProxies().stream().findFirst().orElse(null);
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RegisterInterestServerMetaDataTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RegisterInterestServerMetaDataTest.java
new file mode 100644
index 0000000..0511d4b
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RegisterInterestServerMetaDataTest.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.internal.AvailablePort.SOCKET;
+import static org.apache.geode.internal.AvailablePort.getRandomAvailablePort;
+import static org.apache.geode.test.dunit.Disconnect.disconnectAllFromDS;
+import static org.apache.geode.test.dunit.Host.getHost;
+import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM;
+import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.internal.InternalClientCache;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalCacheServer;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+@SuppressWarnings("serial")
+public class RegisterInterestServerMetaDataTest implements Serializable {
+
+  private static final String PROXY_REGION_NAME = "PROXY_REGION_NAME";
+  private static final String CACHING_PROXY_REGION_NAME = "CACHING_PROXY_REGION_NAME";
+
+  private static InternalCache cache;
+  private static InternalClientCache clientCache;
+
+  private String hostName;
+
+  private VM server;
+  private VM client;
+  private int serverPort1;
+
+  @ClassRule
+  public static DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+  @Before
+  public void setUp() throws Exception {
+    server = getHost(0).getVM(0);
+    client = getHost(0).getVM(1);
+
+    hostName = getServerHostName(server.getHost());
+
+    serverPort1 = server.invoke(() -> createServerCache());
+    client.invoke(() -> createClientCacheWithTwoRegions(hostName, serverPort1));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    disconnectAllFromDS();
+
+    cache = null;
+    invokeInEveryVM(() -> cache = null);
+
+    clientCache = null;
+    invokeInEveryVM(() -> clientCache = null);
+  }
+
+  @Test
+  public void registerInterestSingleKeyCanBeInvokedMultipleTimes() throws Exception {
+    client.invoke(() -> registerKey(PROXY_REGION_NAME, 1));
+    client.invoke(() -> registerKey(CACHING_PROXY_REGION_NAME, 1));
+
+    server.invoke(() -> awaitServerMetaDataToContainClient());
+    server.invoke(() -> validateServerMetaDataKnowsThatClientRegisteredInterest());
+    server.invoke(() -> validateServerMetaDataKnowsWhichClientRegionIsEmpty());
+
+    // invoke registerKey multiple times
+
+    client.invoke(() -> registerKey(PROXY_REGION_NAME, 2));
+    client.invoke(() -> registerKey(PROXY_REGION_NAME, 3));
+    client.invoke(() -> registerKey(CACHING_PROXY_REGION_NAME, 4));
+
+    server.invoke(() -> validateServerMetaDataKnowsThatClientRegisteredInterest());
+    server.invoke(() -> validateServerMetaDataKnowsWhichClientRegionIsEmpty());
+  }
+
+  @Test
+  public void registerInterestRegexCanBeInvokedMultipleTimes() throws Exception {
+    client.invoke(() -> registerRegex(PROXY_REGION_NAME, ".*"));
+    client.invoke(() -> registerRegex(CACHING_PROXY_REGION_NAME, ".*"));
+
+    server.invoke(() -> awaitServerMetaDataToContainClient());
+    server.invoke(() -> validateServerMetaDataKnowsThatClientRegisteredInterest());
+    server.invoke(() -> validateServerMetaDataKnowsWhichClientRegionIsEmpty());
+
+    // invoke registerRegex multiple times
+
+    client.invoke(() -> registerRegex(PROXY_REGION_NAME, "8"));
+    client.invoke(() -> registerRegex(PROXY_REGION_NAME, "7"));
+    client.invoke(() -> registerRegex(CACHING_PROXY_REGION_NAME, "9"));
+
+    server.invoke(() -> validateServerMetaDataKnowsThatClientRegisteredInterest());
+    server.invoke(() -> validateServerMetaDataKnowsWhichClientRegionIsEmpty());
+  }
+
+  @Test
+  public void registerInterestKeyListCanBeInvokedMultipleTimes() throws Exception {
+    client.invoke(() -> registerKeys(PROXY_REGION_NAME, 1, 2));
+    client.invoke(() -> registerKeys(CACHING_PROXY_REGION_NAME, 1, 2));
+
+    server.invoke(() -> awaitServerMetaDataToContainClient());
+    server.invoke(() -> validateServerMetaDataKnowsThatClientRegisteredInterest());
+    server.invoke(() -> validateServerMetaDataKnowsWhichClientRegionIsEmpty());
+
+    // invoke registerKeys multiple times
+
+    client.invoke(() -> registerKeys(PROXY_REGION_NAME, 3, 4));
+    client.invoke(() -> registerKeys(PROXY_REGION_NAME, 5));
+    client.invoke(() -> registerKeys(CACHING_PROXY_REGION_NAME, 3));
+
+    server.invoke(() -> validateServerMetaDataKnowsThatClientRegisteredInterest());
+    server.invoke(() -> validateServerMetaDataKnowsWhichClientRegionIsEmpty());
+  }
+
+  private int createServerCache() throws IOException {
+    cache = (InternalCache) new CacheFactory().create();
+
+    RegionFactory<Integer, Object> regionFactory = cache.createRegionFactory();
+    regionFactory.setDataPolicy(DataPolicy.REPLICATE);
+    regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+
+    regionFactory.<Integer, Object>create(PROXY_REGION_NAME);
+    regionFactory.<Integer, Object>create(CACHING_PROXY_REGION_NAME);
+
+    CacheServer server = cache.addCacheServer();
+    server.setPort(getRandomAvailablePort(SOCKET));
+    server.start();
+    return server.getPort();
+  }
+
+  private void createClientCacheWithTwoRegions(final String host, final int port) {
+    createClientCache();
+
+    PoolFactory poolFactory = createPoolFactory();
+    poolFactory.addServer(host, port);
+
+    Pool pool = poolFactory.create(getClass().getSimpleName() + "-Pool");
+
+    createRegionOnClient(PROXY_REGION_NAME, ClientRegionShortcut.PROXY, pool);
+    Region<Integer, Object> region2 =
+        createRegionOnClient(CACHING_PROXY_REGION_NAME, ClientRegionShortcut.CACHING_PROXY, pool);
+
+    region2.getAttributesMutator().setCloningEnabled(true);
+    assertThat(region2.getAttributes().getCloningEnabled()).isTrue();
+  }
+
+  private void createClientCache() {
+    clientCache = (InternalClientCache) new ClientCacheFactory().create();
+    assertThat(clientCache.isClient()).isTrue();
+  }
+
+  private PoolFactory createPoolFactory() {
+    return PoolManager.createFactory().setThreadLocalConnections(true).setMinConnections(3)
+        .setSubscriptionEnabled(true).setSubscriptionRedundancy(0).setReadTimeout(10000)
+        .setSocketBufferSize(32768);
+  }
+
+  private Region<Integer, Object> createRegionOnClient(final String regionName,
+      final ClientRegionShortcut shortcut, final Pool pool) {
+    ClientRegionFactory<Integer, Object> regionFactory =
+        clientCache.createClientRegionFactory(shortcut);
+    regionFactory.setPoolName(pool.getName());
+    Region<Integer, Object> region = regionFactory.create(regionName);
+    assertThat(region.getAttributes().getCloningEnabled()).isFalse();
+    return region;
+  }
+
+  private CacheClientProxy getClientProxy() {
+    CacheClientNotifier notifier = getCacheServer().getAcceptor().getCacheClientNotifier();
+    return notifier.getClientProxies().stream().findFirst().orElse(null);
+  }
+
+  private InternalCacheServer getCacheServer() {
+    return (InternalCacheServer) cache.getCacheServers().iterator().next();
+  }
+
+  /**
+   * Register single key on given region
+   */
+  private void registerKey(final String regionName, final int key) {
+    Region<Integer, Object> region = clientCache.getRegion(regionName);
+    region.registerInterest(key);
+  }
+
+  /**
+   * Register list of keys on given region
+   */
+  private void registerKeys(final String regionName, final int... keys) {
+    Region region = clientCache.getRegion(regionName);
+
+    List<Integer> list = new ArrayList<>();
+    for (int key : keys) {
+      list.add(key);
+    }
+
+    region.registerInterest(list);
+  }
+
+  private void registerRegex(final String regionName, final String regex) {
+    Region<Integer, Object> region = clientCache.getRegion(regionName);
+    region.registerInterestRegex(regex);
+  }
+
+  private void awaitServerMetaDataToContainClient() {
+    await().atMost(30, SECONDS)
+        .until(() -> assertThat(
+            getCacheServer().getAcceptor().getCacheClientNotifier().getClientProxies().size())
+                .isEqualTo(1));
+
+    CacheClientProxy proxy = getClientProxy();
+    assertThat(proxy).isNotNull();
+
+    await().atMost(30, SECONDS).until(() -> getClientProxy().isAlive() && getClientProxy()
+        .getRegionsWithEmptyDataPolicy().containsKey(Region.SEPARATOR + PROXY_REGION_NAME));
+  }
+
+  private void validateServerMetaDataKnowsThatClientRegisteredInterest() {
+    CacheClientProxy proxy = getClientProxy();
+    assertThat(proxy.hasRegisteredInterested()).isTrue();
+  }
+
+  private void validateServerMetaDataKnowsWhichClientRegionIsEmpty() {
+    CacheClientProxy proxy = getClientProxy();
+    assertThat(proxy.getRegionsWithEmptyDataPolicy())
+        .containsKey(Region.SEPARATOR + PROXY_REGION_NAME);
+    assertThat(proxy.getRegionsWithEmptyDataPolicy())
+        .doesNotContainKey(Region.SEPARATOR + CACHING_PROXY_REGION_NAME);
+    assertThat(proxy.getRegionsWithEmptyDataPolicy()).hasSize(1);
+    assertThat(proxy.getRegionsWithEmptyDataPolicy())
+        .containsEntry(Region.SEPARATOR + PROXY_REGION_NAME, 0);
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.