You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2017/09/14 21:47:34 UTC

[geode] branch develop updated: GEODE-3083: New protocol should record statistics

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3d8f016  GEODE-3083: New protocol should record statistics
3d8f016 is described below

commit 3d8f0160c3725108bc96286806c07c5dcecc43a8
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Tue Sep 12 10:01:56 2017 -0700

    GEODE-3083: New protocol should record statistics
    
    Statistics implemented: connections, connection starts, connection termination,
    bytes/second received bytes/second sent, authorization violations
---
 .../distributed/internal/InternalLocator.java      |   7 +
 .../distributed/internal/tcpserver/TcpServer.java  |   8 +
 .../apache/geode/internal/cache/tier/Acceptor.java |   6 +
 .../internal/cache/tier/sockets/AcceptorImpl.java  |  15 +-
 .../cache/tier/sockets/CacheServerStats.java       | 282 +++++++++++----------
 .../tier/sockets/ClientProtocolMessageHandler.java |   7 +
 ...eHandler.java => ClientProtocolStatistics.java} |  20 +-
 .../sockets/GenericProtocolServerConnection.java   |  30 ++-
 .../tier/sockets/MessageExecutionContext.java      |  19 +-
 ...ocolMessageHandler.java => NoOpStatistics.java} |  23 +-
 .../tier/sockets/ServerConnectionFactory.java      |  26 +-
 .../tier/sockets/ServerConnectionFactoryTest.java  |   2 +-
 .../cache/tier/sockets/ServerConnectionTest.java   |   6 +-
 .../protocol/protobuf/ProtobufOpsProcessor.java    |   7 +
 .../protocol/protobuf/ProtobufStreamProcessor.java |  17 ++
 .../statistics/ProtobufClientStatistics.java       |  27 +-
 .../statistics/ProtobufClientStatisticsImpl.java   | 104 ++++++++
 .../GenericProtocolServerConnectionTest.java       |   7 +-
 .../RoundTripCacheConnectionJUnitTest.java         |  61 +++++
 .../RoundTripLocatorConnectionDUnitTest.java       |  18 ++
 .../protobuf/ProtobufStreamProcessorTest.java      |   5 +-
 .../GetAllRequestOperationHandlerJUnitTest.java    |  22 +-
 ...egionNamesRequestOperationHandlerJUnitTest.java |  10 +-
 .../GetRegionRequestOperationHandlerJUnitTest.java |  30 ++-
 .../GetRequestOperationHandlerJUnitTest.java       |  37 +--
 .../PutAllRequestOperationHandlerJUnitTest.java    |  48 ++--
 .../PutRequestOperationHandlerJUnitTest.java       |  44 ++--
 .../RemoveRequestOperationHandlerJUnitTest.java    |  40 +--
 .../statistics/NoOpProtobufStatistics.java         |  45 ++--
 29 files changed, 643 insertions(+), 330 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
