You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by go...@apache.org on 2017/09/22 22:52:07 UTC
[geode] 01/03: GEODE-3546: Finish new protocol Locator stats.
This is an automated email from the ASF dual-hosted git repository.
gosullivan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
commit b974668bd7479c3dabea24ec3daa8c35bcadae8a
Author: Galen O'Sullivan <go...@pivotal.io>
AuthorDate: Wed Sep 20 13:35:09 2017 -0700
GEODE-3546: Finish new protocol Locator stats.
- Increment and decrement connection count stats.
- Add stat checking to a locator test.
- Add stats to a second test method.
- Prevent locator state from one test from influencing another test.
Signed-off-by: Galen O'Sullivan <go...@pivotal.io>
---
.../distributed/internal/InternalLocator.java | 4 +-
.../distributed/internal/tcpserver/TcpServer.java | 14 +-
.../acceptance/LocatorConnectionDUnitTest.java | 239 +++++++++++++++++----
3 files changed, 210 insertions(+), 47 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
index c4541c3..489e647 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
@@ -1335,10 +1335,8 @@ public class InternalLocator extends Locator implements ConnectListener {
try {
this.stats.hookupStats(sys,
SocketCreator.getLocalHost().getCanonicalHostName() + '-' + this.server.getBindAddress());
- ClientProtocolMessageHandler messageHandler = this.server.getMessageHandler();
+ ClientProtocolMessageHandler messageHandler = this.server.getClientProtocolMessageHandler();
if (messageHandler != null) {
- // GEODE-3546 - this should create locator-specific stats but is creating client/server
- // stats
messageHandler.initializeStatistics("LocatorStats", sys);
}
} catch (UnknownHostException e) {
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
index cf8e477..85b2ace 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
@@ -133,7 +133,7 @@ public class TcpServer {
private final PoolStatHelper poolHelper;
private final InternalLocator internalLocator;
private final TcpHandler handler;
- private ClientProtocolMessageHandler messageHandler;
+ private ClientProtocolMessageHandler clientProtocolMessageHandler;
private PooledExecutorWithDMStats executor;
@@ -158,20 +158,20 @@ public class TcpServer {
/**
* returns the message handler used for client/locator communications processing
*/
- public ClientProtocolMessageHandler getMessageHandler() {
- return messageHandler;
+ public ClientProtocolMessageHandler getClientProtocolMessageHandler() {
+ return clientProtocolMessageHandler;
}
public TcpServer(int port, InetAddress bind_address, Properties sslConfig,
DistributionConfigImpl cfg, TcpHandler handler, PoolStatHelper poolHelper,
ThreadGroup threadGroup, String threadName, InternalLocator internalLocator,
- ClientProtocolMessageHandler messageHandler) {
+ ClientProtocolMessageHandler clientProtocolMessageHandler) {
this.port = port;
this.bind_address = bind_address;
this.handler = handler;
this.poolHelper = poolHelper;
this.internalLocator = internalLocator;
- this.messageHandler = messageHandler;
+ this.clientProtocolMessageHandler = clientProtocolMessageHandler;
// register DSFID types first; invoked explicitly so that all message type
// initializations do not happen in first deserialization on a possibly
// "precious" thread
@@ -381,8 +381,10 @@ public class TcpServer {
if (gossipVersion == NON_GOSSIP_REQUEST_VERSION) {
if (input.readUnsignedByte() == PROTOBUF_CLIENT_SERVER_PROTOCOL
&& Boolean.getBoolean("geode.feature-protobuf-protocol")) {
- messageHandler.receiveMessage(input, socket.getOutputStream(),
+ clientProtocolMessageHandler.getStatistics().clientConnected();
+ clientProtocolMessageHandler.receiveMessage(input, socket.getOutputStream(),
new MessageExecutionContext(internalLocator));
+ clientProtocolMessageHandler.getStatistics().clientDisconnected();
} else {
rejectUnknownProtocolConnection(socket, gossipVersion);
}
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/LocatorConnectionDUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/LocatorConnectionDUnitTest.java
index 4e310d2..fe03740 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/LocatorConnectionDUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/LocatorConnectionDUnitTest.java
@@ -17,21 +17,25 @@ package org.apache.geode.protocol.acceptance;
import static org.apache.geode.internal.cache.tier.CommunicationMode.ProtobufClientServerProtocol;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.net.Socket;
import java.util.Properties;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
-import org.junit.contrib.java.lang.system.RestoreSystemProperties;
import org.junit.experimental.categories.Category;
+import org.apache.geode.Statistics;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.Locator;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
import org.apache.geode.internal.protocol.protobuf.ServerAPI;
@@ -42,7 +46,10 @@ import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities;
import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.RMIException;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
import org.apache.geode.test.junit.categories.DistributedTest;
/*
@@ -51,29 +58,31 @@ import org.apache.geode.test.junit.categories.DistributedTest;
@Category(DistributedTest.class)
public class LocatorConnectionDUnitTest extends JUnit4CacheTestCase {
- private Socket socket;
-
@Rule
- public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
+ public final DistributedRestoreSystemProperties restoreSystemProperties =
+ new DistributedRestoreSystemProperties();
@Before
public void setup() throws IOException {
- Host host = Host.getHost(0);
- int locatorPort = DistributedTestUtils.getDUnitLocatorPort();
startCacheWithCacheServer();
Host.getLocator().invoke(() -> System.setProperty("geode.feature-protobuf-protocol", "true"));
+ }
- socket = new Socket(host.getHostName(), locatorPort);
+ private Socket createSocket() throws IOException {
+ Host host = Host.getHost(0);
+ int locatorPort = DistributedTestUtils.getDUnitLocatorPort();
+ Socket socket = new Socket(host.getHostName(), locatorPort);
DataOutputStream dataOutputStream = new DataOutputStream(socket.getOutputStream());
dataOutputStream.writeInt(0);
// Using the constant from AcceptorImpl to ensure that magic byte is the same
dataOutputStream.writeByte(ProtobufClientServerProtocol.getModeNumber());
+ return socket;
}
+ // Test getAvailableServers twice, validating stats before any messages, after 1, and after 2.
@Test
- public void testEchoProtobufMessageFromLocator()
- throws IOException, InvalidProtocolMessageException {
+ public void testGetAvailableServersWithStats() throws Throwable {
ClientProtocol.Request.Builder protobufRequestBuilder =
ProtobufUtilities.createProtobufRequestBuilder();
ClientProtocol.Message getAvailableServersRequestMessage =
@@ -81,48 +90,202 @@ public class LocatorConnectionDUnitTest extends JUnit4CacheTestCase {
protobufRequestBuilder.setGetAvailableServersRequest(
ProtobufRequestUtilities.createGetAvailableServersRequest()).build());
- ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
- protobufProtocolSerializer.serialize(getAvailableServersRequestMessage,
- socket.getOutputStream());
+ try {
+ ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
- ClientProtocol.Message getAvailableServersResponseMessage =
- protobufProtocolSerializer.deserialize(socket.getInputStream());
- assertEquals(1233445, getAvailableServersResponseMessage.getMessageHeader().getCorrelationId());
- assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE,
- getAvailableServersResponseMessage.getMessageTypeCase());
- ClientProtocol.Response messageResponse = getAvailableServersResponseMessage.getResponse();
- assertEquals(ClientProtocol.Response.ResponseAPICase.GETAVAILABLESERVERSRESPONSE,
- messageResponse.getResponseAPICase());
- ServerAPI.GetAvailableServersResponse getAvailableServersResponse =
- messageResponse.getGetAvailableServersResponse();
- assertEquals(1, getAvailableServersResponse.getServersCount());
+ try (Socket socket = createSocket()) {
+ long messagesReceived = getMessagesReceived();
+ long messagesSent = getMessagesSent();
+ int clientConnectionStarts = getClientConnectionStarts();
+ int clientConnectionTerminations = getClientConnectionTerminations();
+
+ protobufProtocolSerializer.serialize(getAvailableServersRequestMessage,
+ socket.getOutputStream());
+
+ validateGetAvailableServersResponse(protobufProtocolSerializer, socket.getInputStream());
+
+ Host.getLocator().invoke(() -> {
+ InternalDistributedSystem distributedSystem =
+ (InternalDistributedSystem) Locator.getLocator().getDistributedSystem();
+
+ Statistics[] protobufServerStats = distributedSystem
+ .findStatisticsByType(distributedSystem.findType("ProtobufServerStats"));
+ assertEquals(1, protobufServerStats.length);
+ Statistics statistics = protobufServerStats[0];
+ assertEquals(0, statistics.get("currentClientConnections"));
+ assertEquals(messagesReceived + 1, statistics.get("messagesReceived"));
+ assertEquals(messagesSent + 1, statistics.get("messagesSent"));
+ assertTrue(statistics.get("bytesReceived").longValue() > 0);
+ assertTrue(statistics.get("bytesSent").longValue() > 0);
+ assertEquals(clientConnectionStarts, statistics.get("clientConnectionStarts"));
+ assertEquals(clientConnectionTerminations + 1,
+ statistics.get("clientConnectionTerminations"));
+ assertEquals(0L, statistics.get("authorizationViolations"));
+ assertEquals(0L, statistics.get("authenticationFailures"));
+ });
+ }
+
+ try (Socket socket = createSocket()) {
+ long messagesReceived = getMessagesReceived();
+ long messagesSent = getMessagesSent();
+ int clientConnectionStarts = getClientConnectionStarts();
+ int clientConnectionTerminations = getClientConnectionTerminations();
+
+ protobufProtocolSerializer.serialize(getAvailableServersRequestMessage,
+ socket.getOutputStream());
+
+ validateGetAvailableServersResponse(protobufProtocolSerializer, socket.getInputStream());
+
+ Host.getLocator().invoke(() -> {
+ InternalDistributedSystem distributedSystem =
+ (InternalDistributedSystem) Locator.getLocator().getDistributedSystem();
+
+ Statistics[] protobufServerStats = distributedSystem
+ .findStatisticsByType(distributedSystem.findType("ProtobufServerStats"));
+ assertEquals(1, protobufServerStats.length);
+ Statistics statistics = protobufServerStats[0];
+ assertEquals(0, statistics.get("currentClientConnections"));
+ assertEquals(messagesReceived + 1, statistics.get("messagesReceived"));
+ assertEquals(messagesSent + 1, statistics.get("messagesSent"));
+ assertTrue(statistics.get("bytesReceived").longValue() > 0);
+ assertTrue(statistics.get("bytesSent").longValue() > 0);
+ assertEquals(clientConnectionStarts, statistics.get("clientConnectionStarts"));
+ assertEquals(clientConnectionTerminations + 1,
+ statistics.get("clientConnectionTerminations"));
+ assertEquals(0L, statistics.get("authorizationViolations"));
+ assertEquals(0L, statistics.get("authenticationFailures"));
+ });
+ }
+ } catch (RMIException e) {
+ throw e.getCause(); // so that assertions propagate properly.
+ }
}
@Test
public void testInvalidOperationReturnsFailure()
throws IOException, InvalidProtocolMessageException {
- ClientProtocol.Request.Builder protobufRequestBuilder =
- ProtobufUtilities.createProtobufRequestBuilder();
- ClientProtocol.Message getAvailableServersRequestMessage =
- ProtobufUtilities.createProtobufMessage(ProtobufUtilities.createMessageHeader(1233445),
- protobufRequestBuilder
- .setGetRegionNamesRequest(ProtobufRequestUtilities.createGetRegionNamesRequest())
- .build());
+ IgnoredException ignoredInvalidExecutionContext =
+ IgnoredException.addIgnoredException("Invalid execution context");
+ try (Socket socket = createSocket()) {
+
+ ClientProtocol.Request.Builder protobufRequestBuilder =
+ ProtobufUtilities.createProtobufRequestBuilder();
+ ClientProtocol.Message getRegionNamesRequestMessage =
+ ProtobufUtilities.createProtobufMessage(ProtobufUtilities.createMessageHeader(1233445),
+ protobufRequestBuilder
+ .setGetRegionNamesRequest(ProtobufRequestUtilities.createGetRegionNamesRequest())
+ .build());
+
+ long messagesReceived = getMessagesReceived();
+ long messagesSent = getMessagesSent();
+ int clientConnectionStarts = getClientConnectionStarts();
+ int clientConnectionTerminations = getClientConnectionTerminations();
+
+ ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
+ protobufProtocolSerializer.serialize(getRegionNamesRequestMessage, socket.getOutputStream());
+
+ ClientProtocol.Message getAvailableServersResponseMessage =
+ protobufProtocolSerializer.deserialize(socket.getInputStream());
+ assertEquals(1233445,
+ getAvailableServersResponseMessage.getMessageHeader().getCorrelationId());
+ assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE,
+ getAvailableServersResponseMessage.getMessageTypeCase());
+ ClientProtocol.Response messageResponse = getAvailableServersResponseMessage.getResponse();
+ assertEquals(ClientProtocol.Response.ResponseAPICase.ERRORRESPONSE,
+ messageResponse.getResponseAPICase());
+ assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION.codeValue,
+ messageResponse.getErrorResponse().getError().getErrorCode());
+
+ Host.getLocator().invoke(() -> {
+ InternalDistributedSystem distributedSystem =
+ (InternalDistributedSystem) Locator.getLocator().getDistributedSystem();
+
+ Statistics[] protobufServerStats = distributedSystem
+ .findStatisticsByType(distributedSystem.findType("ProtobufServerStats"));
+ assertEquals(1, protobufServerStats.length);
+ Statistics statistics = protobufServerStats[0];
+ assertEquals(0, statistics.get("currentClientConnections"));
+ assertEquals(messagesReceived + 1, statistics.get("messagesReceived"));
+ assertEquals(messagesSent + 1, statistics.get("messagesSent"));
+ assertTrue(statistics.get("bytesReceived").longValue() > 0);
+ assertTrue(statistics.get("bytesSent").longValue() > 0);
+ assertEquals(clientConnectionStarts, statistics.get("clientConnectionStarts"));
+ assertEquals(clientConnectionTerminations + 1,
+ statistics.get("clientConnectionTerminations"));
+ assertEquals(0L, statistics.get("authorizationViolations"));
+ assertEquals(0L, statistics.get("authenticationFailures"));
+ });
+ }
+ ignoredInvalidExecutionContext.remove();
+ }
+
+ private Long getMessagesReceived() {
+ return Host.getLocator().invoke(() -> {
+ InternalDistributedSystem distributedSystem =
+ (InternalDistributedSystem) Locator.getLocator().getDistributedSystem();
+
+ Statistics[] protobufServerStats =
+ distributedSystem.findStatisticsByType(distributedSystem.findType("ProtobufServerStats"));
+ assertEquals(1, protobufServerStats.length);
+ Statistics statistics = protobufServerStats[0];
+ return statistics.get("messagesReceived").longValue();
+ });
+ }
+
+ private Long getMessagesSent() {
+ return Host.getLocator().invoke(() -> {
+ InternalDistributedSystem distributedSystem =
+ (InternalDistributedSystem) Locator.getLocator().getDistributedSystem();
+
+ Statistics[] protobufServerStats =
+ distributedSystem.findStatisticsByType(distributedSystem.findType("ProtobufServerStats"));
+ assertEquals(1, protobufServerStats.length);
+ Statistics statistics = protobufServerStats[0];
+ return statistics.get("messagesSent").longValue();
+ });
+ }
+
+ private Integer getClientConnectionStarts() {
+ return Host.getLocator().invoke(() -> {
+ InternalDistributedSystem distributedSystem =
+ (InternalDistributedSystem) Locator.getLocator().getDistributedSystem();
+
+ Statistics[] protobufServerStats =
+ distributedSystem.findStatisticsByType(distributedSystem.findType("ProtobufServerStats"));
+ assertEquals(1, protobufServerStats.length);
+ Statistics statistics = protobufServerStats[0];
+ return statistics.get("clientConnectionStarts").intValue();
+ });
+ }
- ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
- protobufProtocolSerializer.serialize(getAvailableServersRequestMessage,
- socket.getOutputStream());
+ private Integer getClientConnectionTerminations() {
+ return Host.getLocator().invoke(() -> {
+ InternalDistributedSystem distributedSystem =
+ (InternalDistributedSystem) Locator.getLocator().getDistributedSystem();
+ Statistics[] protobufServerStats =
+ distributedSystem.findStatisticsByType(distributedSystem.findType("ProtobufServerStats"));
+ assertEquals(1, protobufServerStats.length);
+ Statistics statistics = protobufServerStats[0];
+ return statistics.get("clientConnectionTerminations").intValue();
+ });
+ }
+
+ private void validateGetAvailableServersResponse(
+ ProtobufProtocolSerializer protobufProtocolSerializer, InputStream inputStream)
+ throws InvalidProtocolMessageException, IOException {
ClientProtocol.Message getAvailableServersResponseMessage =
- protobufProtocolSerializer.deserialize(socket.getInputStream());
+ protobufProtocolSerializer.deserialize(inputStream);
+ assertNotNull(getAvailableServersResponseMessage);
assertEquals(1233445, getAvailableServersResponseMessage.getMessageHeader().getCorrelationId());
assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE,
getAvailableServersResponseMessage.getMessageTypeCase());
ClientProtocol.Response messageResponse = getAvailableServersResponseMessage.getResponse();
- assertEquals(ClientProtocol.Response.ResponseAPICase.ERRORRESPONSE,
+ assertEquals(ClientProtocol.Response.ResponseAPICase.GETAVAILABLESERVERSRESPONSE,
messageResponse.getResponseAPICase());
- assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION.codeValue,
- messageResponse.getErrorResponse().getError().getErrorCode());
+ ServerAPI.GetAvailableServersResponse getAvailableServersResponse =
+ messageResponse.getGetAvailableServersResponse();
+ assertEquals(1, getAvailableServersResponse.getServersCount());
}
@Override
--
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.