You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by go...@apache.org on 2017/09/22 20:39:32 UTC

[geode] branch develop updated: GEODE-3080 Add a multiple-connections-per-thread test for new protocol.

This is an automated email from the ASF dual-hosted git repository.

gosullivan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new f8740c8  GEODE-3080 Add a multiple-connections-per-thread test for new protocol.
f8740c8 is described below

commit f8740c88b5b04231417c63f3fd9ab9c9594c4538
Author: Galen O'Sullivan <go...@pivotal.io>
AuthorDate: Fri Sep 8 09:57:27 2017 -0700

    GEODE-3080 Add a multiple-connections-per-thread test for new protocol.
    
    Signed-off-by: Hitesh Khamesra <hk...@pivotal.io>
---
 .../acceptance/CacheMaxConnectionJUnitTest.java    | 146 ++++++++++++++++-----
 1 file changed, 113 insertions(+), 33 deletions(-)

diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/CacheMaxConnectionJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/CacheMaxConnectionJUnitTest.java
index 3c81608..8f95573 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/CacheMaxConnectionJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/CacheMaxConnectionJUnitTest.java
@@ -16,12 +16,20 @@
 package org.apache.geode.protocol.acceptance;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.awaitility.Awaitility;
@@ -31,7 +39,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.contrib.java.lang.system.RestoreSystemProperties;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 
 import org.apache.geode.cache.Cache;
@@ -44,8 +51,12 @@ import org.apache.geode.internal.cache.CacheServerImpl;
 import org.apache.geode.internal.cache.tier.CommunicationMode;
 import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
 import org.apache.geode.internal.net.SocketCreatorFactory;
+import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
+import org.apache.geode.protocol.MessageUtil;
+import org.apache.geode.protocol.exception.InvalidProtocolMessageException;
 import org.apache.geode.protocol.protobuf.ProtobufSerializationService;
