You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by go...@apache.org on 2017/09/22 20:39:32 UTC
[geode] branch develop updated: GEODE-3080 Add a
multiple-connections-per-thread test for new protocol.
This is an automated email from the ASF dual-hosted git repository.
gosullivan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new f8740c8 GEODE-3080 Add a multiple-connections-per-thread test for new protocol.
f8740c8 is described below
commit f8740c88b5b04231417c63f3fd9ab9c9594c4538
Author: Galen O'Sullivan <go...@pivotal.io>
AuthorDate: Fri Sep 8 09:57:27 2017 -0700
GEODE-3080 Add a multiple-connections-per-thread test for new protocol.
Signed-off-by: Hitesh Khamesra <hk...@pivotal.io>
---
.../acceptance/CacheMaxConnectionJUnitTest.java | 146 ++++++++++++++++-----
1 file changed, 113 insertions(+), 33 deletions(-)
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/CacheMaxConnectionJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/CacheMaxConnectionJUnitTest.java
index 3c81608..8f95573 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/CacheMaxConnectionJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/CacheMaxConnectionJUnitTest.java
@@ -16,12 +16,20 @@
package org.apache.geode.protocol.acceptance;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
-import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.awaitility.Awaitility;
@@ -31,7 +39,6 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.RestoreSystemProperties;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.apache.geode.cache.Cache;
@@ -44,8 +51,12 @@ import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.internal.cache.tier.CommunicationMode;
import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
import org.apache.geode.internal.net.SocketCreatorFactory;
+import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
+import org.apache.geode.protocol.MessageUtil;
+import org.apache.geode.protocol.exception.InvalidProtocolMessageException;
import org.apache.geode.protocol.protobuf.ProtobufSerializationService;
-import org.apache.geode.serialization.SerializationService;
+import org.apache.geode.protocol.protobuf.serializer.ProtobufProtocolSerializer;
+import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
import org.apache.geode.test.junit.categories.IntegrationTest;
/**
@@ -53,6 +64,9 @@ import org.apache.geode.test.junit.categories.IntegrationTest;
*/
@Category(IntegrationTest.class)
public class CacheMaxConnectionJUnitTest {
+ private static final String TEST_KEY = "testKey";
+ private static final String TEST_VALUE = "testValue";
+ private static final int TEST_PUT_CORRELATION_ID = 12355;
private final String TEST_REGION = "testRegion";
@@ -66,6 +80,8 @@ public class CacheMaxConnectionJUnitTest {
@Rule
public TestName testName = new TestName();
+ private ProtobufSerializationService serializationService;
+ private ProtobufProtocolSerializer protobufProtocolSerializer;
@Before
public void setup() throws Exception {
@@ -90,6 +106,9 @@ public class CacheMaxConnectionJUnitTest {
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
outputStream = socket.getOutputStream();
outputStream.write(110);
+
+ serializationService = new ProtobufSerializationService();
+ protobufProtocolSerializer = new ProtobufProtocolSerializer();
}
@After
@@ -100,60 +119,121 @@ public class CacheMaxConnectionJUnitTest {
}
@Test
- public void testNewProtocolRespectsMaxConnectionLimit() throws IOException, InterruptedException {
- cache.getDistributedSystem().disconnect();
+ public void testNewProtocolRespectsMaxConnectionLimit_notSelector() throws Exception {
+ testNewProtocolRespectsMaxConnectionLimit(0, false);
+ }
- CacheFactory cacheFactory = new CacheFactory();
- cacheFactory.set(ConfigurationProperties.LOCATORS, "");
- cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0");
- cache = cacheFactory.create();
+ @Test
+ public void testNewProtocolRespectsMaxConnectionLimit_isSelector() throws Exception {
+ testNewProtocolRespectsMaxConnectionLimit(4, true);
+ }
+
+ private void testNewProtocolRespectsMaxConnectionLimit(int threads, boolean isSelector)
+ throws Exception {
+ final int connections = 16;
+
+ List<CacheServer> cacheServers = cache.getCacheServers();
+ assertEquals(1, cacheServers.size());
+ cacheServers.get(0).stop();
CacheServer cacheServer = cache.addCacheServer();
final int cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort();
cacheServer.setPort(cacheServerPort);
- cacheServer.setMaxConnections(16);
- cacheServer.setMaxThreads(16);
+ cacheServer.setMaxConnections(connections);
+ cacheServer.setMaxThreads(threads);
cacheServer.start();
AcceptorImpl acceptor = ((CacheServerImpl) cacheServer).getAcceptor();
- // Start 16 sockets, which is exactly the maximum that the server will support.
- Socket[] sockets = new Socket[16];
- for (int i = 0; i < 16; i++) {
- Socket socket = new Socket("localhost", cacheServerPort);
- sockets[i] = socket;
- Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
- socket.getOutputStream()
- .write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+ if (isSelector) {
+ assertTrue(acceptor.isSelector());
+ } else {
+ assertFalse(acceptor.isSelector());
}
- // try to start a new socket, expecting it to be disconnected.
- try (Socket socket = new Socket("localhost", cacheServerPort)) {
- Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
- socket.getOutputStream()
- .write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
- assertEquals(-1, socket.getInputStream().read()); // EOF implies disconnected.
- }
+ validateSocketCreationAndDestruction(cacheServerPort, connections);
- for (Socket currentSocket : sockets) {
- currentSocket.close();
- }
+ // Once all connections are closed, the acceptor should have a connection count of 0.
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .until(() -> acceptor.getClientServerCnxCount() == 0);
+
+ // do it again, just to be sure there's no leak somewhere else.
+ validateSocketCreationAndDestruction(cacheServerPort, connections);
// Once all connections are closed, the acceptor should have a connection count of 0.
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> acceptor.getClientServerCnxCount() == 0);
- // Try to start 16 new connections, again at the limit.
- for (int i = 0; i < 16; i++) {
- Socket socket = new Socket("localhost", cacheServerPort);
- sockets[i] = socket;
+ }
+
+ // Start exactly as many that the server will support, check that they work.
+ // test that creating one more causes it to be disconnected.
+ // Close all the sockets when we're done.
+ private void validateSocketCreationAndDestruction(int cacheServerPort, int connections)
+ throws Exception {
+ final Socket[] sockets = new Socket[connections];
+
+ ExecutorService executor = Executors.newFixedThreadPool(20);
+
+ // Used to assert the exception is non-null.
+ ArrayList<Callable<Exception>> callables = new ArrayList<>();
+
+ for (int i = 0; i < connections; i++) {
+ final int j = i;
+ callables.add(() -> {
+ try {
+ Socket socket = new Socket("localhost", cacheServerPort);
+ sockets[j] = socket;
+
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
+ OutputStream outputStream = socket.getOutputStream();
+ outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+
+ ClientProtocol.Message putMessage =
+ MessageUtil.makePutRequestMessage(serializationService, TEST_KEY, TEST_VALUE,
+ TEST_REGION, ProtobufUtilities.createMessageHeader(TEST_PUT_CORRELATION_ID));
+ protobufProtocolSerializer.serialize(putMessage, outputStream);
+ validatePutResponse(socket, protobufProtocolSerializer);
+ } catch (Exception e) {
+ return e;
+ }
+ return null;
+ });
+ }
+ List<Future<Exception>> futures = executor.invokeAll(callables);
+
+ for (Future<Exception> f : futures) {
+ assertNull(f.get());
+ }
+
+ // try to start a new socket, expecting it to be disconnected.
+ try (Socket socket = new Socket("localhost", cacheServerPort)) {
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
socket.getOutputStream()
.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+ assertEquals(-1, socket.getInputStream().read()); // EOF implies disconnected.
}
for (Socket currentSocket : sockets) {
currentSocket.close();
}
}
+
+ private void validatePutResponse(Socket socket,
+ ProtobufProtocolSerializer protobufProtocolSerializer) throws Exception {
+ ClientProtocol.Response response =
+ deserializeResponse(socket, protobufProtocolSerializer, TEST_PUT_CORRELATION_ID);
+ assertEquals(ClientProtocol.Response.ResponseAPICase.PUTRESPONSE,
+ response.getResponseAPICase());
+ }
+
+ private ClientProtocol.Response deserializeResponse(Socket socket,
+ ProtobufProtocolSerializer protobufProtocolSerializer, int expectedCorrelationId)
+ throws InvalidProtocolMessageException, IOException {
+ ClientProtocol.Message message =
+ protobufProtocolSerializer.deserialize(socket.getInputStream());
+ assertEquals(expectedCorrelationId, message.getMessageHeader().getCorrelationId());
+ assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, message.getMessageTypeCase());
+ return message.getResponse();
+ }
}
--
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].