index 06603cc..c4541c3 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
@@ -62,6 +62,7 @@ import org.apache.geode.distributed.internal.tcpserver.TcpServer;
 import org.apache.geode.internal.admin.remote.DistributionLocatorId;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler;
 import org.apache.geode.internal.cache.tier.sockets.TcpServerFactory;
 import org.apache.geode.internal.cache.wan.WANServiceProvider;
 import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -1334,6 +1335,12 @@ public class InternalLocator extends Locator implements ConnectListener {
     try {
       this.stats.hookupStats(sys,
           SocketCreator.getLocalHost().getCanonicalHostName() + '-' + this.server.getBindAddress());
+      ClientProtocolMessageHandler messageHandler = this.server.getMessageHandler();
+      if (messageHandler != null) {
+        // GEODE-3546 - this should create locator-specific stats but is creating client/server
+        // stats
+        messageHandler.initializeStatistics("LocatorStats", sys);
+      }
     } catch (UnknownHostException e) {
       logger.warn(e);
     }
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
index a721589..cf8e477 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
@@ -155,6 +155,13 @@ public class TcpServer {
     GOSSIP_TO_GEMFIRE_VERSION_MAP.put(OLDGOSSIPVERSION, Version.GFE_57.ordinal());
   }
 
+  /**
+   * returns the message handler used for client/locator communications processing
+   */
+  public ClientProtocolMessageHandler getMessageHandler() {
+    return messageHandler;
+  }
+
   public TcpServer(int port, InetAddress bind_address, Properties sslConfig,
       DistributionConfigImpl cfg, TcpHandler handler, PoolStatHelper poolHelper,
       ThreadGroup threadGroup, String threadName, InternalLocator internalLocator,
@@ -575,4 +582,5 @@ public class TcpServer {
   public static Map getGossipVersionMapForTestOnly() {
     return GOSSIP_TO_GEMFIRE_VERSION_MAP;
   }
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
index f062129..9596f74 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
@@ -50,6 +50,12 @@ public abstract class Acceptor {
   public abstract int getPort();
 
   /**
+   * returns the server's name string, including the inet address and port that the server is
+   * listening on
+   */
+  public abstract String getServerName();
+
+  /**
    * Closes this acceptor thread
    */
   public abstract void close();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index f9bc596..68377d8 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -511,7 +511,7 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
         }
       }
       this.localPort = port;
-      String sockName = this.serverSock.getLocalSocketAddress().toString();
+      String sockName = getServerName();
       logger.info(LocalizedMessage.create(
           LocalizedStrings.AcceptorImpl_CACHE_SERVER_CONNECTION_LISTENER_BOUND_TO_ADDRESS_0_WITH_BACKLOG_1,
           new Object[] {sockName, Integer.valueOf(backLog)}));
@@ -1178,6 +1178,17 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
     return localPort;
   }
 
+  @Override
+  public String getServerName() {
+    String name = this.serverSock.getLocalSocketAddress().toString();
+    try {
+      name = SocketCreator.getLocalHost().getCanonicalHostName() + "-" + name;
+    } catch (Exception e) {
+    }
+    return name;
+  }
+
+
   public InetAddress getServerInetAddr() {
     return this.serverSock.getInetAddress();
   }
@@ -1487,7 +1498,7 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
 
     ServerConnection serverConn = serverConnectionFactory.makeServerConnection(socket, this.cache,
         this.crHelper, this.stats, AcceptorImpl.handShakeTimeout, this.socketBufferSize,
-        communicationModeStr, communicationMode, this, this.securityService);
+        communicationModeStr, communicationMode, this, this.securityService, this.getBindAddress());
 
     synchronized (this.allSCsLock) {
       this.allSCs.add(serverConn);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerStats.java
index 4f2a0b3..1849487 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerStats.java
@@ -233,261 +233,263 @@ public class CacheServerStats implements MessageStats {
    * Add a convinience method to pass in a StatisticsFactory for Statistics construction. Helpful
    * for local Statistics operations
    * 
-   * @param f
+   * @param statisticsFactory
    * @param ownerName
    */
-  public CacheServerStats(StatisticsFactory f, String ownerName, String typeName,
-      StatisticDescriptor[] descriptiors) {
-    if (f == null) {
+  public CacheServerStats(StatisticsFactory statisticsFactory, String ownerName, String typeName,
+      StatisticDescriptor[] descriptors) {
+    if (statisticsFactory == null) {
       // Create statistics later when needed
       return;
     }
     StatisticDescriptor[] serverStatDescriptors = new StatisticDescriptor[] {
-        f.createIntCounter("getRequests", "Number of cache client get requests.", "operations"),
-        f.createLongCounter("readGetRequestTime", "Total time spent in reading get requests.",
-            "nanoseconds"),
-        f.createLongCounter("processGetTime",
-            "Total time spent in processing a cache client get request, including the time to get an object from the cache.",
-            "nanoseconds"),
-        f.createIntCounter("getResponses", "Number of get responses written to the cache client.",
-            "operations"),
-        f.createLongCounter("writeGetResponseTime", "Total time spent in writing get responses.",
-            "nanoseconds"),
-
-        f.createIntCounter("putRequests", "Number of cache client put requests.", "operations"),
-        f.createLongCounter("readPutRequestTime", "Total time spent in reading put requests.",
-            "nanoseconds"),
-        f.createLongCounter("processPutTime",
-            "Total time spent in processing a cache client put request, including the time to put an object into the cache.",
-            "nanoseconds"),
-        f.createIntCounter("putResponses", "Number of put responses written to the cache client.",
+        statisticsFactory.createIntCounter("getRequests", "Number of cache client get requests.",
             "operations"),
-        f.createLongCounter("writePutResponseTime", "Total time spent in writing put responses.",
+        statisticsFactory.createLongCounter("readGetRequestTime",
+            "Total time spent in reading get requests.", "nanoseconds"),
+        statisticsFactory.createLongCounter("processGetTime",
+            "Total time spent in processing a cache client get request, including the time to get an object from the cache.",
             "nanoseconds"),
+        statisticsFactory.createIntCounter("getResponses",
+            "Number of get responses written to the cache client.", "operations"),
+        statisticsFactory.createLongCounter("writeGetResponseTime",
+            "Total time spent in writing get responses.", "nanoseconds"),
 
-        f.createIntCounter("putAllRequests", "Number of cache client putAll requests.",
+        statisticsFactory.createIntCounter("putRequests", "Number of cache client put requests.",
             "operations"),
-        f.createLongCounter("readPutAllRequestTime", "Total time spent in reading putAll requests.",
+        statisticsFactory.createLongCounter("readPutRequestTime",
+            "Total time spent in reading put requests.", "nanoseconds"),
+        statisticsFactory.createLongCounter("processPutTime",
+            "Total time spent in processing a cache client put request, including the time to put an object into the cache.",
             "nanoseconds"),
-        f.createLongCounter("processPutAllTime",
+        statisticsFactory.createIntCounter("putResponses",
+            "Number of put responses written to the cache client.", "operations"),
+        statisticsFactory.createLongCounter("writePutResponseTime",
+            "Total time spent in writing put responses.", "nanoseconds"),
+
+        statisticsFactory.createIntCounter("putAllRequests",
+            "Number of cache client putAll requests.", "operations"),
+        statisticsFactory.createLongCounter("readPutAllRequestTime",
+            "Total time spent in reading putAll requests.", "nanoseconds"),
+        statisticsFactory.createLongCounter("processPutAllTime",
             "Total time spent in processing a cache client putAll request, including the time to put all objects into the cache.",
             "nanoseconds"),
-        f.createIntCounter("putAllResponses",
+        statisticsFactory.createIntCounter("putAllResponses",
             "Number of putAll responses written to the cache client.", "operations"),
-        f.createLongCounter("writePutAllResponseTime",
+        statisticsFactory.createLongCounter("writePutAllResponseTime",
             "Total time spent in writing putAll responses.", "nanoseconds"),
 
-        f.createIntCounter("removeAllRequests", "Number of cache client removeAll requests.",
-            "operations"),
-        f.createLongCounter("readRemoveAllRequestTime",
+        statisticsFactory.createIntCounter("removeAllRequests",
+            "Number of cache client removeAll requests.", "operations"),
+        statisticsFactory.createLongCounter("readRemoveAllRequestTime",
             "Total time spent in reading removeAll requests.", "nanoseconds"),
-        f.createLongCounter("processRemoveAllTime",
+        statisticsFactory.createLongCounter("processRemoveAllTime",
             "Total time spent in processing a cache client removeAll request, including the time to remove all objects from the cache.",
             "nanoseconds"),
-        f.createIntCounter("removeAllResponses",
+        statisticsFactory.createIntCounter("removeAllResponses",
             "Number of removeAll responses written to the cache client.", "operations"),
-        f.createLongCounter("writeRemoveAllResponseTime",
+        statisticsFactory.createLongCounter("writeRemoveAllResponseTime",
             "Total time spent in writing removeAll responses.", "nanoseconds"),
 
-        f.createIntCounter("getAllRequests", "Number of cache client getAll requests.",
-            "operations"),
-        f.createLongCounter("readGetAllRequestTime", "Total time spent in reading getAll requests.",
-            "nanoseconds"),
-        f.createLongCounter("processGetAllTime",
+        statisticsFactory.createIntCounter("getAllRequests",
+            "Number of cache client getAll requests.", "operations"),
+        statisticsFactory.createLongCounter("readGetAllRequestTime",
+            "Total time spent in reading getAll requests.", "nanoseconds"),
+        statisticsFactory.createLongCounter("processGetAllTime",
             "Total time spent in processing a cache client getAll request.", "nanoseconds"),
-        f.createIntCounter("getAllResponses",
+        statisticsFactory.createIntCounter("getAllResponses",
             "Number of getAll responses written to the cache client.", "operations"),
-        f.createLongCounter("writeGetAllResponseTime",
+        statisticsFactory.createLongCounter("writeGetAllResponseTime",
             "Total time spent in writing getAll responses.", "nanoseconds"),
 
-        f.createIntCounter("destroyRequests", "Number of cache client destroy requests.",
-            "operations"),
-        f.createLongCounter("readDestroyRequestTime",
+        statisticsFactory.createIntCounter("destroyRequests",
+            "Number of cache client destroy requests.", "operations"),
+        statisticsFactory.createLongCounter("readDestroyRequestTime",
             "Total time spent in reading destroy requests.", "nanoseconds"),
-        f.createLongCounter("processDestroyTime",
+        statisticsFactory.createLongCounter("processDestroyTime",
             "Total time spent in processing a cache client destroy request, including the time to destroy an object from the cache.",
             "nanoseconds"),
-        f.createIntCounter("destroyResponses",
+        statisticsFactory.createIntCounter("destroyResponses",
             "Number of destroy responses written to the cache client.", "operations"),
-        f.createLongCounter("writeDestroyResponseTime",
+        statisticsFactory.createLongCounter("writeDestroyResponseTime",
             "Total time spent in writing destroy responses.", "nanoseconds"),
 
-        f.createIntCounter("invalidateRequests", "Number of cache client invalidate requests.",
-            "operations"),
-        f.createLongCounter("readInvalidateRequestTime",
+        statisticsFactory.createIntCounter("invalidateRequests",
+            "Number of cache client invalidate requests.", "operations"),
+        statisticsFactory.createLongCounter("readInvalidateRequestTime",
             "Total time spent in reading invalidate requests.", "nanoseconds"),
-        f.createLongCounter("processInvalidateTime",
+        statisticsFactory.createLongCounter("processInvalidateTime",
             "Total time spent in processing a cache client invalidate request, including the time to invalidate an object from the cache.",
             "nanoseconds"),
-        f.createIntCounter("invalidateResponses",
+        statisticsFactory.createIntCounter("invalidateResponses",
             "Number of invalidate responses written to the cache client.", "operations"),
-        f.createLongCounter("writeInvalidateResponseTime",
+        statisticsFactory.createLongCounter("writeInvalidateResponseTime",
             "Total time spent in writing invalidate responses.", "nanoseconds"),
 
-        f.createIntCounter("sizeRequests", "Number of cache client size requests.", "operations"),
-        f.createLongCounter("readSizeRequestTime", "Total time spent in reading size requests.",
-            "nanoseconds"),
-        f.createLongCounter("processSizeTime",
-            "Total time spent in processing a cache client size request, including the time to size an object from the cache.",
-            "nanoseconds"),
-        f.createIntCounter("sizeResponses", "Number of size responses written to the cache client.",
+        statisticsFactory.createIntCounter("sizeRequests", "Number of cache client size requests.",
             "operations"),
-        f.createLongCounter("writeSizeResponseTime", "Total time spent in writing size responses.",
+        statisticsFactory.createLongCounter("readSizeRequestTime",
+            "Total time spent in reading size requests.", "nanoseconds"),
+        statisticsFactory.createLongCounter("processSizeTime",
+            "Total time spent in processing a cache client size request, including the time to size an object from the cache.",
             "nanoseconds"),
+        statisticsFactory.createIntCounter("sizeResponses",
+            "Number of size responses written to the cache client.", "operations"),
+        statisticsFactory.createLongCounter("writeSizeResponseTime",
+            "Total time spent in writing size responses.", "nanoseconds"),
 
 
-        f.createIntCounter("queryRequests", "Number of cache client query requests.", "operations"),
-        f.createLongCounter("readQueryRequestTime", "Total time spent in reading query requests.",
-            "nanoseconds"),
-        f.createLongCounter("processQueryTime",
+        statisticsFactory.createIntCounter("queryRequests",
+            "Number of cache client query requests.", "operations"),
+        statisticsFactory.createLongCounter("readQueryRequestTime",
+            "Total time spent in reading query requests.", "nanoseconds"),
+        statisticsFactory.createLongCounter("processQueryTime",
             "Total time spent in processing a cache client query request, including the time to destroy an object from the cache.",
             "nanoseconds"),
-        f.createIntCounter("queryResponses",
+        statisticsFactory.createIntCounter("queryResponses",
             "Number of query responses written to the cache client.", "operations"),
-        f.createLongCounter("writeQueryResponseTime",
+        statisticsFactory.createLongCounter("writeQueryResponseTime",
             "Total time spent in writing query responses.", "nanoseconds"),
 
-        f.createIntCounter("destroyRegionRequests",
+        statisticsFactory.createIntCounter("destroyRegionRequests",
             "Number of cache client destroyRegion requests.", "operations"),
-        f.createLongCounter("readDestroyRegionRequestTime",
+        statisticsFactory.createLongCounter("readDestroyRegionRequestTime",
             "Total time spent in reading destroyRegion requests.", "nanoseconds"),
-        f.createLongCounter("processDestroyRegionTime",
+        statisticsFactory.createLongCounter("processDestroyRegionTime",
             "Total time spent in processing a cache client destroyRegion request, including the time to destroy the region from the cache.",
             "nanoseconds"),
-        f.createIntCounter("destroyRegionResponses",
+        statisticsFactory.createIntCounter("destroyRegionResponses",
             "Number of destroyRegion responses written to the cache client.", "operations"),
-        f.createLongCounter("writeDestroyRegionResponseTime",
+        statisticsFactory.createLongCounter("writeDestroyRegionResponseTime",
             "Total time spent in writing destroyRegion responses.", "nanoseconds"),
 
-        f.createIntCounter("containsKeyRequests", "Number of cache client containsKey requests.",
-            "operations"),
-        f.createLongCounter("readContainsKeyRequestTime",
+        statisticsFactory.createIntCounter("containsKeyRequests",
+            "Number of cache client containsKey requests.", "operations"),
+        statisticsFactory.createLongCounter("readContainsKeyRequestTime",
             "Total time spent reading containsKey requests.", "nanoseconds"),
-        f.createLongCounter("processContainsKeyTime",
+        statisticsFactory.createLongCounter("processContainsKeyTime",
             "Total time spent processing a containsKey request.", "nanoseconds"),
-        f.createIntCounter("containsKeyResponses",
+        statisticsFactory.createIntCounter("containsKeyResponses",
             "Number of containsKey responses written to the cache client.", "operations"),
-        f.createLongCounter("writeContainsKeyResponseTime",
+        statisticsFactory.createLongCounter("writeContainsKeyResponseTime",
             "Total time spent writing containsKey responses.", "nanoseconds"),
 
-        f.createIntCounter("processBatchRequests", "Number of cache client processBatch requests.",
-            "operations"),
-        f.createLongCounter("readProcessBatchRequestTime",
+        statisticsFactory.createIntCounter("processBatchRequests",
+            "Number of cache client processBatch requests.", "operations"),
+        statisticsFactory.createLongCounter("readProcessBatchRequestTime",
             "Total time spent in reading processBatch requests.", "nanoseconds"),
-        f.createLongCounter("processBatchTime",
+        statisticsFactory.createLongCounter("processBatchTime",
             "Total time spent in processing a cache client processBatch request.", "nanoseconds"),
-        f.createIntCounter("processBatchResponses",
+        statisticsFactory.createIntCounter("processBatchResponses",
             "Number of processBatch responses written to the cache client.", "operations"),
-        f.createLongCounter("writeProcessBatchResponseTime",
+        statisticsFactory.createLongCounter("writeProcessBatchResponseTime",
             "Total time spent in writing processBatch responses.", "nanoseconds"),
-        f.createLongCounter("batchSize", "The size of the batches received.", "bytes"),
-        f.createIntCounter("clearRegionRequests", "Number of cache client clearRegion requests.",
-            "operations"),
-        f.createLongCounter("readClearRegionRequestTime",
+        statisticsFactory.createLongCounter("batchSize", "The size of the batches received.",
+            "bytes"),
+        statisticsFactory.createIntCounter("clearRegionRequests",
+            "Number of cache client clearRegion requests.", "operations"),
+        statisticsFactory.createLongCounter("readClearRegionRequestTime",
             "Total time spent in reading clearRegion requests.", "nanoseconds"),
-        f.createLongCounter("processClearRegionTime",
+        statisticsFactory.createLongCounter("processClearRegionTime",
             "Total time spent in processing a cache client clearRegion request, including the time to clear the region from the cache.",
             "nanoseconds"),
-        f.createIntCounter("clearRegionResponses",
+        statisticsFactory.createIntCounter("clearRegionResponses",
             "Number of clearRegion responses written to the cache client.", "operations"),
-        f.createLongCounter("writeClearRegionResponseTime",
+        statisticsFactory.createLongCounter("writeClearRegionResponseTime",
             "Total time spent in writing clearRegion responses.", "nanoseconds"),
-        f.createIntCounter("clientNotificationRequests",
+        statisticsFactory.createIntCounter("clientNotificationRequests",
             "Number of cache client notification requests.", "operations"),
-        f.createLongCounter("readClientNotificationRequestTime",
+        statisticsFactory.createLongCounter("readClientNotificationRequestTime",
             "Total time spent in reading client notification requests.", "nanoseconds"),
-        f.createLongCounter("processClientNotificationTime",
+        statisticsFactory.createLongCounter("processClientNotificationTime",
             "Total time spent in processing a cache client notification request.", "nanoseconds"),
 
-        f.createIntCounter("updateClientNotificationRequests",
+        statisticsFactory.createIntCounter("updateClientNotificationRequests",
             "Number of cache client notification update requests.", "operations"),
-        f.createLongCounter("readUpdateClientNotificationRequestTime",
+        statisticsFactory.createLongCounter("readUpdateClientNotificationRequestTime",
             "Total time spent in reading client notification update requests.", "nanoseconds"),
-        f.createLongCounter("processUpdateClientNotificationTime",
+        statisticsFactory.createLongCounter("processUpdateClientNotificationTime",
             "Total time spent in processing a client notification update request.", "nanoseconds"),
 
-        f.createIntCounter("clientReadyRequests", "Number of cache client ready requests.",
-            "operations"),
-        f.createLongCounter("readClientReadyRequestTime",
+        statisticsFactory.createIntCounter("clientReadyRequests",
+            "Number of cache client ready requests.", "operations"),
+        statisticsFactory.createLongCounter("readClientReadyRequestTime",
             "Total time spent in reading cache client ready requests.", "nanoseconds"),
-        f.createLongCounter("processClientReadyTime",
+        statisticsFactory.createLongCounter("processClientReadyTime",
             "Total time spent in processing a cache client ready request, including the time to destroy an object from the cache.",
             "nanoseconds"),
-        f.createIntCounter("clientReadyResponses",
+        statisticsFactory.createIntCounter("clientReadyResponses",
             "Number of client ready responses written to the cache client.", "operations"),
-        f.createLongCounter("writeClientReadyResponseTime",
+        statisticsFactory.createLongCounter("writeClientReadyResponseTime",
             "Total time spent in writing client ready responses.", "nanoseconds"),
 
-        f.createIntCounter("closeConnectionRequests",
+        statisticsFactory.createIntCounter("closeConnectionRequests",
             "Number of cache client close connection requests.", "operations"),
-        f.createLongCounter("readCloseConnectionRequestTime",
+        statisticsFactory.createLongCounter("readCloseConnectionRequestTime",
             "Total time spent in reading close connection requests.", "nanoseconds"),
-        f.createLongCounter("processCloseConnectionTime",
+        statisticsFactory.createLongCounter("processCloseConnectionTime",
             "Total time spent in processing a cache client close connection request.",
             "nanoseconds"),
-        f.createIntCounter("failedConnectionAttempts", "Number of failed connection attempts.",
-            "attempts"),
-        f.createIntGauge("currentClientConnections",
+        statisticsFactory.createIntCounter("failedConnectionAttempts",
+            "Number of failed connection attempts.", "attempts"),
+        statisticsFactory.createIntGauge("currentClientConnections",
             "Number of sockets accepted and used for client to server messaging.", "sockets"),
-        f.createIntGauge("currentQueueConnections",
+        statisticsFactory.createIntGauge("currentQueueConnections",
             "Number of sockets accepted and used for server to client messaging.", "sockets"),
-        f.createIntGauge("currentClients", "Number of client virtual machines connected.",
-            "clients"),
-        f.createIntCounter("outOfOrderGatewayBatchIds", "Number of Out of order batch IDs.",
-            "batches"),
-        f.createIntCounter("abandonedWriteRequests",
+        statisticsFactory.createIntGauge("currentClients",
+            "Number of client virtual machines connected.", "clients"),
+        statisticsFactory.createIntCounter("outOfOrderGatewayBatchIds",
+            "Number of Out of order batch IDs.", "batches"),
+        statisticsFactory.createIntCounter("abandonedWriteRequests",
             "Number of write opertations abandond by clients", "requests"),
-        f.createIntCounter("abandonedReadRequests",
+        statisticsFactory.createIntCounter("abandonedReadRequests",
             "Number of read opertations abandond by clients", "requests"),
-        f.createLongCounter("receivedBytes", "Total number of bytes received from clients.",
+        statisticsFactory.createLongCounter("receivedBytes",
+            "Total number of bytes received from clients.", "bytes"),
+        statisticsFactory.createLongCounter("sentBytes", "Total number of bytes sent to clients.",
             "bytes"),
-        f.createLongCounter("sentBytes", "Total number of bytes sent to clients.", "bytes"),
-        f.createIntGauge("messagesBeingReceived",
+        statisticsFactory.createIntGauge("messagesBeingReceived",
             "Current number of message being received off the network or being processed after reception.",
             "messages"),
-        f.createLongGauge("messageBytesBeingReceived",
+        statisticsFactory.createLongGauge("messageBytesBeingReceived",
             "Current number of bytes consumed by messages being received or processed.", "bytes"),
-        f.createIntCounter("connectionsTimedOut",
+        statisticsFactory.createIntCounter("connectionsTimedOut",
             "Total number of connections that have been timed out by the server because of client inactivity",
             "connections"),
-        f.createIntGauge("threadQueueSize",
+        statisticsFactory.createIntGauge("threadQueueSize",
             "Current number of connections waiting for a thread to start processing their message.",
             "connections"),
-        f.createIntGauge("acceptsInProgress",
+        statisticsFactory.createIntGauge("acceptsInProgress",
             "Current number of server accepts that are attempting to do the initial handshake with the client.",
             "accepts"),
-        f.createIntCounter("acceptThreadStarts",
+        statisticsFactory.createIntCounter("acceptThreadStarts",
             "Total number of threads created to deal with an accepted socket. Note that this is not the current number of threads.",
             "starts"),
-        f.createIntCounter("connectionThreadStarts",
+        statisticsFactory.createIntCounter("connectionThreadStarts",
             "Total number of threads created to deal with a client connection. Note that this is not the current number of threads.",
             "starts"),
-        f.createIntGauge("connectionThreads",
+        statisticsFactory.createIntGauge("connectionThreads",
             "Current number of threads dealing with a client connection.", "threads"),
-        f.createDoubleGauge("connectionLoad",
+        statisticsFactory.createDoubleGauge("connectionLoad",
             "The load from client to server connections as reported by the load probe installed in this server",
             "load"),
-        f.createDoubleGauge("loadPerConnection",
+        statisticsFactory.createDoubleGauge("loadPerConnection",
             "The estimate of how much load is added for each new connection as reported by the load probe installed in this server",
             "load"),
-        f.createDoubleGauge("queueLoad",
+        statisticsFactory.createDoubleGauge("queueLoad",
             "The load from queues as reported by the load probe installed in this server", "load"),
-        f.createDoubleGauge("loadPerQueue",
+        statisticsFactory.createDoubleGauge("loadPerQueue",
             "The estimate of how much load is added for each new connection as reported by the load probe installed in this server",
             "load")};
     StatisticDescriptor[] alldescriptors = serverStatDescriptors;
-    if (descriptiors != null) {
-      alldescriptors = new StatisticDescriptor[descriptiors.length + serverStatDescriptors.length];
-      System.arraycopy(descriptiors, 0, alldescriptors, 0, descriptiors.length);
-      System.arraycopy(serverStatDescriptors, 0, alldescriptors, descriptiors.length,
+    if (descriptors != null) {
+      alldescriptors = new StatisticDescriptor[descriptors.length + serverStatDescriptors.length];
+      System.arraycopy(descriptors, 0, alldescriptors, 0, descriptors.length);
+      System.arraycopy(serverStatDescriptors, 0, alldescriptors, descriptors.length,
           serverStatDescriptors.length);
     }
-    statType = f.createType(typeName, typeName, alldescriptors);
-    try {
-      ownerName = SocketCreator.getLocalHost().getCanonicalHostName() + "-" + ownerName;
-    } catch (Exception e) {
-    }
-    this.stats = f.createAtomicStatistics(statType, ownerName);
+    statType = statisticsFactory.createType(typeName, typeName, alldescriptors);
+    this.stats = statisticsFactory.createAtomicStatistics(statType, ownerName);
 
     getRequestsId = this.stats.nameToId("getRequests");
     readGetRequestTimeId = this.stats.nameToId("readGetRequestTime");
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
index 0ced3aa..b8969e1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
@@ -19,6 +19,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import org.apache.geode.Statistics;
+import org.apache.geode.StatisticsFactory;
+
 
 /**
  * This is an interface that other modules can implement to hook into
@@ -29,6 +32,10 @@ import java.io.OutputStream;
  * {@link GenericProtocolServerConnection}.
  */
 public interface ClientProtocolMessageHandler {
+  void initializeStatistics(String statisticsName, StatisticsFactory factory);
+
+  ClientProtocolStatistics getStatistics();
+
   void receiveMessage(InputStream inputStream, OutputStream outputStream,
       MessageExecutionContext executionContext) throws IOException;
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolStatistics.java
similarity index 56%
copy from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
copy to geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolStatistics.java
index 0ced3aa..3c3acbe 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolStatistics.java
@@ -12,23 +12,13 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.geode.internal.cache.tier.sockets;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-
 /**
- * This is an interface that other modules can implement to hook into
- * {@link GenericProtocolServerConnection} to handle messages sent to Geode.
- *
- * Currently, only one {@link ClientProtocolMessageHandler} at a time can be used in a Geode
- * instance. It gets wired into {@link ServerConnectionFactory} to create all instances of
- * {@link GenericProtocolServerConnection}.
+ * Implementations of this interface record statistics for the corresponding client/server protocol
  */
-public interface ClientProtocolMessageHandler {
-  void receiveMessage(InputStream inputStream, OutputStream outputStream,
-      MessageExecutionContext executionContext) throws IOException;
+public interface ClientProtocolStatistics {
+  public void clientConnected();
+
+  public void clientDisconnected();
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
index 6c81028..767b6c5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
@@ -36,20 +36,23 @@ public class GenericProtocolServerConnection extends ServerConnection {
   private final ClientProtocolMessageHandler messageHandler;
   private final SecurityManager securityManager;
   private final Authenticator authenticator;
+  private boolean cleanedUp;
 
   /**
    * Creates a new <code>GenericProtocolServerConnection</code> that processes messages received
    * from an edge client over a given <code>Socket</code>.
    */
-  public GenericProtocolServerConnection(Socket s, InternalCache c, CachedRegionHelper helper,
-      CacheServerStats stats, int hsTimeout, int socketBufferSize, String communicationModeStr,
-      byte communicationMode, Acceptor acceptor, ClientProtocolMessageHandler newClientProtocol,
-      SecurityService securityService, Authenticator authenticator) {
-    super(s, c, helper, stats, hsTimeout, socketBufferSize, communicationModeStr, communicationMode,
-        acceptor, securityService);
+  public GenericProtocolServerConnection(Socket socket, InternalCache cache,
+      CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int socketBufferSize,
+      String communicationModeStr, byte communicationMode, Acceptor acceptor,
+      ClientProtocolMessageHandler newClientProtocol, SecurityService securityService,
+      Authenticator authenticator) {
+    super(socket, cache, helper, stats, hsTimeout, socketBufferSize, communicationModeStr,
+        communicationMode, acceptor, securityService);
     securityManager = securityService.getSecurityManager();
     this.messageHandler = newClientProtocol;
     this.authenticator = authenticator;
+    this.messageHandler.getStatistics().clientConnected();
   }
 
   @Override
@@ -62,8 +65,8 @@ public class GenericProtocolServerConnection extends ServerConnection {
       if (!authenticator.isAuthenticated()) {
         authenticator.authenticate(inputStream, outputStream, securityManager);
       } else {
-        messageHandler.receiveMessage(inputStream, outputStream,
-            new MessageExecutionContext(this.getCache(), authenticator.getAuthorizer()));
+        messageHandler.receiveMessage(inputStream, outputStream, new MessageExecutionContext(
+            this.getCache(), authenticator.getAuthorizer(), messageHandler.getStatistics()));
       }
     } catch (EOFException e) {
       this.setFlagProcessMessagesAsFalse();
@@ -76,6 +79,17 @@ public class GenericProtocolServerConnection extends ServerConnection {
   }
 
   @Override
+  public boolean cleanup() {
+    synchronized (this) {
+      if (!cleanedUp) {
+        cleanedUp = true;
+        messageHandler.getStatistics().clientDisconnected();
+      }
+    }
+    return super.cleanup();
+  }
+
+  @Override
   protected boolean doHandShake(byte epType, int qSize) {
     // no handshake for new client protocol.
     return true;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java
index 1130ce7..d1fc461 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java
@@ -28,10 +28,14 @@ public class MessageExecutionContext {
   private Cache cache;
   private Locator locator;
   private Authorizer authorizer;
+  private ClientProtocolStatistics statistics;
 
-  public MessageExecutionContext(Cache cache, Authorizer streamAuthorizer) {
+
+  public MessageExecutionContext(Cache cache, Authorizer streamAuthorizer,
+      ClientProtocolStatistics statistics) {
     this.cache = cache;
     this.authorizer = streamAuthorizer;
+    this.statistics = statistics;
   }
 
   public MessageExecutionContext(InternalLocator locator) {
@@ -39,6 +43,7 @@ public class MessageExecutionContext {
     // set a no-op authorizer until such time as locators implement authentication
     // and authorization checks
     this.authorizer = new NoOpAuthorizer();
+    this.statistics = new NoOpStatistics();
   }
 
   /**
@@ -76,4 +81,16 @@ public class MessageExecutionContext {
   public Authorizer getAuthorizer() {
     return authorizer;
   }
+
+  /**
+   * Returns the statistics for recording operation stats. In a unit test environment this may not
+   * be a protocol-specific statistics implementation.
+   * 
+   * @return
+   */
+  public ClientProtocolStatistics getStatistics() {
+    return statistics;
+  }
+
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NoOpStatistics.java
similarity index 56%
copy from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
copy to geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NoOpStatistics.java
index 0ced3aa..d04db47 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NoOpStatistics.java
@@ -12,23 +12,16 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.geode.internal.cache.tier.sockets;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+public class NoOpStatistics implements ClientProtocolStatistics {
+  @Override
+  public void clientConnected() {
 
+  }
 
-/**
- * This is an interface that other modules can implement to hook into
- * {@link GenericProtocolServerConnection} to handle messages sent to Geode.
- *
- * Currently, only one {@link ClientProtocolMessageHandler} at a time can be used in a Geode
- * instance. It gets wired into {@link ServerConnectionFactory} to create all instances of
- * {@link GenericProtocolServerConnection}.
- */
-public interface ClientProtocolMessageHandler {
-  void receiveMessage(InputStream inputStream, OutputStream outputStream,
-      MessageExecutionContext executionContext) throws IOException;
+  @Override
+  public void clientDisconnected() {
+
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
index f0348c3..31eab49 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache.tier.sockets;
 
 import static org.apache.geode.internal.cache.tier.CommunicationMode.ProtobufClientServerProtocol;
 
+import org.apache.geode.StatisticsFactory;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.Acceptor;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
@@ -24,6 +25,7 @@ import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.security.server.Authenticator;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.Socket;
 import java.util.HashMap;
 import java.util.Map;
@@ -49,12 +51,14 @@ public class ServerConnectionFactory {
     }
   }
 
-  private synchronized ClientProtocolMessageHandler initializeMessageHandler() {
+  private synchronized ClientProtocolMessageHandler initializeMessageHandler(
+      StatisticsFactory statisticsFactory, String statisticsName) {
     if (protocolHandler != null) {
       return protocolHandler;
     }
 
     protocolHandler = new MessageHandlerFactory().makeMessageHandler();
+    protocolHandler.initializeStatistics(statisticsName, statisticsFactory);
 
     return protocolHandler;
   }
@@ -78,17 +82,18 @@ public class ServerConnectionFactory {
     }
   }
 
-  private ClientProtocolMessageHandler getClientProtocolMessageHandler() {
+  private ClientProtocolMessageHandler getOrCreateClientProtocolMessageHandler(
+      StatisticsFactory statisticsFactory, Acceptor acceptor) {
     if (protocolHandler == null) {
-      initializeMessageHandler();
+      return initializeMessageHandler(statisticsFactory, acceptor.getServerName());
     }
     return protocolHandler;
   }
 
-  public ServerConnection makeServerConnection(Socket s, InternalCache c, CachedRegionHelper helper,
-      CacheServerStats stats, int hsTimeout, int socketBufferSize, String communicationModeStr,
-      byte communicationMode, Acceptor acceptor, SecurityService securityService)
-      throws IOException {
+  public ServerConnection makeServerConnection(Socket socket, InternalCache cache,
+      CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int socketBufferSize,
+      String communicationModeStr, byte communicationMode, Acceptor acceptor,
+      SecurityService securityService, InetAddress bindAddress) throws IOException {
     if (communicationMode == ProtobufClientServerProtocol.getModeNumber()) {
       if (!Boolean.getBoolean("geode.feature-protobuf-protocol")) {
         throw new IOException("Server received unknown communication mode: " + communicationMode);
@@ -96,12 +101,13 @@ public class ServerConnectionFactory {
         String authenticationMode =
             System.getProperty("geode.protocol-authentication-mode", "NOOP");
 
-        return new GenericProtocolServerConnection(s, c, helper, stats, hsTimeout, socketBufferSize,
-            communicationModeStr, communicationMode, acceptor, getClientProtocolMessageHandler(),
+        return new GenericProtocolServerConnection(socket, cache, helper, stats, hsTimeout,
+            socketBufferSize, communicationModeStr, communicationMode, acceptor,
+            getOrCreateClientProtocolMessageHandler(cache.getDistributedSystem(), acceptor),
             securityService, findStreamAuthenticator(authenticationMode));
       }
     } else {
-      return new LegacyServerConnection(s, c, helper, stats, hsTimeout, socketBufferSize,
+      return new LegacyServerConnection(socket, cache, helper, stats, hsTimeout, socketBufferSize,
           communicationModeStr, communicationMode, acceptor, securityService);
     }
   }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
index 09c5949..56d3770 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
@@ -107,7 +107,7 @@ public class ServerConnectionFactoryTest {
 
     return new ServerConnectionFactory().makeServerConnection(socketMock, mock(InternalCache.class),
         mock(CachedRegionHelper.class), mock(CacheServerStats.class), 0, 0, "", communicationMode,
-        mock(AcceptorImpl.class), mock(SecurityService.class));
+        mock(AcceptorImpl.class), mock(SecurityService.class), InetAddress.getLocalHost());
   }
 
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
index bd23223..a4ebbac 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
@@ -82,9 +82,9 @@ public class ServerConnectionTest {
     InternalCache cache = mock(InternalCache.class);
     SecurityService securityService = mock(SecurityService.class);
 
-    serverConnection =
-        new ServerConnectionFactory().makeServerConnection(socket, cache, null, null, 0, 0, null,
-            CommunicationMode.PrimaryServerToClient.getModeNumber(), acceptor, securityService);
+    serverConnection = new ServerConnectionFactory().makeServerConnection(socket, cache, null, null,
+        0, 0, null, CommunicationMode.PrimaryServerToClient.getModeNumber(), acceptor,
+        securityService, InetAddress.getLocalHost());
     MockitoAnnotations.initMocks(this);
   }
 
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java
index 7dee26b..e5b4ae4 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java
@@ -19,6 +19,7 @@ import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
 import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
 import org.apache.geode.protocol.protobuf.registry.OperationContextRegistry;
+import org.apache.geode.protocol.protobuf.statistics.ProtobufClientStatistics;
 import org.apache.geode.protocol.protobuf.utilities.ProtobufResponseUtilities;
 import org.apache.geode.serialization.SerializationService;
 
@@ -49,6 +50,7 @@ public class ProtobufOpsProcessor {
         result = operationContext.getOperationHandler().process(serializationService,
             operationContext.getFromRequest().apply(request), context);
       } else {
+        recordAuthorizationViolation(context);
         result = Failure.of(ProtobufResponseUtilities.makeErrorResponse(
             ProtocolErrorCode.AUTHORIZATION_FAILED.codeValue,
             "User isn't authorized for this operation."));
@@ -63,4 +65,9 @@ public class ProtobufOpsProcessor {
         operationContext.getToErrorResponse());
     return builder.build();
   }
+
+  private void recordAuthorizationViolation(MessageExecutionContext context) {
+    ProtobufClientStatistics statistics = (ProtobufClientStatistics) context.getStatistics();
+    statistics.incAuthorizationViolations();
+  }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java
index f28c310..717365b 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java
@@ -19,13 +19,17 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import org.apache.geode.StatisticsFactory;
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolStatistics;
 import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.protocol.exception.InvalidProtocolMessageException;
 import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
 import org.apache.geode.protocol.protobuf.registry.OperationContextRegistry;
 import org.apache.geode.protocol.protobuf.serializer.ProtobufProtocolSerializer;
+import org.apache.geode.protocol.protobuf.statistics.ProtobufClientStatistics;
+import org.apache.geode.protocol.protobuf.statistics.ProtobufClientStatisticsImpl;
 import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
 import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredForTypeException;
 
@@ -38,6 +42,7 @@ import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredF
 public class ProtobufStreamProcessor implements ClientProtocolMessageHandler {
   private final ProtobufProtocolSerializer protobufProtocolSerializer;
   private final ProtobufOpsProcessor protobufOpsProcessor;
+  private ProtobufClientStatistics statistics;
 
   public ProtobufStreamProcessor() throws CodecAlreadyRegisteredForTypeException {
     protobufProtocolSerializer = new ProtobufProtocolSerializer();
@@ -46,6 +51,16 @@ public class ProtobufStreamProcessor implements ClientProtocolMessageHandler {
   }
 
   @Override
+  public void initializeStatistics(String statisticsName, StatisticsFactory factory) {
+    statistics = new ProtobufClientStatisticsImpl(factory, statisticsName, "ProtobufServerStats");
+  }
+
+  @Override
+  public ClientProtocolStatistics getStatistics() {
+    return statistics;
+  }
+
+  @Override
   public void receiveMessage(InputStream inputStream, OutputStream outputStream,
       MessageExecutionContext executionContext) throws IOException {
     try {
@@ -62,6 +77,7 @@ public class ProtobufStreamProcessor implements ClientProtocolMessageHandler {
     if (message == null) {
       throw new EOFException("Tried to deserialize protobuf message at EOF");
     }
+    statistics.messageReceived(message.getSerializedSize());
 
     ClientProtocol.Request request = message.getRequest();
     ClientProtocol.Response response = protobufOpsProcessor.process(request, executionContext);
@@ -69,6 +85,7 @@ public class ProtobufStreamProcessor implements ClientProtocolMessageHandler {
         ProtobufUtilities.createMessageHeaderForRequest(message);
     ClientProtocol.Message responseMessage =
         ProtobufUtilities.createProtobufResponse(responseHeader, response);
+    statistics.messageSent(responseMessage.getSerializedSize());
     protobufProtocolSerializer.serialize(responseMessage, outputStream);
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/statistics/ProtobufClientStatistics.java
similarity index 52%
copy from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
copy to geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/statistics/ProtobufClientStatistics.java
index 0ced3aa..f769c31 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/statistics/ProtobufClientStatistics.java
@@ -12,23 +12,20 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
+package org.apache.geode.protocol.protobuf.statistics;
 
-package org.apache.geode.internal.cache.tier.sockets;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolStatistics;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+public interface ProtobufClientStatistics extends ClientProtocolStatistics {
+  public void clientConnected();
 
+  public void clientDisconnected();
 
-/**
- * This is an interface that other modules can implement to hook into
- * {@link GenericProtocolServerConnection} to handle messages sent to Geode.
- *
- * Currently, only one {@link ClientProtocolMessageHandler} at a time can be used in a Geode
- * instance. It gets wired into {@link ServerConnectionFactory} to create all instances of
- * {@link GenericProtocolServerConnection}.
- */
-public interface ClientProtocolMessageHandler {
-  void receiveMessage(InputStream inputStream, OutputStream outputStream,
-      MessageExecutionContext executionContext) throws IOException;
+  public void messageReceived(int bytes);
+
+  public void messageSent(int bytes);
+
+  public void incAuthorizationViolations();
+
+  public void incAuthenticationFailures();
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/statistics/ProtobufClientStatisticsImpl.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/statistics/ProtobufClientStatisticsImpl.java
new file mode 100644
index 0000000..92f0610
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/statistics/ProtobufClientStatisticsImpl.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.protocol.protobuf.statistics;
+
+import org.apache.geode.StatisticDescriptor;
+import org.apache.geode.Statistics;
+import org.apache.geode.StatisticsFactory;
+import org.apache.geode.StatisticsType;
+import org.apache.geode.protocol.protobuf.statistics.ProtobufClientStatistics;
+
+public class ProtobufClientStatisticsImpl implements ProtobufClientStatistics {
+  private final StatisticsType statType;
+  private final Statistics stats;
+  private final int currentClientConnectionsId;
+  private final int clientConnectionTerminationsId;
+  private final int clientConnectionStartsId;
+  private final int bytesReceivedId;
+  private final int bytesSentId;
+  private final int messagesReceivedId;
+  private final int messagesSentId;
+  private final int authorizationViolationsId;
+  private final int authenticationFailuresId;
+
+  public ProtobufClientStatisticsImpl(StatisticsFactory statisticsFactory, String statisticsName,
+      String typeName) {
+    StatisticDescriptor[] serverStatDescriptors = new StatisticDescriptor[] {
+        statisticsFactory.createIntGauge("currentClientConnections",
+            "Number of sockets accepted and used for client to server messaging.", "sockets"),
+        statisticsFactory.createIntCounter("clientConnectionStarts",
+            "Number of sockets accepted and used for client to server messaging.", "sockets"),
+        statisticsFactory.createIntCounter("clientConnectionTerminations",
+            "Number of sockets that were used for client to server messaging.", "sockets"),
+        statisticsFactory.createLongCounter("authenticationFailures", "Authentication failures",
+            "attemptss"),
+        statisticsFactory.createLongCounter("authorizationViolations",
+            "Operations not allowed to proceed", "operations"),
+        statisticsFactory.createLongCounter("bytesReceived",
+            "Bytes received from client messaging.", "bytes"),
+        statisticsFactory.createLongCounter("bytesSent", "Bytes sent for client messaging.",
+            "bytes"),
+        statisticsFactory.createLongCounter("messagesReceived", "Messages received from clients.",
+            "messages"),
+        statisticsFactory.createLongCounter("messagesSent", "Messages sent to clients.",
+            "messages")};
+    statType = statisticsFactory.createType(typeName, "Protobuf client/server statistics",
+        serverStatDescriptors);
+    this.stats = statisticsFactory.createAtomicStatistics(statType, statisticsName);
+    currentClientConnectionsId = this.stats.nameToId("currentClientConnections");
+    clientConnectionStartsId = this.stats.nameToId("clientConnectionStarts");
+    clientConnectionTerminationsId = this.stats.nameToId("clientConnectionTerminations");
+    authorizationViolationsId = this.stats.nameToId("authorizationViolations");
+    authenticationFailuresId = this.stats.nameToId("authenticationFailures");
+    bytesReceivedId = this.stats.nameToId("bytesReceived");
+    bytesSentId = this.stats.nameToId("bytesSent");
+    messagesReceivedId = this.stats.nameToId("bytesReceived");
+    messagesSentId = this.stats.nameToId("bytesSent");
+  }
+
+  @Override
+  public void clientConnected() {
+    stats.incInt(currentClientConnectionsId, 1);
+    stats.incInt(clientConnectionStartsId, 1);
+  }
+
+  @Override
+  public void clientDisconnected() {
+    stats.incInt(currentClientConnectionsId, -1);
+    stats.incInt(clientConnectionTerminationsId, 1);
+  }
+
+  @Override
+  public void messageReceived(int bytes) {
+    stats.incLong(bytesReceivedId, bytes);
+    stats.incLong(messagesReceivedId, 1);
+  }
+
+  @Override
+  public void messageSent(int bytes) {
+    stats.incLong(bytesSentId, bytes);
+    stats.incLong(messagesSentId, 1);
+  }
+
+  @Override
+  public void incAuthorizationViolations() {
+    stats.incLong(authorizationViolationsId, 1);
+  }
+
+  @Override
+  public void incAuthenticationFailures() {
+    stats.incLong(authenticationFailuresId, 1);
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
similarity index 88%
rename from geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
rename to geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
index be9c4a2..d52223c 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
@@ -24,6 +24,7 @@ import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.cache.tier.CommunicationMode;
 import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.protocol.protobuf.statistics.NoOpProtobufStatistics;
 import org.apache.geode.security.server.NoOpAuthenticator;
 import org.apache.geode.test.junit.categories.UnitTest;
 
@@ -56,8 +57,10 @@ public class GenericProtocolServerConnectionTest {
     Socket socketMock = mock(Socket.class);
     when(socketMock.getInetAddress()).thenReturn(InetAddress.getByName("localhost"));
 
+    ClientProtocolMessageHandler mockHandler = mock(ClientProtocolMessageHandler.class);
+    when(mockHandler.getStatistics()).thenReturn(new NoOpProtobufStatistics());
     GenericProtocolServerConnection genericProtocolServerConnection =
-        getGenericProtocolServerConnection(socketMock, mock(ClientProtocolMessageHandler.class));
+        getGenericProtocolServerConnection(socketMock, mockHandler);
 
     genericProtocolServerConnection.emergencyClose();
 
@@ -69,6 +72,8 @@ public class GenericProtocolServerConnectionTest {
     when(socketMock.getInetAddress()).thenReturn(InetAddress.getByName("localhost"));
 
     ClientProtocolMessageHandler clientProtocolMock = mock(ClientProtocolMessageHandler.class);
+    ClientProtocolStatistics statisticsMock = mock(ClientProtocolStatistics.class);
+    when(clientProtocolMock.getStatistics()).thenReturn(statisticsMock);
     doThrow(new IOException()).when(clientProtocolMock).receiveMessage(any(), any(), any());
 
     return getGenericProtocolServerConnection(socketMock, clientProtocolMock);
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java
index cc15e4f..4a6b44a 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java
@@ -22,15 +22,18 @@ import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_
 import static org.apache.geode.distributed.ConfigurationProperties.SSL_REQUIRE_AUTHENTICATION;
 import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE;
 import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD;
+import static org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase.disconnectAllFromDS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.Socket;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -64,6 +67,7 @@ import org.apache.geode.internal.net.SocketCreatorFactory;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
+import org.apache.geode.internal.statistics.StatArchiveReader;
 import org.apache.geode.protocol.exception.InvalidProtocolMessageException;
 import org.apache.geode.protocol.protobuf.ProtobufSerializationService;
 import org.apache.geode.protocol.protobuf.serializer.ProtobufProtocolSerializer;
@@ -111,6 +115,7 @@ public class RoundTripCacheConnectionJUnitTest {
 
   @Rule
   public TestName testName = new TestName();
+  private File statisticsArchiveFile;
 
   @Before
   public void setup() throws Exception {
@@ -126,6 +131,12 @@ public class RoundTripCacheConnectionJUnitTest {
     cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0");
     cacheFactory.set(ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION, "false");
     cacheFactory.set(ConfigurationProperties.USE_CLUSTER_CONFIGURATION, "false");
+    cacheFactory.set(ConfigurationProperties.STATISTIC_SAMPLING_ENABLED, "true");
+    cacheFactory.set(ConfigurationProperties.STATISTIC_SAMPLE_RATE, "100");
+    statisticsArchiveFile =
+        new File(getClass().getSimpleName() + "_" + testName.getMethodName() + ".gfs");
+    cacheFactory.set(ConfigurationProperties.STATISTIC_ARCHIVE_FILE,
+        statisticsArchiveFile.getName());
     cache = cacheFactory.create();
 
     CacheServer cacheServer = cache.addCacheServer();
@@ -208,6 +219,51 @@ public class RoundTripCacheConnectionJUnitTest {
         ProtobufUtilities.createProtobufRequestWithGetAllRequest(getAllRequest));
     protobufProtocolSerializer.serialize(getAllMessage, outputStream);
     validateGetAllResponse(socket, protobufProtocolSerializer);
+    long startTime = System.currentTimeMillis();
+    Thread.sleep(3000);
+
+    long endTime = System.currentTimeMillis();
+
+    disconnectAllFromDS();
+
+    StatArchiveReader.ValueFilter filter = new StatArchiveReader.ValueFilter() {
+      @Override
+      public boolean archiveMatches(File archive) {
+        return true;
+      }
+
+      @Override
+      public boolean typeMatches(String type) {
+        return type.equals("ProtobufServerStats");
+      }
+
+      @Override
+      public boolean statMatches(String statName) {
+        return true;
+      }
+
+      @Override
+      public boolean instanceMatches(String textId, long numericId) {
+        return true;
+      }
+    };
+
+    StatArchiveReader reader = new StatArchiveReader(new File[] {statisticsArchiveFile},
+        new StatArchiveReader.ValueFilter[] {filter}, true);
+    List resourceInstList = reader.getResourceInstList();
+    // for (Object inst : resourceInstList) {
+    // StatArchiveReader.ResourceInst ri = (StatArchiveReader.ResourceInst) inst;
+    // String resourceName = ri.getName();
+    // String resourceTypeName = ri.getType().getName();
+    // System.out.println("===> resource name: " + resourceName + "; type name: " +
+    // resourceTypeName);
+    // }
+    assertEquals(1, resourceInstList.size());
+    StatArchiveReader.ResourceInst resourceInst =
+        (StatArchiveReader.ResourceInst) resourceInstList.iterator().next();
+    StatArchiveReader.StatValue statValue =
+        resourceInst.getStatValue("currentClientConnections").createTrimmed(startTime, endTime);
+    assertEquals(2.0, statValue.getSnapshotsMinimum(), 0.01);
   }
 
   @Test
@@ -297,6 +353,10 @@ public class RoundTripCacheConnectionJUnitTest {
     CacheFactory cacheFactory = new CacheFactory();
     cacheFactory.set(ConfigurationProperties.LOCATORS, "");
     cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0");
+    cacheFactory.set(ConfigurationProperties.STATISTIC_SAMPLING_ENABLED, "true");
+    cacheFactory.set(ConfigurationProperties.STATISTIC_SAMPLE_RATE, "100");
+    cacheFactory.set(ConfigurationProperties.STATISTIC_ARCHIVE_FILE,
+        getClass().getSimpleName() + "_" + testName.getMethodName() + ".gfs");
     cache = cacheFactory.create();
 
     CacheServer cacheServer = cache.addCacheServer();
@@ -326,6 +386,7 @@ public class RoundTripCacheConnectionJUnitTest {
       assertEquals(-1, socket.getInputStream().read()); // EOF implies disconnected.
     }
 
+    Thread.sleep(15000);
     for (Socket currentSocket : sockets) {
       currentSocket.close();
     }
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripLocatorConnectionDUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripLocatorConnectionDUnitTest.java
index 5b627b8..9603caf 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripLocatorConnectionDUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripLocatorConnectionDUnitTest.java
@@ -19,6 +19,7 @@ import static org.apache.geode.internal.cache.tier.CommunicationMode.ProtobufCli
 import static org.junit.Assert.assertEquals;
 
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.net.Socket;
 
@@ -29,7 +30,9 @@ import org.junit.contrib.java.lang.system.RestoreSystemProperties;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.distributed.Locator;
+import org.apache.geode.internal.Config;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.protocol.exception.InvalidProtocolMessageException;
 import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
@@ -51,6 +54,7 @@ import org.junit.experimental.categories.Category;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.Socket;
+import java.util.Properties;
 
 @Category(DistributedTest.class)
 public class RoundTripLocatorConnectionDUnitTest extends JUnit4CacheTestCase {
@@ -129,6 +133,20 @@ public class RoundTripLocatorConnectionDUnitTest extends JUnit4CacheTestCase {
         messageResponse.getErrorResponse().getError().getErrorCode());
   }
 
+  @Override
+  public Properties getDistributedSystemProperties() {
+    Properties properties = super.getDistributedSystemProperties();
+    properties.put(ConfigurationProperties.STATISTIC_SAMPLING_ENABLED, "true");
+    properties.put(ConfigurationProperties.STATISTIC_SAMPLE_RATE, "100");
+    String statFileName = getUniqueName() + ".gfs";
+    properties.put(ConfigurationProperties.STATISTIC_ARCHIVE_FILE, statFileName);
+    File statFile = new File(statFileName);
+    if (statFile.exists()) {
+      statFile.delete();
+    }
+    return properties;
+  }
+
   private Integer startCacheWithCacheServer() throws IOException {
     System.setProperty("geode.feature-protobuf-protocol", "true");
 
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java
index 50d7b40..4806297 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java
@@ -27,6 +27,7 @@ import org.junit.experimental.categories.Category;
 
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
+import org.apache.geode.protocol.protobuf.statistics.NoOpProtobufStatistics;
 import org.apache.geode.security.server.NoOpAuthorizer;
 import org.apache.geode.test.junit.categories.UnitTest;
 
@@ -39,7 +40,7 @@ public class ProtobufStreamProcessorTest {
 
     ProtobufStreamProcessor protobufStreamProcessor = new ProtobufStreamProcessor();
     InternalCache mockInternalCache = mock(InternalCache.class);
-    protobufStreamProcessor.receiveMessage(inputStream, outputStream,
-        new MessageExecutionContext(mockInternalCache, new NoOpAuthorizer()));
+    protobufStreamProcessor.receiveMessage(inputStream, outputStream, new MessageExecutionContext(
+        mockInternalCache, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
   }
 }
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java
index 0850f25..91a6336 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java
@@ -39,6 +39,7 @@ import org.apache.geode.internal.protocol.protobuf.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
 import org.apache.geode.protocol.protobuf.Result;
 import org.apache.geode.protocol.protobuf.Success;
+import org.apache.geode.protocol.protobuf.statistics.NoOpProtobufStatistics;
 import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities;
 import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
 import org.apache.geode.security.server.NoOpAuthorizer;
@@ -80,9 +81,9 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
   public void processReturnsExpectedValuesForValidKeys()
       throws CodecAlreadyRegisteredForTypeException, UnsupportedEncodingTypeException,
       CodecNotRegisteredForTypeException, InvalidExecutionContextException {
-    Result<RegionAPI.GetAllResponse> result =
-        operationHandler.process(serializationServiceStub, generateTestRequest(true, false),
-            new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+    Result<RegionAPI.GetAllResponse> result = operationHandler.process(serializationServiceStub,
+        generateTestRequest(true, false),
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
 
     assertTrue(result instanceof Success);
 
@@ -101,9 +102,9 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
   @Test
   public void processReturnsNoEntriesForNoKeysRequested() throws UnsupportedEncodingTypeException,
       CodecNotRegisteredForTypeException, InvalidExecutionContextException {
-    Result<RegionAPI.GetAllResponse> result =
-        operationHandler.process(serializationServiceStub, generateTestRequest(false, false),
-            new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+    Result<RegionAPI.GetAllResponse> result = operationHandler.process(serializationServiceStub,
+        generateTestRequest(false, false),
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
 
     assertTrue(result instanceof Success);
 
@@ -120,7 +121,8 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
     RegionAPI.GetAllRequest getAllRequest =
         ProtobufRequestUtilities.createGetAllRequest(TEST_REGION, testKeys);
     Result<RegionAPI.GetAllResponse> result = operationHandler.process(serializationServiceStub,
-        getAllRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+        getAllRequest,
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
 
     assertTrue(result instanceof Success);
     RegionAPI.GetAllResponse message = result.getMessage();
@@ -134,9 +136,9 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
   @Test
   public void multipleKeysWhereOneThrows() throws UnsupportedEncodingTypeException,
       CodecNotRegisteredForTypeException, InvalidExecutionContextException {
-    Result<RegionAPI.GetAllResponse> result =
-        operationHandler.process(serializationServiceStub, generateTestRequest(true, true),
-            new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+    Result<RegionAPI.GetAllResponse> result = operationHandler.process(serializationServiceStub,
+        generateTestRequest(true, true),
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
 
     assertTrue(result instanceof Success);
 
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandlerJUnitTest.java
index 0d09148..10bd5c6 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandlerJUnitTest.java
@@ -32,6 +32,7 @@ import org.apache.geode.internal.exception.InvalidExecutionContextException;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
 import org.apache.geode.protocol.protobuf.Result;
 import org.apache.geode.protocol.protobuf.Success;
+import org.apache.geode.protocol.protobuf.statistics.NoOpProtobufStatistics;
 import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities;
 import org.apache.geode.security.server.NoOpAuthorizer;
 import org.apache.geode.serialization.exception.UnsupportedEncodingTypeException;
@@ -68,7 +69,7 @@ public class GetRegionNamesRequestOperationHandlerJUnitTest extends OperationHan
       CodecNotRegisteredForTypeException, InvalidExecutionContextException {
     Result<RegionAPI.GetRegionNamesResponse> result = operationHandler.process(
         serializationServiceStub, ProtobufRequestUtilities.createGetRegionNamesRequest(),
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
     Assert.assertTrue(result instanceof Success);
 
     RegionAPI.GetRegionNamesResponse getRegionsResponse = result.getMessage();
@@ -92,9 +93,10 @@ public class GetRegionNamesRequestOperationHandlerJUnitTest extends OperationHan
     Cache emptyCache = mock(Cache.class);;
     when(emptyCache.rootRegions())
         .thenReturn(Collections.unmodifiableSet(new HashSet<Region<String, String>>()));
-    Result<RegionAPI.GetRegionNamesResponse> result = operationHandler.process(
-        serializationServiceStub, ProtobufRequestUtilities.createGetRegionNamesRequest(),
-        new MessageExecutionContext(emptyCache, new NoOpAuthorizer()));
+    Result<RegionAPI.GetRegionNamesResponse> result =
+        operationHandler.process(serializationServiceStub,
+            ProtobufRequestUtilities.createGetRegionNamesRequest(), new MessageExecutionContext(
+                emptyCache, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
     Assert.assertTrue(result instanceof Success);
 
     RegionAPI.GetRegionNamesResponse getRegionsResponse = result.getMessage();
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandlerJUnitTest.java
index 3458e21..ee63cd2 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandlerJUnitTest.java
@@ -14,6 +14,16 @@
  */
 package org.apache.geode.protocol.protobuf.operations;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashSet;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.Region;
@@ -21,28 +31,20 @@ import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
-import org.apache.geode.protocol.MessageUtil;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.RegionAPI;
+import org.apache.geode.protocol.MessageUtil;
 import org.apache.geode.protocol.protobuf.Failure;
 import org.apache.geode.protocol.protobuf.ProtocolErrorCode;
-import org.apache.geode.internal.protocol.protobuf.RegionAPI;
 import org.apache.geode.protocol.protobuf.Result;
+import org.apache.geode.protocol.protobuf.statistics.NoOpProtobufStatistics;
 import org.apache.geode.security.server.NoOpAuthorizer;
 import org.apache.geode.serialization.exception.UnsupportedEncodingTypeException;
 import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredForTypeException;
 import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTypeException;
 import org.apache.geode.test.dunit.Assert;
 import org.apache.geode.test.junit.categories.UnitTest;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.util.Collections;
-import java.util.HashSet;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 @Category(UnitTest.class)
 public class GetRegionRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTest {
@@ -77,7 +79,7 @@ public class GetRegionRequestOperationHandlerJUnitTest extends OperationHandlerJ
 
     Result<RegionAPI.GetRegionResponse> result = operationHandler.process(serializationServiceStub,
         MessageUtil.makeGetRegionRequest(TEST_REGION1),
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
     RegionAPI.GetRegionResponse response = result.getMessage();
     BasicTypes.Region region = response.getRegion();
     Assert.assertEquals(TEST_REGION1, region.getName());
@@ -102,8 +104,8 @@ public class GetRegionRequestOperationHandlerJUnitTest extends OperationHandlerJ
         .thenReturn(Collections.unmodifiableSet(new HashSet<Region<String, String>>()));
     String unknownRegionName = "UNKNOWN_REGION";
     Result<RegionAPI.GetRegionResponse> result = operationHandler.process(serializationServiceStub,
-        MessageUtil.makeGetRegionRequest(unknownRegionName),
-        new MessageExecutionContext(emptyCache, new NoOpAuthorizer()));
+        MessageUtil.makeGetRegionRequest(unknownRegionName), new MessageExecutionContext(emptyCache,
+            new NoOpAuthorizer(), new NoOpProtobufStatistics()));
     Assert.assertTrue(result instanceof Failure);
     Assert.assertEquals(ProtocolErrorCode.REGION_NOT_FOUND.codeValue,
         result.getErrorMessage().getError().getErrorCode());
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java
index 5bdd5d7..9b4109c 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java
@@ -14,16 +14,27 @@
  */
 package org.apache.geode.protocol.protobuf.operations;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.UnsupportedEncodingException;
+
 import com.google.protobuf.ByteString;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
 import org.apache.geode.cache.Region;
 import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
+import org.apache.geode.internal.protocol.protobuf.RegionAPI;
 import org.apache.geode.protocol.protobuf.Failure;
 import org.apache.geode.protocol.protobuf.ProtocolErrorCode;
-import org.apache.geode.internal.protocol.protobuf.RegionAPI;
 import org.apache.geode.protocol.protobuf.Result;
 import org.apache.geode.protocol.protobuf.Success;
+import org.apache.geode.protocol.protobuf.statistics.NoOpProtobufStatistics;
 import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities;
 import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
 import org.apache.geode.security.server.NoOpAuthorizer;
@@ -32,15 +43,6 @@ import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredF
 import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTypeException;
 import org.apache.geode.test.dunit.Assert;
 import org.apache.geode.test.junit.categories.UnitTest;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.UnsupportedEncodingException;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 @Category(UnitTest.class)
 public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTest {
@@ -73,7 +75,8 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
       CodecNotRegisteredForTypeException, InvalidExecutionContextException {
     RegionAPI.GetRequest getRequest = generateTestRequest(false, false, false);
     Result<RegionAPI.GetResponse> result = operationHandler.process(serializationServiceStub,
-        getRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+        getRequest,
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
 
     Assert.assertTrue(result instanceof Success);
     Assert.assertEquals(BasicTypes.EncodedValue.ValueCase.STRINGRESULT,
@@ -88,7 +91,8 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
       CodecNotRegisteredForTypeException, InvalidExecutionContextException {
     RegionAPI.GetRequest getRequest = generateTestRequest(true, false, false);
     Result<RegionAPI.GetResponse> response = operationHandler.process(serializationServiceStub,
-        getRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+        getRequest,
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
 
     Assert.assertTrue(response instanceof Failure);
     Assert.assertEquals(ProtocolErrorCode.REGION_NOT_FOUND.codeValue,
@@ -101,7 +105,8 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
       CodecNotRegisteredForTypeException, InvalidExecutionContextException {
     RegionAPI.GetRequest getRequest = generateTestRequest(false, true, false);
     Result<RegionAPI.GetResponse> response = operationHandler.process(serializationServiceStub,
-        getRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+        getRequest,
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
 
     Assert.assertTrue(response instanceof Success);
   }
@@ -112,7 +117,8 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
       CodecNotRegisteredForTypeException, InvalidExecutionContextException {
     RegionAPI.GetRequest getRequest = generateTestRequest(false, false, true);
     Result<RegionAPI.GetResponse> response = operationHandler.process(serializationServiceStub,
-        getRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+        getRequest,
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
 
     Assert.assertTrue(response instanceof Success);
   }
@@ -134,7 +140,8 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
     RegionAPI.GetRequest getRequest =
         ProtobufRequestUtilities.createGetRequest(TEST_REGION, encodedKey).getGetRequest();
     Result<RegionAPI.GetResponse> response = operationHandler.process(serializationServiceStub,
-        getRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+        getRequest,
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
 
     Assert.assertTrue(response instanceof Failure);
     Assert.assertEquals(ProtocolErrorCode.VALUE_ENCODING_ERROR.codeValue,
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java
index 36a5cf4..1c735e1 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java
@@ -14,6 +14,21 @@
  */
 package org.apache.geode.protocol.protobuf.operations;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
 import org.apache.geode.cache.Region;
 import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
@@ -21,6 +36,7 @@ import org.apache.geode.internal.protocol.protobuf.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
 import org.apache.geode.protocol.protobuf.Result;
 import org.apache.geode.protocol.protobuf.Success;
+import org.apache.geode.protocol.protobuf.statistics.NoOpProtobufStatistics;
 import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities;
 import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
 import org.apache.geode.security.server.NoOpAuthorizer;
@@ -29,20 +45,6 @@ import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredF
 import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTypeException;
 import org.apache.geode.test.dunit.Assert;
 import org.apache.geode.test.junit.categories.UnitTest;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 @Category(UnitTest.class)
 public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTest {
@@ -74,9 +76,9 @@ public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
       CodecAlreadyRegisteredForTypeException, InvalidExecutionContextException {
     PutAllRequestOperationHandler operationHandler = new PutAllRequestOperationHandler();
 
-    Result<RegionAPI.PutAllResponse> result =
-        operationHandler.process(serializationServiceStub, generateTestRequest(false, true),
-            new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+    Result<RegionAPI.PutAllResponse> result = operationHandler.process(serializationServiceStub,
+        generateTestRequest(false, true),
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
 
     Assert.assertTrue(result instanceof Success);
 
@@ -89,9 +91,9 @@ public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
   public void processWithInvalidEntrySucceedsAndReturnsFailedKey() throws Exception {
     PutAllRequestOperationHandler operationHandler = new PutAllRequestOperationHandler();
 
-    Result<RegionAPI.PutAllResponse> result =
-        operationHandler.process(serializationServiceStub, generateTestRequest(true, true),
-            new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+    Result<RegionAPI.PutAllResponse> result = operationHandler.process(serializationServiceStub,
+        generateTestRequest(true, true),
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
 
     assertTrue(result instanceof Success);
     verify(regionMock).put(TEST_KEY1, TEST_VALUE1);
@@ -109,9 +111,9 @@ public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
   public void processWithNoEntriesPasses() throws Exception {
     PutAllRequestOperationHandler operationHandler = new PutAllRequestOperationHandler();
 
-    Result<RegionAPI.PutAllResponse> result =
-        operationHandler.process(serializationServiceStub, generateTestRequest(false, false),
-            new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+    Result<RegionAPI.PutAllResponse> result = operationHandler.process(serializationServiceStub,
+        generateTestRequest(false, false),
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
 
     assertTrue(result instanceof Success);
 
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandlerJUnitTest.java
index 5235a90..37aa379 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandlerJUnitTest.java
@@ -14,16 +14,32 @@
  */
 package org.apache.geode.protocol.protobuf.operations;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.UnsupportedEncodingException;
+
 import com.google.protobuf.ByteString;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
 import org.apache.geode.cache.Region;
 import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
+import org.apache.geode.internal.protocol.protobuf.RegionAPI;
 import org.apache.geode.protocol.protobuf.Failure;
 import org.apache.geode.protocol.protobuf.ProtocolErrorCode;
-import org.apache.geode.internal.protocol.protobuf.RegionAPI;
 import org.apache.geode.protocol.protobuf.Result;
 import org.apache.geode.protocol.protobuf.Success;
+import org.apache.geode.protocol.protobuf.statistics.NoOpProtobufStatistics;
 import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities;
 import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
 import org.apache.geode.security.server.NoOpAuthorizer;
@@ -31,20 +47,6 @@ import org.apache.geode.serialization.exception.UnsupportedEncodingTypeException
 import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredForTypeException;
 import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTypeException;
 import org.apache.geode.test.junit.categories.UnitTest;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.UnsupportedEncodingException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 @Category(UnitTest.class)
 public class PutRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTest {
@@ -69,7 +71,8 @@ public class PutRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
       CodecAlreadyRegisteredForTypeException, InvalidExecutionContextException {
     PutRequestOperationHandler operationHandler = new PutRequestOperationHandler();
     Result<RegionAPI.PutResponse> result = operationHandler.process(serializationServiceStub,
-        generateTestRequest(), new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+        generateTestRequest(),
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
 
     assertTrue(result instanceof Success);
 
@@ -100,7 +103,8 @@ public class PutRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
     RegionAPI.PutRequest putRequest =
         ProtobufRequestUtilities.createPutRequest(TEST_REGION, testEntry).getPutRequest();
     Result<RegionAPI.PutResponse> result = operationHandler.process(serializationServiceStub,
-        putRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+        putRequest,
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
 
     assertTrue(result instanceof Failure);
     assertEquals(ProtocolErrorCode.VALUE_ENCODING_ERROR.codeValue,
@@ -114,7 +118,8 @@ public class PutRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
     when(cacheStub.getRegion(TEST_REGION)).thenReturn(null);
     PutRequestOperationHandler operationHandler = new PutRequestOperationHandler();
     Result<RegionAPI.PutResponse> result = operationHandler.process(serializationServiceStub,
-        generateTestRequest(), new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+        generateTestRequest(),
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
 
     assertTrue(result instanceof Failure);
     assertEquals(ProtocolErrorCode.REGION_NOT_FOUND.codeValue,
@@ -129,7 +134,8 @@ public class PutRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
 
     PutRequestOperationHandler operationHandler = new PutRequestOperationHandler();
     Result<RegionAPI.PutResponse> result = operationHandler.process(serializationServiceStub,
-        generateTestRequest(), new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+        generateTestRequest(),
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
 
     assertTrue(result instanceof Failure);
     assertEquals(ProtocolErrorCode.CONSTRAINT_VIOLATION.codeValue,
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandlerJUnitTest.java
index a5bde46..6e04214 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandlerJUnitTest.java
@@ -14,17 +14,31 @@
  */
 package org.apache.geode.protocol.protobuf.operations;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.UnsupportedEncodingException;
+
 import com.google.protobuf.ByteString;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
 import org.apache.geode.cache.Region;
 import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.RegionAPI;
 import org.apache.geode.protocol.protobuf.Failure;
 import org.apache.geode.protocol.protobuf.ProtocolErrorCode;
-import org.apache.geode.internal.protocol.protobuf.RegionAPI;
 import org.apache.geode.protocol.protobuf.Result;
 import org.apache.geode.protocol.protobuf.Success;
+import org.apache.geode.protocol.protobuf.statistics.NoOpProtobufStatistics;
 import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities;
 import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
 import org.apache.geode.security.server.NoOpAuthorizer;
@@ -32,18 +46,6 @@ import org.apache.geode.serialization.exception.UnsupportedEncodingTypeException
 import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredForTypeException;
 import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTypeException;
 import org.apache.geode.test.junit.categories.UnitTest;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.UnsupportedEncodingException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 @Category(UnitTest.class)
 public class RemoveRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTest {
@@ -74,7 +76,8 @@ public class RemoveRequestOperationHandlerJUnitTest extends OperationHandlerJUni
       CodecNotRegisteredForTypeException, InvalidExecutionContextException {
     RegionAPI.RemoveRequest removeRequest = generateTestRequest(false, false).getRemoveRequest();
     Result<RegionAPI.RemoveResponse> result = operationHandler.process(serializationServiceStub,
-        removeRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+        removeRequest,
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
 
     assertTrue(result instanceof Success);
     verify(regionStub).remove(TEST_KEY);
@@ -86,7 +89,8 @@ public class RemoveRequestOperationHandlerJUnitTest extends OperationHandlerJUni
       CodecNotRegisteredForTypeException, InvalidExecutionContextException {
     RegionAPI.RemoveRequest removeRequest = generateTestRequest(true, false).getRemoveRequest();
     Result<RegionAPI.RemoveResponse> result = operationHandler.process(serializationServiceStub,
-        removeRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+        removeRequest,
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
 
     assertTrue(result instanceof Failure);
     assertEquals(ProtocolErrorCode.REGION_NOT_FOUND.codeValue,
@@ -99,7 +103,8 @@ public class RemoveRequestOperationHandlerJUnitTest extends OperationHandlerJUni
       CodecNotRegisteredForTypeException, InvalidExecutionContextException {
     RegionAPI.RemoveRequest removeRequest = generateTestRequest(false, true).getRemoveRequest();
     Result<RegionAPI.RemoveResponse> result = operationHandler.process(serializationServiceStub,
-        removeRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+        removeRequest,
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
 
     assertTrue(result instanceof Success);
   }
@@ -122,7 +127,8 @@ public class RemoveRequestOperationHandlerJUnitTest extends OperationHandlerJUni
     RegionAPI.RemoveRequest removeRequest =
         ProtobufRequestUtilities.createRemoveRequest(TEST_REGION, encodedKey).getRemoveRequest();;
     Result<RegionAPI.RemoveResponse> result = operationHandler.process(serializationServiceStub,
-        removeRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+        removeRequest,
+        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
 
     assertTrue(result instanceof Failure);
     assertEquals(ProtocolErrorCode.VALUE_ENCODING_ERROR.codeValue,
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/statistics/NoOpProtobufStatistics.java
similarity index 52%
copy from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
copy to geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/statistics/NoOpProtobufStatistics.java
index 0ced3aa..20ec40c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/statistics/NoOpProtobufStatistics.java
@@ -12,23 +12,38 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
+package org.apache.geode.protocol.protobuf.statistics;
 
-package org.apache.geode.internal.cache.tier.sockets;
+import org.apache.geode.protocol.protobuf.statistics.ProtobufClientStatistics;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+public class NoOpProtobufStatistics implements ProtobufClientStatistics {
+  @Override
+  public void clientConnected() {
 
+  }
 
-/**
- * This is an interface that other modules can implement to hook into
- * {@link GenericProtocolServerConnection} to handle messages sent to Geode.
- *
- * Currently, only one {@link ClientProtocolMessageHandler} at a time can be used in a Geode
- * instance. It gets wired into {@link ServerConnectionFactory} to create all instances of
- * {@link GenericProtocolServerConnection}.
- */
-public interface ClientProtocolMessageHandler {
-  void receiveMessage(InputStream inputStream, OutputStream outputStream,
-      MessageExecutionContext executionContext) throws IOException;
+  @Override
+  public void clientDisconnected() {
+
+  }
+
+  @Override
+  public void messageReceived(int bytes) {
+
+  }
+
+  @Override
+  public void messageSent(int bytes) {
+
+  }
+
+  @Override
+  public void incAuthorizationViolations() {
+
+  }
+
+  @Override
+  public void incAuthenticationFailures() {
+
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].