You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2016/05/03 07:31:06 UTC

incubator-geode git commit: GEODE-1183: keep only one proxy if there're 3 cache servers on one JVM

Repository: incubator-geode
Updated Branches:
  refs/heads/develop 1aa08cd54 -> 51e4e71ef


GEODE-1183: keep only one proxy if there're 3 cache servers on one JVM

Current API allows us to create 2 cache servers on the same JVM, then the client
will try to create 2 queues to that JVM, one secondary and one primary.
But the proxy is actually the same (since there's only one client), so the
CCN keeps destroying and recreating the proxy.

To fix, we will keep the first proxy and reject the duplicate creating.
Then the secondary proxy will automatically become primary.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/51e4e71e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/51e4e71e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/51e4e71e

Branch: refs/heads/develop
Commit: 51e4e71ef1ffb2fddb3ade42e0ad46fe40886239
Parents: 1aa08cd
Author: zhouxh <gz...@pivotal.io>
Authored: Sun Apr 24 22:51:07 2016 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Mon May 2 22:08:58 2016 -0700

----------------------------------------------------------------------
 .../cache/tier/sockets/CacheClientNotifier.java |  36 +++--
 .../cache/wan/CacheClientNotifierDUnitTest.java | 106 ++++++++++---
 .../cache/wan/Simple2CacheServerDUnitTest.java  | 157 +++++++++++++++++++
 3 files changed, 260 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/51e4e71e/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
index 1ba2294..80d05ba 100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -505,24 +505,32 @@ public class CacheClientNotifier {
       }
     } else {
       CacheClientProxy staleClientProxy = this.getClientProxy(proxyId);
+      boolean toCreateNewProxy = true;
       if (staleClientProxy != null) {
-        // A proxy exists for this non-durable client. It must be closed.
-        if (logger.isDebugEnabled()) {
-          logger.debug("CacheClientNotifier: A proxy exists for this non-durable client. It must be closed.");
-        }
-        if (staleClientProxy.startRemoval()) {
-          staleClientProxy.waitRemoval();
-        }
-        else {
-          staleClientProxy.close(false, false); // do not check for queue, just close it
-          removeClientProxy(staleClientProxy); // remove old proxy from proxy set
+        if (staleClientProxy.isConnected() && staleClientProxy.getSocket().isConnected()) {
+          successful = false;
+          toCreateNewProxy = false;
+        } else {
+          // A proxy exists for this non-durable client. It must be closed.
+          if (logger.isDebugEnabled()) {
+            logger.debug("CacheClientNotifier: A proxy exists for this non-durable client. It must be closed.");
+          }
+          if (staleClientProxy.startRemoval()) {
+            staleClientProxy.waitRemoval();
+          }
+          else {
+            staleClientProxy.close(false, false); // do not check for queue, just close it
+            removeClientProxy(staleClientProxy); // remove old proxy from proxy set
+          }
         }
       } // non-null stale proxy
 
-      // Create the new proxy for this non-durable client
-      l_proxy = new CacheClientProxy(this, socket, proxyId,
-          isPrimary, clientConflation, clientVersion, acceptorId, notifyBySubscription);
-      successful = this.initializeProxy(l_proxy);
+      if (toCreateNewProxy) {
+        // Create the new proxy for this non-durable client
+        l_proxy = new CacheClientProxy(this, socket, proxyId,
+            isPrimary, clientConflation, clientVersion, acceptorId, notifyBySubscription);
+        successful = this.initializeProxy(l_proxy);
+      }
     }
 
     if (!successful){

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/51e4e71e/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java
index 8bf819c..0b1cd11 100755
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java
@@ -18,25 +18,40 @@ package com.gemstone.gemfire.internal.cache.wan;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Properties;
 
 import org.junit.experimental.categories.Category;
 
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
 import com.gemstone.gemfire.cache.DiskStore;
 import com.gemstone.gemfire.cache.EvictionAction;
 import com.gemstone.gemfire.cache.EvictionAttributes;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.client.Pool;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl;
 import com.gemstone.gemfire.cache.server.CacheServer;
 import com.gemstone.gemfire.cache.server.ClientSubscriptionConfig;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
 import com.gemstone.gemfire.internal.AvailablePort;
 import com.gemstone.gemfire.internal.cache.CacheServerImpl;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverAdapter;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
 import com.gemstone.gemfire.internal.cache.UserSpecifiedRegionAttributes;
 import com.gemstone.gemfire.internal.cache.ha.HAContainerRegion;
 import com.gemstone.gemfire.internal.cache.ha.HAContainerWrapper;
 import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil;
 import com.gemstone.gemfire.internal.cache.xmlcache.RegionAttributesCreation;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.SerializableCallable;
 import com.gemstone.gemfire.test.dunit.SerializableRunnable;
 import com.gemstone.gemfire.test.dunit.VM;
 import com.gemstone.gemfire.test.dunit.Wait;
@@ -121,24 +136,17 @@ public class CacheClientNotifierDUnitTest extends WANTestBase {
     vm.invoke(checkCacheServer);
   }
 
-  private void closeCacheServer(VM vm, final int serverPort) {
-    SerializableRunnable stopCacheServer = new SerializableRunnable() {
-
-      @Override
-      public void run() throws Exception {
-        List<CacheServer> cacheServers = cache.getCacheServers();
-        CacheServerImpl server = null;
-        for (CacheServer cs:cacheServers) {
-          if (cs.getPort() == serverPort) {
-            server = (CacheServerImpl)cs;
-            break;
-          }
-        }
-        assertNotNull(server);
-        server.stop();
+  public static void closeACacheServer(final int serverPort) {
+    List<CacheServer> cacheServers = cache.getCacheServers();
+    CacheServerImpl server = null;
+    for (CacheServer cs:cacheServers) {
+      if (cs.getPort() == serverPort) {
+        server = (CacheServerImpl)cs;
+        break;
       }
-    };
-    vm.invoke(stopCacheServer);
+    }
+    assertNotNull(server);
+    server.stop();
   }
 
   private void verifyRegionSize(VM vm, final int expect) {
@@ -165,8 +173,12 @@ public class CacheClientNotifierDUnitTest extends WANTestBase {
    * The test will start several cache servers, including gateway receivers.
    * Shutdown them and verify the CacheClientNofifier for each server is correct
    */
-  @Category(FlakyTest.class) // GEODE-1183: random ports, failure to start threads, eats exceptions, time sensitive
-  public void testMultipleCacheServer() throws Exception {
+  // GEODE-1183: random ports, failure to start threads, eats exceptions, time sensitive
+  public void testNormalClient2MultipleCacheServer() throws Exception {
+    doMultipleCacheServer(false);
+  }
+
+  public void doMultipleCacheServer(boolean durable) throws Exception {
     /* test senario: */
     /* create 1 GatewaySender on vm0 */
     /* create 1 GatewayReceiver on vm1 */
@@ -199,8 +211,8 @@ public class CacheClientNotifierDUnitTest extends WANTestBase {
     checkCacheServer(vm1, serverPort2, true, 3);
     LogService.getLogger().info("receiverPort="+receiverPort+",serverPort="+serverPort+",serverPort2="+serverPort2);
     
-    vm2.invoke(() -> WANTestBase.createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR" ));
-    vm3.invoke(() -> WANTestBase.createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR" ));
+    vm2.invoke(() -> createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR", "123", durable));
+    vm3.invoke(() -> createClientWithLocator(nyPort, "localhost", getTestMethodName() + "_PR", "124", durable));
 
     vm0.invoke(() -> WANTestBase.createCache( lnPort ));
     vm0.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 400, false, false, null, true ));
@@ -211,19 +223,63 @@ public class CacheClientNotifierDUnitTest extends WANTestBase {
     /* verify */
     verifyRegionSize(vm0, NUM_KEYS);
     verifyRegionSize(vm1, NUM_KEYS);
-    verifyRegionSize(vm2, NUM_KEYS);
     verifyRegionSize(vm3, NUM_KEYS);
+    verifyRegionSize(vm2, NUM_KEYS);
 
     // close a cache server, then re-test
-    closeCacheServer(vm1, serverPort2);
+    vm1.invoke(() -> closeACacheServer(serverPort2));
 
     vm0.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", NUM_KEYS*2 ));
 
     /* verify */
     verifyRegionSize(vm0, NUM_KEYS*2);
     verifyRegionSize(vm1, NUM_KEYS*2);
-    verifyRegionSize(vm2, NUM_KEYS*2);
     verifyRegionSize(vm3, NUM_KEYS*2);
+    verifyRegionSize(vm2, NUM_KEYS*2);
+    
+    disconnectAllFromDS();
+  }
+
+  public static void createClientWithLocator(int port0,String host,
+      String regionName, String clientId, boolean isDurable) {
+    WANTestBase test = new WANTestBase(getTestMethodName());
+    Properties props = test.getDistributedSystemProperties();
+    props.setProperty("mcast-port", "0");
+    props.setProperty("locators", "");
+    if (isDurable) {
+      props.setProperty("durable-client-id", clientId);
+      props.setProperty("durable-client-timeout", "" + 200);
+    }
+
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+
+    assertNotNull(cache);
+    CacheServerTestUtil.disableShufflingOfEndpoints();
+    Pool p;
+    try {
+      p = PoolManager.createFactory().addLocator(host, port0)
+          .setPingInterval(250).setSubscriptionEnabled(true)
+          .setSubscriptionRedundancy(-1).setReadTimeout(2000)
+          .setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10)
+          .setRetryAttempts(3).create(regionName);
+    } finally {
+      CacheServerTestUtil.enableShufflingOfEndpoints();
+    }
+
+    AttributesFactory factory = new AttributesFactory();
+    factory.setPoolName(p.getName());
+    factory.setDataPolicy(DataPolicy.NORMAL);
+    RegionAttributes attrs = factory.create();
+    region = cache.createRegion(regionName, attrs);
+    region.registerInterest("ALL_KEYS");
+    assertNotNull(region);
+    if (isDurable) {
+      cache.readyForEvents();
+    }
+    LogWriterUtils.getLogWriter().info(
+        "Distributed Region " + regionName + " created Successfully :"
+            + region.toString() + " in a "+(isDurable?"durable":"")+" client");
   }
 
-}
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/51e4e71e/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/Simple2CacheServerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/Simple2CacheServerDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/Simple2CacheServerDUnitTest.java
new file mode 100755
index 0000000..684660b
--- /dev/null
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/Simple2CacheServerDUnitTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan;
+
+import java.util.Iterator;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.client.internal.PoolImpl;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverAdapter;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.test.dunit.SerializableCallable;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.test.junit.categories.FlakyTest;
+
+public class Simple2CacheServerDUnitTest extends WANTestBase {
+  private static final int NUM_KEYS = 10;
+  static int afterPrimaryCount = 0;
+  static int afterProxyReinitialized = 0;
+  
+  public Simple2CacheServerDUnitTest(String name) {
+    super(name);
+  }
+  
+  // GEODE-1183: random ports, failure to start threads, eats exceptions, time sensitive
+  public void testDurableClient2MultipleCacheServer() throws Exception {
+    doMultipleCacheServer(true);
+  }
+
+  public void testNormalClient2MultipleCacheServer() throws Exception {
+    doMultipleCacheServer(false);
+  }
+  
+  public void doMultipleCacheServer(boolean durable) throws Exception {
+    Integer lnPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    vm1.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm1.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    int serverPort = vm1.invoke(() -> WANTestBase.createCacheServer());
+    int serverPort2 = vm1.invoke(() -> WANTestBase.createCacheServer());
+
+    if (durable) {
+      vm1.invoke(() -> setCacheClientProxyTestHook());
+    } else {
+      vm2.invoke(() -> setClientServerObserver());
+    }
+    vm2.invoke(() -> CacheClientNotifierDUnitTest.createClientWithLocator(lnPort, "localhost", getTestMethodName() + "_PR" , "123", durable));
+
+    vm0.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm0.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    int serverPort3 = vm0.invoke(() -> WANTestBase.createCacheServer());
+    
+    if (durable) {
+      vm1.invoke(() -> checkResultAndUnsetCacheClientProxyTestHook());
+    } else {
+      vm2.invoke(() -> checkResultAndUnsetClientServerObserver());
+    }
+    
+    boolean vm0_proxy = checkProxyIsPrimary(vm0);
+    boolean vm1_proxy = checkProxyIsPrimary(vm1);
+    assertTrue(vm1_proxy || vm0_proxy);
+    
+    // close the current primary cache server, then re-test
+    vm1.invoke(()-> CacheClientNotifierDUnitTest.closeACacheServer(serverPort2));
+    vm0_proxy = checkProxyIsPrimary(vm0);
+    vm1_proxy = checkProxyIsPrimary(vm1);
+    assertTrue(vm1_proxy || vm0_proxy);
+    
+    disconnectAllFromDS();
+  }
+
+  public static void setClientServerObserver()
+  {
+    PoolImpl.AFTER_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG = true;
+    ClientServerObserverHolder
+    .setInstance(new ClientServerObserverAdapter() {
+      public void afterPrimaryIdentificationFromBackup(ServerLocation primaryEndpoint)
+      {
+        LogService.getLogger().info("After primary is set");
+        afterPrimaryCount++;
+      }
+    });
+  }
+
+  public static void checkResultAndUnsetClientServerObserver()
+  {
+    PoolImpl.AFTER_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG = false;
+    // setPrimary only happened once
+    assertEquals(1, afterPrimaryCount);
+    afterPrimaryCount = 0;
+  }
+
+  public static void setCacheClientProxyTestHook()
+  {
+    CacheClientProxy.testHook = new CacheClientProxy.TestHook() {
+      @Override
+      public void doTestHook(String spot) {
+        if (spot.equals("CLIENT_RECONNECTED")) {
+          afterProxyReinitialized++;
+        }
+      }
+    };
+  }
+
+  public static void checkResultAndUnsetCacheClientProxyTestHook()
+  {
+    // Reinitialize only happened once
+    CacheClientProxy.testHook = null;
+    assertEquals(1, afterProxyReinitialized);
+    afterProxyReinitialized = 0;
+  }
+  
+  private boolean checkProxyIsPrimary(VM vm) {
+    SerializableCallable checkProxyIsPrimary = new SerializableCallable() {
+      @Override
+      public Object call() throws Exception {
+        final CacheClientNotifier ccn = CacheClientNotifier.getInstance();
+        
+        Wait.waitForCriterion(new WaitCriterion() {
+          public boolean done() {
+            return ccn.getClientProxies().size() == 1; 
+          }
+          public String description() {
+            return null;
+          }
+        }, 20000, 100, false);
+        assertEquals(1, ccn.getClientProxies().size());
+
+        Iterator iter_prox = ccn.getClientProxies().iterator();
+        assertEquals(1, ccn.getClientProxies().size());
+        CacheClientProxy proxy = (CacheClientProxy)iter_prox.next();
+        return proxy.isPrimary();
+      }
+    };
+    return (Boolean)vm.invoke(checkProxyIsPrimary);
+  }
+}