You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by go...@apache.org on 2017/10/10 18:17:30 UTC

[geode] branch develop updated: GEODE-3751: a single place for client protocol loading, logic.

This is an automated email from the ASF dual-hosted git repository.

gosullivan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 033757f  GEODE-3751: a single place for client protocol loading, logic.
033757f is described below

commit 033757f5569126ad84ee4badf2c283481ec205b4
Author: Brian Rowe <br...@pivotal.io>
AuthorDate: Wed Oct 4 14:13:13 2017 -0700

    GEODE-3751: a single place for client protocol loading, logic.
    
    * Load a single service, `ClientProtocolService`.
      - (well, authenticators are still separate).
    * Move several interfaces to `geode-protobuf` from core.
    * Client protocol state and statistics are now stored on the new
      `ClientProtocolPipeline` interface. This will allow us to keep
      connection logic for core separated from protocol logic.
    * implement pipelines for Cache and Locator.
    * some other small fixes and changes.
    
    Signed-off-by: Galen O'Sullivan <go...@pivotal.io>
    
    This closes #873
    
    Signed-off-by: Brian Rowe <br...@pivotal.io>
---
 .../distributed/internal/InternalLocator.java      |  9 ++-
 .../distributed/internal/tcpserver/TcpServer.java  | 34 ++++++---
 .../internal/cache/tier/sockets/AcceptorImpl.java  |  2 +-
 ...geHandler.java => ClientProtocolProcessor.java} | 32 ++++----
 ...sageHandler.java => ClientProtocolService.java} | 35 +++++----
 ...ctory.java => ClientProtocolServiceLoader.java} | 20 +++--
 .../sockets/GenericProtocolServerConnection.java   | 27 ++-----
 .../tier/sockets/ServerConnectionFactory.java      | 63 ++++++++++------
 .../cache/tier/sockets/TcpServerFactory.java       | 11 ++-
 .../tier/sockets/ServerConnectionFactoryTest.java  | 18 +++--
 .../cache/tier/sockets/ServerConnectionTest.java   |  6 +-
 .../tier/sockets/ClientProtocolMessageHandler.java |  4 -
 .../tier/sockets/MessageExecutionContext.java      | 14 ++--
 .../internal/protocol/ProtobufCachePipeline.java   | 66 ++++++++++++++++
 .../internal/protocol/ProtobufLocatorPipeline.java | 55 ++++++++++++++
 .../internal/protocol/ProtobufProtocolService.java | 61 +++++++++++++++
 .../protocol/protobuf/ProtobufStreamProcessor.java | 12 +--
 .../protobuf/statistics}/NoOpStatistics.java       | 24 +++++-
 .../statistics/ProtobufClientStatistics.java       | 14 ++--
 ...cache.tier.sockets.ClientProtocolMessageHandler |  1 -
 ...ternal.cache.tier.sockets.ClientProtocolService |  1 +
 .../GenericProtocolServerConnectionTest.java       | 39 +++++-----
 .../acceptance/CacheConnectionJUnitTest.java       |  9 ++-
 .../acceptance/LocatorConnectionDUnitTest.java     | 88 +++++++++-------------
 ...tAvailableServersOperationHandlerJUnitTest.java | 11 ++-
 25 files changed, 433 insertions(+), 223 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
