You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2017/10/03 19:46:30 UTC

[geode] 01/01: GEODE-3604: Completed rebase and refactor

This is an automated email from the ASF dual-hosted git repository.

udo pushed a commit to branch feature/GEODE-3604
in repository https://gitbox.apache.org/repos/asf/geode.git

commit c37a6dc1bb5b10158d70b8b5d2725324d363e12f
Author: kohlmu-pivotal <uk...@pivotal.io>
AuthorDate: Tue Oct 3 12:42:12 2017 -0700

    GEODE-3604: Completed rebase and refactor
---
 .../distributed/internal/InternalLocator.java      |   2 +-
 .../distributed/internal/tcpserver/TcpServer.java  |  17 +-
 .../geode/internal/cache/CacheServerImpl.java      |  10 +-
 .../internal/cache/tier/sockets/AcceptorImpl.java  |  75 ++++----
 .../sockets/GenericProtocolServerConnection.java   |  30 ++-
 .../tier/sockets/ServerConnectionFactory.java      | 102 +++-------
 .../cache/tier/sockets/TcpServerFactory.java       |  14 +-
 .../ClientProtocolMessageHandler.java              |   8 +-
 .../MessageExecutionContext.java                   |  62 +++++--
 .../ProtocolMessageHandlerLookupService.java}      |  24 ++-
 .../protocol}/security/server/Authenticator.java   |  16 +-
 .../server/AuthenticatorLookupService.java         |  62 +++++++
 .../server/AuthorizationLookupService.java         |  60 ++++++
 .../protocol}/security/server/Authorizer.java      |  10 +-
 .../security/server/NoOpAuthenticator.java         |  13 +-
 .../protocol}/security/server/NoOpAuthorizer.java  |  11 +-
 .../protocol}/security/server/package.html         |   0
 ...internal.protocol.security.server.Authenticator |   1 +
 ...de.internal.protocol.security.server.Authorizer |   1 +
 .../org.apache.geode.security.server.Authenticator |   1 -
 .../tier/sockets/ServerConnectionFactoryTest.java  |  38 ++--
 .../cache/tier/sockets/ServerConnectionTest.java   |  31 ++--
 .../protocol/operations/OperationHandler.java      |   8 +-
 .../protocol/protobuf/EncodingTypeTranslator.java  |   7 +-
 .../protocol/protobuf/OperationContext.java        |   4 +-
 ...essor.java => ProtobufOperationsProcessor.java} |  19 +-
 ...or.java => ProtobufProtocolMessageHandler.java} |  60 +++---
 .../protobuf/ProtobufSimpleAuthenticator.java      |  78 --------
 .../operations/GetAllRequestOperationHandler.java  |  32 ++--
 .../GetAvailableServersOperationHandler.java       |  28 +--
 .../GetRegionNamesRequestOperationHandler.java     |   6 +-
 .../GetRegionRequestOperationHandler.java          |  10 +-
 .../operations/GetRequestOperationHandler.java     |   8 +-
 .../operations/PutAllRequestOperationHandler.java  |  17 +-
 .../operations/PutRequestOperationHandler.java     |   8 +-
 .../operations/RemoveRequestOperationHandler.java  |   8 +-
 .../server/ProtobufSimpleAuthenticator.java        |  75 ++++++++
 .../server}/ProtobufSimpleAuthorizer.java          |  18 +-
 .../protobuf/utilities/ProtobufPrimitiveTypes.java |   4 +
 .../protocol/{protobuf => responses}/Failure.java  |   3 +-
 .../protocol/{protobuf => responses}/Result.java   |   3 +-
 .../protocol/{protobuf => responses}/Success.java  |   3 +-
 ....internal.protocol.ClientProtocolMessageHandler |   1 +
 ...internal.protocol.security.server.Authenticator |   1 +
 ...de.internal.protocol.security.server.Authorizer |   1 +
 .../GenericProtocolServerConnectionTest.java       |   9 +-
 .../acceptance/CacheOperationsJUnitTest.java       |  69 +++++++
 ...ava => ProtobufProtocolMessageHandlerTest.java} |  15 +-
 .../ProtobufSimpleAuthenticatorJUnitTest.java      |  39 ++--
 .../GetAllRequestOperationHandlerJUnitTest.java    |  34 ++--
 ...tAvailableServersOperationHandlerJUnitTest.java |  29 +--
 ...egionNamesRequestOperationHandlerJUnitTest.java |  27 +--
 .../GetRegionRequestOperationHandlerJUnitTest.java |  24 +--
 .../GetRequestOperationHandlerJUnitTest.java       |  39 ++--
 .../PutAllRequestOperationHandlerJUnitTest.java    |  21 ++-
 .../PutRequestOperationHandlerJUnitTest.java       |  27 ++-
 .../RemoveRequestOperationHandlerJUnitTest.java    |  36 ++--
 .../utilities/ProtobufUtilitiesJUnitTest.java      |  15 +-
 .../server/AuthenticationIntegrationTest.java      | 146 +++++++++++++++
 .../server/AuthorizationIntegrationTest.java       | 206 +++++++++++++++++++++
 60 files changed, 1153 insertions(+), 573 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
index 489e647..389a47d 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
@@ -62,7 +62,6 @@ import org.apache.geode.distributed.internal.tcpserver.TcpServer;
 import org.apache.geode.internal.admin.remote.DistributionLocatorId;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler;
 import org.apache.geode.internal.cache.tier.sockets.TcpServerFactory;
 import org.apache.geode.internal.cache.wan.WANServiceProvider;
 import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -74,6 +73,7 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage;
 import org.apache.geode.internal.logging.log4j.LogWriterAppenders;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.net.SocketCreatorFactory;
+import org.apache.geode.internal.protocol.ClientProtocolMessageHandler;
 import org.apache.geode.management.internal.JmxManagerLocator;
 import org.apache.geode.management.internal.JmxManagerLocatorRequest;
 import org.apache.geode.management.internal.cli.CliUtil;
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
index 85b2ace..9a4126f 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
@@ -58,12 +58,12 @@ import org.apache.geode.internal.VersionedDataInputStream;
 import org.apache.geode.internal.VersionedDataOutputStream;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.CommunicationMode;
-import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler;
 import org.apache.geode.internal.cache.tier.sockets.HandShake;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.net.SocketCreatorFactory;
+import org.apache.geode.internal.protocol.ClientProtocolMessageHandler;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.security.SecurableCommunicationChannel;
 
 /**
@@ -97,12 +97,7 @@ public class TcpServer {
   // GossipServer.
   public final static int OLDGOSSIPVERSION = 1001;
 
-  private static/* GemStoneAddition */ final Map GOSSIP_TO_GEMFIRE_VERSION_MAP = new HashMap();
-
-  /**
-   * For the new client-server protocol, which ignores the usual handshake mechanism.
-   */
-  public static final byte PROTOBUF_CLIENT_SERVER_PROTOCOL = (byte) 110;
+  private static final Map GOSSIP_TO_GEMFIRE_VERSION_MAP = new HashMap();
 
   // For test purpose only
   public static boolean isTesting = false;
@@ -117,7 +112,7 @@ public class TcpServer {
 
   private static final Logger log = LogService.getLogger();
 
-  protected/* GemStoneAddition */ final/* GemStoneAddition */ static int READ_TIMEOUT =
+  protected final static int READ_TIMEOUT =
       Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "TcpServer.READ_TIMEOUT", 60 * 1000);
   // This is for backwards compatibility. The p2p.backlog flag used to be the only way to configure
   // the locator backlog.
