You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2017/09/07 00:06:10 UTC

[geode] 01/01: GEODE-3082 Integrated GenericProtocolServerConnection with ClientHealthMonitor.

This is an automated email from the ASF dual-hosted git repository.

hiteshkhamesra pushed a commit to branch feature/GEODE-3082
in repository https://gitbox.apache.org/repos/asf/geode.git

commit e7ca79659a80134a780b426aedc843551b276a84
Author: Alexander Murmann <am...@pivotal.io>
AuthorDate: Wed Sep 6 17:03:40 2017 -0700

    GEODE-3082 Integrated GenericProtocolServerConnection with ClientHealthMonitor.
    
    1. Now GenericProtocolServerConnection creates ClientProxyMembershipId.
    2. added test where CHM closes the connection.
    3. added test where CHM doesn't close the connection
    
    Signed-off-by: Hitesh Khamesra <hk...@pivotal.io>
---
 .../membership/InternalDistributedMember.java      |  2 +-
 .../cache/tier/sockets/ClientHealthMonitor.java    |  2 +-
 .../sockets/GenericProtocolServerConnection.java   | 31 ++++++++++++-
 .../cache/tier/sockets/ServerConnection.java       |  2 +-
 .../RoundTripCacheConnectionJUnitTest.java         | 52 ++++++++++++++++++++++
 5 files changed, 85 insertions(+), 4 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java
index e152756..3deb13c 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java
@@ -117,7 +117,7 @@ public class InternalDistributedMember implements DistributedMember, Externaliza
    * member for use in the P2P cache. Use of other constructors can break
    * network-partition-detection.
    *