-import org.apache.geode.serialization.SerializationService;
+import org.apache.geode.protocol.protobuf.serializer.ProtobufProtocolSerializer;
+import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 
 /**
@@ -53,6 +64,9 @@ import org.apache.geode.test.junit.categories.IntegrationTest;
  */
 @Category(IntegrationTest.class)
 public class CacheMaxConnectionJUnitTest {
+  private static final String TEST_KEY = "testKey";
+  private static final String TEST_VALUE = "testValue";
+  private static final int TEST_PUT_CORRELATION_ID = 12355;
   private final String TEST_REGION = "testRegion";
 
 
@@ -66,6 +80,8 @@ public class CacheMaxConnectionJUnitTest {
 
   @Rule
   public TestName testName = new TestName();
+  private ProtobufSerializationService serializationService;
+  private ProtobufProtocolSerializer protobufProtocolSerializer;
 
   @Before
   public void setup() throws Exception {
@@ -90,6 +106,9 @@ public class CacheMaxConnectionJUnitTest {
     Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
     outputStream = socket.getOutputStream();
     outputStream.write(110);
+
+    serializationService = new ProtobufSerializationService();
+    protobufProtocolSerializer = new ProtobufProtocolSerializer();
   }
 
   @After
@@ -100,60 +119,121 @@ public class CacheMaxConnectionJUnitTest {
   }
 
   @Test
-  public void testNewProtocolRespectsMaxConnectionLimit() throws IOException, InterruptedException {
-    cache.getDistributedSystem().disconnect();
+  public void testNewProtocolRespectsMaxConnectionLimit_notSelector() throws Exception {
+    testNewProtocolRespectsMaxConnectionLimit(0, false);
+  }
 
-    CacheFactory cacheFactory = new CacheFactory();
-    cacheFactory.set(ConfigurationProperties.LOCATORS, "");
-    cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0");
-    cache = cacheFactory.create();
+  @Test
+  public void testNewProtocolRespectsMaxConnectionLimit_isSelector() throws Exception {
+    testNewProtocolRespectsMaxConnectionLimit(4, true);
+  }
+
+  private void testNewProtocolRespectsMaxConnectionLimit(int threads, boolean isSelector)
+      throws Exception {
+    final int connections = 16;
+
+    List<CacheServer> cacheServers = cache.getCacheServers();
+    assertEquals(1, cacheServers.size());
+    cacheServers.get(0).stop();
 
     CacheServer cacheServer = cache.addCacheServer();
     final int cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort();
     cacheServer.setPort(cacheServerPort);
-    cacheServer.setMaxConnections(16);
-    cacheServer.setMaxThreads(16);
+    cacheServer.setMaxConnections(connections);
+    cacheServer.setMaxThreads(threads);
     cacheServer.start();
 
     AcceptorImpl acceptor = ((CacheServerImpl) cacheServer).getAcceptor();
 
-    // Start 16 sockets, which is exactly the maximum that the server will support.
-    Socket[] sockets = new Socket[16];
-    for (int i = 0; i < 16; i++) {
-      Socket socket = new Socket("localhost", cacheServerPort);
-      sockets[i] = socket;
-      Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
-      socket.getOutputStream()
-          .write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+    if (isSelector) {
+      assertTrue(acceptor.isSelector());
+    } else {
+      assertFalse(acceptor.isSelector());
     }
 
-    // try to start a new socket, expecting it to be disconnected.
-    try (Socket socket = new Socket("localhost", cacheServerPort)) {
-      Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
-      socket.getOutputStream()
-          .write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-      assertEquals(-1, socket.getInputStream().read()); // EOF implies disconnected.
-    }
+    validateSocketCreationAndDestruction(cacheServerPort, connections);
 
-    for (Socket currentSocket : sockets) {
-      currentSocket.close();
-    }
+    // Once all connections are closed, the acceptor should have a connection count of 0.
+    Awaitility.await().atMost(5, TimeUnit.SECONDS)
+        .until(() -> acceptor.getClientServerCnxCount() == 0);
+
+    // do it again, just to be sure there's no leak somewhere else.
+    validateSocketCreationAndDestruction(cacheServerPort, connections);
 
     // Once all connections are closed, the acceptor should have a connection count of 0.
     Awaitility.await().atMost(5, TimeUnit.SECONDS)
         .until(() -> acceptor.getClientServerCnxCount() == 0);
 
-    // Try to start 16 new connections, again at the limit.
-    for (int i = 0; i < 16; i++) {
-      Socket socket = new Socket("localhost", cacheServerPort);
-      sockets[i] = socket;
+  }
+
+  // Start exactly as many that the server will support, check that they work.
+  // test that creating one more causes it to be disconnected.
+  // Close all the sockets when we're done.
+  private void validateSocketCreationAndDestruction(int cacheServerPort, int connections)
+      throws Exception {
+    final Socket[] sockets = new Socket[connections];
+
+    ExecutorService executor = Executors.newFixedThreadPool(20);
+
+    // Used to assert the exception is non-null.
+    ArrayList<Callable<Exception>> callables = new ArrayList<>();
+
+    for (int i = 0; i < connections; i++) {
+      final int j = i;
+      callables.add(() -> {
+        try {
+          Socket socket = new Socket("localhost", cacheServerPort);
+          sockets[j] = socket;
+
+          Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
+          OutputStream outputStream = socket.getOutputStream();
+          outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+
+          ClientProtocol.Message putMessage =
+              MessageUtil.makePutRequestMessage(serializationService, TEST_KEY, TEST_VALUE,
+                  TEST_REGION, ProtobufUtilities.createMessageHeader(TEST_PUT_CORRELATION_ID));
+          protobufProtocolSerializer.serialize(putMessage, outputStream);
+          validatePutResponse(socket, protobufProtocolSerializer);
+        } catch (Exception e) {
+          return e;
+        }
+        return null;
+      });
+    }
+    List<Future<Exception>> futures = executor.invokeAll(callables);
+
+    for (Future<Exception> f : futures) {
+      assertNull(f.get());
+    }
+
+    // try to start a new socket, expecting it to be disconnected.
+    try (Socket socket = new Socket("localhost", cacheServerPort)) {
       Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
       socket.getOutputStream()
           .write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+      assertEquals(-1, socket.getInputStream().read()); // EOF implies disconnected.
     }
 
     for (Socket currentSocket : sockets) {
       currentSocket.close();
     }
   }
+
+  private void validatePutResponse(Socket socket,
+      ProtobufProtocolSerializer protobufProtocolSerializer) throws Exception {
+    ClientProtocol.Response response =
+        deserializeResponse(socket, protobufProtocolSerializer, TEST_PUT_CORRELATION_ID);
+    assertEquals(ClientProtocol.Response.ResponseAPICase.PUTRESPONSE,
+        response.getResponseAPICase());
+  }
+
+  private ClientProtocol.Response deserializeResponse(Socket socket,
+      ProtobufProtocolSerializer protobufProtocolSerializer, int expectedCorrelationId)
+      throws InvalidProtocolMessageException, IOException {
+    ClientProtocol.Message message =
+        protobufProtocolSerializer.deserialize(socket.getInputStream());
+    assertEquals(expectedCorrelationId, message.getMessageHeader().getCorrelationId());
+    assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, message.getMessageTypeCase());
+    return message.getResponse();
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].