You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by go...@apache.org on 2017/09/07 17:28:21 UTC
[geode] branch develop updated: GEODE-3555: proper new client
protocol closure with more than max conns.
This is an automated email from the ASF dual-hosted git repository.
gosullivan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new b92b9a9 GEODE-3555: proper new client protocol closure with more than max conns.
b92b9a9 is described below
commit b92b9a9b0dcaa7f2095d43c02cd9e8141fe45fc8
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Sep 1 14:37:36 2017 -0700
GEODE-3555: proper new client protocol closure with more than max conns.
Don't send an old client message when closing new client connections.
This test also tests GEODE-3077.
This closes #756
Signed-off-by: Galen O'Sullivan <go...@pivotal.io>
---
.../internal/cache/tier/CommunicationMode.java | 8 +++
.../internal/cache/tier/sockets/AcceptorImpl.java | 14 +++--
.../RoundTripCacheConnectionJUnitTest.java | 69 ++++++++++++++++++++--
3 files changed, 79 insertions(+), 12 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CommunicationMode.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CommunicationMode.java
index 9abf2f3..053a556 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CommunicationMode.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/CommunicationMode.java
@@ -101,6 +101,14 @@ public enum CommunicationMode {
}
/**
+ * non-protobuf (legacy) protocols expect a refusal message before a connection is closed. This is
+ * unintelligible to protobuf protocol clients.
+ */
+ public boolean expectsConnectionRefusalMessage() {
+ return this != ProtobufClientServerProtocol;
+ }
+
+ /**
* The modeNumber is the byte written on-wire that indicates this connection mode
*/
private byte modeNumber;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index fa36fab..f9bc596 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -1471,12 +1471,14 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_CURRENT_CONNECTION_COUNT_OF_1_IS_GREATER_THAN_OR_EQUAL_TO_THE_CONFIGURED_MAX_OF_2,
new Object[] {socket.getInetAddress(), Integer.valueOf(curCnt),
Integer.valueOf(this.maxConnections)}));
- try {
- ServerHandShakeProcessor.refuse(socket.getOutputStream(),
- LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0
- .toLocalizedString(Integer.valueOf(this.maxConnections)));
- } catch (Exception ex) {
- logger.debug("rejection message failed", ex);
+ if (mode.expectsConnectionRefusalMessage()) {
+ try {
+ ServerHandShakeProcessor.refuse(socket.getOutputStream(),
+ LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0
+ .toLocalizedString(Integer.valueOf(this.maxConnections)));
+ } catch (Exception ex) {
+ logger.debug("rejection message failed", ex);
+ }
}
closeSocket(socket);
return;
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java
index 9819a4d..5a46d4d 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java
@@ -56,15 +56,16 @@ import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.admin.SSLConfig;
import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.CommunicationMode;
import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
import org.apache.geode.internal.cache.tier.sockets.GenericProtocolServerConnection;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.net.SocketCreatorFactory;
-import org.apache.geode.protocol.exception.InvalidProtocolMessageException;
import org.apache.geode.internal.protocol.protobuf.BasicTypes;
import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
-import org.apache.geode.protocol.protobuf.ProtobufSerializationService;
import org.apache.geode.internal.protocol.protobuf.RegionAPI;
+import org.apache.geode.protocol.exception.InvalidProtocolMessageException;
+import org.apache.geode.protocol.protobuf.ProtobufSerializationService;
import org.apache.geode.protocol.protobuf.serializer.ProtobufProtocolSerializer;
import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities;
import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
@@ -144,7 +145,7 @@ public class RoundTripCacheConnectionJUnitTest {
}
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
outputStream = socket.getOutputStream();
- outputStream.write(110);
+ outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
serializationService = new ProtobufSerializationService();
}
@@ -178,7 +179,7 @@ public class RoundTripCacheConnectionJUnitTest {
Socket socket = new Socket("localhost", cacheServerPort);
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
OutputStream outputStream = socket.getOutputStream();
- outputStream.write(110);
+ outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
Set<BasicTypes.Entry> putEntries = new HashSet<>();
@@ -220,7 +221,7 @@ public class RoundTripCacheConnectionJUnitTest {
Socket socket = new Socket("localhost", cacheServerPort);
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
OutputStream outputStream = socket.getOutputStream();
- outputStream.write(110);
+ outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
Set<BasicTypes.Entry> putEntries = new HashSet<>();
@@ -290,6 +291,62 @@ public class RoundTripCacheConnectionJUnitTest {
}
@Test
+ public void testNewProtocolRespectsMaxConnectionLimit() throws IOException, InterruptedException {
+ cache.close();
+
+ CacheFactory cacheFactory = new CacheFactory();
+ Cache cache = cacheFactory.create();
+
+ CacheServer cacheServer = cache.addCacheServer();
+ final int cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort();
+ cacheServer.setPort(cacheServerPort);
+ cacheServer.setMaxConnections(16);
+ cacheServer.setMaxThreads(16);
+ cacheServer.start();
+
+ AcceptorImpl acceptor = ((CacheServerImpl) cacheServer).getAcceptor();
+
+ // Start 16 sockets, which is exactly the maximum that the server will support.
+ Socket[] sockets = new Socket[16];
+ for (int i = 0; i < 16; i++) {
+ Socket socket = new Socket("localhost", cacheServerPort);
+ sockets[i] = socket;
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
+ socket.getOutputStream()
+ .write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+ }
+
+ // try to start a new socket, expecting it to be disconnected.
+ try (Socket socket = new Socket("localhost", cacheServerPort)) {
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
+ socket.getOutputStream()
+ .write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+ assertEquals(-1, socket.getInputStream().read()); // EOF implies disconnected.
+ }
+
+ for (Socket currentSocket : sockets) {
+ currentSocket.close();
+ }
+
+ // Once all connections are closed, the acceptor should have a connection count of 0.
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .until(() -> acceptor.getClientServerCnxCount() == 0);
+
+ // Try to start 16 new connections, again at the limit.
+ for (int i = 0; i < 16; i++) {
+ Socket socket = new Socket("localhost", cacheServerPort);
+ sockets[i] = socket;
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
+ socket.getOutputStream()
+ .write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+ }
+
+ for (Socket currentSocket : sockets) {
+ currentSocket.close();
+ }
+ }
+
+ @Test
public void testNewProtocolGetRegionNamesCallSucceeds() throws Exception {
int correlationId = TEST_GET_CORRELATION_ID; // reuse this value for this test
@@ -316,7 +373,7 @@ public class RoundTripCacheConnectionJUnitTest {
Socket socket = new Socket("localhost", cacheServerPort);
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
OutputStream outputStream = socket.getOutputStream();
- outputStream.write(110);
+ outputStream.write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
ClientProtocol.Message getRegionMessage = MessageUtil.makeGetRegionRequestMessage(TEST_REGION,
--
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].