@@ -379,8 +374,8 @@ public class TcpServer {
 
         short versionOrdinal;
         if (gossipVersion == NON_GOSSIP_REQUEST_VERSION) {
-          if (input.readUnsignedByte() == PROTOBUF_CLIENT_SERVER_PROTOCOL
-              && Boolean.getBoolean("geode.feature-protobuf-protocol")) {
+          if (input.readUnsignedByte() == CommunicationMode.ProtobufClientServerProtocol
+              .getModeNumber() && Boolean.getBoolean("geode.feature-protobuf-protocol")) {
             clientProtocolMessageHandler.getStatistics().clientConnected();
             clientProtocolMessageHandler.receiveMessage(input, socket.getOutputStream(),
                 new MessageExecutionContext(internalLocator));
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
index bcd8b32..98af40b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
@@ -14,7 +14,9 @@
  */
 package org.apache.geode.internal.cache;
 
-import static java.lang.Integer.*;
+import static java.lang.Integer.MAX_VALUE;
+import static java.lang.Integer.getInteger;
+import static java.lang.Integer.valueOf;
 
 import java.io.File;
 import java.io.IOException;
@@ -27,7 +29,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.geode.internal.cache.tier.sockets.ServerConnectionFactory;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelCriterion;
@@ -37,7 +38,6 @@ import org.apache.geode.InvalidValueException;
 import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.ClientSession;
 import org.apache.geode.cache.DataPolicy;
-import org.apache.geode.cache.DiskStore;
 import org.apache.geode.cache.DiskStoreFactory;
 import org.apache.geode.cache.DynamicRegionFactory;
 import org.apache.geode.cache.EvictionAction;
@@ -70,6 +70,8 @@ import org.apache.geode.internal.cache.tier.Acceptor;
 import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.tier.sockets.GenericProtocolServerConnection;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnectionFactory;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
@@ -95,7 +97,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
   /**
    * The server connection factory, that provides either a
    * {@link org.apache.geode.internal.cache.tier.sockets.LegacyServerConnection} or a new
-   * {@link org.apache.geode.internal.cache.tier.sockets.GenericProtocolServerConnection}
+   * {@link GenericProtocolServerConnection}
    */
   private final ServerConnectionFactory serverConnectionFactory = new ServerConnectionFactory();
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index c660b68..9c6a14a 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -18,42 +18,6 @@ package org.apache.geode.internal.cache.tier.sockets;
 import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_ACCESSOR_PP;
 import static org.apache.geode.internal.cache.tier.CommunicationMode.ClientToServerForQueue;
 
-import org.apache.geode.CancelException;
-import org.apache.geode.SystemFailure;
-import org.apache.geode.ToDataException;
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.RegionDestroyedException;
-import org.apache.geode.cache.client.internal.PoolImpl;
-import org.apache.geode.cache.server.CacheServer;
-import org.apache.geode.cache.wan.GatewayTransportFilter;
-import org.apache.geode.distributed.internal.DM;
-import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.distributed.internal.LonerDistributionManager;
-import org.apache.geode.distributed.internal.PooledExecutorWithDMStats;
-import org.apache.geode.distributed.internal.ReplyProcessor21;
-import org.apache.geode.internal.SystemTimer;
-import org.apache.geode.internal.cache.BucketAdvisor;
-import org.apache.geode.internal.cache.BucketAdvisor.BucketProfile;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.partitioned.AllBucketProfilesUpdateMessage;
-import org.apache.geode.internal.cache.tier.Acceptor;
-import org.apache.geode.internal.cache.tier.CachedRegionHelper;
-import org.apache.geode.internal.cache.tier.CommunicationMode;
-import org.apache.geode.internal.cache.wan.GatewayReceiverStats;
-import org.apache.geode.internal.i18n.LocalizedStrings;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.logging.LoggingThreadGroup;
-import org.apache.geode.internal.logging.log4j.LocalizedMessage;
-import org.apache.geode.internal.net.SocketCreator;
-import org.apache.geode.internal.net.SocketCreatorFactory;
-import org.apache.geode.internal.security.SecurableCommunicationChannel;
-import org.apache.geode.internal.security.SecurityService;
-import org.apache.geode.internal.tcp.ConnectionTable;
-import org.apache.geode.internal.util.ArrayUtils;
-import org.apache.logging.log4j.Logger;
-
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -92,6 +56,43 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.SystemFailure;
+import org.apache.geode.ToDataException;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.RegionDestroyedException;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.wan.GatewayTransportFilter;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.LonerDistributionManager;
+import org.apache.geode.distributed.internal.PooledExecutorWithDMStats;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import org.apache.geode.internal.SystemTimer;
+import org.apache.geode.internal.cache.BucketAdvisor;
+import org.apache.geode.internal.cache.BucketAdvisor.BucketProfile;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.partitioned.AllBucketProfilesUpdateMessage;
+import org.apache.geode.internal.cache.tier.Acceptor;
+import org.apache.geode.internal.cache.tier.CachedRegionHelper;
+import org.apache.geode.internal.cache.tier.CommunicationMode;
+import org.apache.geode.internal.cache.wan.GatewayReceiverStats;
+import org.apache.geode.internal.i18n.LocalizedStrings;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.LoggingThreadGroup;
+import org.apache.geode.internal.logging.log4j.LocalizedMessage;
+import org.apache.geode.internal.net.SocketCreator;
+import org.apache.geode.internal.net.SocketCreatorFactory;
+import org.apache.geode.internal.security.SecurableCommunicationChannel;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.internal.tcp.ConnectionTable;
+import org.apache.geode.internal.util.ArrayUtils;
+
 /**
  * Implements the acceptor thread on the bridge server. Accepts connections from the edge and starts
  * up threads to process requests from these.
@@ -1463,7 +1464,7 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
     ServerConnection serverConn =
         serverConnectionFactory.makeServerConnection(socket, this.cache, this.crHelper, this.stats,
             AcceptorImpl.handShakeTimeout, this.socketBufferSize, communicationMode.toString(),
-            communicationMode.getModeNumber(), this, this.securityService, this.getBindAddress());
+            communicationMode.getModeNumber(), this, this.securityService);
 
     synchronized (this.allSCsLock) {
       this.allSCs.add(serverConn);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
index eb401b9..7bc33ea 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
@@ -30,9 +30,13 @@ import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.Acceptor;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.cache.tier.CommunicationMode;
+import org.apache.geode.internal.protocol.ClientProtocolMessageHandler;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
+import org.apache.geode.internal.protocol.security.server.Authenticator;
+import org.apache.geode.internal.protocol.security.server.Authorizer;
 import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.security.AuthenticationFailedException;
 import org.apache.geode.security.SecurityManager;
-import org.apache.geode.security.server.Authenticator;
 
 /**
  * Holds the socket and protocol handler for the new client protocol.
@@ -42,6 +46,7 @@ public class GenericProtocolServerConnection extends ServerConnection {
   private final ClientProtocolMessageHandler messageHandler;
   private final SecurityManager securityManager;
   private final Authenticator authenticator;
+  private final Authorizer authorizer;
   private boolean cleanedUp;
   private ClientProxyMembershipID clientProxyMembershipID;
 
@@ -51,13 +56,15 @@ public class GenericProtocolServerConnection extends ServerConnection {
    */
   public GenericProtocolServerConnection(Socket socket, InternalCache c, CachedRegionHelper helper,
       CacheServerStats stats, int hsTimeout, int socketBufferSize, String communicationModeStr,
-      byte communicationMode, Acceptor acceptor, ClientProtocolMessageHandler newClientProtocol,
-      SecurityService securityService, Authenticator authenticator) {
+      byte communicationMode, Acceptor acceptor, SecurityService securityService,
+      ClientProtocolMessageHandler clientProtocolMessageHandler, Authenticator authenticator,
+      Authorizer authorizer) {
     super(socket, c, helper, stats, hsTimeout, socketBufferSize, communicationModeStr,
         communicationMode, acceptor, securityService);
     securityManager = securityService.getSecurityManager();
-    this.messageHandler = newClientProtocol;
+    this.messageHandler = clientProtocolMessageHandler;
     this.authenticator = authenticator;
+    this.authorizer = authorizer;
     this.messageHandler.getStatistics().clientConnected();
 
     setClientProxyMembershipId();
@@ -72,12 +79,15 @@ public class GenericProtocolServerConnection extends ServerConnection {
       InputStream inputStream = socket.getInputStream();
       OutputStream outputStream = socket.getOutputStream();
 
-      if (!authenticator.isAuthenticated()) {
-        authenticator.authenticate(inputStream, outputStream, securityManager);
-      } else {
-        messageHandler.receiveMessage(inputStream, outputStream, new MessageExecutionContext(
-            this.getCache(), authenticator.getAuthorizer(), messageHandler.getStatistics()));
-      }
+      Object authenticationToken =
+          authenticator.authenticate(inputStream, outputStream, securityManager);
+      MessageExecutionContext messageExecutionContext = new MessageExecutionContext(this.getCache(),
+          authenticationToken, securityManager, messageHandler.getStatistics(), authorizer);
+      messageHandler.receiveMessage(inputStream, outputStream, messageExecutionContext);
+    } catch (AuthenticationFailedException e) {
+      logger.warn(e);
+      this.setFlagProcessMessagesAsFalse();
+      setClientDisconnectedException(e);
     } catch (EOFException e) {
       this.setFlagProcessMessagesAsFalse();
       setClientDisconnectedException(e);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
index 8f3a8c3..6779c09 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
@@ -18,102 +18,56 @@ package org.apache.geode.internal.cache.tier.sockets;
 import static org.apache.geode.internal.cache.tier.CommunicationMode.ProtobufClientServerProtocol;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.Socket;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.ServiceLoader;
 
-import org.apache.geode.StatisticsFactory;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.Acceptor;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
+import org.apache.geode.internal.protocol.ClientProtocolMessageHandler;
+import org.apache.geode.internal.protocol.ProtocolMessageHandlerLookupService;
+import org.apache.geode.internal.protocol.security.server.AuthenticatorLookupService;
+import org.apache.geode.internal.protocol.security.server.AuthorizationLookupService;
 import org.apache.geode.internal.security.SecurityService;
-import org.apache.geode.security.server.Authenticator;
 
 /**
  * Creates instances of ServerConnection based on the connection mode provided.
  */
 public class ServerConnectionFactory {
-  private ClientProtocolMessageHandler protocolHandler;
-  private Map<String, Class<? extends Authenticator>> authenticators = null;
-
-  public ServerConnectionFactory() {}
-
-  private synchronized void initializeAuthenticatorsMap() {
-    if (authenticators != null) {
-      return;
-    }
-    HashMap tmp = new HashMap<>();
-
-    ServiceLoader<Authenticator> loader = ServiceLoader.load(Authenticator.class);
-    for (Authenticator streamAuthenticator : loader) {
-      tmp.put(streamAuthenticator.implementationID(), streamAuthenticator.getClass());
-    }
-
-    authenticators = tmp;
-  }
-
-  private synchronized ClientProtocolMessageHandler initializeMessageHandler(
-      StatisticsFactory statisticsFactory, String statisticsName) {
-    if (protocolHandler != null) {
-      return protocolHandler;
-    }
-
-    ClientProtocolMessageHandler tempHandler = new MessageHandlerFactory().makeMessageHandler();
-    tempHandler.initializeStatistics(statisticsName, statisticsFactory);
-
-    protocolHandler = tempHandler;
-    return protocolHandler;
-  }
-
-  private Authenticator findStreamAuthenticator(String implementationID) {
-    if (authenticators == null) {
-      initializeAuthenticatorsMap();
-    }
-    Class<? extends Authenticator> streamAuthenticatorClass = authenticators.get(implementationID);
-    if (streamAuthenticatorClass == null) {
-      throw new ServiceLoadingFailureException(
-          "Could not find implementation for Authenticator with implementation ID "
-              + implementationID);
-    } else {
-      try {
-        return streamAuthenticatorClass.newInstance();
-      } catch (InstantiationException | IllegalAccessException e) {
-        throw new ServiceLoadingFailureException(
-            "Unable to instantiate authenticator for ID " + implementationID, e);
-      }
-    }
-  }
-
-  private ClientProtocolMessageHandler getOrCreateClientProtocolMessageHandler(
-      StatisticsFactory statisticsFactory, String serverName) {
-    if (protocolHandler == null) {
-      return initializeMessageHandler(statisticsFactory, serverName);
-    }
-    return protocolHandler;
+  private final ProtocolMessageHandlerLookupService protocolMessageHandlerLookupService;
+  private final AuthenticatorLookupService authenticatorLookupService;
+  private final AuthorizationLookupService authorizationLookupService;
+
+  public ServerConnectionFactory() {
+    protocolMessageHandlerLookupService = new ProtocolMessageHandlerLookupService();
+    authenticatorLookupService = new AuthenticatorLookupService();
+    authorizationLookupService = new AuthorizationLookupService();
   }
 
   public ServerConnection makeServerConnection(Socket socket, InternalCache cache,
-      CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int socketBufferSize,
-      String communicationModeStr, byte communicationMode, Acceptor acceptor,
-      SecurityService securityService, InetAddress bindAddress) throws IOException {
-    if (communicationMode == ProtobufClientServerProtocol.getModeNumber()) {
+      CachedRegionHelper cachedRegionHelper, CacheServerStats cacheServerStats, int hsTimeout,
+      int socketBufferSize, String communicationModeStr, byte communicationMode, Acceptor acceptor,
+      SecurityService securityService) throws IOException {
+    if (ProtobufClientServerProtocol.getModeNumber() == communicationMode) {
       if (!Boolean.getBoolean("geode.feature-protobuf-protocol")) {
         throw new IOException("Server received unknown communication mode: " + communicationMode);
       } else {
         String authenticationMode =
             System.getProperty("geode.protocol-authentication-mode", "NOOP");
 
-        return new GenericProtocolServerConnection(socket, cache, helper, stats, hsTimeout,
-            socketBufferSize, communicationModeStr, communicationMode, acceptor,
-            getOrCreateClientProtocolMessageHandler(cache.getDistributedSystem(),
-                acceptor.getServerName()),
-            securityService, findStreamAuthenticator(authenticationMode));
+        ClientProtocolMessageHandler clientProtocolMessageHandler =
+            protocolMessageHandlerLookupService.lookupProtocolHandler("PROTOBUF");
+        clientProtocolMessageHandler.initializeStatistics(acceptor.getServerName(),
+            cache.getDistributedSystem());
+        return new GenericProtocolServerConnection(socket, cache, cachedRegionHelper,
+            cacheServerStats, hsTimeout, socketBufferSize, communicationModeStr, communicationMode,
+            acceptor, securityService, clientProtocolMessageHandler,
+            authenticatorLookupService.getAuthenticator(authenticationMode),
+            authorizationLookupService.getAuthorizer(authenticationMode));
       }
     } else {
-      return new LegacyServerConnection(socket, cache, helper, stats, hsTimeout, socketBufferSize,
-          communicationModeStr, communicationMode, acceptor, securityService);
+      return new LegacyServerConnection(socket, cache, cachedRegionHelper, cacheServerStats,
+          hsTimeout, socketBufferSize, communicationModeStr, communicationMode, acceptor,
+          securityService);
     }
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/TcpServerFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/TcpServerFactory.java
index 959e6e2..15dff3b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/TcpServerFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/TcpServerFactory.java
@@ -16,7 +16,9 @@
 package org.apache.geode.internal.cache.tier.sockets;
 
 import java.net.InetAddress;
+import java.util.Iterator;
 import java.util.Properties;
+import java.util.ServiceLoader;
 
 import org.apache.logging.log4j.Logger;
 
@@ -26,6 +28,7 @@ import org.apache.geode.distributed.internal.PoolStatHelper;
 import org.apache.geode.distributed.internal.tcpserver.TcpHandler;
 import org.apache.geode.distributed.internal.tcpserver.TcpServer;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.protocol.ClientProtocolMessageHandler;
 
 public class TcpServerFactory {
   private ClientProtocolMessageHandler protocolHandler;
@@ -33,7 +36,16 @@ public class TcpServerFactory {
 
   public TcpServerFactory() {
     try {
-      protocolHandler = new MessageHandlerFactory().makeMessageHandler();
+      ServiceLoader<ClientProtocolMessageHandler> loader =
+          ServiceLoader.load(ClientProtocolMessageHandler.class);
+      Iterator<ClientProtocolMessageHandler> iterator = loader.iterator();
+
+      if (!iterator.hasNext()) {
+        throw new ServiceLoadingFailureException(
+            "There is no ClientProtocolMessageHandler implementation found in JVM");
+      }
+
+      protocolHandler = iterator.next();
     } catch (ServiceLoadingFailureException ex) {
       logger.warn(ex.getMessage());
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-core/src/main/java/org/apache/geode/internal/protocol/ClientProtocolMessageHandler.java
similarity index 83%
rename from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
rename to geode-core/src/main/java/org/apache/geode/internal/protocol/ClientProtocolMessageHandler.java
index b8969e1..52ed1a3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/protocol/ClientProtocolMessageHandler.java
@@ -13,14 +13,16 @@
  * the License.
  */
 
-package org.apache.geode.internal.cache.tier.sockets;
+package org.apache.geode.internal.protocol;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
-import org.apache.geode.Statistics;
 import org.apache.geode.StatisticsFactory;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolStatistics;
+import org.apache.geode.internal.cache.tier.sockets.GenericProtocolServerConnection;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnectionFactory;
 
 
 /**
@@ -38,4 +40,6 @@ public interface ClientProtocolMessageHandler {
 
   void receiveMessage(InputStream inputStream, OutputStream outputStream,
       MessageExecutionContext executionContext) throws IOException;
+
+  String getMessageHandlerProtocolName();
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java b/geode-core/src/main/java/org/apache/geode/internal/protocol/MessageExecutionContext.java
similarity index 63%
rename from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java
rename to geode-core/src/main/java/org/apache/geode/internal/protocol/MessageExecutionContext.java
index bedb2c7..38f41bb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/protocol/MessageExecutionContext.java
@@ -13,40 +13,47 @@
  * the License.
  */
 
-package org.apache.geode.internal.cache.tier.sockets;
+package org.apache.geode.internal.protocol;
 
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.distributed.Locator;
-import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolStatistics;
+import org.apache.geode.internal.cache.tier.sockets.NoOpStatistics;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
-import org.apache.geode.security.server.Authorizer;
-import org.apache.geode.security.server.NoOpAuthorizer;
+import org.apache.geode.internal.protocol.security.server.Authorizer;
+import org.apache.geode.internal.protocol.security.server.NoOpAuthorizer;
+import org.apache.geode.security.SecurityManager;
 
 @Experimental
 public class MessageExecutionContext {
   private Cache cache;
   private Locator locator;
-  private Authorizer authorizer;
-  private ClientProtocolStatistics statistics;
+  private final Object authenticationToken;
+  private final SecurityManager securityManager;
+  private final ClientProtocolStatistics statistics;
+  private final Authorizer authorizer;
 
-
-  public MessageExecutionContext(Cache cache, Authorizer streamAuthorizer,
-      ClientProtocolStatistics statistics) {
+  public MessageExecutionContext(Cache cache, Object authenticationToken,
+      SecurityManager securityManager, ClientProtocolStatistics statistics, Authorizer authorizer) {
     this.cache = cache;
-    this.authorizer = streamAuthorizer;
+    this.authenticationToken = authenticationToken;
+    this.securityManager = securityManager;
     this.statistics = statistics;
+    this.authorizer = authorizer;
   }
 
-  public MessageExecutionContext(InternalLocator locator) {
+
+  public MessageExecutionContext(Locator locator) {
     this.locator = locator;
-    // set a no-op authorizer until such time as locators implement authentication
-    // and authorization checks
-    this.authorizer = new NoOpAuthorizer();
+    this.authenticationToken = null;
+    this.securityManager = null;
     this.statistics = new NoOpStatistics();
+    this.authorizer = new NoOpAuthorizer();
   }
 
   /**
+   *
    * Returns the cache associated with this execution
    * <p>
    *
@@ -75,19 +82,38 @@ public class MessageExecutionContext {
   }
 
   /**
-   * Returns the Authorizer associated with this execution. This can be used to perform
+   * Returns the AuthenticatedTokenassociated with this execution. This can be used to perform
    * authorization checks for the user associated with this thread.
    */
-  public Authorizer getAuthorizer() {
-    return authorizer;
+  public Object getAuthenticationToken() {
+    return authenticationToken;
+  }
+
+  /**
+   * Returns the security manager.
+   *
+   * @return SecurityManager
+   */
+  public SecurityManager getSecurityManager() {
+    return securityManager;
   }
 
   /**
    * Returns the statistics for recording operation stats. In a unit test environment this may not
    * be a protocol-specific statistics implementation.
-   *
+   * 
+   * 
    */
   public ClientProtocolStatistics getStatistics() {
     return statistics;
   }
+
+  /**
+   * Returns the associated Authorizer for this MessageContext.
+   * 
+   * @return Authorizer
+   */
+  public Authorizer getAuthorizer() {
+    return authorizer;
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageHandlerFactory.java b/geode-core/src/main/java/org/apache/geode/internal/protocol/ProtocolMessageHandlerLookupService.java
similarity index 59%
rename from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageHandlerFactory.java
rename to geode-core/src/main/java/org/apache/geode/internal/protocol/ProtocolMessageHandlerLookupService.java
index 2aca8c2..4196fd6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageHandlerFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/protocol/ProtocolMessageHandlerLookupService.java
@@ -13,22 +13,26 @@
  * the License.
  */
 
-package org.apache.geode.internal.cache.tier.sockets;
+package org.apache.geode.internal.protocol;
 
-import java.util.Iterator;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.ServiceLoader;
 
-public class MessageHandlerFactory {
-  public ClientProtocolMessageHandler makeMessageHandler() {
+public class ProtocolMessageHandlerLookupService {
+  private Map<String, ClientProtocolMessageHandler> protocolMessageHandlers = new HashMap<>();
+
+  public ProtocolMessageHandlerLookupService() {
     ServiceLoader<ClientProtocolMessageHandler> loader =
         ServiceLoader.load(ClientProtocolMessageHandler.class);
-    Iterator<ClientProtocolMessageHandler> iterator = loader.iterator();
-
-    if (!iterator.hasNext()) {
-      throw new ServiceLoadingFailureException(
-          "There is no ClientProtocolMessageHandler implementation found in JVM");
+    for (ClientProtocolMessageHandler clientProtocolMessageHandler : loader) {
+      protocolMessageHandlers.put(clientProtocolMessageHandler.getMessageHandlerProtocolName(),
+          clientProtocolMessageHandler);
     }
 
-    return iterator.next();
+  }
+
+  public ClientProtocolMessageHandler lookupProtocolHandler(String protocolType) {
+    return protocolMessageHandlers.get(protocolType);
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/security/server/Authenticator.java b/geode-core/src/main/java/org/apache/geode/internal/protocol/security/server/Authenticator.java
similarity index 81%
rename from geode-core/src/main/java/org/apache/geode/security/server/Authenticator.java
rename to geode-core/src/main/java/org/apache/geode/internal/protocol/security/server/Authenticator.java
index 7893b4b..7d68aa0 100644
--- a/geode-core/src/main/java/org/apache/geode/security/server/Authenticator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/protocol/security/server/Authenticator.java
@@ -12,13 +12,13 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.security.server;
+package org.apache.geode.internal.protocol.security.server;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
-import org.apache.geode.security.AuthenticationRequiredException;
+import org.apache.geode.annotations.Experimental;
 import org.apache.geode.security.SecurityManager;
 
 /**
@@ -28,15 +28,17 @@ import org.apache.geode.security.SecurityManager;
  * If authentication fails, an implementor may continue to wait for another valid authentication
  * exchange.
  */
+@Experimental
 public interface Authenticator {
   /**
    *
    * @param inputStream to read auth messages from.
    * @param outputStream to send messages to.
    * @param securityManager can be used for validating credentials against.
+   * @return a authentication token that proves that the remote entity was authenticated
    * @throws IOException if EOF or if invalid input is received.
    */
-  void authenticate(InputStream inputStream, OutputStream outputStream,
+  Object authenticate(InputStream inputStream, OutputStream outputStream,
       SecurityManager securityManager) throws IOException;
 
   /**
@@ -47,13 +49,7 @@ public interface Authenticator {
   boolean isAuthenticated();
 
   /**
-   * Return an authorization object which can be used to determine which permissions this stream has
-   * according to the provided securityManager.
-   */
-  Authorizer getAuthorizer() throws AuthenticationRequiredException;
-
-  /**
    * @return a unique identifier for this particular implementation (NOOP, PASSTHROUGH, etc.)
    */
-  String implementationID();
+  String getImplementationID();
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/protocol/security/server/AuthenticatorLookupService.java b/geode-core/src/main/java/org/apache/geode/internal/protocol/security/server/AuthenticatorLookupService.java
new file mode 100644
index 0000000..4e0e41c
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/protocol/security/server/AuthenticatorLookupService.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.security.server;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+import org.apache.geode.GemFireConfigException;
+import org.apache.geode.internal.cache.tier.sockets.ServiceLoadingFailureException;
+
+public class AuthenticatorLookupService {
+  private Map<String, Class<? extends Authenticator>> authenticators = null;
+
+  public AuthenticatorLookupService() {
+    if (authenticators == null) {
+      initializeAuthenticatorsMap();
+    }
+  }
+
+  private synchronized void initializeAuthenticatorsMap() {
+    if (authenticators != null) {
+      return;
+    }
+    Map<String, Class<? extends Authenticator>> tempAuthenticators = new HashMap<>();
+    ServiceLoader<Authenticator> loader = ServiceLoader.load(Authenticator.class);
+    for (Authenticator streamAuthenticator : loader) {
+      tempAuthenticators.put(streamAuthenticator.getImplementationID(),
+          streamAuthenticator.getClass());
+    }
+    authenticators = tempAuthenticators;
+  }
+
+  public Authenticator getAuthenticator(String authenticationMode) {
+    Class<? extends Authenticator> streamAuthenticatorClass =
+        authenticators.get(authenticationMode);
+    if (streamAuthenticatorClass == null) {
+      throw new GemFireConfigException(
+          "Could not find implementation for Authenticator with implementation ID "
+              + authenticationMode);
+    } else {
+      try {
+        return streamAuthenticatorClass.newInstance();
+      } catch (InstantiationException | IllegalAccessException e) {
+        throw new ServiceLoadingFailureException(
+            "Unable to instantiate authenticator for ID " + authenticationMode, e);
+      }
+    }
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/protocol/security/server/AuthorizationLookupService.java b/geode-core/src/main/java/org/apache/geode/internal/protocol/security/server/AuthorizationLookupService.java
new file mode 100644
index 0000000..7555c91
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/protocol/security/server/AuthorizationLookupService.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.security.server;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+import org.apache.geode.GemFireConfigException;
+import org.apache.geode.internal.cache.tier.sockets.ServiceLoadingFailureException;
+
+public class AuthorizationLookupService {
+  private Map<String, Class<? extends Authorizer>> authorizers = null;
+
+  public AuthorizationLookupService() {
+    if (authorizers == null) {
+      initializeAuthenticatorsMap();
+    }
+  }
+
+  private synchronized void initializeAuthenticatorsMap() {
+    if (authorizers != null) {
+      return;
+    }
+
+    Map<String, Class<? extends Authorizer>> tempAuthorizers = new HashMap<>();
+    ServiceLoader<Authorizer> loader = ServiceLoader.load(Authorizer.class);
+    for (Authorizer authorizer : loader) {
+      tempAuthorizers.put(authorizer.getImplementationID(), authorizer.getClass());
+    }
+    authorizers = tempAuthorizers;
+  }
+
+  public Authorizer getAuthorizer(String authorizationMode) {
+    Class<? extends Authorizer> authorizationClass = authorizers.get(authorizationMode);
+    if (authorizationClass == null) {
+      throw new GemFireConfigException(
+          "Could not find implementation for Authorizer with ID " + authorizationMode);
+    } else {
+      try {
+        return authorizationClass.newInstance();
+      } catch (InstantiationException | IllegalAccessException e) {
+        throw new ServiceLoadingFailureException(
+            "Unable to instantiate authenticator for ID " + authorizationMode, e);
+      }
+    }
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/security/server/Authorizer.java b/geode-core/src/main/java/org/apache/geode/internal/protocol/security/server/Authorizer.java
similarity index 72%
rename from geode-core/src/main/java/org/apache/geode/security/server/Authorizer.java
rename to geode-core/src/main/java/org/apache/geode/internal/protocol/security/server/Authorizer.java
index fea2198..5d92208 100644
--- a/geode-core/src/main/java/org/apache/geode/security/server/Authorizer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/protocol/security/server/Authorizer.java
@@ -12,10 +12,16 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.security.server;
+package org.apache.geode.internal.protocol.security.server;
 
+import org.apache.geode.annotations.Experimental;
 import org.apache.geode.security.ResourcePermission;
+import org.apache.geode.security.SecurityManager;
 
+@Experimental
 public interface Authorizer {
-  boolean authorize(ResourcePermission permissionRequested);
+  boolean authorize(Object authenticatedToken, ResourcePermission permissionRequested,
+      SecurityManager securityManager);
+
+  String getImplementationID();
 }
diff --git a/geode-core/src/main/java/org/apache/geode/security/server/NoOpAuthenticator.java b/geode-core/src/main/java/org/apache/geode/internal/protocol/security/server/NoOpAuthenticator.java
similarity index 79%
rename from geode-core/src/main/java/org/apache/geode/security/server/NoOpAuthenticator.java
rename to geode-core/src/main/java/org/apache/geode/internal/protocol/security/server/NoOpAuthenticator.java
index bf435d2..d68415f 100644
--- a/geode-core/src/main/java/org/apache/geode/security/server/NoOpAuthenticator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/protocol/security/server/NoOpAuthenticator.java
@@ -12,7 +12,7 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.security.server;
+package org.apache.geode.internal.protocol.security.server;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -26,9 +26,9 @@ import org.apache.geode.security.SecurityManager;
  */
 public class NoOpAuthenticator implements Authenticator {
   @Override
-  public void authenticate(InputStream inputStream, OutputStream outputStream,
+  public Object authenticate(InputStream inputStream, OutputStream outputStream,
       SecurityManager securityManager) throws IOException {
-    // this method needs to do nothing as it is a pass-through implementation
+    return null;
   }
 
   @Override
@@ -37,12 +37,7 @@ public class NoOpAuthenticator implements Authenticator {
   }
 
   @Override
-  public Authorizer getAuthorizer() {
-    return new NoOpAuthorizer();
-  }
-
-  @Override
-  public String implementationID() {
+  public String getImplementationID() {
     return "NOOP";
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/security/server/NoOpAuthorizer.java b/geode-core/src/main/java/org/apache/geode/internal/protocol/security/server/NoOpAuthorizer.java
similarity index 76%
rename from geode-core/src/main/java/org/apache/geode/security/server/NoOpAuthorizer.java
rename to geode-core/src/main/java/org/apache/geode/internal/protocol/security/server/NoOpAuthorizer.java
index 1491f04..d3cdab4 100644
--- a/geode-core/src/main/java/org/apache/geode/security/server/NoOpAuthorizer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/protocol/security/server/NoOpAuthorizer.java
@@ -12,16 +12,23 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.security.server;
+package org.apache.geode.internal.protocol.security.server;
 
 import org.apache.geode.security.ResourcePermission;
+import org.apache.geode.security.SecurityManager;
 
 /**
  * An implementation of {@link Authorizer} that doesn't use its parameters and always returns true.
  */
 public class NoOpAuthorizer implements Authorizer {
   @Override
-  public boolean authorize(ResourcePermission permissionRequested) {
+  public boolean authorize(Object authenticatedToken, ResourcePermission permissionRequested,
+      SecurityManager securityManager) {
     return true;
   }
+
+  @Override
+  public String getImplementationID() {
+    return "NOOP";
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/security/server/package.html b/geode-core/src/main/java/org/apache/geode/internal/protocol/security/server/package.html
similarity index 100%
rename from geode-core/src/main/java/org/apache/geode/security/server/package.html
rename to geode-core/src/main/java/org/apache/geode/internal/protocol/security/server/package.html
diff --git a/geode-core/src/main/resources/META-INF/services/org.apache.geode.internal.protocol.security.server.Authenticator b/geode-core/src/main/resources/META-INF/services/org.apache.geode.internal.protocol.security.server.Authenticator
new file mode 100644
index 0000000..e115aae
--- /dev/null
+++ b/geode-core/src/main/resources/META-INF/services/org.apache.geode.internal.protocol.security.server.Authenticator
@@ -0,0 +1 @@
+org.apache.geode.internal.protocol.security.server.NoOpAuthenticator
\ No newline at end of file
diff --git a/geode-core/src/main/resources/META-INF/services/org.apache.geode.internal.protocol.security.server.Authorizer b/geode-core/src/main/resources/META-INF/services/org.apache.geode.internal.protocol.security.server.Authorizer
new file mode 100644
index 0000000..85fcb4a
--- /dev/null
+++ b/geode-core/src/main/resources/META-INF/services/org.apache.geode.internal.protocol.security.server.Authorizer
@@ -0,0 +1 @@
+org.apache.geode.internal.protocol.security.server.NoOpAuthorizer
\ No newline at end of file
diff --git a/geode-core/src/main/resources/META-INF/services/org.apache.geode.security.server.Authenticator b/geode-core/src/main/resources/META-INF/services/org.apache.geode.security.server.Authenticator
deleted file mode 100644
index 4f34d2a..0000000
--- a/geode-core/src/main/resources/META-INF/services/org.apache.geode.security.server.Authenticator
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.geode.security.server.NoOpAuthenticator
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
index 56d3770..568571d 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
@@ -15,23 +15,24 @@
 
 package org.apache.geode.internal.cache.tier.sockets;
 
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.tier.CommunicationMode;
-import org.apache.geode.internal.cache.tier.CachedRegionHelper;
-import org.apache.geode.internal.security.SecurityService;
-import org.apache.geode.test.junit.categories.UnitTest;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.contrib.java.lang.system.RestoreSystemProperties;
-import org.junit.experimental.categories.Category;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.Socket;
 
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.CachedRegionHelper;
+import org.apache.geode.internal.cache.tier.CommunicationMode;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.test.junit.categories.UnitTest;
 
 /**
  * We don't test the path where the service providing protobufProtocolHandler is actually present,
@@ -58,17 +59,6 @@ public class ServerConnectionFactoryTest {
 
   }
 
-  /**
-   * @throws ServiceLoadingFailureException because the service is implemented in a different
-   *         module, and when this unit test is run, that module won't be present.
-   */
-  @Test(expected = ServiceLoadingFailureException.class)
-  public void newClientProtocolFailsWithSystemPropertySet() throws IOException {
-    System.setProperty("geode.feature-protobuf-protocol", "true");
-    ServerConnection serverConnection = serverConnectionMockedExceptForCommunicationMode(
-        CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
-  }
-
   @Test
   public void makeServerConnection() throws Exception {
     CommunicationMode[] communicationModes = new CommunicationMode[] {
@@ -107,7 +97,7 @@ public class ServerConnectionFactoryTest {
 
     return new ServerConnectionFactory().makeServerConnection(socketMock, mock(InternalCache.class),
         mock(CachedRegionHelper.class), mock(CacheServerStats.class), 0, 0, "", communicationMode,
-        mock(AcceptorImpl.class), mock(SecurityService.class), InetAddress.getLocalHost());
+        mock(AcceptorImpl.class), mock(SecurityService.class));
   }
 
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
index a4ebbac..dbda3d7 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
@@ -25,14 +25,11 @@ import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import org.apache.geode.i18n.StringId;
-import org.apache.geode.internal.Version;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.tier.CommunicationMode;
-import org.apache.geode.internal.security.SecurityService;
-import org.apache.geode.security.AuthenticationRequiredException;
-import org.apache.geode.test.junit.categories.UnitTest;
-import org.apache.geode.test.junit.rules.RestoreLocaleRule;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.Locale;
+
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -41,10 +38,14 @@ import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.util.Locale;
+import org.apache.geode.i18n.StringId;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.CommunicationMode;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.security.AuthenticationRequiredException;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.apache.geode.test.junit.rules.RestoreLocaleRule;
 
 @Category(UnitTest.class)
 public class ServerConnectionTest {
@@ -82,9 +83,9 @@ public class ServerConnectionTest {
     InternalCache cache = mock(InternalCache.class);
     SecurityService securityService = mock(SecurityService.class);
 
-    serverConnection = new ServerConnectionFactory().makeServerConnection(socket, cache, null, null,
-        0, 0, null, CommunicationMode.PrimaryServerToClient.getModeNumber(), acceptor,
-        securityService, InetAddress.getLocalHost());
+    serverConnection =
+        new ServerConnectionFactory().makeServerConnection(socket, cache, null, null, 0, 0, null,
+            CommunicationMode.PrimaryServerToClient.getModeNumber(), acceptor, securityService);
     MockitoAnnotations.initMocks(this);
   }
 
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/operations/OperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/operations/OperationHandler.java
index 9d2d482..0221c26 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/operations/OperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/operations/OperationHandler.java
@@ -15,17 +15,17 @@
 package org.apache.geode.internal.protocol.operations;
 
 import org.apache.geode.annotations.Experimental;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
-import org.apache.geode.internal.protocol.protobuf.ProtobufOpsProcessor;
-import org.apache.geode.internal.protocol.protobuf.Result;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
+import org.apache.geode.internal.protocol.protobuf.ProtobufOperationsProcessor;
+import org.apache.geode.internal.protocol.responses.Result;
 import org.apache.geode.internal.serialization.SerializationService;
 
 /**
  * This interface is implemented by a object capable of handling request types 'Req' and returning
  * an a response of type 'Resp'
  *
- * See {@link ProtobufOpsProcessor}
+ * See {@link ProtobufOperationsProcessor}
  */
 @Experimental
 public interface OperationHandler<Req, Resp> {
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/EncodingTypeTranslator.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/EncodingTypeTranslator.java
index 1868ced..8d5c7a8 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/EncodingTypeTranslator.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/EncodingTypeTranslator.java
@@ -17,17 +17,18 @@ package org.apache.geode.internal.protocol.protobuf;
 import java.util.HashMap;
 
 import org.apache.geode.annotations.Experimental;
-import org.apache.geode.pdx.JSONFormatter;
-import org.apache.geode.pdx.PdxInstance;
 import org.apache.geode.internal.serialization.SerializationType;
 import org.apache.geode.internal.serialization.exception.UnsupportedEncodingTypeException;
+import org.apache.geode.pdx.JSONFormatter;
+import org.apache.geode.pdx.PdxInstance;
 
 /**
  * This class maps protobuf specific encoding types and the corresponding serialization types.
  */
 @Experimental
 public abstract class EncodingTypeTranslator {
-  static final HashMap<Class, BasicTypes.EncodingType> typeToEncodingMap = intializeTypeMap();
+  private static final HashMap<Class, BasicTypes.EncodingType> typeToEncodingMap =
+      intializeTypeMap();
 
   private static HashMap<Class, BasicTypes.EncodingType> intializeTypeMap() {
     HashMap<Class, BasicTypes.EncodingType> result = new HashMap<>();
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/OperationContext.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/OperationContext.java
index 12c50aa..fc59ec0 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/OperationContext.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/OperationContext.java
@@ -36,11 +36,11 @@ public class OperationContext<OperationRequest, OperationResponse> {
     this.operationHandler = operationHandler;
     this.fromRequest = fromRequest;
     this.toResponse = toResponse;
-    this.toErrorResponse = OperationContext::makeErrorBuilder;
+    this.toErrorResponse = this::makeErrorBuilder;
     accessPermissionRequired = permissionRequired;
   }
 
-  public static ClientProtocol.Response.Builder makeErrorBuilder(
+  private ClientProtocol.Response.Builder makeErrorBuilder(
       ClientProtocol.ErrorResponse errorResponse) {
     return ClientProtocol.Response.newBuilder().setErrorResponse(errorResponse);
   }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufOpsProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufOperationsProcessor.java
similarity index 83%
rename from geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufOpsProcessor.java
rename to geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufOperationsProcessor.java
index 9b497c5..13a1c77 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufOpsProcessor.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufOperationsProcessor.java
@@ -17,12 +17,14 @@ package org.apache.geode.internal.protocol.protobuf;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.annotations.Experimental;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.registry.OperationContextRegistry;
 import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatistics;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufResponseUtilities;
+import org.apache.geode.internal.protocol.responses.Failure;
+import org.apache.geode.internal.protocol.responses.Result;
 import org.apache.geode.internal.serialization.SerializationService;
 
 /**
@@ -30,27 +32,30 @@ import org.apache.geode.internal.serialization.SerializationService;
  * it to the appropriate handler.
  */
 @Experimental
-public class ProtobufOpsProcessor {
+public class ProtobufOperationsProcessor {
 
   private final OperationContextRegistry operationContextRegistry;
   private final SerializationService serializationService;
-  private static final Logger logger = LogService.getLogger(ProtobufOpsProcessor.class);
+  private static final Logger logger = LogService.getLogger(ProtobufOperationsProcessor.class);
 
-  public ProtobufOpsProcessor(SerializationService serializationService,
+  ProtobufOperationsProcessor(SerializationService serializationService,
       OperationContextRegistry operationContextRegistry) {
     this.serializationService = serializationService;
     this.operationContextRegistry = operationContextRegistry;
   }
 
-  public ClientProtocol.Response process(ClientProtocol.Request request,
-      MessageExecutionContext context) {
+  public ClientProtocol.Response process(final ClientProtocol.Request request,
+      final MessageExecutionContext context) {
     ClientProtocol.Request.RequestAPICase requestType = request.getRequestAPICase();
     logger.debug("Processing request of type {}", requestType);
     OperationContext operationContext = operationContextRegistry.getOperationContext(requestType);
     ClientProtocol.Response.Builder builder;
     Result result;
+
     try {
-      if (context.getAuthorizer().authorize(operationContext.getAccessPermissionRequired())) {
+      boolean authorized = context.getAuthorizer().authorize(context.getAuthenticationToken(),
+          operationContext.getAccessPermissionRequired(), context.getSecurityManager());
+      if (authorized) {
         result = operationContext.getOperationHandler().process(serializationService,
             operationContext.getFromRequest().apply(request), context);
       } else {
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufStreamProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufProtocolMessageHandler.java
similarity index 63%
rename from geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufStreamProcessor.java
rename to geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufProtocolMessageHandler.java
index de34a4a..f6cc3be 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufStreamProcessor.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufProtocolMessageHandler.java
@@ -23,10 +23,10 @@ import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.StatisticsFactory;
 import org.apache.geode.annotations.Experimental;
-import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler;
 import org.apache.geode.internal.cache.tier.sockets.ClientProtocolStatistics;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.protocol.ClientProtocolMessageHandler;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.exception.InvalidProtocolMessageException;
 import org.apache.geode.internal.protocol.protobuf.registry.OperationContextRegistry;
 import org.apache.geode.internal.protocol.protobuf.serializer.ProtobufProtocolSerializer;
@@ -40,16 +40,21 @@ import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufUtilities;
  * and then pushes it to the output stream.
  */
 @Experimental
-public class ProtobufStreamProcessor implements ClientProtocolMessageHandler {
+public class ProtobufProtocolMessageHandler implements ClientProtocolMessageHandler {
   private final ProtobufProtocolSerializer protobufProtocolSerializer;
-  private final ProtobufOpsProcessor protobufOpsProcessor;
+  private final ProtobufOperationsProcessor protobufOperationProcessor;
+  private final OperationContextRegistry operationContextRegistry;
+  private final ProtobufSerializationService protobufSerializationService;
   private ProtobufClientStatistics statistics;
   private static final Logger logger = LogService.getLogger();
 
-  public ProtobufStreamProcessor() {
+  public ProtobufProtocolMessageHandler() {
     protobufProtocolSerializer = new ProtobufProtocolSerializer();
-    protobufOpsProcessor = new ProtobufOpsProcessor(new ProtobufSerializationService(),
-        new OperationContextRegistry());
+    operationContextRegistry = new OperationContextRegistry();
+    protobufSerializationService = new ProtobufSerializationService();
+    protobufOperationProcessor =
+        new ProtobufOperationsProcessor(protobufSerializationService, operationContextRegistry);
+
   }
 
   @Override
@@ -66,30 +71,43 @@ public class ProtobufStreamProcessor implements ClientProtocolMessageHandler {
   public void receiveMessage(InputStream inputStream, OutputStream outputStream,
       MessageExecutionContext executionContext) throws IOException {
     try {
-      processOneMessage(inputStream, outputStream, executionContext);
+      ClientProtocol.Message incomingMessage = deserializeMessageFromInputStream(inputStream);
+      statistics.messageReceived(incomingMessage.getSerializedSize());
+      ClientProtocol.Message outgoingMessage = processMessage(incomingMessage, executionContext);
+      statistics.messageSent(outgoingMessage.getSerializedSize());
+      serializeMessageToOutputStream(outgoingMessage, outputStream);
     } catch (InvalidProtocolMessageException e) {
       throw new IOException(e);
     }
   }
 
-  private void processOneMessage(InputStream inputStream, OutputStream outputStream,
-      MessageExecutionContext executionContext)
-      throws InvalidProtocolMessageException, IOException {
+  @Override
+  public String getMessageHandlerProtocolName() {
+    return "PROTOBUF";
+  }
+
+  private ClientProtocol.Message processMessage(ClientProtocol.Message message,
+      MessageExecutionContext executionContext) {
+    ClientProtocol.Response response =
+        protobufOperationProcessor.process(message.getRequest(), executionContext);
+    ClientProtocol.MessageHeader responseHeader =
+        ProtobufUtilities.createMessageHeaderForRequest(message);
+    return ProtobufUtilities.createProtobufResponse(responseHeader, response);
+  }
+
+  private void serializeMessageToOutputStream(ClientProtocol.Message responseMessage,
+      OutputStream outputStream) throws IOException {
+    protobufProtocolSerializer.serialize(responseMessage, outputStream);
+  }
+
+  private ClientProtocol.Message deserializeMessageFromInputStream(InputStream inputStream)
+      throws InvalidProtocolMessageException, EOFException {
     ClientProtocol.Message message = protobufProtocolSerializer.deserialize(inputStream);
     if (message == null) {
       String errorMessage = "Tried to deserialize protobuf message at EOF";
       logger.warn(errorMessage);
       throw new EOFException(errorMessage);
     }
-    statistics.messageReceived(message.getSerializedSize());
-
-    ClientProtocol.Request request = message.getRequest();
-    ClientProtocol.Response response = protobufOpsProcessor.process(request, executionContext);
-    ClientProtocol.MessageHeader responseHeader =
-        ProtobufUtilities.createMessageHeaderForRequest(message);
-    ClientProtocol.Message responseMessage =
-        ProtobufUtilities.createProtobufResponse(responseHeader, response);
-    statistics.messageSent(responseMessage.getSerializedSize());
-    protobufProtocolSerializer.serialize(responseMessage, outputStream);
+    return message;
   }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufSimpleAuthenticator.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufSimpleAuthenticator.java
deleted file mode 100644
index 640243b..0000000
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufSimpleAuthenticator.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.protocol.protobuf;
-
-import org.apache.geode.management.internal.security.ResourceConstants;
-import org.apache.geode.security.AuthenticationRequiredException;
-import org.apache.geode.security.server.Authenticator;
-import org.apache.geode.security.AuthenticationFailedException;
-import org.apache.geode.security.SecurityManager;
-import org.apache.geode.security.server.Authorizer;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Properties;
-
-public class ProtobufSimpleAuthenticator implements Authenticator {
-  private ProtobufSimpleAuthorizer authorizer = null;
-
-  @Override
-  public void authenticate(InputStream inputStream, OutputStream outputStream,
-      SecurityManager securityManager) throws IOException {
-    AuthenticationAPI.SimpleAuthenticationRequest authenticationRequest =
-        AuthenticationAPI.SimpleAuthenticationRequest.parseDelimitedFrom(inputStream);
-    if (authenticationRequest == null) {
-      throw new EOFException();
-    }
-
-    Properties properties = new Properties();
-    properties.setProperty(ResourceConstants.USER_NAME, authenticationRequest.getUsername());
-    properties.setProperty(ResourceConstants.PASSWORD, authenticationRequest.getPassword());
-
-    authorizer = null; // authenticating a new user clears current authorizer
-    try {
-      Object principal = securityManager.authenticate(properties);
-      if (principal != null) {
-        authorizer = new ProtobufSimpleAuthorizer(principal, securityManager);
-      }
-    } catch (AuthenticationFailedException e) {
-      authorizer = null;
-    }
-
-    AuthenticationAPI.SimpleAuthenticationResponse.newBuilder().setAuthenticated(isAuthenticated())
-        .build().writeDelimitedTo(outputStream);
-  }
-
-  @Override
-  public boolean isAuthenticated() {
-    // note: an authorizer is only created if the user has been authenticated
-    return authorizer != null;
-  }
-
-  @Override
-  public Authorizer getAuthorizer() throws AuthenticationRequiredException {
-    if (authorizer == null) {
-      throw new AuthenticationRequiredException("Not yet authenticated");
-    }
-    return authorizer;
-  }
-
-  @Override
-  public String implementationID() {
-    return "SIMPLE";
-  }
-}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/GetAllRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/GetAllRequestOperationHandler.java
index 6947204..21a02b9 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/GetAllRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/GetAllRequestOperationHandler.java
@@ -14,10 +14,6 @@
  */
 package org.apache.geode.internal.protocol.protobuf.operations;
 
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.annotations.Experimental;
@@ -25,18 +21,18 @@ import org.apache.geode.cache.CacheLoaderException;
 import org.apache.geode.cache.PartitionedRegionStorageException;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.TimeoutException;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.operations.OperationHandler;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
-import org.apache.geode.internal.protocol.protobuf.RegionAPI;
-import org.apache.geode.internal.protocol.protobuf.Failure;
 import org.apache.geode.internal.protocol.protobuf.ProtocolErrorCode;
-import org.apache.geode.internal.protocol.protobuf.Result;
-import org.apache.geode.internal.protocol.protobuf.Success;
+import org.apache.geode.internal.protocol.protobuf.RegionAPI;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufResponseUtilities;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufUtilities;
+import org.apache.geode.internal.protocol.responses.Failure;
+import org.apache.geode.internal.protocol.responses.Result;
+import org.apache.geode.internal.protocol.responses.Success;
 import org.apache.geode.internal.serialization.SerializationService;
 import org.apache.geode.internal.serialization.exception.UnsupportedEncodingTypeException;
 import org.apache.geode.internal.serialization.registry.exception.CodecNotRegisteredForTypeException;
@@ -58,18 +54,16 @@ public class GetAllRequestOperationHandler
           .makeErrorResponse(ProtocolErrorCode.REGION_NOT_FOUND.codeValue, "Region not found"));
     }
 
-    Map<Boolean, List<Object>> resultsCollection = request.getKeyList().stream()
-        .map((key) -> processOneMessage(serializationService, region, key))
-        .collect(Collectors.partitioningBy(x -> x instanceof BasicTypes.Entry));
     RegionAPI.GetAllResponse.Builder responseBuilder = RegionAPI.GetAllResponse.newBuilder();
 
-    for (Object entry : resultsCollection.get(true)) {
-      responseBuilder.addEntries((BasicTypes.Entry) entry);
-    }
-
-    for (Object entry : resultsCollection.get(false)) {
-      responseBuilder.addFailures((BasicTypes.KeyedError) entry);
-    }
+    request.getKeyList().stream().map((key) -> processOneMessage(serializationService, region, key))
+        .forEach(entry -> {
+          if (entry instanceof BasicTypes.Entry) {
+            responseBuilder.addEntries((BasicTypes.Entry) entry);
+          } else if (entry instanceof BasicTypes.KeyedError) {
+            responseBuilder.addFailures((BasicTypes.KeyedError) entry);
+          }
+        });
 
     return Success.of(responseBuilder.build());
   }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/GetAvailableServersOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/GetAvailableServersOperationHandler.java
index c8f663f..d92a41f 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/GetAvailableServersOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/GetAvailableServersOperationHandler.java
@@ -14,20 +14,19 @@
  */
 package org.apache.geode.internal.protocol.protobuf.operations;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.stream.Collectors;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.distributed.internal.ServerLocation;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.operations.OperationHandler;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
-import org.apache.geode.internal.protocol.protobuf.Result;
 import org.apache.geode.internal.protocol.protobuf.ServerAPI;
-import org.apache.geode.internal.protocol.protobuf.Success;
+import org.apache.geode.internal.protocol.responses.Result;
+import org.apache.geode.internal.protocol.responses.Success;
 import org.apache.geode.internal.serialization.SerializationService;
 
 @Experimental
@@ -40,18 +39,19 @@ public class GetAvailableServersOperationHandler implements
       MessageExecutionContext executionContext) throws InvalidExecutionContextException {
 
     InternalLocator internalLocator = (InternalLocator) executionContext.getLocator();
-    ArrayList serversFromSnapshot =
+    List serversFromSnapshot =
         internalLocator.getServerLocatorAdvisee().getLoadSnapshot().getServers(null);
     if (serversFromSnapshot == null) {
-      serversFromSnapshot = new ArrayList();
+      serversFromSnapshot = Collections.EMPTY_LIST;
     }
 
-    Collection<BasicTypes.Server> servers = (Collection<BasicTypes.Server>) serversFromSnapshot
-        .stream().map(serverLocation -> getServerProtobufMessage((ServerLocation) serverLocation))
-        .collect(Collectors.toList());
-    ServerAPI.GetAvailableServersResponse.Builder builder =
-        ServerAPI.GetAvailableServersResponse.newBuilder().addAllServers(servers);
-    return Success.of(builder.build());
+    ServerAPI.GetAvailableServersResponse.Builder serverResponseBuilder =
+        ServerAPI.GetAvailableServersResponse.newBuilder();
+
+    serversFromSnapshot.stream()
+        .map(serverLocation -> getServerProtobufMessage((ServerLocation) serverLocation)).forEach(
+            serverMessage -> serverResponseBuilder.addServers((BasicTypes.Server) serverMessage));
+    return Success.of(serverResponseBuilder.build());
   }
 
   private BasicTypes.Server getServerProtobufMessage(ServerLocation serverLocation) {
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/GetRegionNamesRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/GetRegionNamesRequestOperationHandler.java
index 7ffdcd8..04604c9 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/GetRegionNamesRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/GetRegionNamesRequestOperationHandler.java
@@ -18,13 +18,13 @@ import java.util.Set;
 
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.Region;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.operations.OperationHandler;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
-import org.apache.geode.internal.protocol.protobuf.Result;
-import org.apache.geode.internal.protocol.protobuf.Success;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufResponseUtilities;
+import org.apache.geode.internal.protocol.responses.Result;
+import org.apache.geode.internal.protocol.responses.Success;
 import org.apache.geode.internal.serialization.SerializationService;
 
 @Experimental
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/GetRegionRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/GetRegionRequestOperationHandler.java
index 48c52c8..ae3a346 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/GetRegionRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/GetRegionRequestOperationHandler.java
@@ -18,18 +18,18 @@ import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.Region;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
 import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.protocol.protobuf.BasicTypes;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.operations.OperationHandler;
-import org.apache.geode.internal.protocol.protobuf.Failure;
+import org.apache.geode.internal.protocol.protobuf.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.ProtocolErrorCode;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
-import org.apache.geode.internal.protocol.protobuf.Result;
-import org.apache.geode.internal.protocol.protobuf.Success;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufResponseUtilities;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufUtilities;
+import org.apache.geode.internal.protocol.responses.Failure;
+import org.apache.geode.internal.protocol.responses.Result;
+import org.apache.geode.internal.protocol.responses.Success;
 import org.apache.geode.internal.serialization.SerializationService;
 
 @Experimental
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/GetRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/GetRequestOperationHandler.java
index 9ded016..d958b17 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/GetRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/GetRequestOperationHandler.java
@@ -18,18 +18,18 @@ import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.Region;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.operations.OperationHandler;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
-import org.apache.geode.internal.protocol.protobuf.Failure;
 import org.apache.geode.internal.protocol.protobuf.ProtocolErrorCode;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
-import org.apache.geode.internal.protocol.protobuf.Result;
-import org.apache.geode.internal.protocol.protobuf.Success;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufResponseUtilities;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufUtilities;
+import org.apache.geode.internal.protocol.responses.Failure;
+import org.apache.geode.internal.protocol.responses.Result;
+import org.apache.geode.internal.protocol.responses.Success;
 import org.apache.geode.internal.serialization.SerializationService;
 import org.apache.geode.internal.serialization.exception.UnsupportedEncodingTypeException;
 import org.apache.geode.internal.serialization.registry.exception.CodecNotRegisteredForTypeException;
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/PutAllRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/PutAllRequestOperationHandler.java
index ca454bb..c2e106d 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/PutAllRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/PutAllRequestOperationHandler.java
@@ -15,24 +15,23 @@
 package org.apache.geode.internal.protocol.protobuf.operations;
 
 import java.util.Objects;
-import java.util.stream.Collectors;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.Region;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.operations.OperationHandler;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
-import org.apache.geode.internal.protocol.protobuf.Failure;
 import org.apache.geode.internal.protocol.protobuf.ProtocolErrorCode;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
-import org.apache.geode.internal.protocol.protobuf.Result;
-import org.apache.geode.internal.protocol.protobuf.Success;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufResponseUtilities;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufUtilities;
+import org.apache.geode.internal.protocol.responses.Failure;
+import org.apache.geode.internal.protocol.responses.Result;
+import org.apache.geode.internal.protocol.responses.Success;
 import org.apache.geode.internal.serialization.SerializationService;
 import org.apache.geode.internal.serialization.exception.UnsupportedEncodingTypeException;
 import org.apache.geode.internal.serialization.registry.exception.CodecNotRegisteredForTypeException;
@@ -56,10 +55,10 @@ public class PutAllRequestOperationHandler
               "Region passed does not exist: " + regionName, logger, null));
     }
 
-    RegionAPI.PutAllResponse.Builder builder = RegionAPI.PutAllResponse.newBuilder()
-        .addAllFailedKeys(putAllRequest.getEntryList().stream()
-            .map((entry) -> singlePut(serializationService, region, entry)).filter(Objects::nonNull)
-            .collect(Collectors.toList()));
+    RegionAPI.PutAllResponse.Builder builder = RegionAPI.PutAllResponse.newBuilder();
+    putAllRequest.getEntryList().stream()
+        .map((entry) -> singlePut(serializationService, region, entry)).filter(Objects::nonNull)
+        .forEach(failedKey -> builder.addFailedKeys(failedKey));
     return Success.of(builder.build());
   }
 
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/PutRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/PutRequestOperationHandler.java
index 72fbf3d..7fadca2 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/PutRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/PutRequestOperationHandler.java
@@ -18,18 +18,18 @@ import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.Region;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.operations.OperationHandler;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
-import org.apache.geode.internal.protocol.protobuf.Failure;
 import org.apache.geode.internal.protocol.protobuf.ProtocolErrorCode;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
-import org.apache.geode.internal.protocol.protobuf.Result;
-import org.apache.geode.internal.protocol.protobuf.Success;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufResponseUtilities;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufUtilities;
+import org.apache.geode.internal.protocol.responses.Failure;
+import org.apache.geode.internal.protocol.responses.Result;
+import org.apache.geode.internal.protocol.responses.Success;
 import org.apache.geode.internal.serialization.SerializationService;
 import org.apache.geode.internal.serialization.exception.UnsupportedEncodingTypeException;
 import org.apache.geode.internal.serialization.registry.exception.CodecNotRegisteredForTypeException;
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/RemoveRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/RemoveRequestOperationHandler.java
index d1e6f49..9954175 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/RemoveRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/operations/RemoveRequestOperationHandler.java
@@ -19,16 +19,16 @@ import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.Region;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.operations.OperationHandler;
-import org.apache.geode.internal.protocol.protobuf.Failure;
 import org.apache.geode.internal.protocol.protobuf.ProtocolErrorCode;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
-import org.apache.geode.internal.protocol.protobuf.Result;
-import org.apache.geode.internal.protocol.protobuf.Success;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufResponseUtilities;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufUtilities;
+import org.apache.geode.internal.protocol.responses.Failure;
+import org.apache.geode.internal.protocol.responses.Result;
+import org.apache.geode.internal.protocol.responses.Success;
 import org.apache.geode.internal.serialization.SerializationService;
 import org.apache.geode.internal.serialization.exception.UnsupportedEncodingTypeException;
 import org.apache.geode.internal.serialization.registry.exception.CodecNotRegisteredForTypeException;
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/security/server/ProtobufSimpleAuthenticator.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/security/server/ProtobufSimpleAuthenticator.java
new file mode 100644
index 0000000..a1f8639
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/security/server/ProtobufSimpleAuthenticator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.security.server;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Properties;
+
+import org.apache.geode.internal.protocol.protobuf.AuthenticationAPI;
+import org.apache.geode.internal.protocol.security.server.Authenticator;
+import org.apache.geode.management.internal.security.ResourceConstants;
+import org.apache.geode.security.AuthenticationFailedException;
+import org.apache.geode.security.SecurityManager;
+
+public class ProtobufSimpleAuthenticator implements Authenticator {
+  private boolean authenticationSuccessfull = false;
+  private Object principal;
+
+  @Override
+  public Object authenticate(InputStream inputStream, OutputStream outputStream,
+      SecurityManager securityManager) throws IOException {
+    if (authenticationSuccessfull) {
+      return principal;
+    } else {
+      AuthenticationAPI.SimpleAuthenticationRequest authenticationRequest =
+          AuthenticationAPI.SimpleAuthenticationRequest.parseDelimitedFrom(inputStream);
+      if (authenticationRequest == null) {
+        // TODO Why are we returning and EOF here?
+        throw new EOFException();
+      }
+
+      Properties properties = new Properties();
+      properties.setProperty(ResourceConstants.USER_NAME, authenticationRequest.getUsername());
+      properties.setProperty(ResourceConstants.PASSWORD, authenticationRequest.getPassword());
+
+      try {
+        principal = securityManager.authenticate(properties);
+        authenticationSuccessfull = true;
+        AuthenticationAPI.SimpleAuthenticationResponse.newBuilder()
+            .setAuthenticated(isAuthenticated()).build().writeDelimitedTo(outputStream);
+        return principal;
+      } catch (AuthenticationFailedException e) {
+        // We need to add some logging here;
+        // logger.error(e);
+        AuthenticationAPI.SimpleAuthenticationResponse.newBuilder().setAuthenticated(false).build()
+            .writeDelimitedTo(outputStream);
+        throw e;
+      }
+    }
+  }
+
+  @Override
+  public boolean isAuthenticated() {
+    return authenticationSuccessfull;
+  }
+
+  @Override
+  public String getImplementationID() {
+    return "SIMPLE";
+  }
+}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufSimpleAuthorizer.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/security/server/ProtobufSimpleAuthorizer.java
similarity index 69%
rename from geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufSimpleAuthorizer.java
rename to geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/security/server/ProtobufSimpleAuthorizer.java
index 703b6a2..64ff4ff 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufSimpleAuthorizer.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/security/server/ProtobufSimpleAuthorizer.java
@@ -12,23 +12,21 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.internal.protocol.protobuf;
+package org.apache.geode.internal.protocol.protobuf.security.server;
 
+import org.apache.geode.internal.protocol.security.server.Authorizer;
 import org.apache.geode.security.ResourcePermission;
 import org.apache.geode.security.SecurityManager;
-import org.apache.geode.security.server.Authorizer;
 
 public class ProtobufSimpleAuthorizer implements Authorizer {
-  private final Object authenticatedPrincipal;
-  private final SecurityManager securityManager;
-
-  public ProtobufSimpleAuthorizer(Object authenticatedPrincipal, SecurityManager securityManager) {
-    this.authenticatedPrincipal = authenticatedPrincipal;
-    this.securityManager = securityManager;
+  @Override
+  public boolean authorize(Object authenticatedPrincipal, ResourcePermission permissionRequested,
+      SecurityManager securityManager) {
+    return securityManager.authorize(authenticatedPrincipal, permissionRequested);
   }
 
   @Override
-  public boolean authorize(ResourcePermission permissionRequested) {
-    return securityManager.authorize(authenticatedPrincipal, permissionRequested);
+  public String getImplementationID() {
+    return "SIMPLE";
   }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/utilities/ProtobufPrimitiveTypes.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/utilities/ProtobufPrimitiveTypes.java
index f958353..58f5ddf 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/utilities/ProtobufPrimitiveTypes.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/utilities/ProtobufPrimitiveTypes.java
@@ -17,6 +17,10 @@ package org.apache.geode.internal.protocol.protobuf.utilities;
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.internal.protocol.protobuf.utilities.exception.UnknownProtobufPrimitiveType;
 
+
+/**
+ * This enums primary use is to serialize Java primitive types with Protobuf
+ */
 @Experimental
 public enum ProtobufPrimitiveTypes {
 
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/Failure.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/responses/Failure.java
similarity index 93%
rename from geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/Failure.java
rename to geode-protobuf/src/main/java/org/apache/geode/internal/protocol/responses/Failure.java
index 6ac73de..02261f8 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/Failure.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/responses/Failure.java
@@ -12,11 +12,12 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.internal.protocol.protobuf;
+package org.apache.geode.internal.protocol.responses;
 
 import java.util.function.Function;
 
 import org.apache.geode.annotations.Experimental;
+import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
 
 @Experimental
 public class Failure<SuccessType> implements Result<SuccessType> {
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/Result.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/responses/Result.java
similarity index 90%
rename from geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/Result.java
rename to geode-protobuf/src/main/java/org/apache/geode/internal/protocol/responses/Result.java
index bada098..53160e7 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/Result.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/responses/Result.java
@@ -12,11 +12,12 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.internal.protocol.protobuf;
+package org.apache.geode.internal.protocol.responses;
 
 import java.util.function.Function;
 
 import org.apache.geode.annotations.Experimental;
+import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
 
 @Experimental
 public interface Result<SuccessType> {
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/Success.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/responses/Success.java
similarity index 92%
rename from geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/Success.java
rename to geode-protobuf/src/main/java/org/apache/geode/internal/protocol/responses/Success.java
index 224b21c..88ef498 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/Success.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/responses/Success.java
@@ -12,11 +12,12 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-package org.apache.geode.internal.protocol.protobuf;
+package org.apache.geode.internal.protocol.responses;
 
 import java.util.function.Function;
 
 import org.apache.geode.annotations.Experimental;
+import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
 
 @Experimental
 public class Success<SuccessType> implements Result<SuccessType> {
diff --git a/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.protocol.ClientProtocolMessageHandler b/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.protocol.ClientProtocolMessageHandler
new file mode 100644
index 0000000..c2cb14c
--- /dev/null
+++ b/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.protocol.ClientProtocolMessageHandler
@@ -0,0 +1 @@
+org.apache.geode.internal.protocol.protobuf.ProtobufProtocolMessageHandler
\ No newline at end of file
diff --git a/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.protocol.security.server.Authenticator b/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.protocol.security.server.Authenticator
new file mode 100644
index 0000000..e94d535
--- /dev/null
+++ b/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.protocol.security.server.Authenticator
@@ -0,0 +1 @@
+org.apache.geode.internal.protocol.protobuf.security.server.ProtobufSimpleAuthenticator
\ No newline at end of file
diff --git a/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.protocol.security.server.Authorizer b/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.protocol.security.server.Authorizer
new file mode 100644
index 0000000..e7a7a80
--- /dev/null
+++ b/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.protocol.security.server.Authorizer
@@ -0,0 +1 @@
+org.apache.geode.internal.protocol.protobuf.security.server.ProtobufSimpleAuthorizer
\ No newline at end of file
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
index b35d923..82c7ad7 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
@@ -37,9 +37,11 @@ import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.cache.tier.CommunicationMode;
-import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.internal.protocol.ClientProtocolMessageHandler;
 import org.apache.geode.internal.protocol.protobuf.statistics.NoOpProtobufStatistics;
-import org.apache.geode.security.server.NoOpAuthenticator;
+import org.apache.geode.internal.protocol.security.server.NoOpAuthenticator;
+import org.apache.geode.internal.protocol.security.server.NoOpAuthorizer;
+import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
@@ -133,7 +135,8 @@ public class GenericProtocolServerConnectionTest {
     return new GenericProtocolServerConnection(socketMock, mock(InternalCache.class),
         mock(CachedRegionHelper.class), mock(CacheServerStats.class), 0, 0, "",
         CommunicationMode.ProtobufClientServerProtocol.getModeNumber(), acceptorStub,
-        clientProtocolMock, mock(SecurityService.class), new NoOpAuthenticator());
+        mock(SecurityService.class), clientProtocolMock, new NoOpAuthenticator(),
+        new NoOpAuthorizer());
   }
 
   private GenericProtocolServerConnection getServerConnection(
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheOperationsJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheOperationsJUnitTest.java
index 266c658..2b521bd 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheOperationsJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheOperationsJUnitTest.java
@@ -55,7 +55,9 @@ import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.internal.admin.SSLConfig;
+import org.apache.geode.internal.cache.CacheServerImpl;
 import org.apache.geode.internal.cache.tier.CommunicationMode;
+import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.net.SocketCreatorFactory;
 import org.apache.geode.internal.protocol.MessageUtil;
@@ -255,6 +257,73 @@ public class CacheOperationsJUnitTest {
   }
 
   @Test
+  public void testConnectionCountIsProperlyDecremented() throws Exception {
+    CacheServer cacheServer = this.cache.getCacheServers().get(0);
+    AcceptorImpl acceptor = ((CacheServerImpl) cacheServer).getAcceptor();
+    Awaitility.await().atMost(30, TimeUnit.SECONDS)
+        .until(() -> acceptor.getClientServerCnxCount() == 1);
+    // run another test that creates a connection to the server
+    testNewProtocolGetRegionNamesCallSucceeds();
+    assertFalse(socket.isClosed());
+    socket.close();
+    Awaitility.await().atMost(30, TimeUnit.SECONDS)
+        .until(() -> acceptor.getClientServerCnxCount() == 0);
+  }
+
+  @Test
+  public void testNewProtocolRespectsMaxConnectionLimit() throws IOException, InterruptedException {
+    cache.getCacheServers().get(0).stop();
+
+    CacheServer cacheServer = cache.addCacheServer();
+    final int cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort();
+    cacheServer.setPort(cacheServerPort);
+    cacheServer.setMaxConnections(16);
+    cacheServer.setMaxThreads(16);
+    cacheServer.start();
+
+    AcceptorImpl acceptor = ((CacheServerImpl) cacheServer).getAcceptor();
+
+    // Start 16 sockets, which is exactly the maximum that the server will support.
+    Socket[] sockets = new Socket[16];
+    for (int i = 0; i < 16; i++) {
+      Socket socket = new Socket("localhost", cacheServerPort);
+      sockets[i] = socket;
+      Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
+      socket.getOutputStream()
+          .write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+    }
+
+    // try to start a new socket, expecting it to be disconnected.
+    try (Socket socket = new Socket("localhost", cacheServerPort)) {
+      Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
+      socket.getOutputStream()
+          .write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+      assertEquals(-1, socket.getInputStream().read()); // EOF implies disconnected.
+    }
+
+    for (Socket currentSocket : sockets) {
+      currentSocket.close();
+    }
+
+    // Once all connections are closed, the acceptor should have a connection count of 0.
+    Awaitility.await().atMost(5, TimeUnit.SECONDS)
+        .until(() -> acceptor.getClientServerCnxCount() == 0);
+
+    // Try to start 16 new connections, again at the limit.
+    for (int i = 0; i < 16; i++) {
+      Socket socket = new Socket("localhost", cacheServerPort);
+      sockets[i] = socket;
+      Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
+      socket.getOutputStream()
+          .write(CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+    }
+
+    for (Socket currentSocket : sockets) {
+      currentSocket.close();
+    }
+  }
+
+  @Test
   public void testNewProtocolGetRegionNamesCallSucceeds() throws Exception {
     int correlationId = TEST_GET_CORRELATION_ID; // reuse this value for this test
 
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/ProtobufStreamProcessorTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/ProtobufProtocolMessageHandlerTest.java
similarity index 73%
rename from geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/ProtobufStreamProcessorTest.java
rename to geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/ProtobufProtocolMessageHandlerTest.java
index 169e64e..37a4ccb 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/ProtobufStreamProcessorTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/ProtobufProtocolMessageHandlerTest.java
@@ -26,21 +26,24 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.statistics.NoOpProtobufStatistics;
-import org.apache.geode.security.server.NoOpAuthorizer;
+import org.apache.geode.internal.protocol.security.server.NoOpAuthorizer;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
-public class ProtobufStreamProcessorTest {
+public class ProtobufProtocolMessageHandlerTest {
   @Test(expected = EOFException.class)
   public void receiveMessage() throws Exception {
     InputStream inputStream = new ByteArrayInputStream(new byte[0]);
     OutputStream outputStream = new ByteArrayOutputStream(2);
 
-    ProtobufStreamProcessor protobufStreamProcessor = new ProtobufStreamProcessor();
+    ProtobufProtocolMessageHandler protobufProtocolMessageHandler =
+        new ProtobufProtocolMessageHandler();
     InternalCache mockInternalCache = mock(InternalCache.class);
-    protobufStreamProcessor.receiveMessage(inputStream, outputStream, new MessageExecutionContext(
-        mockInternalCache, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+    MessageExecutionContext messageExecutionContext = new MessageExecutionContext(mockInternalCache,
+        null, null, new NoOpProtobufStatistics(), new NoOpAuthorizer());
+    protobufProtocolMessageHandler.receiveMessage(inputStream, outputStream,
+        messageExecutionContext);
   }
 }
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/ProtobufSimpleAuthenticatorJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/ProtobufSimpleAuthenticatorJUnitTest.java
index c8df327..0e8aedb 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/ProtobufSimpleAuthenticatorJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/ProtobufSimpleAuthenticatorJUnitTest.java
@@ -15,27 +15,30 @@
 
 package org.apache.geode.internal.protocol.protobuf;
 
-import org.apache.geode.examples.security.ExampleSecurityManager;
-import org.apache.geode.management.internal.security.ResourceConstants;
-import org.apache.geode.security.AuthenticationFailedException;
-import org.apache.geode.security.SecurityManager;
-import org.apache.geode.test.junit.categories.UnitTest;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Properties;
-
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.same;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.examples.security.ExampleSecurityManager;
+import org.apache.geode.internal.protocol.protobuf.security.server.ProtobufSimpleAuthenticator;
+import org.apache.geode.management.internal.security.ResourceConstants;
+import org.apache.geode.security.AuthenticationFailedException;
+import org.apache.geode.security.SecurityManager;
+import org.apache.geode.test.junit.categories.UnitTest;
+
 @Category(UnitTest.class)
 public class ProtobufSimpleAuthenticatorJUnitTest {
   private static final String TEST_USERNAME = "user1";
@@ -95,8 +98,12 @@ public class ProtobufSimpleAuthenticatorJUnitTest {
     when(mockSecurityManager.authenticate(expectedAuthProperties))
         .thenThrow(new AuthenticationFailedException("BOOM!"));
 
-    protobufSimpleAuthenticator.authenticate(byteArrayInputStream, byteArrayOutputStream,
-        mockSecurityManager);
+    try {
+      protobufSimpleAuthenticator.authenticate(byteArrayInputStream, byteArrayOutputStream,
+          mockSecurityManager);
+      fail("This should throw an AuthenticationFailedException.");
+    } catch (AuthenticationFailedException e) {
+    }
 
     AuthenticationAPI.SimpleAuthenticationResponse simpleAuthenticationResponse =
         getSimpleAuthenticationResponse(byteArrayOutputStream);
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java
index 8b8a689..1e12d83 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java
@@ -33,18 +33,18 @@ import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.CacheLoaderException;
 import org.apache.geode.cache.Region;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
-import org.apache.geode.internal.protocol.protobuf.Result;
-import org.apache.geode.internal.protocol.protobuf.Success;
 import org.apache.geode.internal.protocol.protobuf.statistics.NoOpProtobufStatistics;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufRequestUtilities;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufUtilities;
+import org.apache.geode.internal.protocol.responses.Result;
+import org.apache.geode.internal.protocol.responses.Success;
+import org.apache.geode.internal.protocol.security.server.NoOpAuthorizer;
 import org.apache.geode.internal.serialization.exception.UnsupportedEncodingTypeException;
 import org.apache.geode.internal.serialization.registry.exception.CodecNotRegisteredForTypeException;
-import org.apache.geode.security.server.NoOpAuthorizer;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
@@ -59,6 +59,7 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
   private static final String TEST_INVALID_KEY = "I'm a naughty key!";
   private static final String NO_VALUE_PRESENT_FOR_THIS_KEY = "no value present for this key";
   private Region regionStub;
+  private MessageExecutionContext messageExecutionContext;
 
   @Before
   public void setUp() throws Exception {
@@ -74,13 +75,15 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
 
     when(cacheStub.getRegion(TEST_REGION)).thenReturn(regionStub);
     operationHandler = new GetAllRequestOperationHandler();
+
+    messageExecutionContext = new MessageExecutionContext(cacheStub, null, null,
+        new NoOpProtobufStatistics(), new NoOpAuthorizer());
   }
 
   @Test
   public void processReturnsExpectedValuesForValidKeys() throws Exception {
     Result<RegionAPI.GetAllResponse> result = operationHandler.process(serializationServiceStub,
-        generateTestRequest(true, false),
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+        generateTestRequest(true, false), messageExecutionContext);
 
     assertTrue(result instanceof Success);
 
@@ -97,11 +100,10 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
   }
 
   @Test
-  public void processReturnsNoEntriesForNoKeysRequested() throws UnsupportedEncodingTypeException,
-      CodecNotRegisteredForTypeException, InvalidExecutionContextException {
+  public void processReturnsNoEntriesForNoKeysRequested() throws InvalidExecutionContextException,
+      UnsupportedEncodingTypeException, CodecNotRegisteredForTypeException {
     Result<RegionAPI.GetAllResponse> result = operationHandler.process(serializationServiceStub,
-        generateTestRequest(false, false),
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+        generateTestRequest(false, false), messageExecutionContext);
 
     assertTrue(result instanceof Success);
 
@@ -117,9 +119,8 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
         NO_VALUE_PRESENT_FOR_THIS_KEY));
     RegionAPI.GetAllRequest getAllRequest =
         ProtobufRequestUtilities.createGetAllRequest(TEST_REGION, testKeys);
-    Result<RegionAPI.GetAllResponse> result = operationHandler.process(serializationServiceStub,
-        getAllRequest,
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+    Result<RegionAPI.GetAllResponse> result =
+        operationHandler.process(serializationServiceStub, getAllRequest, messageExecutionContext);
 
     assertTrue(result instanceof Success);
     RegionAPI.GetAllResponse message = result.getMessage();
@@ -131,11 +132,10 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
   }
 
   @Test
-  public void multipleKeysWhereOneThrows() throws UnsupportedEncodingTypeException,
-      CodecNotRegisteredForTypeException, InvalidExecutionContextException {
+  public void multipleKeysWhereOneThrows() throws InvalidExecutionContextException,
+      UnsupportedEncodingTypeException, CodecNotRegisteredForTypeException {
     Result<RegionAPI.GetAllResponse> result = operationHandler.process(serializationServiceStub,
-        generateTestRequest(true, true),
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+        generateTestRequest(true, true), messageExecutionContext);
 
     assertTrue(result instanceof Success);
 
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java
index 9b96dab..23e5327 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java
@@ -14,30 +14,31 @@
  */
 package org.apache.geode.internal.protocol.protobuf.operations;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
 import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.distributed.internal.LocatorLoadSnapshot;
 import org.apache.geode.distributed.internal.ServerLocation;
 import org.apache.geode.distributed.internal.ServerLocator;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
-import org.apache.geode.internal.protocol.protobuf.Result;
 import org.apache.geode.internal.protocol.protobuf.ServerAPI;
 import org.apache.geode.internal.protocol.protobuf.ServerAPI.GetAvailableServersResponse;
-import org.apache.geode.internal.protocol.protobuf.Success;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufRequestUtilities;
+import org.apache.geode.internal.protocol.responses.Result;
+import org.apache.geode.internal.protocol.responses.Success;
 import org.apache.geode.test.junit.categories.UnitTest;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.util.ArrayList;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 @Category(UnitTest.class)
 public class GetAvailableServersOperationHandlerJUnitTest extends OperationHandlerJUnitTest {
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetRegionNamesRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetRegionNamesRequestOperationHandlerJUnitTest.java
index a5525ee..f822a97 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetRegionNamesRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetRegionNamesRequestOperationHandlerJUnitTest.java
@@ -28,16 +28,14 @@ import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.Region;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
-import org.apache.geode.internal.protocol.protobuf.Result;
-import org.apache.geode.internal.protocol.protobuf.Success;
 import org.apache.geode.internal.protocol.protobuf.statistics.NoOpProtobufStatistics;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufRequestUtilities;
-import org.apache.geode.internal.serialization.exception.UnsupportedEncodingTypeException;
-import org.apache.geode.internal.serialization.registry.exception.CodecNotRegisteredForTypeException;
-import org.apache.geode.security.server.NoOpAuthorizer;
+import org.apache.geode.internal.protocol.responses.Result;
+import org.apache.geode.internal.protocol.responses.Success;
+import org.apache.geode.internal.protocol.security.server.NoOpAuthorizer;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
@@ -45,6 +43,7 @@ public class GetRegionNamesRequestOperationHandlerJUnitTest extends OperationHan
   private final String TEST_REGION1 = "test region 1";
   private final String TEST_REGION2 = "test region 2";
   private final String TEST_REGION3 = "test region 3";
+  private MessageExecutionContext messageExecutionContext;
 
   @Before
   public void setUp() throws Exception {
@@ -60,14 +59,15 @@ public class GetRegionNamesRequestOperationHandlerJUnitTest extends OperationHan
     when(cacheStub.rootRegions()).thenReturn(Collections
         .unmodifiableSet(new HashSet<>(Arrays.asList(region1Stub, region2Stub, region3Stub))));
     operationHandler = new GetRegionNamesRequestOperationHandler();
+    messageExecutionContext = new MessageExecutionContext(cacheStub, null, null,
+        new NoOpProtobufStatistics(), new NoOpAuthorizer());
   }
 
   @Test
-  public void processReturnsCacheRegions() throws UnsupportedEncodingTypeException,
-      CodecNotRegisteredForTypeException, InvalidExecutionContextException {
-    Result<RegionAPI.GetRegionNamesResponse> result = operationHandler.process(
-        serializationServiceStub, ProtobufRequestUtilities.createGetRegionNamesRequest(),
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+  public void processReturnsCacheRegions() throws InvalidExecutionContextException {
+    Result<RegionAPI.GetRegionNamesResponse> result =
+        operationHandler.process(serializationServiceStub,
+            ProtobufRequestUtilities.createGetRegionNamesRequest(), messageExecutionContext);
     Assert.assertTrue(result instanceof Success);
 
     RegionAPI.GetRegionNamesResponse getRegionsResponse = result.getMessage();
@@ -89,10 +89,11 @@ public class GetRegionNamesRequestOperationHandlerJUnitTest extends OperationHan
     Cache emptyCache = mock(Cache.class);;
     when(emptyCache.rootRegions())
         .thenReturn(Collections.unmodifiableSet(new HashSet<Region<String, String>>()));
+    messageExecutionContext = new MessageExecutionContext(emptyCache, null, null,
+        new NoOpProtobufStatistics(), new NoOpAuthorizer());
     Result<RegionAPI.GetRegionNamesResponse> result =
         operationHandler.process(serializationServiceStub,
-            ProtobufRequestUtilities.createGetRegionNamesRequest(), new MessageExecutionContext(
-                emptyCache, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+            ProtobufRequestUtilities.createGetRegionNamesRequest(), messageExecutionContext);
     Assert.assertTrue(result instanceof Success);
 
     RegionAPI.GetRegionNamesResponse getRegionsResponse = result.getMessage();
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetRegionRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetRegionRequestOperationHandlerJUnitTest.java
index df2c878..5e87068 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetRegionRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetRegionRequestOperationHandlerJUnitTest.java
@@ -30,16 +30,15 @@ import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.Scope;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.MessageUtil;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
-import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
-import org.apache.geode.internal.protocol.protobuf.Failure;
 import org.apache.geode.internal.protocol.protobuf.ProtocolErrorCode;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
-import org.apache.geode.internal.protocol.protobuf.Result;
 import org.apache.geode.internal.protocol.protobuf.statistics.NoOpProtobufStatistics;
-import org.apache.geode.security.server.NoOpAuthorizer;
+import org.apache.geode.internal.protocol.responses.Failure;
+import org.apache.geode.internal.protocol.responses.Result;
+import org.apache.geode.internal.protocol.security.server.NoOpAuthorizer;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
@@ -70,10 +69,10 @@ public class GetRegionRequestOperationHandlerJUnitTest extends OperationHandlerJ
     when(regionAttributesStub.getValueConstraint()).thenReturn(Integer.class);
     when(regionAttributesStub.getScope()).thenReturn(Scope.DISTRIBUTED_ACK);
 
-
+    MessageExecutionContext messageExecutionContext = new MessageExecutionContext(cacheStub, null,
+        null, new NoOpProtobufStatistics(), new NoOpAuthorizer());
     Result<RegionAPI.GetRegionResponse> result = operationHandler.process(serializationServiceStub,
-        MessageUtil.makeGetRegionRequest(TEST_REGION1),
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+        MessageUtil.makeGetRegionRequest(TEST_REGION1), messageExecutionContext);
     RegionAPI.GetRegionResponse response = result.getMessage();
     BasicTypes.Region region = response.getRegion();
     Assert.assertEquals(TEST_REGION1, region.getName());
@@ -85,19 +84,16 @@ public class GetRegionRequestOperationHandlerJUnitTest extends OperationHandlerJ
     Assert.assertEquals(10, region.getSize());
   }
 
-  private ClientProtocol.Request createRequestMessage(RegionAPI.GetRegionRequest getRegionRequest) {
-    return ClientProtocol.Request.newBuilder().setGetRegionRequest(getRegionRequest).build();
-  }
-
   @Test
   public void processReturnsNoCacheRegions() throws Exception {
     Cache emptyCache = mock(Cache.class);
     when(emptyCache.rootRegions())
         .thenReturn(Collections.unmodifiableSet(new HashSet<Region<String, String>>()));
     String unknownRegionName = "UNKNOWN_REGION";
+    MessageExecutionContext messageExecutionContext = new MessageExecutionContext(cacheStub, null,
+        null, new NoOpProtobufStatistics(), new NoOpAuthorizer());
     Result<RegionAPI.GetRegionResponse> result = operationHandler.process(serializationServiceStub,
-        MessageUtil.makeGetRegionRequest(unknownRegionName), new MessageExecutionContext(emptyCache,
-            new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+        MessageUtil.makeGetRegionRequest(unknownRegionName), messageExecutionContext);
     Assert.assertTrue(result instanceof Failure);
     Assert.assertEquals(ProtocolErrorCode.REGION_NOT_FOUND.codeValue,
         result.getErrorMessage().getError().getErrorCode());
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java
index 57282d3..c240ac8 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java
@@ -25,19 +25,19 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.Region;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
-import org.apache.geode.internal.protocol.protobuf.Failure;
 import org.apache.geode.internal.protocol.protobuf.ProtocolErrorCode;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
-import org.apache.geode.internal.protocol.protobuf.Result;
-import org.apache.geode.internal.protocol.protobuf.Success;
 import org.apache.geode.internal.protocol.protobuf.statistics.NoOpProtobufStatistics;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufRequestUtilities;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufUtilities;
+import org.apache.geode.internal.protocol.responses.Failure;
+import org.apache.geode.internal.protocol.responses.Result;
+import org.apache.geode.internal.protocol.responses.Success;
+import org.apache.geode.internal.protocol.security.server.NoOpAuthorizer;
 import org.apache.geode.internal.serialization.exception.UnsupportedEncodingTypeException;
 import org.apache.geode.internal.serialization.registry.exception.CodecNotRegisteredForTypeException;
-import org.apache.geode.security.server.NoOpAuthorizer;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
@@ -48,6 +48,7 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
   private final String MISSING_REGION = "missing region";
   private final String MISSING_KEY = "missing key";
   private final String NULLED_KEY = "nulled key";
+  private MessageExecutionContext messageExecutionContext;
 
   @Before
   public void setUp() throws Exception {
@@ -63,14 +64,16 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
     when(cacheStub.getRegion(TEST_REGION)).thenReturn(regionStub);
     when(cacheStub.getRegion(MISSING_REGION)).thenReturn(null);
     operationHandler = new GetRequestOperationHandler();
+
+    messageExecutionContext = new MessageExecutionContext(cacheStub, null, null,
+        new NoOpProtobufStatistics(), new NoOpAuthorizer());
   }
 
   @Test
   public void processReturnsTheEncodedValueFromTheRegion() throws Exception {
     RegionAPI.GetRequest getRequest = generateTestRequest(false, false, false);
-    Result<RegionAPI.GetResponse> result = operationHandler.process(serializationServiceStub,
-        getRequest,
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+    Result<RegionAPI.GetResponse> result =
+        operationHandler.process(serializationServiceStub, getRequest, messageExecutionContext);
 
     Assert.assertTrue(result instanceof Success);
     Assert.assertEquals(BasicTypes.EncodedValue.ValueCase.STRINGRESULT,
@@ -82,9 +85,8 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
   @Test
   public void processReturnsUnsucessfulResponseForInvalidRegion() throws Exception {
     RegionAPI.GetRequest getRequest = generateTestRequest(true, false, false);
-    Result<RegionAPI.GetResponse> response = operationHandler.process(serializationServiceStub,
-        getRequest,
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+    Result<RegionAPI.GetResponse> response =
+        operationHandler.process(serializationServiceStub, getRequest, messageExecutionContext);
 
     Assert.assertTrue(response instanceof Failure);
     Assert.assertEquals(ProtocolErrorCode.REGION_NOT_FOUND.codeValue,
@@ -94,9 +96,8 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
   @Test
   public void processReturnsKeyNotFoundWhenKeyIsNotFound() throws Exception {
     RegionAPI.GetRequest getRequest = generateTestRequest(false, true, false);
-    Result<RegionAPI.GetResponse> response = operationHandler.process(serializationServiceStub,
-        getRequest,
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+    Result<RegionAPI.GetResponse> response =
+        operationHandler.process(serializationServiceStub, getRequest, messageExecutionContext);
 
     Assert.assertTrue(response instanceof Success);
   }
@@ -104,9 +105,8 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
   @Test
   public void processReturnsLookupFailureWhenKeyFoundWithNoValue() throws Exception {
     RegionAPI.GetRequest getRequest = generateTestRequest(false, false, true);
-    Result<RegionAPI.GetResponse> response = operationHandler.process(serializationServiceStub,
-        getRequest,
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+    Result<RegionAPI.GetResponse> response =
+        operationHandler.process(serializationServiceStub, getRequest, messageExecutionContext);
 
     Assert.assertTrue(response instanceof Success);
   }
@@ -124,9 +124,8 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
         .setCustomEncodedValue(customEncodedValueBuilder).build();
     RegionAPI.GetRequest getRequest =
         ProtobufRequestUtilities.createGetRequest(TEST_REGION, encodedKey).getGetRequest();
-    Result<RegionAPI.GetResponse> response = operationHandler.process(serializationServiceStub,
-        getRequest,
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+    Result<RegionAPI.GetResponse> response =
+        operationHandler.process(serializationServiceStub, getRequest, messageExecutionContext);
 
     Assert.assertTrue(response instanceof Failure);
     Assert.assertEquals(ProtocolErrorCode.VALUE_ENCODING_ERROR.codeValue,
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java
index 70f7b66..449249b 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java
@@ -30,17 +30,17 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.Region;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
-import org.apache.geode.internal.protocol.protobuf.Result;
-import org.apache.geode.internal.protocol.protobuf.Success;
 import org.apache.geode.internal.protocol.protobuf.statistics.NoOpProtobufStatistics;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufRequestUtilities;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufUtilities;
+import org.apache.geode.internal.protocol.responses.Result;
+import org.apache.geode.internal.protocol.responses.Success;
+import org.apache.geode.internal.protocol.security.server.NoOpAuthorizer;
 import org.apache.geode.internal.serialization.exception.UnsupportedEncodingTypeException;
 import org.apache.geode.internal.serialization.registry.exception.CodecNotRegisteredForTypeException;
-import org.apache.geode.security.server.NoOpAuthorizer;
 import org.apache.geode.test.dunit.Assert;
 import org.apache.geode.test.junit.categories.UnitTest;
 
@@ -57,6 +57,7 @@ public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
   private final String TEST_REGION = "test region";
   private final String EXCEPTION_TEXT = "Simulating put failure";
   private Region regionMock;
+  private MessageExecutionContext messageExecutionContext;
 
   @Before
   public void setUp() throws Exception {
@@ -66,6 +67,9 @@ public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
         .thenThrow(new ClassCastException(EXCEPTION_TEXT));
 
     when(cacheStub.getRegion(TEST_REGION)).thenReturn(regionMock);
+
+    messageExecutionContext = new MessageExecutionContext(cacheStub, null, null,
+        new NoOpProtobufStatistics(), new NoOpAuthorizer());
   }
 
   @Test
@@ -73,8 +77,7 @@ public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
     PutAllRequestOperationHandler operationHandler = new PutAllRequestOperationHandler();
 
     Result<RegionAPI.PutAllResponse> result = operationHandler.process(serializationServiceStub,
-        generateTestRequest(false, true),
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+        generateTestRequest(false, true), messageExecutionContext);
 
     Assert.assertTrue(result instanceof Success);
 
@@ -88,8 +91,7 @@ public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
     PutAllRequestOperationHandler operationHandler = new PutAllRequestOperationHandler();
 
     Result<RegionAPI.PutAllResponse> result = operationHandler.process(serializationServiceStub,
-        generateTestRequest(true, true),
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+        generateTestRequest(true, true), messageExecutionContext);
 
     assertTrue(result instanceof Success);
     verify(regionMock).put(TEST_KEY1, TEST_VALUE1);
@@ -108,8 +110,7 @@ public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
     PutAllRequestOperationHandler operationHandler = new PutAllRequestOperationHandler();
 
     Result<RegionAPI.PutAllResponse> result = operationHandler.process(serializationServiceStub,
-        generateTestRequest(false, false),
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+        generateTestRequest(false, false), messageExecutionContext);
 
     assertTrue(result instanceof Success);
 
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/PutRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/PutRequestOperationHandlerJUnitTest.java
index 81b16ce..f475bca 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/PutRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/PutRequestOperationHandlerJUnitTest.java
@@ -29,19 +29,19 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.Region;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
-import org.apache.geode.internal.protocol.protobuf.Failure;
 import org.apache.geode.internal.protocol.protobuf.ProtocolErrorCode;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
-import org.apache.geode.internal.protocol.protobuf.Result;
-import org.apache.geode.internal.protocol.protobuf.Success;
 import org.apache.geode.internal.protocol.protobuf.statistics.NoOpProtobufStatistics;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufRequestUtilities;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufUtilities;
+import org.apache.geode.internal.protocol.responses.Failure;
+import org.apache.geode.internal.protocol.responses.Result;
+import org.apache.geode.internal.protocol.responses.Success;
+import org.apache.geode.internal.protocol.security.server.NoOpAuthorizer;
 import org.apache.geode.internal.serialization.exception.UnsupportedEncodingTypeException;
 import org.apache.geode.internal.serialization.registry.exception.CodecNotRegisteredForTypeException;
-import org.apache.geode.security.server.NoOpAuthorizer;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
@@ -50,6 +50,7 @@ public class PutRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
   private final String TEST_VALUE = "99";
   private final String TEST_REGION = "test region";
   private Region regionMock;
+  private MessageExecutionContext messageExecutionContext;
 
   @Before
   public void setUp() throws Exception {
@@ -59,14 +60,15 @@ public class PutRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
     when(regionMock.put(TEST_KEY, TEST_VALUE)).thenReturn(1);
 
     when(cacheStub.getRegion(TEST_REGION)).thenReturn(regionMock);
+    messageExecutionContext = new MessageExecutionContext(cacheStub, null, null,
+        new NoOpProtobufStatistics(), new NoOpAuthorizer());
   }
 
   @Test
   public void test_puttingTheEncodedEntryIntoRegion() throws Exception {
     PutRequestOperationHandler operationHandler = new PutRequestOperationHandler();
     Result<RegionAPI.PutResponse> result = operationHandler.process(serializationServiceStub,
-        generateTestRequest(),
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+        generateTestRequest(), messageExecutionContext);
 
     assertTrue(result instanceof Success);
 
@@ -94,9 +96,8 @@ public class PutRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
     BasicTypes.Entry testEntry = ProtobufUtilities.createEntry(encodedKey, testValue);
     RegionAPI.PutRequest putRequest =
         ProtobufRequestUtilities.createPutRequest(TEST_REGION, testEntry).getPutRequest();
-    Result<RegionAPI.PutResponse> result = operationHandler.process(serializationServiceStub,
-        putRequest,
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+    Result<RegionAPI.PutResponse> result =
+        operationHandler.process(serializationServiceStub, putRequest, messageExecutionContext);
 
     assertTrue(result instanceof Failure);
     assertEquals(ProtocolErrorCode.VALUE_ENCODING_ERROR.codeValue,
@@ -108,8 +109,7 @@ public class PutRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
     when(cacheStub.getRegion(TEST_REGION)).thenReturn(null);
     PutRequestOperationHandler operationHandler = new PutRequestOperationHandler();
     Result<RegionAPI.PutResponse> result = operationHandler.process(serializationServiceStub,
-        generateTestRequest(),
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+        generateTestRequest(), messageExecutionContext);
 
     assertTrue(result instanceof Failure);
     assertEquals(ProtocolErrorCode.REGION_NOT_FOUND.codeValue,
@@ -122,8 +122,7 @@ public class PutRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
 
     PutRequestOperationHandler operationHandler = new PutRequestOperationHandler();
     Result<RegionAPI.PutResponse> result = operationHandler.process(serializationServiceStub,
-        generateTestRequest(),
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+        generateTestRequest(), messageExecutionContext);
 
     assertTrue(result instanceof Failure);
     assertEquals(ProtocolErrorCode.CONSTRAINT_VIOLATION.codeValue,
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/RemoveRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/RemoveRequestOperationHandlerJUnitTest.java
index b5fc6b7..cfb411d 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/RemoveRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/RemoveRequestOperationHandlerJUnitTest.java
@@ -27,20 +27,20 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.Region;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
+import org.apache.geode.internal.protocol.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
-import org.apache.geode.internal.protocol.protobuf.Failure;
 import org.apache.geode.internal.protocol.protobuf.ProtocolErrorCode;
 import org.apache.geode.internal.protocol.protobuf.RegionAPI;
-import org.apache.geode.internal.protocol.protobuf.Result;
-import org.apache.geode.internal.protocol.protobuf.Success;
 import org.apache.geode.internal.protocol.protobuf.statistics.NoOpProtobufStatistics;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufRequestUtilities;
 import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufUtilities;
+import org.apache.geode.internal.protocol.responses.Failure;
+import org.apache.geode.internal.protocol.responses.Result;
+import org.apache.geode.internal.protocol.responses.Success;
+import org.apache.geode.internal.protocol.security.server.NoOpAuthorizer;
 import org.apache.geode.internal.serialization.exception.UnsupportedEncodingTypeException;
 import org.apache.geode.internal.serialization.registry.exception.CodecNotRegisteredForTypeException;
-import org.apache.geode.security.server.NoOpAuthorizer;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
@@ -51,6 +51,7 @@ public class RemoveRequestOperationHandlerJUnitTest extends OperationHandlerJUni
   private final String MISSING_REGION = "missing region";
   private final String MISSING_KEY = "missing key";
   private Region regionStub;
+  private MessageExecutionContext messageExecutionContext;
 
   @Before
   public void setUp() throws Exception {
@@ -64,25 +65,26 @@ public class RemoveRequestOperationHandlerJUnitTest extends OperationHandlerJUni
     when(cacheStub.getRegion(TEST_REGION)).thenReturn(regionStub);
     when(cacheStub.getRegion(MISSING_REGION)).thenReturn(null);
     operationHandler = new RemoveRequestOperationHandler();
+    messageExecutionContext = new MessageExecutionContext(cacheStub, null, null,
+        new NoOpProtobufStatistics(), new NoOpAuthorizer());
   }
 
   @Test
   public void processValidKeyRemovesTheEntryAndReturnSuccess() throws Exception {
+
     RegionAPI.RemoveRequest removeRequest = generateTestRequest(false, false).getRemoveRequest();
-    Result<RegionAPI.RemoveResponse> result = operationHandler.process(serializationServiceStub,
-        removeRequest,
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+    Result<RegionAPI.RemoveResponse> result =
+        operationHandler.process(serializationServiceStub, removeRequest, messageExecutionContext);
 
-    assertTrue(result instanceof Success);
+    assertTrue(result instanceof Result);
     verify(regionStub).remove(TEST_KEY);
   }
 
   @Test
   public void processReturnsUnsucessfulResponseForInvalidRegion() throws Exception {
     RegionAPI.RemoveRequest removeRequest = generateTestRequest(true, false).getRemoveRequest();
-    Result<RegionAPI.RemoveResponse> result = operationHandler.process(serializationServiceStub,
-        removeRequest,
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+    Result<RegionAPI.RemoveResponse> result =
+        operationHandler.process(serializationServiceStub, removeRequest, messageExecutionContext);
 
     assertTrue(result instanceof Failure);
     assertEquals(ProtocolErrorCode.REGION_NOT_FOUND.codeValue,
@@ -92,9 +94,8 @@ public class RemoveRequestOperationHandlerJUnitTest extends OperationHandlerJUni
   @Test
   public void processReturnsSuccessWhenKeyIsNotFound() throws Exception {
     RegionAPI.RemoveRequest removeRequest = generateTestRequest(false, true).getRemoveRequest();
-    Result<RegionAPI.RemoveResponse> result = operationHandler.process(serializationServiceStub,
-        removeRequest,
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+    Result<RegionAPI.RemoveResponse> result =
+        operationHandler.process(serializationServiceStub, removeRequest, messageExecutionContext);
 
     assertTrue(result instanceof Success);
   }
@@ -113,9 +114,8 @@ public class RemoveRequestOperationHandlerJUnitTest extends OperationHandlerJUni
 
     RegionAPI.RemoveRequest removeRequest =
         ProtobufRequestUtilities.createRemoveRequest(TEST_REGION, encodedKey).getRemoveRequest();;
-    Result<RegionAPI.RemoveResponse> result = operationHandler.process(serializationServiceStub,
-        removeRequest,
-        new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
+    Result<RegionAPI.RemoveResponse> result =
+        operationHandler.process(serializationServiceStub, removeRequest, messageExecutionContext);
 
     assertTrue(result instanceof Failure);
     assertEquals(ProtocolErrorCode.VALUE_ENCODING_ERROR.codeValue,
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/utilities/ProtobufUtilitiesJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/utilities/ProtobufUtilitiesJUnitTest.java
index 7404694..fbeacfe 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/utilities/ProtobufUtilitiesJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/utilities/ProtobufUtilitiesJUnitTest.java
@@ -15,17 +15,18 @@
 
 package org.apache.geode.internal.protocol.protobuf.utilities;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.nio.charset.Charset;
+
 import com.google.protobuf.ByteString;
-import org.apache.geode.internal.protocol.protobuf.BasicTypes;
-import org.apache.geode.internal.protocol.protobuf.utilities.exception.UnknownProtobufPrimitiveType;
-import org.apache.geode.test.junit.categories.UnitTest;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.nio.charset.Charset;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
+import org.apache.geode.internal.protocol.protobuf.BasicTypes;
+import org.apache.geode.internal.protocol.protobuf.utilities.exception.UnknownProtobufPrimitiveType;
+import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
 public class ProtobufUtilitiesJUnitTest {
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/security/server/AuthenticationIntegrationTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/security/server/AuthenticationIntegrationTest.java
new file mode 100644
index 0000000..00ffa12
--- /dev/null
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/security/server/AuthenticationIntegrationTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.security.server;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.protocol.protobuf.AuthenticationAPI;
+import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.RegionAPI;
+import org.apache.geode.internal.protocol.protobuf.serializer.ProtobufProtocolSerializer;
+import org.apache.geode.management.internal.security.ResourceConstants;
+import org.apache.geode.security.SecurityManager;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class AuthenticationIntegrationTest {
+
+  private static final String TEST_USERNAME = "bob";
+  private static final String TEST_PASSWORD = "bobspassword";
+  private Cache cache;
+
+  @Rule
+  public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
+
+  private OutputStream outputStream;
+  private InputStream inputStream;
+  private ProtobufProtocolSerializer protobufProtocolSerializer;
+
+  public void setUp(String authenticationMode) throws IOException {
+    Properties expectedAuthProperties = new Properties();
+    expectedAuthProperties.setProperty(ResourceConstants.USER_NAME, TEST_USERNAME);
+    expectedAuthProperties.setProperty(ResourceConstants.PASSWORD, TEST_PASSWORD);
+
+    Object securityPrincipal = new Object();
+    SecurityManager mockSecurityManager = mock(SecurityManager.class);
+    when(mockSecurityManager.authenticate(expectedAuthProperties)).thenReturn(securityPrincipal);
+    when(mockSecurityManager.authorize(same(securityPrincipal), any())).thenReturn(true);
+
+    Properties properties = new Properties();
+    CacheFactory cacheFactory = new CacheFactory(properties);
+    cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0"); // sometimes it isn't due to other
+                                                               // tests.
+    cacheFactory.set(ConfigurationProperties.USE_CLUSTER_CONFIGURATION, "false");
+    cacheFactory.set(ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION, "false");
+
+    cacheFactory.setSecurityManager(mockSecurityManager);
+    cache = cacheFactory.create();
+
+    CacheServer cacheServer = cache.addCacheServer();
+    int cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort();
+    cacheServer.setPort(cacheServerPort);
+    cacheServer.start();
+
+
+    System.setProperty("geode.feature-protobuf-protocol", "true");
+    System.setProperty("geode.protocol-authentication-mode", authenticationMode);
+    Socket socket = new Socket("localhost", cacheServerPort);
+
+    Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
+    outputStream = socket.getOutputStream();
+    inputStream = socket.getInputStream();
+    outputStream.write(110);
+
+    protobufProtocolSerializer = new ProtobufProtocolSerializer();
+  }
+
+  @After
+  public void tearDown() {
+    if (cache != null) {
+      cache.close();
+      cache = null;
+    }
+  }
+
+  @Test
+  public void noopAuthenticationSucceeds() throws Exception {
+    setUp("NOOP");
+    ClientProtocol.Message getRegionsMessage =
+        ClientProtocol.Message.newBuilder().setRequest(ClientProtocol.Request.newBuilder()
+            .setGetRegionNamesRequest(RegionAPI.GetRegionNamesRequest.newBuilder())).build();
+    protobufProtocolSerializer.serialize(getRegionsMessage, outputStream);
+
+    ClientProtocol.Message regionsResponse = protobufProtocolSerializer.deserialize(inputStream);
+    assertEquals(ClientProtocol.Response.ResponseAPICase.GETREGIONNAMESRESPONSE,
+        regionsResponse.getResponse().getResponseAPICase());
+  }
+
+  @Test
+  public void simpleAuthenticationSucceeds() throws Exception {
+    setUp("SIMPLE");
+    AuthenticationAPI.SimpleAuthenticationRequest authenticationRequest =
+        AuthenticationAPI.SimpleAuthenticationRequest.newBuilder().setUsername(TEST_USERNAME)
+            .setPassword(TEST_PASSWORD).build();
+    authenticationRequest.writeDelimitedTo(outputStream);
+
+    AuthenticationAPI.SimpleAuthenticationResponse authenticationResponse =
+        AuthenticationAPI.SimpleAuthenticationResponse.parseDelimitedFrom(inputStream);
+    assertTrue(authenticationResponse.getAuthenticated());
+
+    ClientProtocol.Message getRegionsMessage =
+        ClientProtocol.Message.newBuilder().setRequest(ClientProtocol.Request.newBuilder()
+            .setGetRegionNamesRequest(RegionAPI.GetRegionNamesRequest.newBuilder())).build();
+    protobufProtocolSerializer.serialize(getRegionsMessage, outputStream);
+
+    ClientProtocol.Message regionsResponse = protobufProtocolSerializer.deserialize(inputStream);
+    assertEquals(ClientProtocol.Response.ResponseAPICase.GETREGIONNAMESRESPONSE,
+        regionsResponse.getResponse().getResponseAPICase());
+
+  }
+}
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/security/server/AuthorizationIntegrationTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/security/server/AuthorizationIntegrationTest.java
new file mode 100644
index 0000000..5d9a99a
--- /dev/null
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/security/server/AuthorizationIntegrationTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.security.server;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.protocol.protobuf.AuthenticationAPI;
+import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.ProtobufSerializationService;
+import org.apache.geode.internal.protocol.protobuf.ProtocolErrorCode;
+import org.apache.geode.internal.protocol.protobuf.RegionAPI;
+import org.apache.geode.internal.protocol.protobuf.serializer.ProtobufProtocolSerializer;
+import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufUtilities;
+import org.apache.geode.management.internal.security.ResourceConstants;
+import org.apache.geode.security.ResourcePermission;
+import org.apache.geode.security.SecurityManager;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class AuthorizationIntegrationTest {
+
+  private static final String TEST_USERNAME = "bob";
+  private static final String TEST_PASSWORD = "bobspassword";
+  public static final String TEST_REGION = "testRegion";
+
+  @Rule
+  public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
+
+  private Cache cache;
+  private int cacheServerPort;
+  private CacheServer cacheServer;
+  private Socket socket;
+  private OutputStream outputStream;
+  private ProtobufSerializationService serializationService;
+  private InputStream inputStream;
+  private ProtobufProtocolSerializer protobufProtocolSerializer;
+  private Object securityPrincipal;
+  private SecurityManager mockSecurityManager;
+  public static final ResourcePermission READ_PERMISSION =
+      new ResourcePermission(ResourcePermission.Resource.DATA, ResourcePermission.Operation.READ);
+  public static final ResourcePermission WRITE_PERMISSION =
+      new ResourcePermission(ResourcePermission.Resource.DATA, ResourcePermission.Operation.WRITE);
+
+  @Before
+  public void setUp() throws IOException {
+
+    System.setProperty("geode.feature-protobuf-protocol", "true");
+    System.setProperty("geode.protocol-authentication-mode", "SIMPLE");
+
+    Properties expectedAuthProperties = new Properties();
+    expectedAuthProperties.setProperty(ResourceConstants.USER_NAME, TEST_USERNAME);
+    expectedAuthProperties.setProperty(ResourceConstants.PASSWORD, TEST_PASSWORD);
+
+    securityPrincipal = new Object();
+    mockSecurityManager = mock(SecurityManager.class);
+    when(mockSecurityManager.authenticate(expectedAuthProperties)).thenReturn(securityPrincipal);
+
+    Properties properties = new Properties();
+    CacheFactory cacheFactory = new CacheFactory(properties);
+    cacheFactory.set("mcast-port", "0"); // sometimes it isn't due to other tests.
+
+    cacheFactory.setSecurityManager(mockSecurityManager);
+    cache = cacheFactory.create();
+
+    cacheServer = cache.addCacheServer();
+    cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort();
+    cacheServer.setPort(cacheServerPort);
+    cacheServer.start();
+
+    cache.createRegionFactory().create(TEST_REGION);
+
+    socket = new Socket("localhost", cacheServerPort);
+
+    Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
+    outputStream = socket.getOutputStream();
+    inputStream = socket.getInputStream();
+    outputStream.write(110);
+
+    serializationService = new ProtobufSerializationService();
+    protobufProtocolSerializer = new ProtobufProtocolSerializer();
+
+    when(mockSecurityManager.authorize(same(securityPrincipal), any())).thenReturn(false);
+    AuthenticationAPI.SimpleAuthenticationRequest authenticationRequest =
+        AuthenticationAPI.SimpleAuthenticationRequest.newBuilder().setUsername(TEST_USERNAME)
+            .setPassword(TEST_PASSWORD).build();
+    authenticationRequest.writeDelimitedTo(outputStream);
+
+    AuthenticationAPI.SimpleAuthenticationResponse authenticationResponse =
+        AuthenticationAPI.SimpleAuthenticationResponse.parseDelimitedFrom(inputStream);
+    assertTrue(authenticationResponse.getAuthenticated());
+  }
+
+  @After
+  public void shutDown() throws IOException {
+    cache.close();
+    socket.close();
+  }
+
+
+  @Test
+  public void validateNoPermissions() throws Exception {
+    when(mockSecurityManager.authorize(securityPrincipal, READ_PERMISSION)).thenReturn(false);
+    when(mockSecurityManager.authorize(securityPrincipal, WRITE_PERMISSION)).thenReturn(false);
+
+    verifyOperations(false, false);
+  }
+
+  @Test
+  public void validateWritePermission() throws Exception {
+    when(mockSecurityManager.authorize(securityPrincipal, READ_PERMISSION)).thenReturn(false);
+    when(mockSecurityManager.authorize(securityPrincipal, WRITE_PERMISSION)).thenReturn(true);
+
+    verifyOperations(false, true);
+  }
+
+  @Test
+  public void validateReadPermission() throws Exception {
+    when(mockSecurityManager.authorize(securityPrincipal, READ_PERMISSION)).thenReturn(true);
+    when(mockSecurityManager.authorize(securityPrincipal, WRITE_PERMISSION)).thenReturn(false);
+
+    verifyOperations(true, false);
+  }
+
+  @Test
+  public void validateReadAndWritePermission() throws Exception {
+    when(mockSecurityManager.authorize(securityPrincipal, READ_PERMISSION)).thenReturn(true);
+    when(mockSecurityManager.authorize(securityPrincipal, WRITE_PERMISSION)).thenReturn(true);
+
+    verifyOperations(true, true);
+  }
+
+  private void verifyOperations(boolean readAllowed, boolean writeAllowed) throws Exception {
+    ClientProtocol.Message getRegionsMessage =
+        ClientProtocol.Message.newBuilder().setRequest(ClientProtocol.Request.newBuilder()
+            .setGetRegionNamesRequest(RegionAPI.GetRegionNamesRequest.newBuilder())).build();
+    validateOperationAuthorized(getRegionsMessage, inputStream, outputStream,
+        readAllowed ? ClientProtocol.Response.ResponseAPICase.GETREGIONNAMESRESPONSE
+            : ClientProtocol.Response.ResponseAPICase.ERRORRESPONSE);
+
+    ClientProtocol.Message putMessage = ClientProtocol.Message.newBuilder()
+        .setRequest(ClientProtocol.Request.newBuilder()
+            .setPutRequest(RegionAPI.PutRequest.newBuilder().setRegionName(TEST_REGION).setEntry(
+                ProtobufUtilities.createEntry(serializationService, "TEST_KEY", "TEST_VALUE"))))
+        .build();
+    validateOperationAuthorized(putMessage, inputStream, outputStream,
+        writeAllowed ? ClientProtocol.Response.ResponseAPICase.PUTRESPONSE
+            : ClientProtocol.Response.ResponseAPICase.ERRORRESPONSE);
+
+    ClientProtocol.Message removeMessage = ClientProtocol.Message.newBuilder()
+        .setRequest(ClientProtocol.Request.newBuilder()
+            .setRemoveRequest(RegionAPI.RemoveRequest.newBuilder().setRegionName(TEST_REGION)
+                .setKey(ProtobufUtilities.createEncodedValue(serializationService, "TEST_KEY"))))
+        .build();
+    validateOperationAuthorized(removeMessage, inputStream, outputStream,
+        writeAllowed ? ClientProtocol.Response.ResponseAPICase.REMOVERESPONSE
+            : ClientProtocol.Response.ResponseAPICase.ERRORRESPONSE);
+  }
+
+  private void validateOperationAuthorized(ClientProtocol.Message message, InputStream inputStream,
+      OutputStream outputStream, ClientProtocol.Response.ResponseAPICase expectedResponseType)
+      throws Exception {
+    protobufProtocolSerializer.serialize(message, outputStream);
+    ClientProtocol.Message response = protobufProtocolSerializer.deserialize(inputStream);
+    assertEquals(expectedResponseType, response.getResponse().getResponseAPICase());
+    if (expectedResponseType == ClientProtocol.Response.ResponseAPICase.ERRORRESPONSE) {
+      assertEquals(ProtocolErrorCode.AUTHORIZATION_FAILED.codeValue,
+          response.getResponse().getErrorResponse().getError().getErrorCode());
+    }
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.