You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by go...@apache.org on 2017/09/22 22:52:07 UTC

[geode] 01/03: GEODE-3546: Finish new protocol Locator stats.

This is an automated email from the ASF dual-hosted git repository.

gosullivan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git

commit b974668bd7479c3dabea24ec3daa8c35bcadae8a
Author: Galen O'Sullivan <go...@pivotal.io>
AuthorDate: Wed Sep 20 13:35:09 2017 -0700

    GEODE-3546: Finish new protocol Locator stats.
    
    - Increment and decrement connection count stats.
    - Add stat checking to a locator test.
    - Add stats to a second test method.
    - Prevent locator state from one test from influencing another test.
    
    Signed-off-by: Galen O'Sullivan <go...@pivotal.io>
---
 .../distributed/internal/InternalLocator.java      |   4 +-
 .../distributed/internal/tcpserver/TcpServer.java  |  14 +-
 .../acceptance/LocatorConnectionDUnitTest.java     | 239 +++++++++++++++++----
 3 files changed, 210 insertions(+), 47 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
index c4541c3..489e647 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
@@ -1335,10 +1335,8 @@ public class InternalLocator extends Locator implements ConnectListener {
     try {
       this.stats.hookupStats(sys,
           SocketCreator.getLocalHost().getCanonicalHostName() + '-' + this.server.getBindAddress());
-      ClientProtocolMessageHandler messageHandler = this.server.getMessageHandler();
+      ClientProtocolMessageHandler messageHandler = this.server.getClientProtocolMessageHandler();
       if (messageHandler != null) {
-        // GEODE-3546 - this should create locator-specific stats but is creating client/server
-        // stats
         messageHandler.initializeStatistics("LocatorStats", sys);
       }
     } catch (UnknownHostException e) {
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
index cf8e477..85b2ace 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
@@ -133,7 +133,7 @@ public class TcpServer {
   private final PoolStatHelper poolHelper;
   private final InternalLocator internalLocator;
   private final TcpHandler handler;
-  private ClientProtocolMessageHandler messageHandler;
+  private ClientProtocolMessageHandler clientProtocolMessageHandler;
 
 
   private PooledExecutorWithDMStats executor;
@@ -158,20 +158,20 @@ public class TcpServer {
   /**
    * returns the message handler used for client/locator communications processing
    */
-  public ClientProtocolMessageHandler getMessageHandler() {
-    return messageHandler;
+  public ClientProtocolMessageHandler getClientProtocolMessageHandler() {
+    return clientProtocolMessageHandler;
   }
 
   public TcpServer(int port, InetAddress bind_address, Properties sslConfig,
       DistributionConfigImpl cfg, TcpHandler handler, PoolStatHelper poolHelper,
       ThreadGroup threadGroup, String threadName, InternalLocator internalLocator,
-      ClientProtocolMessageHandler messageHandler) {
+      ClientProtocolMessageHandler clientProtocolMessageHandler) {
     this.port = port;
     this.bind_address = bind_address;
     this.handler = handler;
     this.poolHelper = poolHelper;
     this.internalLocator = internalLocator;
-    this.messageHandler = messageHandler;
+    this.clientProtocolMessageHandler = clientProtocolMessageHandler;
     // register DSFID types first; invoked explicitly so that all message type
     // initializations do not happen in first deserialization on a possibly
     // "precious" thread
@@ -381,8 +381,10 @@ public class TcpServer {
         if (gossipVersion == NON_GOSSIP_REQUEST_VERSION) {
           if (input.readUnsignedByte() == PROTOBUF_CLIENT_SERVER_PROTOCOL
               && Boolean.getBoolean("geode.feature-protobuf-protocol")) {
-            messageHandler.receiveMessage(input, socket.getOutputStream(),
+            clientProtocolMessageHandler.getStatistics().clientConnected();
+            clientProtocolMessageHandler.receiveMessage(input, socket.getOutputStream(),
                 new MessageExecutionContext(internalLocator));
+            clientProtocolMessageHandler.getStatistics().clientDisconnected();
           } else {
             rejectUnknownProtocolConnection(socket, gossipVersion);
           }
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/LocatorConnectionDUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/LocatorConnectionDUnitTest.java
index 4e310d2..fe03740 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/LocatorConnectionDUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/acceptance/LocatorConnectionDUnitTest.java
@@ -17,21 +17,25 @@ package org.apache.geode.protocol.acceptance;
 
 import static org.apache.geode.internal.cache.tier.CommunicationMode.ProtobufClientServerProtocol;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.Socket;
 import java.util.Properties;
 
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.contrib.java.lang.system.RestoreSystemProperties;
 import org.junit.experimental.categories.Category;
 
+import org.apache.geode.Statistics;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.Locator;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
 import org.apache.geode.internal.protocol.protobuf.ServerAPI;
@@ -42,7 +46,10 @@ import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities;
 import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
 import org.apache.geode.test.dunit.DistributedTestUtils;
 import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.RMIException;
 import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
 import org.apache.geode.test.junit.categories.DistributedTest;
 
 /*
@@ -51,29 +58,31 @@ import org.apache.geode.test.junit.categories.DistributedTest;
 @Category(DistributedTest.class)
 public class LocatorConnectionDUnitTest extends JUnit4CacheTestCase {
 
-  private Socket socket;
-
   @Rule
-  public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
+  public final DistributedRestoreSystemProperties restoreSystemProperties =
+      new DistributedRestoreSystemProperties();
 
   @Before
   public void setup() throws IOException {
-    Host host = Host.getHost(0);
-    int locatorPort = DistributedTestUtils.getDUnitLocatorPort();
     startCacheWithCacheServer();
 
     Host.getLocator().invoke(() -> System.setProperty("geode.feature-protobuf-protocol", "true"));
+  }
 
-    socket = new Socket(host.getHostName(), locatorPort);
+  private Socket createSocket() throws IOException {
+    Host host = Host.getHost(0);
+    int locatorPort = DistributedTestUtils.getDUnitLocatorPort();
+    Socket socket = new Socket(host.getHostName(), locatorPort);
     DataOutputStream dataOutputStream = new DataOutputStream(socket.getOutputStream());
     dataOutputStream.writeInt(0);
     // Using the constant from AcceptorImpl to ensure that magic byte is the same
     dataOutputStream.writeByte(ProtobufClientServerProtocol.getModeNumber());
+    return socket;
   }
 
+  // Test getAvailableServers twice, validating stats before any messages, after 1, and after 2.
   @Test
-  public void testEchoProtobufMessageFromLocator()
-      throws IOException, InvalidProtocolMessageException {
+  public void testGetAvailableServersWithStats() throws Throwable {
     ClientProtocol.Request.Builder protobufRequestBuilder =
         ProtobufUtilities.createProtobufRequestBuilder();
     ClientProtocol.Message getAvailableServersRequestMessage =
@@ -81,48 +90,202 @@ public class LocatorConnectionDUnitTest extends JUnit4CacheTestCase {
             protobufRequestBuilder.setGetAvailableServersRequest(
                 ProtobufRequestUtilities.createGetAvailableServersRequest()).build());
 
-    ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
-    protobufProtocolSerializer.serialize(getAvailableServersRequestMessage,
-        socket.getOutputStream());
+    try {
+      ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
 
-    ClientProtocol.Message getAvailableServersResponseMessage =
-        protobufProtocolSerializer.deserialize(socket.getInputStream());
-    assertEquals(1233445, getAvailableServersResponseMessage.getMessageHeader().getCorrelationId());
-    assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE,
-        getAvailableServersResponseMessage.getMessageTypeCase());
-    ClientProtocol.Response messageResponse = getAvailableServersResponseMessage.getResponse();
-    assertEquals(ClientProtocol.Response.ResponseAPICase.GETAVAILABLESERVERSRESPONSE,
-        messageResponse.getResponseAPICase());
-    ServerAPI.GetAvailableServersResponse getAvailableServersResponse =
-        messageResponse.getGetAvailableServersResponse();
-    assertEquals(1, getAvailableServersResponse.getServersCount());
+      try (Socket socket = createSocket()) {
+        long messagesReceived = getMessagesReceived();
+        long messagesSent = getMessagesSent();
+        int clientConnectionStarts = getClientConnectionStarts();
+        int clientConnectionTerminations = getClientConnectionTerminations();
+
+        protobufProtocolSerializer.serialize(getAvailableServersRequestMessage,
+            socket.getOutputStream());
+
+        validateGetAvailableServersResponse(protobufProtocolSerializer, socket.getInputStream());
+
+        Host.getLocator().invoke(() -> {
+          InternalDistributedSystem distributedSystem =
+              (InternalDistributedSystem) Locator.getLocator().getDistributedSystem();
+
+          Statistics[] protobufServerStats = distributedSystem
+              .findStatisticsByType(distributedSystem.findType("ProtobufServerStats"));
+          assertEquals(1, protobufServerStats.length);
+          Statistics statistics = protobufServerStats[0];
+          assertEquals(0, statistics.get("currentClientConnections"));
+          assertEquals(messagesReceived + 1, statistics.get("messagesReceived"));
+          assertEquals(messagesSent + 1, statistics.get("messagesSent"));
+          assertTrue(statistics.get("bytesReceived").longValue() > 0);
+          assertTrue(statistics.get("bytesSent").longValue() > 0);
+          assertEquals(clientConnectionStarts, statistics.get("clientConnectionStarts"));
+          assertEquals(clientConnectionTerminations + 1,
+              statistics.get("clientConnectionTerminations"));
+          assertEquals(0L, statistics.get("authorizationViolations"));
+          assertEquals(0L, statistics.get("authenticationFailures"));
+        });
+      }
+
+      try (Socket socket = createSocket()) {
+        long messagesReceived = getMessagesReceived();
+        long messagesSent = getMessagesSent();
+        int clientConnectionStarts = getClientConnectionStarts();
+        int clientConnectionTerminations = getClientConnectionTerminations();
+
+        protobufProtocolSerializer.serialize(getAvailableServersRequestMessage,
+            socket.getOutputStream());
+
+        validateGetAvailableServersResponse(protobufProtocolSerializer, socket.getInputStream());
+
+        Host.getLocator().invoke(() -> {
+          InternalDistributedSystem distributedSystem =
+              (InternalDistributedSystem) Locator.getLocator().getDistributedSystem();
+
+          Statistics[] protobufServerStats = distributedSystem
+              .findStatisticsByType(distributedSystem.findType("ProtobufServerStats"));
+          assertEquals(1, protobufServerStats.length);
+          Statistics statistics = protobufServerStats[0];
+          assertEquals(0, statistics.get("currentClientConnections"));
+          assertEquals(messagesReceived + 1, statistics.get("messagesReceived"));
+          assertEquals(messagesSent + 1, statistics.get("messagesSent"));
+          assertTrue(statistics.get("bytesReceived").longValue() > 0);
+          assertTrue(statistics.get("bytesSent").longValue() > 0);
+          assertEquals(clientConnectionStarts, statistics.get("clientConnectionStarts"));
+          assertEquals(clientConnectionTerminations + 1,
+              statistics.get("clientConnectionTerminations"));
+          assertEquals(0L, statistics.get("authorizationViolations"));
+          assertEquals(0L, statistics.get("authenticationFailures"));
+        });
+      }
+    } catch (RMIException e) {
+      throw e.getCause(); // so that assertions propagate properly.
+    }
   }
 
   @Test
   public void testInvalidOperationReturnsFailure()
       throws IOException, InvalidProtocolMessageException {
-    ClientProtocol.Request.Builder protobufRequestBuilder =
-        ProtobufUtilities.createProtobufRequestBuilder();
-    ClientProtocol.Message getAvailableServersRequestMessage =
-        ProtobufUtilities.createProtobufMessage(ProtobufUtilities.createMessageHeader(1233445),
-            protobufRequestBuilder
-                .setGetRegionNamesRequest(ProtobufRequestUtilities.createGetRegionNamesRequest())
-                .build());
+    IgnoredException ignoredInvalidExecutionContext =
+        IgnoredException.addIgnoredException("Invalid execution context");
+    try (Socket socket = createSocket()) {
+
+      ClientProtocol.Request.Builder protobufRequestBuilder =
+          ProtobufUtilities.createProtobufRequestBuilder();
+      ClientProtocol.Message getRegionNamesRequestMessage =
+          ProtobufUtilities.createProtobufMessage(ProtobufUtilities.createMessageHeader(1233445),
+              protobufRequestBuilder
+                  .setGetRegionNamesRequest(ProtobufRequestUtilities.createGetRegionNamesRequest())
+                  .build());
+
+      long messagesReceived = getMessagesReceived();
+      long messagesSent = getMessagesSent();
+      int clientConnectionStarts = getClientConnectionStarts();
+      int clientConnectionTerminations = getClientConnectionTerminations();
+
+      ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
+      protobufProtocolSerializer.serialize(getRegionNamesRequestMessage, socket.getOutputStream());
+
+      ClientProtocol.Message getAvailableServersResponseMessage =
+          protobufProtocolSerializer.deserialize(socket.getInputStream());
+      assertEquals(1233445,
+          getAvailableServersResponseMessage.getMessageHeader().getCorrelationId());
+      assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE,
+          getAvailableServersResponseMessage.getMessageTypeCase());
+      ClientProtocol.Response messageResponse = getAvailableServersResponseMessage.getResponse();
+      assertEquals(ClientProtocol.Response.ResponseAPICase.ERRORRESPONSE,
+          messageResponse.getResponseAPICase());
+      assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION.codeValue,
+          messageResponse.getErrorResponse().getError().getErrorCode());
+
+      Host.getLocator().invoke(() -> {
+        InternalDistributedSystem distributedSystem =
+            (InternalDistributedSystem) Locator.getLocator().getDistributedSystem();
+
+        Statistics[] protobufServerStats = distributedSystem
+            .findStatisticsByType(distributedSystem.findType("ProtobufServerStats"));
+        assertEquals(1, protobufServerStats.length);
+        Statistics statistics = protobufServerStats[0];
+        assertEquals(0, statistics.get("currentClientConnections"));
+        assertEquals(messagesReceived + 1, statistics.get("messagesReceived"));
+        assertEquals(messagesSent + 1, statistics.get("messagesSent"));
+        assertTrue(statistics.get("bytesReceived").longValue() > 0);
+        assertTrue(statistics.get("bytesSent").longValue() > 0);
+        assertEquals(clientConnectionStarts, statistics.get("clientConnectionStarts"));
+        assertEquals(clientConnectionTerminations + 1,
+            statistics.get("clientConnectionTerminations"));
+        assertEquals(0L, statistics.get("authorizationViolations"));
+        assertEquals(0L, statistics.get("authenticationFailures"));
+      });
+    }
+    ignoredInvalidExecutionContext.remove();
+  }
+
+  private Long getMessagesReceived() {
+    return Host.getLocator().invoke(() -> {
+      InternalDistributedSystem distributedSystem =
+          (InternalDistributedSystem) Locator.getLocator().getDistributedSystem();
+
+      Statistics[] protobufServerStats =
+          distributedSystem.findStatisticsByType(distributedSystem.findType("ProtobufServerStats"));
+      assertEquals(1, protobufServerStats.length);
+      Statistics statistics = protobufServerStats[0];
+      return statistics.get("messagesReceived").longValue();
+    });
+  }
+
+  private Long getMessagesSent() {
+    return Host.getLocator().invoke(() -> {
+      InternalDistributedSystem distributedSystem =
+          (InternalDistributedSystem) Locator.getLocator().getDistributedSystem();
+
+      Statistics[] protobufServerStats =
+          distributedSystem.findStatisticsByType(distributedSystem.findType("ProtobufServerStats"));
+      assertEquals(1, protobufServerStats.length);
+      Statistics statistics = protobufServerStats[0];
+      return statistics.get("messagesSent").longValue();
+    });
+  }
+
+  private Integer getClientConnectionStarts() {
+    return Host.getLocator().invoke(() -> {
+      InternalDistributedSystem distributedSystem =
+          (InternalDistributedSystem) Locator.getLocator().getDistributedSystem();
+
+      Statistics[] protobufServerStats =
+          distributedSystem.findStatisticsByType(distributedSystem.findType("ProtobufServerStats"));
+      assertEquals(1, protobufServerStats.length);
+      Statistics statistics = protobufServerStats[0];
+      return statistics.get("clientConnectionStarts").intValue();
+    });
+  }
 
-    ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
-    protobufProtocolSerializer.serialize(getAvailableServersRequestMessage,
-        socket.getOutputStream());
+  private Integer getClientConnectionTerminations() {
+    return Host.getLocator().invoke(() -> {
+      InternalDistributedSystem distributedSystem =
+          (InternalDistributedSystem) Locator.getLocator().getDistributedSystem();
 
+      Statistics[] protobufServerStats =
+          distributedSystem.findStatisticsByType(distributedSystem.findType("ProtobufServerStats"));
+      assertEquals(1, protobufServerStats.length);
+      Statistics statistics = protobufServerStats[0];
+      return statistics.get("clientConnectionTerminations").intValue();
+    });
+  }
+
+  private void validateGetAvailableServersResponse(
+      ProtobufProtocolSerializer protobufProtocolSerializer, InputStream inputStream)
+      throws InvalidProtocolMessageException, IOException {
     ClientProtocol.Message getAvailableServersResponseMessage =
-        protobufProtocolSerializer.deserialize(socket.getInputStream());
+        protobufProtocolSerializer.deserialize(inputStream);
+    assertNotNull(getAvailableServersResponseMessage);
     assertEquals(1233445, getAvailableServersResponseMessage.getMessageHeader().getCorrelationId());
     assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE,
         getAvailableServersResponseMessage.getMessageTypeCase());
     ClientProtocol.Response messageResponse = getAvailableServersResponseMessage.getResponse();
-    assertEquals(ClientProtocol.Response.ResponseAPICase.ERRORRESPONSE,
+    assertEquals(ClientProtocol.Response.ResponseAPICase.GETAVAILABLESERVERSRESPONSE,
         messageResponse.getResponseAPICase());
-    assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION.codeValue,
-        messageResponse.getErrorResponse().getError().getErrorCode());
+    ServerAPI.GetAvailableServersResponse getAvailableServersResponse =
+        messageResponse.getGetAvailableServersResponse();
+    assertEquals(1, getAvailableServersResponse.getServersCount());
   }
 
   @Override

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.