You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by go...@apache.org on 2017/09/07 17:28:21 UTC

[geode] branch develop updated: GEODE-3555: proper new client protocol closure with more than max conns.

This is an automated email from the ASF dual-hosted git repository.

gosullivan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new b92b9a9  GEODE-3555: proper new client protocol closure with more than max conns.
b92b9a9 is described below

commit b92b9a9b0dcaa7f2095d43c02cd9e8141fe45fc8
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Sep 1 14:37:36 2017 -0700

    GEODE-3555: proper new client protocol closure with more than max conns.
    
    Don't send an old client message when closing new client connections.
    
    This test also tests GEODE-3077.
    
    This closes #756
    
    Signed-off-by: Galen O'Sullivan <go...@pivotal.io>
---
 .../internal/cache/tier/CommunicationMode.java     |  8 +++
 .../internal/cache/tier/sockets/AcceptorImpl.java  | 14 +++--
 .../RoundTripCacheConnectionJUnitTest.java         | 69 ++++++++++++++++++++--
 3 files changed, 79 insertions(+), 12 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CommunicationMode.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CommunicationMode.java
index 9abf2f3..053a556 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CommunicationMode.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CommunicationMode.java
@@ -101,6 +101,14 @@ public enum CommunicationMode {
   }
 
   /**
+   * non-protobuf (legacy) protocols expect a refusal message before a connection is closed. This is
+   * unintelligible to protobuf protocol clients.
+   */
+  public boolean expectsConnectionRefusalMessage() {
+    return this != ProtobufClientServerProtocol;
+  }
+
+  /**
    * The modeNumber is the byte written on-wire that indicates this connection mode
    */
   private byte modeNumber;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index fa36fab..f9bc596 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -1471,12 +1471,14 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
             LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_CURRENT_CONNECTION_COUNT_OF_1_IS_GREATER_THAN_OR_EQUAL_TO_THE_CONFIGURED_MAX_OF_2,
             new Object[] {socket.getInetAddress(), Integer.valueOf(curCnt),
                 Integer.valueOf(this.maxConnections)}));
-        try {
-          ServerHandShakeProcessor.refuse(socket.getOutputStream(),
-              LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0
-                  .toLocalizedString(Integer.valueOf(this.maxConnections)));
-        } catch (Exception ex) {
-          logger.debug("rejection message failed", ex);
+        if (mode.expectsConnectionRefusalMessage()) {
+          try {
+            ServerHandShakeProcessor.refuse(socket.getOutputStream(),
+                LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0
+                    .toLocalizedString(Integer.valueOf(this.maxConnections)));
+          } catch (Exception ex) {
+            logger.debug("rejection message failed", ex);
+          }
         }
         closeSocket(socket);
         return;
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java
index 9819a4d..5a46d4d 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java
@@ -56,15 +56,16 @@ import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.admin.SSLConfig;
 import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.CommunicationMode;
 import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
 import org.apache.geode.internal.cache.tier.sockets.GenericProtocolServerConnection;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.net.SocketCreatorFactory;
-import org.apache.geode.protocol.exception.InvalidProtocolMessageException;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
-import org.apache.geode.protocol.protobuf.ProtobufSerializationService;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
+import org.apache.geode.protocol.exception.InvalidProtocolMessageException;
+import org.apache.geode.protocol.protobuf.ProtobufSerializationService;
 import org.apache.geode.protocol.protobuf.serializer.ProtobufProtocolSerializer;
 import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities;
 import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
@@ -144,7 +145,7 @@ public class RoundTripCacheConnectionJUnitTest {
     }
     Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
     outputStream = socket.getOutputStream();
-    outputStream.write(110);
+    outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
 
     serializationService = new ProtobufSerializationService();
   }
@@ -178,7 +179,7 @@ public class RoundTripCacheConnectionJUnitTest {
     Socket socket = new Socket("localhost", cacheServerPort);
     Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
     OutputStream outputStream = socket.getOutputStream();
-    outputStream.write(110);
+    outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
 
     ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
     Set<BasicTypes.Entry> putEntries = new HashSet<>();
@@ -220,7 +221,7 @@ public class RoundTripCacheConnectionJUnitTest {
     Socket socket = new Socket("localhost", cacheServerPort);
     Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
     OutputStream outputStream = socket.getOutputStream();
-    outputStream.write(110);
+    outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
 
     ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
     Set<BasicTypes.Entry> putEntries = new HashSet<>();
@@ -290,6 +291,62 @@ public class RoundTripCacheConnectionJUnitTest {
   }
 
   @Test
+  public void testNewProtocolRespectsMaxConnectionLimit() throws IOException, InterruptedException {
+    cache.close();
+
+    CacheFactory cacheFactory = new CacheFactory();
+    Cache cache = cacheFactory.create();
+
+    CacheServer cacheServer = cache.addCacheServer();
+    final int cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort();
+    cacheServer.setPort(cacheServerPort);
+    cacheServer.setMaxConnections(16);
+    cacheServer.setMaxThreads(16);
+    cacheServer.start();
+
+    AcceptorImpl acceptor = ((CacheServerImpl) cacheServer).getAcceptor();
+
+    // Start 16 sockets, which is exactly the maximum that the server will support.
+    Socket[] sockets = new Socket[16];
+    for (int i = 0; i < 16; i++) {
+      Socket socket = new Socket("localhost", cacheServerPort);
+      sockets[i] = socket;
+      Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
+      socket.getOutputStream()
+          .write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+    }
+
+    // try to start a new socket, expecting it to be disconnected.
+    try (Socket socket = new Socket("localhost", cacheServerPort)) {
+      Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
+      socket.getOutputStream()
+          .write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+      assertEquals(-1, socket.getInputStream().read()); // EOF implies disconnected.
+    }
+
+    for (Socket currentSocket : sockets) {
+      currentSocket.close();
+    }
+
+    // Once all connections are closed, the acceptor should have a connection count of 0.
+    Awaitility.await().atMost(5, TimeUnit.SECONDS)
+        .until(() -> acceptor.getClientServerCnxCount() == 0);
+
+    // Try to start 16 new connections, again at the limit.
+    for (int i = 0; i < 16; i++) {
+      Socket socket = new Socket("localhost", cacheServerPort);
+      sockets[i] = socket;
+      Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
+      socket.getOutputStream()
+          .write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+    }
+
+    for (Socket currentSocket : sockets) {
+      currentSocket.close();
+    }
+  }
+
+  @Test
   public void testNewProtocolGetRegionNamesCallSucceeds() throws Exception {
     int correlationId = TEST_GET_CORRELATION_ID; // reuse this value for this test
 
@@ -316,7 +373,7 @@ public class RoundTripCacheConnectionJUnitTest {
     Socket socket = new Socket("localhost", cacheServerPort);
     Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
     OutputStream outputStream = socket.getOutputStream();
-    outputStream.write(110);
+    outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
 
     ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
     ClientProtocol.Message getRegionMessage = MessageUtil.makeGetRegionRequestMessage(TEST_REGION,

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].