-   * @param i
+   * @param i the intet address
    * @param p the membership port
    * @param splitBrainEnabled whether this feature is enabled for the member
    * @param canBeCoordinator whether the member is eligible to be the membership coordinator
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
index 226da8a..c3f9529 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
@@ -100,7 +100,7 @@ public class ClientHealthMonitor {
   /**
    * The interval between client monitor iterations
    */
-  final protected static long CLIENT_MONITOR_INTERVAL = 1000;
+  final public static long CLIENT_MONITOR_INTERVAL = 1000;
 
   final private CacheClientNotifierStats stats;
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
index 6c81028..f4c4c3e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
@@ -15,9 +15,13 @@
 
 package org.apache.geode.internal.cache.tier.sockets;
 
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.Acceptor;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
+import org.apache.geode.internal.cache.tier.CommunicationMode;
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.security.SecurityManager;
 import org.apache.geode.security.server.Authenticator;
@@ -26,6 +30,8 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 
 /**
@@ -36,6 +42,7 @@ public class GenericProtocolServerConnection extends ServerConnection {
   private final ClientProtocolMessageHandler messageHandler;
   private final SecurityManager securityManager;
   private final Authenticator authenticator;
+  private ClientProxyMembershipID clientProxyMembershipID;
 
   /**
    * Creates a new <code>GenericProtocolServerConnection</code> that processes messages received
@@ -50,6 +57,10 @@ public class GenericProtocolServerConnection extends ServerConnection {
     securityManager = securityService.getSecurityManager();
     this.messageHandler = newClientProtocol;
     this.authenticator = authenticator;
+
+    setClientProxyMembershipId();
+
+    doHandShake(CommunicationMode.ProtobufClientServerProtocol.getModeNumber(), 0);
   }
 
   @Override
@@ -72,16 +83,34 @@ public class GenericProtocolServerConnection extends ServerConnection {
       logger.warn(e);
       this.setFlagProcessMessagesAsFalse();
       setClientDisconnectedException(e);
+    } finally {
+      acceptor.getClientHealthMonitor().receivedPing(this.clientProxyMembershipID);
     }
   }
 
+  private void setClientProxyMembershipId() {
+    ServerLocation serverLocation = new ServerLocation(
+        ((InetSocketAddress) this.getSocket().getRemoteSocketAddress()).getHostName(),
+        this.getSocketPort());
+    DistributedMember distributedMember = new InternalDistributedMember(serverLocation);
+    // no handshake for new client protocol.
+    clientProxyMembershipID = new ClientProxyMembershipID(distributedMember);
+  }
+
   @Override
   protected boolean doHandShake(byte epType, int qSize) {
-    // no handshake for new client protocol.
+    getAcceptor().getClientHealthMonitor().registerClient(clientProxyMembershipID);
+    getAcceptor().getClientHealthMonitor().addConnection(clientProxyMembershipID, this);
+
     return true;
   }
 
   @Override
+  protected int getClientReadTimeout() {
+    return 10000;
+  }
+
+  @Override
   public boolean isClientServerConnection() {
     return true;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index b243d8e..3aa6cd6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -1265,7 +1265,7 @@ public abstract class ServerConnection implements Runnable {
     this.requestSpecificTimeout = -1;
   }
 
-  int getClientReadTimeout() {
+  protected int getClientReadTimeout() {
     if (this.requestSpecificTimeout == -1)
       return this.handshake.getClientReadTimeout();
     else
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java
index 9819a4d..21a7db7 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java
@@ -24,6 +24,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTOR
 import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -57,6 +58,7 @@ import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.admin.SSLConfig;
 import org.apache.geode.internal.cache.CacheServerImpl;
 import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
+import org.apache.geode.internal.cache.tier.sockets.ClientHealthMonitor;
 import org.apache.geode.internal.cache.tier.sockets.GenericProtocolServerConnection;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.net.SocketCreatorFactory;
@@ -117,6 +119,7 @@ public class RoundTripCacheConnectionJUnitTest {
     boolean useSSL = testName.getMethodName().startsWith("useSSL_");
 
     Properties properties = new Properties();
+    properties.put("log-level", "debug");
     if (useSSL) {
       updatePropertiesForSSLCache(properties);
     }
@@ -125,13 +128,17 @@ public class RoundTripCacheConnectionJUnitTest {
     cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0");
     cacheFactory.set(ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION, "false");
     cacheFactory.set(ConfigurationProperties.USE_CLUSTER_CONFIGURATION, "false");
+
+
     cache = cacheFactory.create();
 
     CacheServer cacheServer = cache.addCacheServer();
     cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort();
     cacheServer.setPort(cacheServerPort);
+    cacheServer.setMaximumTimeBetweenPings(200);
     cacheServer.start();
 
+
     RegionFactory<Object, Object> regionFactory = cache.createRegionFactory();
     regionFactory.create(TEST_REGION);
 
@@ -151,9 +158,54 @@ public class RoundTripCacheConnectionJUnitTest {
 
   @After
   public void cleanup() throws IOException {
+    System.out.println("beginning of after");
     cache.close();
     socket.close();
     SocketCreatorFactory.close();
+    System.out.println("done with after");
+  }
+
+  @Test
+  public void testUnresponsiveClientsGetDisconnected() throws Exception {
+    ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
+    ClientProtocol.Message putMessage =
+        MessageUtil.makePutRequestMessage(serializationService, TEST_KEY, TEST_VALUE, TEST_REGION,
+            ProtobufUtilities.createMessageHeader(TEST_PUT_CORRELATION_ID));
+
+
+    Runnable runnable = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          // send a PUT message
+          protobufProtocolSerializer.serialize(putMessage, outputStream);
+          assertEquals(-1, socket.getInputStream().read());
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    };
+
+    Awaitility.await().atMost(1500, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS)
+        .pollDelay(ClientHealthMonitor.CLIENT_MONITOR_INTERVAL + 1, TimeUnit.MILLISECONDS)
+        .until(runnable);
+  }
+
+  @Test
+  public void testResponsiveClientsStaysConnected() throws Exception {
+    ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
+    ClientProtocol.Message putMessage =
+        MessageUtil.makePutRequestMessage(serializationService, TEST_KEY, TEST_VALUE, TEST_REGION,
+            ProtobufUtilities.createMessageHeader(TEST_PUT_CORRELATION_ID));
+
+    int timeout = 1500;
+    int interval = 100;
+    for (int i = 0; i < timeout; i += interval) {
+      // send a PUT message
+      protobufProtocolSerializer.serialize(putMessage, outputStream);
+      assertNotEquals(-1, socket.getInputStream().read());
+      Thread.sleep(interval);
+    }
   }
 
   @Test

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.