index 489e647..26864d2 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
@@ -62,7 +62,7 @@ import org.apache.geode.distributed.internal.tcpserver.TcpServer;
 import org.apache.geode.internal.admin.remote.DistributionLocatorId;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolService;
 import org.apache.geode.internal.cache.tier.sockets.TcpServerFactory;
 import org.apache.geode.internal.cache.wan.WANServiceProvider;
 import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -1335,9 +1335,10 @@ public class InternalLocator extends Locator implements ConnectListener {
     try {
       this.stats.hookupStats(sys,
           SocketCreator.getLocalHost().getCanonicalHostName() + '-' + this.server.getBindAddress());
-      ClientProtocolMessageHandler messageHandler = this.server.getClientProtocolMessageHandler();
-      if (messageHandler != null) {
-        messageHandler.initializeStatistics("LocatorStats", sys);
+
+      ClientProtocolService clientProtocolService = this.server.getClientProtocolService();
+      if (clientProtocolService != null) {
+        clientProtocolService.initializeStatistics("LocatorStats", sys);
       }
     } catch (UnknownHostException e) {
       logger.warn(e);
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
index 85b2ace..b9d10f6 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
@@ -43,6 +43,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.CancelException;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.SystemFailure;
+import org.apache.geode.cache.IncompatibleVersionException;
 import org.apache.geode.distributed.internal.ClusterConfigurationService;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.DistributionConfigImpl;
@@ -58,9 +59,9 @@ import org.apache.geode.internal.VersionedDataInputStream;
 import org.apache.geode.internal.VersionedDataOutputStream;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.CommunicationMode;
-import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolProcessor;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolService;
 import org.apache.geode.internal.cache.tier.sockets.HandShake;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.net.SocketCreatorFactory;
@@ -133,7 +134,7 @@ public class TcpServer {
   private final PoolStatHelper poolHelper;
   private final InternalLocator internalLocator;
   private final TcpHandler handler;
-  private ClientProtocolMessageHandler clientProtocolMessageHandler;
+  private final ClientProtocolService clientProtocolService;
 
 
   private PooledExecutorWithDMStats executor;
@@ -158,20 +159,20 @@ public class TcpServer {
   /**
    * returns the message handler used for client/locator communications processing
    */
-  public ClientProtocolMessageHandler getClientProtocolMessageHandler() {
-    return clientProtocolMessageHandler;
+  public ClientProtocolService getClientProtocolService() {
+    return clientProtocolService;
   }
 
   public TcpServer(int port, InetAddress bind_address, Properties sslConfig,
       DistributionConfigImpl cfg, TcpHandler handler, PoolStatHelper poolHelper,
       ThreadGroup threadGroup, String threadName, InternalLocator internalLocator,
-      ClientProtocolMessageHandler clientProtocolMessageHandler) {
+      ClientProtocolService clientProtocolService) {
     this.port = port;
     this.bind_address = bind_address;
     this.handler = handler;
     this.poolHelper = poolHelper;
     this.internalLocator = internalLocator;
-    this.clientProtocolMessageHandler = clientProtocolMessageHandler;
+    this.clientProtocolService = clientProtocolService;
     // register DSFID types first; invoked explicitly so that all message type
     // initializations do not happen in first deserialization on a possibly
     // "precious" thread
@@ -381,10 +382,21 @@ public class TcpServer {
         if (gossipVersion == NON_GOSSIP_REQUEST_VERSION) {
           if (input.readUnsignedByte() == PROTOBUF_CLIENT_SERVER_PROTOCOL
               && Boolean.getBoolean("geode.feature-protobuf-protocol")) {
-            clientProtocolMessageHandler.getStatistics().clientConnected();
-            clientProtocolMessageHandler.receiveMessage(input, socket.getOutputStream(),
-                new MessageExecutionContext(internalLocator));
-            clientProtocolMessageHandler.getStatistics().clientDisconnected();
+            if (clientProtocolService == null) {
+              // this shouldn't happen.
+              log.error("Client protocol service not initialized but a request was received");
+              socket.close();
+              throw new IOException(
+                  "Client protocol service not initialized but a request was received");
+            } else {
+              try (ClientProtocolProcessor pipeline =
+                  clientProtocolService.createProcessorForLocator(internalLocator)) {
+                pipeline.processMessage(input, socket.getOutputStream());
+              } catch (IncompatibleVersionException e) {
+                // should not happen on the locator as there is no handshake.
+                log.error("Unexpected exception in client message processing", e);
+              }
+            }
           } else {
             rejectUnknownProtocolConnection(socket, gossipVersion);
           }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index c660b68..704369e 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -1463,7 +1463,7 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
     ServerConnection serverConn =
         serverConnectionFactory.makeServerConnection(socket, this.cache, this.crHelper, this.stats,
             AcceptorImpl.handShakeTimeout, this.socketBufferSize, communicationMode.toString(),
-            communicationMode.getModeNumber(), this, this.securityService, this.getBindAddress());
+            communicationMode.getModeNumber(), this, this.securityService);
 
     synchronized (this.allSCsLock) {
       this.allSCs.add(serverConn);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolProcessor.java
similarity index 55%
copy from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
copy to geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolProcessor.java
index b8969e1..96d2a89 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolProcessor.java
@@ -19,23 +19,25 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
-import org.apache.geode.Statistics;
-import org.apache.geode.StatisticsFactory;
-
+import org.apache.geode.cache.IncompatibleVersionException;
 
 /**
- * This is an interface that other modules can implement to hook into
- * {@link GenericProtocolServerConnection} to handle messages sent to Geode.
- *
- * Currently, only one {@link ClientProtocolMessageHandler} at a time can be used in a Geode
- * instance. It gets wired into {@link ServerConnectionFactory} to create all instances of
- * {@link GenericProtocolServerConnection}.
+ * An interface that does the message handling part of a protocol for a particular connection. It
+ * does not manage the socket.
  */
-public interface ClientProtocolMessageHandler {
-  void initializeStatistics(String statisticsName, StatisticsFactory factory);
-
-  ClientProtocolStatistics getStatistics();
+public interface ClientProtocolProcessor extends AutoCloseable {
+  /**
+   * @throws IncompatibleVersionException if a client tries to connect with version that is
+   *         incompatible with the current version of the server.
+   */
+  void processMessage(InputStream inputStream, OutputStream outputStream)
+      throws IOException, IncompatibleVersionException;
 
-  void receiveMessage(InputStream inputStream, OutputStream outputStream,
-      MessageExecutionContext executionContext) throws IOException;
+  /**
+   * Close the pipeline, incrementing stats and releasing any resources.
+   *
+   * This declaration narrows the exception type to be IOException.
+   */
+  @Override
+  void close();
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolService.java
similarity index 51%
copy from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
copy to geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolService.java
index b8969e1..64f5365 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolService.java
@@ -15,27 +15,30 @@
 
 package org.apache.geode.internal.cache.tier.sockets;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.geode.Statistics;
 import org.apache.geode.StatisticsFactory;
-
+import org.apache.geode.cache.Cache;
+import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.security.server.Authenticator;
 
 /**
- * This is an interface that other modules can implement to hook into
- * {@link GenericProtocolServerConnection} to handle messages sent to Geode.
- *
- * Currently, only one {@link ClientProtocolMessageHandler} at a time can be used in a Geode
- * instance. It gets wired into {@link ServerConnectionFactory} to create all instances of
- * {@link GenericProtocolServerConnection}.
+ * Provides a convenient location for a client protocol service to be loaded into the system.
  */
-public interface ClientProtocolMessageHandler {
+public interface ClientProtocolService {
   void initializeStatistics(String statisticsName, StatisticsFactory factory);
 
-  ClientProtocolStatistics getStatistics();
+  /**
+   *
+   * The pipeline MUST use an available authenticator for authentication of all operations once the
+   * handshake has happened.
+   *
+   * @param availableAuthenticators A list of valid authenticators for the current system.
+   */
+  ClientProtocolProcessor createProcessorForCache(Cache cache,
+      Authenticator availableAuthenticators, SecurityService securityService);
 
-  void receiveMessage(InputStream inputStream, OutputStream outputStream,
-      MessageExecutionContext executionContext) throws IOException;
+  /**
+   * Create a locator pipeline. The locator does not currently provide any authentication.
+   */
+  ClientProtocolProcessor createProcessorForLocator(InternalLocator locator);
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageHandlerFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolServiceLoader.java
similarity index 62%
rename from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageHandlerFactory.java
rename to geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolServiceLoader.java
index 2aca8c2..7e1e3f5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageHandlerFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolServiceLoader.java
@@ -18,17 +18,23 @@ package org.apache.geode.internal.cache.tier.sockets;
 import java.util.Iterator;
 import java.util.ServiceLoader;
 
-public class MessageHandlerFactory {
-  public ClientProtocolMessageHandler makeMessageHandler() {
-    ServiceLoader<ClientProtocolMessageHandler> loader =
-        ServiceLoader.load(ClientProtocolMessageHandler.class);
-    Iterator<ClientProtocolMessageHandler> iterator = loader.iterator();
+public class ClientProtocolServiceLoader {
+  public ClientProtocolService loadService() {
+    ServiceLoader<ClientProtocolService> loader = ServiceLoader.load(ClientProtocolService.class);
+    Iterator<ClientProtocolService> iterator = loader.iterator();
 
     if (!iterator.hasNext()) {
       throw new ServiceLoadingFailureException(
-          "There is no ClientProtocolMessageHandler implementation found in JVM");
+          "There is no ClientProtocolService implementation found in JVM");
     }
 
-    return iterator.next();
+    ClientProtocolService service = iterator.next();
+
+    if (iterator.hasNext()) {
+      throw new ServiceLoadingFailureException(
+          "There is more than one ClientProtocolService implementation found in JVM; aborting");
+    }
+
+    return service;
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
index eb401b9..5be6cac 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
@@ -22,6 +22,7 @@ import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 
+import org.apache.geode.cache.IncompatibleVersionException;
 import org.apache.geode.cache.client.PoolFactory;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.ServerLocation;
@@ -31,17 +32,13 @@ import org.apache.geode.internal.cache.tier.Acceptor;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.cache.tier.CommunicationMode;
 import org.apache.geode.internal.security.SecurityService;
-import org.apache.geode.security.SecurityManager;
-import org.apache.geode.security.server.Authenticator;
 
 /**
  * Holds the socket and protocol handler for the new client protocol.
  */
 public class GenericProtocolServerConnection extends ServerConnection {
   // The new protocol lives in a separate module and gets loaded when this class is instantiated.
-  private final ClientProtocolMessageHandler messageHandler;
-  private final SecurityManager securityManager;
-  private final Authenticator authenticator;
+  private final ClientProtocolProcessor protocolPipeline;
   private boolean cleanedUp;
   private ClientProxyMembershipID clientProxyMembershipID;
 
@@ -51,14 +48,11 @@ public class GenericProtocolServerConnection extends ServerConnection {
    */
   public GenericProtocolServerConnection(Socket socket, InternalCache c, CachedRegionHelper helper,
       CacheServerStats stats, int hsTimeout, int socketBufferSize, String communicationModeStr,
-      byte communicationMode, Acceptor acceptor, ClientProtocolMessageHandler newClientProtocol,
-      SecurityService securityService, Authenticator authenticator) {
+      byte communicationMode, Acceptor acceptor, ClientProtocolProcessor clientProtocolProcessor,
+      SecurityService securityService) {
     super(socket, c, helper, stats, hsTimeout, socketBufferSize, communicationModeStr,
         communicationMode, acceptor, securityService);
-    securityManager = securityService.getSecurityManager();
-    this.messageHandler = newClientProtocol;
-    this.authenticator = authenticator;
-    this.messageHandler.getStatistics().clientConnected();
+    this.protocolPipeline = clientProtocolProcessor;
 
     setClientProxyMembershipId();
 
@@ -72,17 +66,12 @@ public class GenericProtocolServerConnection extends ServerConnection {
       InputStream inputStream = socket.getInputStream();
       OutputStream outputStream = socket.getOutputStream();
 
-      if (!authenticator.isAuthenticated()) {
-        authenticator.authenticate(inputStream, outputStream, securityManager);
-      } else {
-        messageHandler.receiveMessage(inputStream, outputStream, new MessageExecutionContext(
-            this.getCache(), authenticator.getAuthorizer(), messageHandler.getStatistics()));
-      }
+      protocolPipeline.processMessage(inputStream, outputStream);
     } catch (EOFException e) {
       this.setFlagProcessMessagesAsFalse();
       setClientDisconnectedException(e);
       logger.debug("Encountered EOF while processing message: {}", e);
-    } catch (IOException e) {
+    } catch (IOException | IncompatibleVersionException e) {
       logger.warn(e);
       this.setFlagProcessMessagesAsFalse();
       setClientDisconnectedException(e);
@@ -105,7 +94,7 @@ public class GenericProtocolServerConnection extends ServerConnection {
     synchronized (this) {
       if (!cleanedUp) {
         cleanedUp = true;
-        messageHandler.getStatistics().clientDisconnected();
+        protocolPipeline.close();
       }
     }
     return super.cleanup();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
index 8f3a8c3..e550700 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
@@ -18,7 +18,6 @@ package org.apache.geode.internal.cache.tier.sockets;
 import static org.apache.geode.internal.cache.tier.CommunicationMode.ProtobufClientServerProtocol;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.Socket;
 import java.util.HashMap;
 import java.util.Map;
@@ -35,7 +34,7 @@ import org.apache.geode.security.server.Authenticator;
  * Creates instances of ServerConnection based on the connection mode provided.
  */
 public class ServerConnectionFactory {
-  private ClientProtocolMessageHandler protocolHandler;
+  private volatile ClientProtocolService clientProtocolService;
   private Map<String, Class<? extends Authenticator>> authenticators = null;
 
   public ServerConnectionFactory() {}
@@ -44,7 +43,7 @@ public class ServerConnectionFactory {
     if (authenticators != null) {
       return;
     }
-    HashMap tmp = new HashMap<>();
+    HashMap<String, Class<? extends Authenticator>> tmp = new HashMap<>();
 
     ServiceLoader<Authenticator> loader = ServiceLoader.load(Authenticator.class);
     for (Authenticator streamAuthenticator : loader) {
@@ -54,17 +53,18 @@ public class ServerConnectionFactory {
     authenticators = tmp;
   }
 
-  private synchronized ClientProtocolMessageHandler initializeMessageHandler(
+  private synchronized ClientProtocolService initializeClientProtocolService(
       StatisticsFactory statisticsFactory, String statisticsName) {
-    if (protocolHandler != null) {
-      return protocolHandler;
+    if (clientProtocolService != null) {
+      return clientProtocolService;
     }
 
-    ClientProtocolMessageHandler tempHandler = new MessageHandlerFactory().makeMessageHandler();
-    tempHandler.initializeStatistics(statisticsName, statisticsFactory);
+    // use temp to make sure we publish properly.
+    ClientProtocolService tmp = new ClientProtocolServiceLoader().loadService();
+    tmp.initializeStatistics(statisticsName, statisticsFactory);
 
-    protocolHandler = tempHandler;
-    return protocolHandler;
+    clientProtocolService = tmp;
+    return clientProtocolService;
   }
 
   private Authenticator findStreamAuthenticator(String implementationID) {
@@ -86,34 +86,51 @@ public class ServerConnectionFactory {
     }
   }
 
-  private ClientProtocolMessageHandler getOrCreateClientProtocolMessageHandler(
+  private ClientProtocolService getOrCreateClientProtocolService(
       StatisticsFactory statisticsFactory, String serverName) {
-    if (protocolHandler == null) {
-      return initializeMessageHandler(statisticsFactory, serverName);
+    if (clientProtocolService == null) {
+      return initializeClientProtocolService(statisticsFactory, serverName);
     }
-    return protocolHandler;
+    return clientProtocolService;
   }
 
   public ServerConnection makeServerConnection(Socket socket, InternalCache cache,
       CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int socketBufferSize,
       String communicationModeStr, byte communicationMode, Acceptor acceptor,
-      SecurityService securityService, InetAddress bindAddress) throws IOException {
+      SecurityService securityService) throws IOException {
     if (communicationMode == ProtobufClientServerProtocol.getModeNumber()) {
       if (!Boolean.getBoolean("geode.feature-protobuf-protocol")) {
         throw new IOException("Server received unknown communication mode: " + communicationMode);
       } else {
-        String authenticationMode =
-            System.getProperty("geode.protocol-authentication-mode", "NOOP");
-
-        return new GenericProtocolServerConnection(socket, cache, helper, stats, hsTimeout,
-            socketBufferSize, communicationModeStr, communicationMode, acceptor,
-            getOrCreateClientProtocolMessageHandler(cache.getDistributedSystem(),
-                acceptor.getServerName()),
-            securityService, findStreamAuthenticator(authenticationMode));
+        try {
+          String authenticationMode =
+              System.getProperty("geode.protocol-authentication-mode", "NOOP");
+
+          return createGenericProtocolServerConnection(socket, cache, helper, stats, hsTimeout,
+              socketBufferSize, communicationModeStr, communicationMode, acceptor, securityService,
+              authenticationMode);
+        } catch (ServiceLoadingFailureException ex) {
+          throw new IOException("Could not load protobuf client protocol", ex);
+        }
       }
     } else {
       return new LegacyServerConnection(socket, cache, helper, stats, hsTimeout, socketBufferSize,
           communicationModeStr, communicationMode, acceptor, securityService);
     }
   }
+
+  private ServerConnection createGenericProtocolServerConnection(Socket socket, InternalCache cache,
+      CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int socketBufferSize,
+      String communicationModeStr, byte communicationMode, Acceptor acceptor,
+      SecurityService securityService, String authenticationMode) {
+    ClientProtocolService service =
+        getOrCreateClientProtocolService(cache.getDistributedSystem(), acceptor.getServerName());
+
+    ClientProtocolProcessor processor = service.createProcessorForCache(cache,
+        findStreamAuthenticator(authenticationMode), securityService);
+
+    return new GenericProtocolServerConnection(socket, cache, helper, stats, hsTimeout,
+        socketBufferSize, communicationModeStr, communicationMode, acceptor, processor,
+        securityService);
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/TcpServerFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/TcpServerFactory.java
index 959e6e2..bf442f8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/TcpServerFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/TcpServerFactory.java
@@ -20,6 +20,7 @@ import java.util.Properties;
 
 import org.apache.logging.log4j.Logger;
 
+import org.apache.geode.cache.IncompatibleVersionException;
 import org.apache.geode.distributed.internal.DistributionConfigImpl;
 import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.distributed.internal.PoolStatHelper;
@@ -28,15 +29,17 @@ import org.apache.geode.distributed.internal.tcpserver.TcpServer;
 import org.apache.geode.internal.logging.LogService;
 
 public class TcpServerFactory {
-  private ClientProtocolMessageHandler protocolHandler;
+  private final ClientProtocolService clientProtocolService;
   static final Logger logger = LogService.getLogger();
 
   public TcpServerFactory() {
+    ClientProtocolService service = null;
     try {
-      protocolHandler = new MessageHandlerFactory().makeMessageHandler();
+      service = new ClientProtocolServiceLoader().loadService();
     } catch (ServiceLoadingFailureException ex) {
-      logger.warn(ex.getMessage());
+      logger.warn("Could not load client protocol", ex);
     }
+    clientProtocolService = service;
   }
 
   public TcpServer makeTcpServer(int port, InetAddress bind_address, Properties sslConfig,
@@ -44,6 +47,6 @@ public class TcpServerFactory {
       ThreadGroup threadGroup, String threadName, InternalLocator internalLocator) {
 
     return new TcpServer(port, bind_address, sslConfig, cfg, handler, poolHelper, threadGroup,
-        threadName, internalLocator, protocolHandler);
+        threadName, internalLocator, clientProtocolService);
   }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
index 56d3770..a3f34b0 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
@@ -20,6 +20,8 @@ import org.apache.geode.internal.cache.tier.CommunicationMode;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.test.junit.categories.UnitTest;
+
+import org.assertj.core.api.Assertions;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.contrib.java.lang.system.RestoreSystemProperties;
@@ -59,14 +61,16 @@ public class ServerConnectionFactoryTest {
   }
 
   /**
-   * @throws ServiceLoadingFailureException because the service is implemented in a different
-   *         module, and when this unit test is run, that module won't be present.
+   * @throws IOException caused by ServiceLoadingFailureException because the service is implemented
+   *         in a different module, and when this unit test is run, that module won't be present.
    */
-  @Test(expected = ServiceLoadingFailureException.class)
+  @Test
   public void newClientProtocolFailsWithSystemPropertySet() throws IOException {
-    System.setProperty("geode.feature-protobuf-protocol", "true");
-    ServerConnection serverConnection = serverConnectionMockedExceptForCommunicationMode(
-        CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+    Assertions.assertThatThrownBy(() -> {
+      System.setProperty("geode.feature-protobuf-protocol", "true");
+      ServerConnection serverConnection = serverConnectionMockedExceptForCommunicationMode(
+          CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+    }).hasRootCauseInstanceOf(ServiceLoadingFailureException.class);
   }
 
   @Test
@@ -107,7 +111,7 @@ public class ServerConnectionFactoryTest {
 
     return new ServerConnectionFactory().makeServerConnection(socketMock, mock(InternalCache.class),
         mock(CachedRegionHelper.class), mock(CacheServerStats.class), 0, 0, "", communicationMode,
-        mock(AcceptorImpl.class), mock(SecurityService.class), InetAddress.getLocalHost());
+        mock(AcceptorImpl.class), mock(SecurityService.class));
   }
 
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
index a4ebbac..bd23223 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
@@ -82,9 +82,9 @@ public class ServerConnectionTest {
     InternalCache cache = mock(InternalCache.class);
     SecurityService securityService = mock(SecurityService.class);
 
-    serverConnection = new ServerConnectionFactory().makeServerConnection(socket, cache, null, null,
-        0, 0, null, CommunicationMode.PrimaryServerToClient.getModeNumber(), acceptor,
-        securityService, InetAddress.getLocalHost());
+    serverConnection =
+        new ServerConnectionFactory().makeServerConnection(socket, cache, null, null, 0, 0, null,
+            CommunicationMode.PrimaryServerToClient.getModeNumber(), acceptor, securityService);
     MockitoAnnotations.initMocks(this);
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
similarity index 92%
rename from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
rename to geode-protobuf/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
index b8969e1..1d86d70 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
@@ -32,10 +32,6 @@ import org.apache.geode.StatisticsFactory;
  * {@link GenericProtocolServerConnection}.
  */
 public interface ClientProtocolMessageHandler {
-  void initializeStatistics(String statisticsName, StatisticsFactory factory);
-
-  ClientProtocolStatistics getStatistics();
-
   void receiveMessage(InputStream inputStream, OutputStream outputStream,
       MessageExecutionContext executionContext) throws IOException;
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java b/geode-protobuf/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java
similarity index 85%
rename from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java
rename to geode-protobuf/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java
index fd1ed4d..cce750c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java
@@ -20,6 +20,8 @@ import org.apache.geode.cache.Cache;
 import org.apache.geode.distributed.Locator;
 import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
+import org.apache.geode.internal.protocol.protobuf.statistics.NoOpStatistics;
+import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatistics;
 import org.apache.geode.security.server.Authorizer;
 import org.apache.geode.security.server.NoOpAuthorizer;
 
@@ -27,23 +29,23 @@ import org.apache.geode.security.server.NoOpAuthorizer;
 public class MessageExecutionContext {
   private Cache cache;
   private Locator locator;
-  private Authorizer authorizer;
-  private ClientProtocolStatistics statistics;
+  private final Authorizer authorizer;
+  private final ProtobufClientStatistics statistics;
 
 
   public MessageExecutionContext(Cache cache, Authorizer streamAuthorizer,
-      ClientProtocolStatistics statistics) {
+      ProtobufClientStatistics statistics) {
     this.cache = cache;
     this.authorizer = streamAuthorizer;
     this.statistics = statistics;
   }
 
-  public MessageExecutionContext(InternalLocator locator) {
+  public MessageExecutionContext(InternalLocator locator, ProtobufClientStatistics statistics) {
     this.locator = locator;
     // set a no-op authorizer until such time as locators implement authentication
     // and authorization checks
     this.authorizer = new NoOpAuthorizer();
-    this.statistics = new NoOpStatistics();
+    this.statistics = statistics;
   }
 
   /**
@@ -86,7 +88,7 @@ public class MessageExecutionContext {
    * Returns the statistics for recording operation stats. In a unit test environment this may not
    * be a protocol-specific statistics implementation.
    */
-  public ClientProtocolStatistics getStatistics() {
+  public ProtobufClientStatistics getStatistics() {
     return statistics;
   }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/ProtobufCachePipeline.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/ProtobufCachePipeline.java
new file mode 100644
index 0000000..36bb477
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/ProtobufCachePipeline.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.protocol;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.geode.annotations.Experimental;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.IncompatibleVersionException;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolProcessor;
+import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
+import org.apache.geode.internal.protocol.protobuf.ProtobufStreamProcessor;
+import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatistics;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.security.server.Authenticator;
+
+@Experimental
+public final class ProtobufCachePipeline implements ClientProtocolProcessor {
+  private final ProtobufClientStatistics statistics;
+  private final Cache cache;
+  private final SecurityService securityService;
+  private final ProtobufStreamProcessor streamProcessor;
+  private final Authenticator authenticator;
+
+  ProtobufCachePipeline(ProtobufStreamProcessor protobufStreamProcessor,
+      ProtobufClientStatistics statistics, Cache cache, Authenticator authenticator,
+      SecurityService securityService) {
+    this.streamProcessor = protobufStreamProcessor;
+    this.statistics = statistics;
+    this.cache = cache;
+    this.authenticator = authenticator;
+    this.securityService = securityService;
+    this.statistics.clientConnected();
+  }
+
+  @Override
+  public void processMessage(InputStream inputStream, OutputStream outputStream)
+      throws IOException, IncompatibleVersionException {
+    if (!authenticator.isAuthenticated()) {
+      authenticator.authenticate(inputStream, outputStream, securityService.getSecurityManager());
+    } else {
+      streamProcessor.receiveMessage(inputStream, outputStream,
+          new MessageExecutionContext(cache, authenticator.getAuthorizer(), statistics));
+    }
+  }
+
+  @Override
+  public void close() {
+    this.statistics.clientDisconnected();
+  }
+}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/ProtobufLocatorPipeline.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/ProtobufLocatorPipeline.java
new file mode 100644
index 0000000..f4ed9e2
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/ProtobufLocatorPipeline.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.protocol;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.geode.annotations.Experimental;
+import org.apache.geode.cache.IncompatibleVersionException;
+import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolProcessor;
+import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
+import org.apache.geode.internal.protocol.protobuf.ProtobufStreamProcessor;
+import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatistics;
+
+@Experimental
+public final class ProtobufLocatorPipeline implements ClientProtocolProcessor {
+  private final ProtobufClientStatistics statistics;
+  private final InternalLocator locator;
+  private final ProtobufStreamProcessor streamProcessor;
+
+  ProtobufLocatorPipeline(ProtobufStreamProcessor protobufStreamProcessor,
+      ProtobufClientStatistics statistics, InternalLocator locator) {
+    this.streamProcessor = protobufStreamProcessor;
+    this.statistics = statistics;
+    this.locator = locator;
+    this.statistics.clientConnected();
+  }
+
+  @Override
+  public void processMessage(InputStream inputStream, OutputStream outputStream)
+      throws IOException, IncompatibleVersionException {
+    streamProcessor.receiveMessage(inputStream, outputStream,
+        new MessageExecutionContext(locator, statistics));
+  }
+
+  @Override
+  public void close() {
+    this.statistics.clientDisconnected();
+  }
+}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/ProtobufProtocolService.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/ProtobufProtocolService.java
new file mode 100644
index 0000000..25df248
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/ProtobufProtocolService.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol;
+
+import org.apache.geode.StatisticsFactory;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolProcessor;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolService;
+import org.apache.geode.internal.protocol.protobuf.ProtobufStreamProcessor;
+import org.apache.geode.internal.protocol.protobuf.statistics.NoOpStatistics;
+import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatistics;
+import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatisticsImpl;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.security.server.Authenticator;
+
+public class ProtobufProtocolService implements ClientProtocolService {
+  private volatile ProtobufClientStatistics statistics;
+  private final ProtobufStreamProcessor protobufStreamProcessor = new ProtobufStreamProcessor();
+
+  @Override
+  public synchronized void initializeStatistics(String statisticsName, StatisticsFactory factory) {
+    statistics = new ProtobufClientStatisticsImpl(factory, statisticsName,
+        ProtobufClientStatistics.PROTOBUF_STATS_NAME);
+  }
+
+  /**
+   * For internal use. This is necessary because the statistics may get initialized in another
+   * thread.
+   */
+  private ProtobufClientStatistics getStatistics() {
+    if (statistics == null) {
+      return new NoOpStatistics();
+    }
+    return statistics;
+  }
+
+  @Override
+  public ClientProtocolProcessor createProcessorForCache(Cache cache, Authenticator authenticator,
+      SecurityService securityService) {
+    return new ProtobufCachePipeline(protobufStreamProcessor, getStatistics(), cache, authenticator,
+        securityService);
+  }
+
+  @Override
+  public ClientProtocolProcessor createProcessorForLocator(InternalLocator locator) {
+    return new ProtobufLocatorPipeline(protobufStreamProcessor, getStatistics(), locator);
+  }
+}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufStreamProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufStreamProcessor.java
index 5e06875..3571821 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufStreamProcessor.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufStreamProcessor.java
@@ -43,7 +43,6 @@ import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufUtilities;
 public class ProtobufStreamProcessor implements ClientProtocolMessageHandler {
   private final ProtobufProtocolSerializer protobufProtocolSerializer;
   private final ProtobufOpsProcessor protobufOpsProcessor;
-  private ProtobufClientStatistics statistics;
   private static final Logger logger = LogService.getLogger();
 
   public ProtobufStreamProcessor() {
@@ -53,16 +52,6 @@ public class ProtobufStreamProcessor implements ClientProtocolMessageHandler {
   }
 
   @Override
-  public void initializeStatistics(String statisticsName, StatisticsFactory factory) {
-    statistics = new ProtobufClientStatisticsImpl(factory, statisticsName, "ProtobufServerStats");
-  }
-
-  @Override
-  public ClientProtocolStatistics getStatistics() {
-    return statistics;
-  }
-
-  @Override
   public void receiveMessage(InputStream inputStream, OutputStream outputStream,
       MessageExecutionContext executionContext) throws IOException {
     try {
@@ -81,6 +70,7 @@ public class ProtobufStreamProcessor implements ClientProtocolMessageHandler {
       logger.debug(errorMessage);
       throw new EOFException(errorMessage);
     }
+    ProtobufClientStatistics statistics = executionContext.getStatistics();
     statistics.messageReceived(message.getSerializedSize());
 
     ClientProtocol.Request request = message.getRequest();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NoOpStatistics.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/statistics/NoOpStatistics.java
similarity index 70%
rename from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NoOpStatistics.java
rename to geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/statistics/NoOpStatistics.java
index d04db47..e06ea8d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NoOpStatistics.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/statistics/NoOpStatistics.java
@@ -12,9 +12,9 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.internal.cache.tier.sockets;
+package org.apache.geode.internal.protocol.protobuf.statistics;
 
-public class NoOpStatistics implements ClientProtocolStatistics {
+public class NoOpStatistics implements ProtobufClientStatistics {
   @Override
   public void clientConnected() {
 
@@ -24,4 +24,24 @@ public class NoOpStatistics implements ClientProtocolStatistics {
   public void clientDisconnected() {
 
   }
+
+  @Override
+  public void messageReceived(int bytes) {
+
+  }
+
+  @Override
+  public void messageSent(int bytes) {
+
+  }
+
+  @Override
+  public void incAuthorizationViolations() {
+
+  }
+
+  @Override
+  public void incAuthenticationFailures() {
+
+  }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/statistics/ProtobufClientStatistics.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/statistics/ProtobufClientStatistics.java
index 95f2180..afd9648 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/statistics/ProtobufClientStatistics.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/statistics/ProtobufClientStatistics.java
@@ -17,15 +17,17 @@ package org.apache.geode.internal.protocol.protobuf.statistics;
 import org.apache.geode.internal.cache.tier.sockets.ClientProtocolStatistics;
 
 public interface ProtobufClientStatistics extends ClientProtocolStatistics {
-  public void clientConnected();
+  String PROTOBUF_STATS_NAME = "ProtobufStats";
 
-  public void clientDisconnected();
+  void clientConnected();
 
-  public void messageReceived(int bytes);
+  void clientDisconnected();
 
-  public void messageSent(int bytes);
+  void messageReceived(int bytes);
 
-  public void incAuthorizationViolations();
+  void messageSent(int bytes);
 
-  public void incAuthenticationFailures();
+  void incAuthorizationViolations();
+
+  void incAuthenticationFailures();
 }
diff --git a/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler b/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler
deleted file mode 100644
index a3a005e..0000000
--- a/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.geode.internal.protocol.protobuf.ProtobufStreamProcessor
\ No newline at end of file
diff --git a/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolService b/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolService
new file mode 100644
index 0000000..207426a
--- /dev/null
+++ b/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolService
@@ -0,0 +1 @@
+org.apache.geode.internal.protocol.ProtobufProtocolService
\ No newline at end of file
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
index b35d923..98aa1b9 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
@@ -33,13 +33,12 @@ import org.junit.experimental.categories.Category;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
+import org.apache.geode.cache.IncompatibleVersionException;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.cache.tier.CommunicationMode;
 import org.apache.geode.internal.security.SecurityService;
-import org.apache.geode.internal.protocol.protobuf.statistics.NoOpProtobufStatistics;
-import org.apache.geode.security.server.NoOpAuthenticator;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
@@ -48,7 +47,7 @@ public class GenericProtocolServerConnectionTest {
   private ClientHealthMonitor clientHealthMonitorMock;
 
   @Test
-  public void testProcessFlag() throws IOException {
+  public void testProcessFlag() throws Exception {
     ServerConnection serverConnection = IOExceptionThrowingServerConnection();
     Assert.assertTrue(serverConnection.processMessages);
     serverConnection.doOneMessage();
@@ -61,9 +60,9 @@ public class GenericProtocolServerConnectionTest {
     when(socketMock.getInetAddress()).thenReturn(InetAddress.getByName("localhost"));
 
     AcceptorImpl acceptorStub = mock(AcceptorImpl.class);
-    ClientProtocolMessageHandler mockHandler = mock(ClientProtocolMessageHandler.class);
+    ClientProtocolProcessor clientProtocolProcessorMock = mock(ClientProtocolProcessor.class);
     GenericProtocolServerConnection genericProtocolServerConnection =
-        getServerConnection(socketMock, mockHandler, acceptorStub);
+        getServerConnection(socketMock, clientProtocolProcessorMock, acceptorStub);
 
     genericProtocolServerConnection.emergencyClose();
 
@@ -74,9 +73,9 @@ public class GenericProtocolServerConnectionTest {
   public void testClientHealthMonitorRegistration() throws UnknownHostException {
     AcceptorImpl acceptorStub = mock(AcceptorImpl.class);
 
-    ClientProtocolMessageHandler clientProtocolMock = mock(ClientProtocolMessageHandler.class);
+    ClientProtocolProcessor clientProtocolProcessor = mock(ClientProtocolProcessor.class);
 
-    ServerConnection serverConnection = getServerConnection(clientProtocolMock, acceptorStub);
+    ServerConnection serverConnection = getServerConnection(clientProtocolProcessor, acceptorStub);
 
     ArgumentCaptor<ClientProxyMembershipID> registerCpmidArgumentCaptor =
         ArgumentCaptor.forClass(ClientProxyMembershipID.class);
@@ -96,9 +95,9 @@ public class GenericProtocolServerConnectionTest {
   @Test
   public void testDoOneMessageNotifiesClientHealthMonitor() throws UnknownHostException {
     AcceptorImpl acceptorStub = mock(AcceptorImpl.class);
-    ClientProtocolMessageHandler clientProtocolMock = mock(ClientProtocolMessageHandler.class);
+    ClientProtocolProcessor clientProtocolProcessor = mock(ClientProtocolProcessor.class);
 
-    ServerConnection serverConnection = getServerConnection(clientProtocolMock, acceptorStub);
+    ServerConnection serverConnection = getServerConnection(clientProtocolProcessor, acceptorStub);
     serverConnection.doOneMessage();
 
     ArgumentCaptor<ClientProxyMembershipID> clientProxyMembershipIDArgumentCaptor =
@@ -108,17 +107,15 @@ public class GenericProtocolServerConnectionTest {
         clientProxyMembershipIDArgumentCaptor.getValue().toString());
   }
 
-  private GenericProtocolServerConnection IOExceptionThrowingServerConnection() throws IOException {
-    ClientProtocolMessageHandler clientProtocolMock = mock(ClientProtocolMessageHandler.class);
-    ClientProtocolStatistics statisticsMock = mock(ClientProtocolStatistics.class);
-    when(clientProtocolMock.getStatistics()).thenReturn(statisticsMock);
-    doThrow(new IOException()).when(clientProtocolMock).receiveMessage(any(), any(), any());
-
-    return getServerConnection(clientProtocolMock, mock(AcceptorImpl.class));
+  private GenericProtocolServerConnection IOExceptionThrowingServerConnection()
+      throws IOException, IncompatibleVersionException {
+    ClientProtocolProcessor clientProtocolProcessor = mock(ClientProtocolProcessor.class);
+    doThrow(new IOException()).when(clientProtocolProcessor).processMessage(any(), any());
+    return getServerConnection(clientProtocolProcessor, mock(AcceptorImpl.class));
   }
 
   private GenericProtocolServerConnection getServerConnection(Socket socketMock,
-      ClientProtocolMessageHandler clientProtocolMock, AcceptorImpl acceptorStub)
+      ClientProtocolProcessor clientProtocolProcessorMock, AcceptorImpl acceptorStub)
       throws UnknownHostException {
     clientHealthMonitorMock = mock(ClientHealthMonitor.class);
     when(acceptorStub.getClientHealthMonitor()).thenReturn(clientHealthMonitorMock);
@@ -128,18 +125,16 @@ public class GenericProtocolServerConnectionTest {
     when(socketMock.getRemoteSocketAddress()).thenReturn(inetSocketAddressStub);
     when(socketMock.getInetAddress()).thenReturn(inetAddressStub);
 
-    when(clientProtocolMock.getStatistics()).thenReturn(new NoOpProtobufStatistics());
-
     return new GenericProtocolServerConnection(socketMock, mock(InternalCache.class),
         mock(CachedRegionHelper.class), mock(CacheServerStats.class), 0, 0, "",
         CommunicationMode.ProtobufClientServerProtocol.getModeNumber(), acceptorStub,
-        clientProtocolMock, mock(SecurityService.class), new NoOpAuthenticator());
+        clientProtocolProcessorMock, mock(SecurityService.class));
   }
 
   private GenericProtocolServerConnection getServerConnection(
-      ClientProtocolMessageHandler clientProtocolMock, AcceptorImpl acceptorStub)
+      ClientProtocolProcessor clientProtocolProcessorMock, AcceptorImpl acceptorStub)
       throws UnknownHostException {
     Socket socketMock = mock(Socket.class);
-    return getServerConnection(socketMock, clientProtocolMock, acceptorStub);
+    return getServerConnection(socketMock, clientProtocolProcessorMock, acceptorStub);
   }
 }
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionJUnitTest.java
index d4f90a5..99b0e1f 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionJUnitTest.java
@@ -66,6 +66,7 @@ import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
 import org.apache.geode.internal.protocol.protobuf.ProtobufSerializationService;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
 import org.apache.geode.internal.protocol.protobuf.serializer.ProtobufProtocolSerializer;
+import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatistics;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufUtilities;
 import org.apache.geode.internal.serialization.SerializationService;
 import org.apache.geode.internal.serialization.exception.UnsupportedEncodingTypeException;
@@ -172,10 +173,10 @@ public class CacheConnectionJUnitTest {
 
     InternalDistributedSystem distributedSystem =
         (InternalDistributedSystem) cache.getDistributedSystem();
-    Statistics[] protobufServerStats =
-        distributedSystem.findStatisticsByType(distributedSystem.findType("ProtobufServerStats"));
-    assertEquals(1, protobufServerStats.length);
-    Statistics statistics = protobufServerStats[0];
+    Statistics[] protobufStats = distributedSystem.findStatisticsByType(
+        distributedSystem.findType(ProtobufClientStatistics.PROTOBUF_STATS_NAME));
+    assertEquals(1, protobufStats.length);
+    Statistics statistics = protobufStats[0];
     assertEquals(1, statistics.get("currentClientConnections"));
     assertEquals(2L, statistics.get("messagesReceived"));
     assertEquals(2L, statistics.get("messagesSent"));
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/LocatorConnectionDUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/LocatorConnectionDUnitTest.java
index 1a3f654..1f4579c 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/LocatorConnectionDUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/LocatorConnectionDUnitTest.java
@@ -18,7 +18,6 @@ package org.apache.geode.internal.protocol.acceptance;
 import static org.apache.geode.internal.cache.tier.CommunicationMode.ProtobufClientServerProtocol;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -36,17 +35,17 @@ import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.distributed.Locator;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
-import org.apache.geode.internal.protocol.protobuf.ServerAPI;
 import org.apache.geode.internal.protocol.exception.InvalidProtocolMessageException;
+import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
 import org.apache.geode.internal.protocol.protobuf.ProtocolErrorCode;
+import org.apache.geode.internal.protocol.protobuf.ServerAPI;
 import org.apache.geode.internal.protocol.protobuf.serializer.ProtobufProtocolSerializer;
+import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatistics;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufRequestUtilities;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufUtilities;
 import org.apache.geode.test.dunit.DistributedTestUtils;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.IgnoredException;
-import org.apache.geode.test.dunit.RMIException;
 import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
 import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
 import org.apache.geode.test.junit.categories.DistributedTest;
@@ -63,9 +62,9 @@ public class LocatorConnectionDUnitTest extends JUnit4CacheTestCase {
 
   @Before
   public void setup() throws IOException {
-    startCacheWithCacheServer();
-
     Host.getLocator().invoke(() -> System.setProperty("geode.feature-protobuf-protocol", "true"));
+
+    startCacheWithCacheServer();
   }
 
   private Socket createSocket() throws IOException {
@@ -89,52 +88,35 @@ public class LocatorConnectionDUnitTest extends JUnit4CacheTestCase {
             protobufRequestBuilder.setGetAvailableServersRequest(
                 ProtobufRequestUtilities.createGetAvailableServersRequest()).build());
 
-    try {
-      ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
+    ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
+
+    testSocketWithStats(getAvailableServersRequestMessage, protobufProtocolSerializer);
+
+    testSocketWithStats(getAvailableServersRequestMessage, protobufProtocolSerializer);
+  }
+
+  private void testSocketWithStats(ClientProtocol.Message getAvailableServersRequestMessage,
+      ProtobufProtocolSerializer protobufProtocolSerializer)
+      throws IOException, InvalidProtocolMessageException {
+    try (Socket socket = createSocket()) {
+      long messagesReceived = getMessagesReceived();
+      long messagesSent = getMessagesSent();
+      long bytesReceived = getBytesReceived();
+      long bytesSent = getBytesSent();
+      int clientConnectionStarts = getClientConnectionStarts();
+      int clientConnectionTerminations = getClientConnectionTerminations();
 
-      try (Socket socket = createSocket()) {
-        long messagesReceived = getMessagesReceived();
-        long messagesSent = getMessagesSent();
-        long bytesReceived = getBytesReceived();
-        long bytesSent = getBytesSent();
-        int clientConnectionStarts = getClientConnectionStarts();
-        int clientConnectionTerminations = getClientConnectionTerminations();
-
-        protobufProtocolSerializer.serialize(getAvailableServersRequestMessage,
-            socket.getOutputStream());
-
-        ClientProtocol.Message getAvailableServersResponseMessage =
-            protobufProtocolSerializer.deserialize(socket.getInputStream());
-        validateGetAvailableServersResponse(getAvailableServersResponseMessage);
-
-        validateStats(messagesReceived + 1, messagesSent + 1,
-            bytesReceived + getAvailableServersRequestMessage.getSerializedSize(),
-            bytesSent + getAvailableServersResponseMessage.getSerializedSize(),
-            clientConnectionStarts, clientConnectionTerminations + 1);
-      }
-
-      try (Socket socket = createSocket()) {
-        long messagesReceived = getMessagesReceived();
-        long messagesSent = getMessagesSent();
-        long bytesReceived = getBytesReceived();
-        long bytesSent = getBytesSent();
-        int clientConnectionStarts = getClientConnectionStarts();
-        int clientConnectionTerminations = getClientConnectionTerminations();
-
-        protobufProtocolSerializer.serialize(getAvailableServersRequestMessage,
-            socket.getOutputStream());
-
-        ClientProtocol.Message getAvailableServersResponseMessage =
-            protobufProtocolSerializer.deserialize(socket.getInputStream());
-        validateGetAvailableServersResponse(getAvailableServersResponseMessage);
-
-        validateStats(messagesReceived + 1, messagesSent + 1,
-            bytesReceived + getAvailableServersRequestMessage.getSerializedSize(),
-            bytesSent + getAvailableServersResponseMessage.getSerializedSize(),
-            clientConnectionStarts, clientConnectionTerminations + 1);
-      }
-    } catch (RMIException e) {
-      throw e.getCause(); // so that assertions propagate properly.
+      protobufProtocolSerializer.serialize(getAvailableServersRequestMessage,
+          socket.getOutputStream());
+
+      ClientProtocol.Message getAvailableServersResponseMessage =
+          protobufProtocolSerializer.deserialize(socket.getInputStream());
+      validateGetAvailableServersResponse(getAvailableServersResponseMessage);
+
+      validateStats(messagesReceived + 1, messagesSent + 1,
+          bytesReceived + getAvailableServersRequestMessage.getSerializedSize(),
+          bytesSent + getAvailableServersResponseMessage.getSerializedSize(),
+          clientConnectionStarts, clientConnectionTerminations + 1);
     }
   }
 
@@ -187,8 +169,8 @@ public class LocatorConnectionDUnitTest extends JUnit4CacheTestCase {
     InternalDistributedSystem distributedSystem =
         (InternalDistributedSystem) Locator.getLocator().getDistributedSystem();
 
-    Statistics[] protobufServerStats =
-        distributedSystem.findStatisticsByType(distributedSystem.findType("ProtobufServerStats"));
+    Statistics[] protobufServerStats = distributedSystem.findStatisticsByType(
+        distributedSystem.findType(ProtobufClientStatistics.PROTOBUF_STATS_NAME));
     assertEquals(1, protobufServerStats.length);
     return protobufServerStats[0];
   }
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java
index 9b96dab..5f724d6 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java
@@ -25,6 +25,7 @@ import org.apache.geode.internal.protocol.protobuf.Result;
 import org.apache.geode.internal.protocol.protobuf.ServerAPI;
 import org.apache.geode.internal.protocol.protobuf.ServerAPI.GetAvailableServersResponse;
 import org.apache.geode.internal.protocol.protobuf.Success;
+import org.apache.geode.internal.protocol.protobuf.statistics.NoOpStatistics;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufRequestUtilities;
 import org.apache.geode.test.junit.categories.UnitTest;
 import org.junit.Before;
@@ -74,8 +75,9 @@ public class GetAvailableServersOperationHandlerJUnitTest extends OperationHandl
 
     ServerAPI.GetAvailableServersRequest getAvailableServersRequest =
         ProtobufRequestUtilities.createGetAvailableServersRequest();
-    Result operationHandlerResult = operationHandler.process(serializationServiceStub,
-        getAvailableServersRequest, new MessageExecutionContext(internalLocatorMock));
+    Result operationHandlerResult =
+        operationHandler.process(serializationServiceStub, getAvailableServersRequest,
+            new MessageExecutionContext(internalLocatorMock, new NoOpStatistics()));
     assertTrue(operationHandlerResult instanceof Success);
     ValidateGetAvailableServersResponse(
         (GetAvailableServersResponse) operationHandlerResult.getMessage());
@@ -88,8 +90,9 @@ public class GetAvailableServersOperationHandlerJUnitTest extends OperationHandl
 
     ServerAPI.GetAvailableServersRequest getAvailableServersRequest =
         ProtobufRequestUtilities.createGetAvailableServersRequest();
-    Result operationHandlerResult = operationHandler.process(serializationServiceStub,
-        getAvailableServersRequest, new MessageExecutionContext(internalLocatorMock));
+    Result operationHandlerResult =
+        operationHandler.process(serializationServiceStub, getAvailableServersRequest,
+            new MessageExecutionContext(internalLocatorMock, new NoOpStatistics()));
     assertTrue(operationHandlerResult instanceof Success);
     GetAvailableServersResponse availableServersResponse =
         (GetAvailableServersResponse) operationHandlerResult.getMessage();

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].