You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2017/08/23 20:55:06 UTC

[02/25] geode git commit: GEODE-3406: Locator accepts Protobuf requests

GEODE-3406: Locator accepts Protobuf requests

Also addresses GEODE-3400, GEODE-3399
This allows the locator to respond to Protobuf requests. Currently it
will only be able to respond to getAvailableServers.

To enable this we are introducing a new value of "0" that will be sent
in place of the Gossip version. After it we expect the same magic byte
("110") as in AcceptorImpl.

This also is gated by the `geode.feature-protobuf-protocol` system
property.

The getAvailableServers request handler now uses the locator directly,
since we are on the locator.

Signed-off-by: Brian Rowe <br...@pivotal.io>


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/530f48f3
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/530f48f3
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/530f48f3

Branch: refs/heads/feature/GEODE-3503
Commit: 530f48f35a96c4f8af7e51ed03b1ee2e5e150ebd
Parents: be45511
Author: Alexander Murmann <am...@pivotal.io>
Authored: Mon Aug 14 15:08:14 2017 -0700
Committer: Hitesh Khamesra <hk...@pivotal.io>
Committed: Tue Aug 22 10:52:50 2017 -0700

----------------------------------------------------------------------
 .../distributed/internal/InternalLocator.java   |   4 +-
 .../distributed/internal/ServerLocator.java     |   4 +
 .../internal/tcpserver/TcpServer.java           |  77 +++++++----
 .../geode/internal/cache/InternalCache.java     |   4 +-
 .../ClientProtoclMessageHandlerLoader.java      |  64 +++++++++
 .../sockets/ClientProtocolMessageHandler.java   |   7 +-
 .../ClientProtocolMessageHandlerLoader.java     |  64 +++++++++
 .../cache/tier/sockets/ExecutionContext.java    |  54 ++++++++
 .../GenericProtocolServerConnection.java        |   3 +-
 .../InvalidExecutionContextException.java       |  33 +++++
 .../AutoConnectionSourceImplJUnitTest.java      |   8 +-
 .../tcpserver/TCPServerSSLJUnitTest.java        |   2 +-
 .../internal/tcpserver/TcpServerJUnitTest.java  |   2 +-
 .../protocol/operations/OperationHandler.java   |   6 +-
 .../protocol/protobuf/ProtobufOpsProcessor.java |  17 ++-
 .../protobuf/ProtobufStreamProcessor.java       |  29 ++--
 .../protocol/protobuf/ProtocolErrorCode.java    |   1 +
 .../GetAllRequestOperationHandler.java          |   8 +-
 .../GetAvailableServersOperationHandler.java    |  65 ++-------
 .../GetRegionNamesRequestOperationHandler.java  |   8 +-
 .../GetRegionRequestOperationHandler.java       |   8 +-
 .../operations/GetRequestOperationHandler.java  |   8 +-
 .../PutAllRequestOperationHandler.java          |   8 +-
 .../operations/PutRequestOperationHandler.java  |   8 +-
 .../RemoveRequestOperationHandler.java          |   9 +-
 .../protocol/GetAvailableServersDUnitTest.java  | 108 ---------------
 .../RoundTripLocatorConnectionJUnitTest.java    | 132 +++++++++++++++++++
 .../protobuf/ProtobufStreamProcessorTest.java   |   4 +-
 .../GetAllRequestOperationHandlerJUnitTest.java |  18 +--
 ...ailableServersOperationHandlerJUnitTest.java |  97 ++++----------
 ...onNamesRequestOperationHandlerJUnitTest.java |  26 ++--
 ...tRegionRequestOperationHandlerJUnitTest.java |  16 ++-
 .../GetRequestOperationHandlerJUnitTest.java    |  33 ++---
 .../PutAllRequestOperationHandlerJUnitTest.java |  13 +-
 .../PutRequestOperationHandlerJUnitTest.java    |  39 +++---
 .../RemoveRequestOperationHandlerJUnitTest.java |  27 ++--
 36 files changed, 618 insertions(+), 396 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
index 4725518..8d2daf6 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
@@ -315,7 +315,6 @@ public class InternalLocator extends Locator implements ConnectListener {
 
       // TODO:GEODE-1243: this.server is now a TcpServer and it should store or return its non-zero
       // port in a variable to use here
-
       try {
         newLocator.startPeerLocation(startDistributedSystem);
         if (startDistributedSystem) {
@@ -500,7 +499,7 @@ public class InternalLocator extends Locator implements ConnectListener {
     this.stats = new LocatorStats();
 
     this.server = new TcpServer(port, this.bindAddress, null, this.config, this.handler,
-        new DelayedPoolStatHelper(), group, this.toString());
+        new DelayedPoolStatHelper(), group, this.toString(), this);
   }
 
   // Reset the file names with the correct port number if startLocatorAndDS was called with port
@@ -636,7 +635,6 @@ public class InternalLocator extends Locator implements ConnectListener {
    */
   private void startDistributedSystem() throws UnknownHostException {
     InternalDistributedSystem existing = InternalDistributedSystem.getConnectedInstance();
-
     if (existing != null) {
       // LOG: changed from config to info
       logger.info(LocalizedMessage

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java
index fb66b4c..27c557c 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java
@@ -103,6 +103,10 @@ public class ServerLocator implements TcpHandler, DistributionAdvisee {
     this.stats = null;
   }
 
+  public LocatorLoadSnapshot getLoadSnapshot() {
+    return loadSnapshot;
+  }
+
   public ServerLocator(int port, InetAddress bindAddress, String hostNameForClients, File logFile,
       ProductUseLog productUseLogWriter, String memberName, InternalDistributedSystem ds,
       LocatorStats stats) throws IOException {

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
index 976f504..c3d51c1 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
@@ -22,6 +22,7 @@ import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.DistributionConfigImpl;
 import org.apache.geode.distributed.internal.DistributionStats;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.distributed.internal.PoolStatHelper;
 import org.apache.geode.distributed.internal.PooledExecutorWithDMStats;
 import org.apache.geode.internal.DSFIDFactory;
@@ -31,6 +32,10 @@ import org.apache.geode.internal.VersionedDataInputStream;
 import org.apache.geode.internal.VersionedDataOutputStream;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.Acceptor;
+import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandlerLoader;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler;
+import org.apache.geode.internal.cache.tier.sockets.ExecutionContext;
 import org.apache.geode.internal.cache.tier.sockets.HandShake;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.net.SocketCreator;
@@ -49,7 +54,6 @@ import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketAddress;
 import java.net.URL;
-import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -78,6 +82,7 @@ public class TcpServer {
    * <p>
    * This should be incremented if the gossip message structures change
    * <p>
+   * 0 - special indicator of a non-gossip message from a client<br>
    * 1000 - gemfire 5.5 - using java serialization<br>
    * 1001 - 5.7 - using DataSerializable and supporting server locator messages.<br>
    * 1002 - 7.1 - sending GemFire version along with GOSSIP_VERSION in each request.
@@ -86,6 +91,7 @@ public class TcpServer {
    * version number
    */
   public final static int GOSSIPVERSION = 1002;
+  public final static int NON_GOSSIP_REQUEST_VERSION = 0;
   // Don't change it ever. We did NOT send GemFire version in a Gossip request till 1001 version.
   // This GOSSIPVERSION is used in _getVersionForAddress request for getting GemFire version of a
   // GossipServer.
@@ -120,6 +126,7 @@ public class TcpServer {
   private InetAddress bind_address;
   private volatile boolean shuttingDown = false; // GemStoneAddition
   private final PoolStatHelper poolHelper;
+  private InternalLocator internalLocator;
   private final TcpHandler handler;
 
   private PooledExecutorWithDMStats executor;
@@ -143,11 +150,12 @@ public class TcpServer {
 
   public TcpServer(int port, InetAddress bind_address, Properties sslConfig,
       DistributionConfigImpl cfg, TcpHandler handler, PoolStatHelper poolHelper,
-      ThreadGroup threadGroup, String threadName) {
+      ThreadGroup threadGroup, String threadName, InternalLocator internalLocator) {
     this.port = port;
     this.bind_address = bind_address;
     this.handler = handler;
     this.poolHelper = poolHelper;
+    this.internalLocator = internalLocator;
     // register DSFID types first; invoked explicitly so that all message type
     // initializations do not happen in first deserialization on a possibly
     // "precious" thread
@@ -334,42 +342,46 @@ public class TcpServer {
    * fix for bug 33711 - client requests are spun off to another thread for processing. Requests are
    * synchronized in processGossip.
    */
-  private void processRequest(final Socket sock) {
+  private void processRequest(final Socket socket) {
     executor.execute(() -> {
       long startTime = DistributionStats.getStatTime();
       DataInputStream input = null;
       Object request, response;
       try {
 
-        sock.setSoTimeout(READ_TIMEOUT);
-        getSocketCreator().configureServerSSLSocket(sock);
+        socket.setSoTimeout(READ_TIMEOUT);
+        getSocketCreator().configureServerSSLSocket(socket);
 
         try {
-          input = new DataInputStream(sock.getInputStream());
+          input = new DataInputStream(socket.getInputStream());
         } catch (StreamCorruptedException e) {
           // Some garbage can be left on the socket stream
           // if a peer disappears at exactly the wrong moment.
           log.debug("Discarding illegal request from "
-              + (sock.getInetAddress().getHostAddress() + ":" + sock.getPort()), e);
+              + (socket.getInetAddress().getHostAddress() + ":" + socket.getPort()), e);
           return;
         }
-        int gossipVersion = readGossipVersion(sock, input);
+        int gossipVersion = readGossipVersion(socket, input);
 
         short versionOrdinal;
+        if (gossipVersion == NON_GOSSIP_REQUEST_VERSION) {
+          if (input.readUnsignedByte() == AcceptorImpl.PROTOBUF_CLIENT_SERVER_PROTOCOL
+              && Boolean.getBoolean("geode.feature-protobuf-protocol")) {
+            ClientProtocolMessageHandler messageHandler = ClientProtocolMessageHandlerLoader.load();
+            messageHandler.receiveMessage(input, socket.getOutputStream(),
+                new ExecutionContext(internalLocator));
+          } else {
+            rejectUnknownProtocolConnection(socket, gossipVersion);
+            return;
+          }
+        }
         if (gossipVersion <= getCurrentGossipVersion()
             && GOSSIP_TO_GEMFIRE_VERSION_MAP.containsKey(gossipVersion)) {
           // Create a versioned stream to remember sender's GemFire version
           versionOrdinal = (short) GOSSIP_TO_GEMFIRE_VERSION_MAP.get(gossipVersion);
         } else {
           // Close the socket. We can not accept requests from a newer version
-          try {
-            sock.getOutputStream().write("unknown protocol version".getBytes());
-            sock.getOutputStream().flush();
-          } catch (IOException e) {
-            log.debug(
-                "exception in sending reply to process using unknown protocol " + gossipVersion, e);
-          }
-          sock.close();
+          rejectUnknownProtocolConnection(socket, gossipVersion);
           return;
         }
         if (Version.GFE_71.compareTo(versionOrdinal) <= 0) {
@@ -378,13 +390,13 @@ public class TcpServer {
         }
 
         if (log.isDebugEnabled() && versionOrdinal != Version.CURRENT_ORDINAL) {
-          log.debug("Locator reading request from " + sock.getInetAddress() + " with version "
+          log.debug("Locator reading request from " + socket.getInetAddress() + " with version "
               + Version.fromOrdinal(versionOrdinal, false));
         }
         input = new VersionedDataInputStream(input, Version.fromOrdinal(versionOrdinal, false));
         request = DataSerializer.readObject(input);
         if (log.isDebugEnabled()) {
-          log.debug("Locator received request " + request + " from " + sock.getInetAddress());
+          log.debug("Locator received request " + request + " from " + socket.getInetAddress());
         }
         if (request instanceof ShutdownRequest) {
           shuttingDown = true;
@@ -405,7 +417,7 @@ public class TcpServer {
 
         startTime = DistributionStats.getStatTime();
         if (response != null) {
-          DataOutputStream output = new DataOutputStream(sock.getOutputStream());
+          DataOutputStream output = new DataOutputStream(socket.getOutputStream());
           if (versionOrdinal != Version.CURRENT_ORDINAL) {
             output =
                 new VersionedDataOutputStream(output, Version.fromOrdinal(versionOrdinal, false));
@@ -422,19 +434,19 @@ public class TcpServer {
         // ignore
       } catch (ClassNotFoundException ex) {
         String sender = null;
-        if (sock != null) {
-          sender = sock.getInetAddress().getHostAddress();
+        if (socket != null) {
+          sender = socket.getInetAddress().getHostAddress();
         }
         log.info("Unable to process request from " + sender + " exception=" + ex.getMessage());
       } catch (Exception ex) {
         String sender = null;
-        if (sock != null) {
-          sender = sock.getInetAddress().getHostAddress();
+        if (socket != null) {
+          sender = socket.getInetAddress().getHostAddress();
         }
         if (ex instanceof IOException) {
           // IOException could be caused by a client failure. Don't
           // log with severe.
-          if (!sock.isClosed()) {
+          if (!socket.isClosed()) {
             log.info("Exception in processing request from " + sender, ex);
           }
         } else {
@@ -447,8 +459,8 @@ public class TcpServer {
       } catch (Throwable ex) {
         SystemFailure.checkFailure();
         String sender = null;
-        if (sock != null) {
-          sender = sock.getInetAddress().getHostAddress();
+        if (socket != null) {
+          sender = socket.getInetAddress().getHostAddress();
         }
         try {
           log.fatal("Exception in processing request from " + sender, ex);
@@ -461,7 +473,7 @@ public class TcpServer {
         }
       } finally {
         try {
-          sock.close();
+          socket.close();
         } catch (IOException ignore) {
           // ignore
         }
@@ -469,6 +481,17 @@ public class TcpServer {
     });
   }
 
+  private void rejectUnknownProtocolConnection(Socket socket, int gossipVersion)
+      throws IOException {
+    try {
+      socket.getOutputStream().write("unknown protocol version".getBytes());
+      socket.getOutputStream().flush();
+    } catch (IOException e) {
+      log.debug("exception in sending reply to process using unknown protocol " + gossipVersion, e);
+    }
+    socket.close();
+  }
+
   private int readGossipVersion(Socket sock, DataInputStream input) throws Exception {
     // read the first byte & check for an improperly configured client pool trying
     // to contact a cache server

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
index 84aa66e..4c7a6ef 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
@@ -75,7 +75,9 @@ import org.apache.geode.pdx.internal.TypeRegistry;
  */
 public interface InternalCache extends Cache, Extensible<Cache>, CacheTime {
 
-  InternalDistributedMember getMyId();
+  default InternalDistributedMember getMyId() {
+    return null;
+  }
 
   Collection<DiskStore> listDiskStores();
 

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtoclMessageHandlerLoader.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtoclMessageHandlerLoader.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtoclMessageHandlerLoader.java
new file mode 100644
index 0000000..6654757
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtoclMessageHandlerLoader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.tier.sockets;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Iterator;
+import java.util.ServiceLoader;
+
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.Acceptor;
+import org.apache.geode.internal.cache.tier.CachedRegionHelper;
+import org.apache.geode.internal.security.SecurityService;
+
+/**
+ * Creates instances of ServerConnection based on the connection mode provided.
+ */
+public class ClientProtoclMessageHandlerLoader {
+  private static ClientProtocolMessageHandler protobufProtocolHandler;
+  private static final Object protocolLoadLock = new Object();
+
+  public static ClientProtocolMessageHandler load() {
+    if (protobufProtocolHandler != null) {
+      return protobufProtocolHandler;
+    }
+
+    synchronized (protocolLoadLock) {
+      if (protobufProtocolHandler != null) {
+        return protobufProtocolHandler;
+      }
+
+      ServiceLoader<ClientProtocolMessageHandler> loader =
+          ServiceLoader.load(ClientProtocolMessageHandler.class);
+      Iterator<ClientProtocolMessageHandler> iterator = loader.iterator();
+
+      if (!iterator.hasNext()) {
+        throw new ServiceLoadingFailureException(
+            "ClientProtocolMessageHandler implementation not found in JVM");
+      }
+
+      ClientProtocolMessageHandler returnValue = iterator.next();
+
+      if (iterator.hasNext()) {
+        throw new ServiceLoadingFailureException(
+            "Multiple service implementations found for ClientProtocolMessageHandler");
+      }
+
+      return returnValue;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
index 32e9e4b..38ab73e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
@@ -15,12 +15,11 @@
 
 package org.apache.geode.internal.cache.tier.sockets;
 
-import org.apache.geode.internal.cache.InternalCache;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+
 /**
  * This is an interface that other modules can implement to hook into
  * {@link GenericProtocolServerConnection} to handle messages sent to Geode.
@@ -30,6 +29,6 @@ import java.io.OutputStream;
  * {@link GenericProtocolServerConnection}.
  */
 public interface ClientProtocolMessageHandler {
-  void receiveMessage(InputStream inputStream, OutputStream outputStream, InternalCache cache)
-      throws IOException;
+  void receiveMessage(InputStream inputStream, OutputStream outputStream,
+      ExecutionContext executionContext) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandlerLoader.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandlerLoader.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandlerLoader.java
new file mode 100644
index 0000000..1dc6129
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandlerLoader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.tier.sockets;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Iterator;
+import java.util.ServiceLoader;
+
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.Acceptor;
+import org.apache.geode.internal.cache.tier.CachedRegionHelper;
+import org.apache.geode.internal.security.SecurityService;
+
+/**
+ * Creates instances of ServerConnection based on the connection mode provided.
+ */
+public class ClientProtocolMessageHandlerLoader {
+  private static ClientProtocolMessageHandler protobufProtocolHandler;
+  private static final Object protocolLoadLock = new Object();
+
+  public static ClientProtocolMessageHandler load() {
+    if (protobufProtocolHandler != null) {
+      return protobufProtocolHandler;
+    }
+
+    synchronized (protocolLoadLock) {
+      if (protobufProtocolHandler != null) {
+        return protobufProtocolHandler;
+      }
+
+      ServiceLoader<ClientProtocolMessageHandler> loader =
+          ServiceLoader.load(ClientProtocolMessageHandler.class);
+      Iterator<ClientProtocolMessageHandler> iterator = loader.iterator();
+
+      if (!iterator.hasNext()) {
+        throw new ServiceLoadingFailureException(
+            "ClientProtocolMessageHandler implementation not found in JVM");
+      }
+
+      ClientProtocolMessageHandler returnValue = iterator.next();
+
+      if (iterator.hasNext()) {
+        throw new ServiceLoadingFailureException(
+            "Multiple service implementations found for ClientProtocolMessageHandler");
+      }
+
+      return returnValue;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ExecutionContext.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ExecutionContext.java
new file mode 100644
index 0000000..27da205
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ExecutionContext.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.tier.sockets;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.distributed.internal.InternalLocator;
+
+public class ExecutionContext {
+  private Cache cache;
+  private InternalLocator locator;
+
+  public ExecutionContext(Cache cache) {
+    this.cache = cache;
+  }
+
+  public ExecutionContext(InternalLocator locator) {
+    this.locator = locator;
+  }
+
+  // This throws if the cache isn't present because we know that non of the callers can take any
+  // reasonable action if the cache is not present
+  public Cache getCache() throws InvalidExecutionContextException {
+    if (cache != null) {
+      return cache;
+    } else {
+      throw new InvalidExecutionContextException(
+          "Execution context's cache was accessed but isn't present. Did this happen on a locator? Operations on the locator should not try to operate on a cache");
+    }
+  }
+
+  // This throws if the locator isn't present because we know that non of the callers can take any
+  // reasonable action if the locator is not present
+  public InternalLocator getLocator() throws InvalidExecutionContextException {
+    if (locator != null) {
+      return locator;
+    } else {
+      throw new InvalidExecutionContextException(
+          "Execution context's locator was accessed but isn't present. Did this happen on a server? Operations on the locator should not try to operate on a cache");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
index 93a7f6f..8f6720e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
@@ -61,7 +61,8 @@ public class GenericProtocolServerConnection extends ServerConnection {
       if (!authenticator.isAuthenticated()) {
         authenticator.receiveMessage(inputStream, outputStream, securityManager);
       } else {
-        messageHandler.receiveMessage(inputStream, outputStream, this.getCache());
+        messageHandler.receiveMessage(inputStream, outputStream,
+            new ExecutionContext(this.getCache()));
       }
     } catch (IOException e) {
       logger.warn(e);

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/InvalidExecutionContextException.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/InvalidExecutionContextException.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/InvalidExecutionContextException.java
new file mode 100644
index 0000000..919e301
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/InvalidExecutionContextException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import org.apache.geode.annotations.Experimental;
+
+/*
+ * Indicates that OperationContext was missing required data. This will typically happen if a
+ * operation that is supposed to run on a server runs on a locator and receives a locator in its
+ * context instead of a cache. The reverse case applies as well.
+ */
+@Experimental
+public class InvalidExecutionContextException extends Exception {
+  public InvalidExecutionContextException(String message) {
+    super(message);
+  }
+
+  public InvalidExecutionContextException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/test/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImplJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImplJUnitTest.java
index 5c33468..802620c 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImplJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/AutoConnectionSourceImplJUnitTest.java
@@ -18,11 +18,8 @@ import org.apache.geode.CancelCriterion;
 import org.apache.geode.cache.*;
 import org.apache.geode.cache.client.NoAvailableLocatorsException;
 import org.apache.geode.cache.client.SubscriptionNotEnabledException;
-import org.apache.geode.cache.client.internal.AutoConnectionSourceImpl.UpdateLocatorListTask;
-import org.apache.geode.cache.client.internal.PoolImpl.PoolTask;
 import org.apache.geode.cache.client.internal.locator.ClientConnectionRequest;
 import org.apache.geode.cache.client.internal.locator.ClientConnectionResponse;
-import org.apache.geode.cache.client.internal.locator.LocatorListRequest;
 import org.apache.geode.cache.client.internal.locator.LocatorListResponse;
 import org.apache.geode.cache.query.QueryService;
 import org.apache.geode.distributed.DistributedSystem;
@@ -63,7 +60,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -308,7 +304,7 @@ public class AutoConnectionSourceImplJUnitTest {
     startFakeLocator();
     int secondPort = AvailablePortHelper.getRandomAvailableTCPPort();
     TcpServer server2 = new TcpServer(secondPort, InetAddress.getLocalHost(), null, null, handler,
-        new FakeHelper(), Thread.currentThread().getThreadGroup(), "tcp server");
+        new FakeHelper(), Thread.currentThread().getThreadGroup(), "tcp server", null);
     server2.start();
 
     try {
@@ -392,7 +388,7 @@ public class AutoConnectionSourceImplJUnitTest {
 
   private void startFakeLocator() throws UnknownHostException, IOException, InterruptedException {
     server = new TcpServer(port, InetAddress.getLocalHost(), null, null, handler, new FakeHelper(),
-        Thread.currentThread().getThreadGroup(), "Tcp Server");
+        Thread.currentThread().getThreadGroup(), "Tcp Server", null);
     server.start();
     Thread.sleep(500);
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TCPServerSSLJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TCPServerSSLJUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TCPServerSSLJUnitTest.java
index 8a25aaf..229fbb9 100644
--- a/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TCPServerSSLJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TCPServerSSLJUnitTest.java
@@ -138,7 +138,7 @@ public class TCPServerSSLJUnitTest {
     public DummyTcpServer(int port, InetAddress bind_address, Properties sslConfig,
         DistributionConfigImpl cfg, TcpHandler handler, PoolStatHelper poolHelper,
         ThreadGroup threadGroup, String threadName) {
-      super(port, bind_address, sslConfig, cfg, handler, poolHelper, threadGroup, threadName);
+      super(port, bind_address, sslConfig, cfg, handler, poolHelper, threadGroup, threadName, null);
       if (cfg == null) {
         cfg = new DistributionConfigImpl(sslConfig);
       }

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java
index eda0641..9d20e8c 100644
--- a/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/tcpserver/TcpServerJUnitTest.java
@@ -69,7 +69,7 @@ public class TcpServerJUnitTest {
 
     stats = new SimpleStats();
     server = new TcpServer(port, localhost, new Properties(), null, handler, stats,
-        Thread.currentThread().getThreadGroup(), "server thread");
+        Thread.currentThread().getThreadGroup(), "server thread", null);
     server.start();
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/operations/OperationHandler.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/operations/OperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/operations/OperationHandler.java
index aa6d79e..5d9012f 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/operations/OperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/operations/OperationHandler.java
@@ -15,7 +15,8 @@
 package org.apache.geode.protocol.operations;
 
 import org.apache.geode.annotations.Experimental;
-import org.apache.geode.cache.Cache;
+import org.apache.geode.internal.cache.tier.sockets.ExecutionContext;
+import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException;
 import org.apache.geode.protocol.protobuf.ProtobufOpsProcessor;
 import org.apache.geode.protocol.protobuf.Result;
 import org.apache.geode.serialization.SerializationService;
@@ -32,6 +33,7 @@ public interface OperationHandler<Req, Resp> {
    * Decode the message, deserialize contained values using the serialization service, do the work
    * indicated on the provided cache, and return a response.
    */
-  Result<Resp> process(SerializationService serializationService, Req request, Cache cache);
+  Result<Resp> process(SerializationService serializationService, Req request,
+      ExecutionContext executionContext) throws InvalidExecutionContextException;
 }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java
index 7d75b4a..76f81e7 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java
@@ -15,8 +15,10 @@
 package org.apache.geode.protocol.protobuf;
 
 import org.apache.geode.annotations.Experimental;
-import org.apache.geode.cache.Cache;
+import org.apache.geode.internal.cache.tier.sockets.ExecutionContext;
+import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException;
 import org.apache.geode.protocol.protobuf.registry.OperationContextRegistry;
+import org.apache.geode.protocol.protobuf.utilities.ProtobufResponseUtilities;
 import org.apache.geode.serialization.SerializationService;
 
 /**
@@ -35,12 +37,19 @@ public class ProtobufOpsProcessor {
     this.operationContextRegistry = operationContextRegistry;
   }
 
-  public ClientProtocol.Response process(ClientProtocol.Request request, Cache cache) {
+  public ClientProtocol.Response process(ClientProtocol.Request request, ExecutionContext context) {
     ClientProtocol.Request.RequestAPICase requestType = request.getRequestAPICase();
     OperationContext operationContext = operationContextRegistry.getOperationContext(requestType);
     ClientProtocol.Response.Builder builder;
-    Result result = operationContext.getOperationHandler().process(serializationService,
-        operationContext.getFromRequest().apply(request), cache);
+    Result result;
+    try {
+      result = operationContext.getOperationHandler().process(serializationService,
+          operationContext.getFromRequest().apply(request), context);
+    } catch (InvalidExecutionContextException e) {
+      result = Failure.of(ProtobufResponseUtilities.makeErrorResponse(
+          ProtocolErrorCode.UNSUPPORTED_OPERATION.codeValue,
+          "Invalid execution context found for operation."));
+    }
 
     builder = (ClientProtocol.Response.Builder) result.map(operationContext.getToResponse(),
         operationContext.getToErrorResponse());

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java
index 648ab3c..d04e49e 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java
@@ -20,9 +20,8 @@ import java.io.InputStream;
 import java.io.OutputStream;
 
 import org.apache.geode.annotations.Experimental;
-import org.apache.geode.cache.Cache;
-import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler;
+import org.apache.geode.internal.cache.tier.sockets.ExecutionContext;
 import org.apache.geode.protocol.exception.InvalidProtocolMessageException;
 import org.apache.geode.protocol.protobuf.registry.OperationContextRegistry;
 import org.apache.geode.protocol.protobuf.serializer.ProtobufProtocolSerializer;
@@ -45,29 +44,29 @@ public class ProtobufStreamProcessor implements ClientProtocolMessageHandler {
         new OperationContextRegistry());
   }
 
-  public void processOneMessage(InputStream inputStream, OutputStream outputStream, Cache cache)
-      throws InvalidProtocolMessageException, IOException {
+  @Override
+  public void receiveMessage(InputStream inputStream, OutputStream outputStream,
+      ExecutionContext executionContext) throws IOException {
+    try {
+      processOneMessage(inputStream, outputStream, executionContext);
+    } catch (InvalidProtocolMessageException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private void processOneMessage(InputStream inputStream, OutputStream outputStream,
+      ExecutionContext executionContext) throws InvalidProtocolMessageException, IOException {
     ClientProtocol.Message message = protobufProtocolSerializer.deserialize(inputStream);
     if (message == null) {
       throw new EOFException("Tried to deserialize protobuf message at EOF");
     }
 
     ClientProtocol.Request request = message.getRequest();
-    ClientProtocol.Response response = protobufOpsProcessor.process(request, cache);
+    ClientProtocol.Response response = protobufOpsProcessor.process(request, executionContext);
     ClientProtocol.MessageHeader responseHeader =
         ProtobufUtilities.createMessageHeaderForRequest(message);
     ClientProtocol.Message responseMessage =
         ProtobufUtilities.createProtobufResponse(responseHeader, response);
     protobufProtocolSerializer.serialize(responseMessage, outputStream);
   }
-
-  @Override
-  public void receiveMessage(InputStream inputStream, OutputStream outputStream,
-      InternalCache cache) throws IOException {
-    try {
-      processOneMessage(inputStream, outputStream, cache);
-    } catch (InvalidProtocolMessageException e) {
-      throw new IOException(e);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtocolErrorCode.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtocolErrorCode.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtocolErrorCode.java
index e3b262d..6a6f605 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtocolErrorCode.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtocolErrorCode.java
@@ -18,6 +18,7 @@ public enum ProtocolErrorCode {
   GENERIC_FAILURE(1000),
   VALUE_ENCODING_ERROR(1100),
   UNSUPPORTED_VERSION(1101),
+  UNSUPPORTED_OPERATION(1102),
   AUTHENTICATION_FAILED(1200),
   AUTHORIZATION_FAILED(1201),
   UNAUTHORIZED_REQUEST(1202),

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java
index 607d1d2..75274c1 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java
@@ -19,8 +19,9 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.geode.annotations.Experimental;
-import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.tier.sockets.ExecutionContext;
+import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException;
 import org.apache.geode.protocol.operations.OperationHandler;
 import org.apache.geode.protocol.protobuf.BasicTypes;
 import org.apache.geode.protocol.protobuf.Failure;
@@ -40,9 +41,10 @@ public class GetAllRequestOperationHandler
 
   @Override
   public Result<RegionAPI.GetAllResponse> process(SerializationService serializationService,
-      RegionAPI.GetAllRequest request, Cache cache) {
+      RegionAPI.GetAllRequest request, ExecutionContext executionContext)
+      throws InvalidExecutionContextException {
     String regionName = request.getRegionName();
-    Region region = cache.getRegion(regionName);
+    Region region = executionContext.getCache().getRegion(regionName);
     if (region == null) {
       return Failure.of(ProtobufResponseUtilities
           .makeErrorResponse(ProtocolErrorCode.REGION_NOT_FOUND.codeValue, "Region not found"));

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java
index 239d9f7..e7c18cd 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java
@@ -14,33 +14,20 @@
  */
 package org.apache.geode.protocol.protobuf.operations;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.StringTokenizer;
 import java.util.stream.Collectors;
 
-import org.apache.commons.lang.StringUtils;
-
 import org.apache.geode.annotations.Experimental;
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.client.internal.locator.GetAllServersRequest;
-import org.apache.geode.cache.client.internal.locator.GetAllServersResponse;
-import org.apache.geode.distributed.ConfigurationProperties;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.distributed.internal.ServerLocation;
-import org.apache.geode.distributed.internal.tcpserver.TcpClient;
-import org.apache.geode.internal.admin.remote.DistributionLocatorId;
+import org.apache.geode.internal.cache.tier.sockets.ExecutionContext;
+import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException;
 import org.apache.geode.protocol.operations.OperationHandler;
 import org.apache.geode.protocol.protobuf.BasicTypes;
-import org.apache.geode.protocol.protobuf.Failure;
-import org.apache.geode.protocol.protobuf.ProtocolErrorCode;
 import org.apache.geode.protocol.protobuf.Result;
 import org.apache.geode.protocol.protobuf.ServerAPI;
 import org.apache.geode.protocol.protobuf.Success;
-import org.apache.geode.protocol.protobuf.utilities.ProtobufResponseUtilities;
 import org.apache.geode.serialization.SerializationService;
 
 @Experimental
@@ -50,51 +37,19 @@ public class GetAvailableServersOperationHandler implements
   @Override
   public Result<ServerAPI.GetAvailableServersResponse> process(
       SerializationService serializationService, ServerAPI.GetAvailableServersRequest request,
-      Cache cache) {
-
-    InternalDistributedSystem distributedSystem =
-        (InternalDistributedSystem) cache.getDistributedSystem();
-    Properties properties = distributedSystem.getProperties();
-    String locatorsString = properties.getProperty(ConfigurationProperties.LOCATORS);
-
-    HashSet<DistributionLocatorId> locators = new HashSet();
-    StringTokenizer stringTokenizer = new StringTokenizer(locatorsString, ",");
-    while (stringTokenizer.hasMoreTokens()) {
-      String locator = stringTokenizer.nextToken();
-      if (StringUtils.isNotEmpty(locator)) {
-        locators.add(new DistributionLocatorId(locator));
-      }
-    }
+      ExecutionContext executionContext) throws InvalidExecutionContextException {
 
-    TcpClient tcpClient = getTcpClient();
-    for (DistributionLocatorId locator : locators) {
-      try {
-        return getGetAvailableServersFromLocator(tcpClient, locator.getHost());
-      } catch (IOException | ClassNotFoundException e) {
-        // try the next locator
-      }
-    }
-    return Failure.of(ProtobufResponseUtilities.makeErrorResponse(
-        ProtocolErrorCode.DATA_UNREACHABLE.codeValue, "Unable to find a locator"));
-  }
+    InternalLocator locator = executionContext.getLocator();
+    ArrayList servers2 = locator.getServerLocatorAdvisee().getLoadSnapshot().getServers(null);
 
-  private Result<ServerAPI.GetAvailableServersResponse> getGetAvailableServersFromLocator(
-      TcpClient tcpClient, InetSocketAddress address) throws IOException, ClassNotFoundException {
-    GetAllServersResponse getAllServersResponse = (GetAllServersResponse) tcpClient
-        .requestToServer(address, new GetAllServersRequest(), 1000, true);
-    Collection<BasicTypes.Server> servers =
-        (Collection<BasicTypes.Server>) getAllServersResponse.getServers().stream()
-            .map(serverLocation -> getServerProtobufMessage((ServerLocation) serverLocation))
-            .collect(Collectors.toList());
+    Collection<BasicTypes.Server> servers = (Collection<BasicTypes.Server>) servers2.stream()
+        .map(serverLocation -> getServerProtobufMessage((ServerLocation) serverLocation))
+        .collect(Collectors.toList());
     ServerAPI.GetAvailableServersResponse.Builder builder =
         ServerAPI.GetAvailableServersResponse.newBuilder().addAllServers(servers);
     return Success.of(builder.build());
   }
 
-  protected TcpClient getTcpClient() {
-    return new TcpClient();
-  }
-
   private BasicTypes.Server getServerProtobufMessage(ServerLocation serverLocation) {
     BasicTypes.Server.Builder serverBuilder = BasicTypes.Server.newBuilder();
     serverBuilder.setHostname(serverLocation.getHostName()).setPort(serverLocation.getPort());

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandler.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandler.java
index e5d216a..53898ed 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandler.java
@@ -17,8 +17,9 @@ package org.apache.geode.protocol.protobuf.operations;
 import java.util.Set;
 
 import org.apache.geode.annotations.Experimental;
-import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.tier.sockets.ExecutionContext;
+import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException;
 import org.apache.geode.protocol.operations.OperationHandler;
 import org.apache.geode.protocol.protobuf.RegionAPI;
 import org.apache.geode.protocol.protobuf.Result;
@@ -32,8 +33,9 @@ public class GetRegionNamesRequestOperationHandler
 
   @Override
   public Result<RegionAPI.GetRegionNamesResponse> process(SerializationService serializationService,
-      RegionAPI.GetRegionNamesRequest request, Cache cache) {
-    Set<Region<?, ?>> regions = cache.rootRegions();
+      RegionAPI.GetRegionNamesRequest request, ExecutionContext executionContext)
+      throws InvalidExecutionContextException {
+    Set<Region<?, ?>> regions = executionContext.getCache().rootRegions();
     return Success.of(ProtobufResponseUtilities.createGetRegionNamesResponse(regions));
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandler.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandler.java
index b563a5d..007f96b 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandler.java
@@ -15,8 +15,9 @@
 package org.apache.geode.protocol.protobuf.operations;
 
 import org.apache.geode.annotations.Experimental;
-import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.tier.sockets.ExecutionContext;
+import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException;
 import org.apache.geode.protocol.operations.OperationHandler;
 import org.apache.geode.protocol.protobuf.BasicTypes;
 import org.apache.geode.protocol.protobuf.Failure;
@@ -34,10 +35,11 @@ public class GetRegionRequestOperationHandler
 
   @Override
   public Result<RegionAPI.GetRegionResponse> process(SerializationService serializationService,
-      RegionAPI.GetRegionRequest request, Cache cache) {
+      RegionAPI.GetRegionRequest request, ExecutionContext executionContext)
+      throws InvalidExecutionContextException {
     String regionName = request.getRegionName();
 
-    Region region = cache.getRegion(regionName);
+    Region region = executionContext.getCache().getRegion(regionName);
     if (region == null) {
       return Failure.of(
           ProtobufResponseUtilities.makeErrorResponse(ProtocolErrorCode.REGION_NOT_FOUND.codeValue,

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java
index 96c0282..8f0fef7 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java
@@ -15,8 +15,9 @@
 package org.apache.geode.protocol.protobuf.operations;
 
 import org.apache.geode.annotations.Experimental;
-import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.tier.sockets.ExecutionContext;
+import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException;
 import org.apache.geode.protocol.operations.OperationHandler;
 import org.apache.geode.protocol.protobuf.BasicTypes;
 import org.apache.geode.protocol.protobuf.Failure;
@@ -36,9 +37,10 @@ public class GetRequestOperationHandler
 
   @Override
   public Result<RegionAPI.GetResponse> process(SerializationService serializationService,
-      RegionAPI.GetRequest request, Cache cache) {
+      RegionAPI.GetRequest request, ExecutionContext executionContext)
+      throws InvalidExecutionContextException {
     String regionName = request.getRegionName();
-    Region region = cache.getRegion(regionName);
+    Region region = executionContext.getCache().getRegion(regionName);
     if (region == null) {
       return Failure.of(ProtobufResponseUtilities
           .makeErrorResponse(ProtocolErrorCode.REGION_NOT_FOUND.codeValue, "Region not found"));

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java
index 253a95d..e0ebc41 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java
@@ -21,8 +21,9 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.annotations.Experimental;
-import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.tier.sockets.ExecutionContext;
+import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException;
 import org.apache.geode.protocol.operations.OperationHandler;
 import org.apache.geode.protocol.protobuf.BasicTypes;
 import org.apache.geode.protocol.protobuf.Failure;
@@ -43,8 +44,9 @@ public class PutAllRequestOperationHandler
 
   @Override
   public Result<RegionAPI.PutAllResponse> process(SerializationService serializationService,
-      RegionAPI.PutAllRequest putAllRequest, Cache cache) {
-    Region region = cache.getRegion(putAllRequest.getRegionName());
+      RegionAPI.PutAllRequest putAllRequest, ExecutionContext executionContext)
+      throws InvalidExecutionContextException {
+    Region region = executionContext.getCache().getRegion(putAllRequest.getRegionName());
 
     if (region == null) {
       return Failure.of(ProtobufResponseUtilities.createAndLogErrorResponse(

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java
index c24fb29..cf5afb4 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java
@@ -15,8 +15,9 @@
 package org.apache.geode.protocol.protobuf.operations;
 
 import org.apache.geode.annotations.Experimental;
-import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.tier.sockets.ExecutionContext;
+import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException;
 import org.apache.geode.protocol.operations.OperationHandler;
 import org.apache.geode.protocol.protobuf.BasicTypes;
 import org.apache.geode.protocol.protobuf.Failure;
@@ -36,9 +37,10 @@ public class PutRequestOperationHandler
 
   @Override
   public Result<RegionAPI.PutResponse> process(SerializationService serializationService,
-      RegionAPI.PutRequest request, Cache cache) {
+      RegionAPI.PutRequest request, ExecutionContext executionContext)
+      throws InvalidExecutionContextException {
     String regionName = request.getRegionName();
-    Region region = cache.getRegion(regionName);
+    Region region = executionContext.getCache().getRegion(regionName);
     if (region == null) {
       return Failure.of(
           ProtobufResponseUtilities.makeErrorResponse(ProtocolErrorCode.REGION_NOT_FOUND.codeValue,

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java
index 59236be..052efcf 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java
@@ -18,10 +18,10 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.annotations.Experimental;
-import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.tier.sockets.ExecutionContext;
+import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException;
 import org.apache.geode.protocol.operations.OperationHandler;
-import org.apache.geode.protocol.protobuf.BasicTypes;
 import org.apache.geode.protocol.protobuf.Failure;
 import org.apache.geode.protocol.protobuf.ProtocolErrorCode;
 import org.apache.geode.protocol.protobuf.RegionAPI;
@@ -40,10 +40,11 @@ public class RemoveRequestOperationHandler
 
   @Override
   public Result<RegionAPI.RemoveResponse> process(SerializationService serializationService,
-      RegionAPI.RemoveRequest request, Cache cache) {
+      RegionAPI.RemoveRequest request, ExecutionContext executionContext)
+      throws InvalidExecutionContextException {
 
     String regionName = request.getRegionName();
-    Region region = cache.getRegion(regionName);
+    Region region = executionContext.getCache().getRegion(regionName);
     if (region == null) {
       return Failure.of(ProtobufResponseUtilities
           .makeErrorResponse(ProtocolErrorCode.REGION_NOT_FOUND.codeValue, "Region not found"));

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/test/java/org/apache/geode/protocol/GetAvailableServersDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/GetAvailableServersDUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/GetAvailableServersDUnitTest.java
deleted file mode 100644
index 4d6390b..0000000
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/GetAvailableServersDUnitTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.protocol;
-
-import org.apache.geode.cache.server.CacheServer;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.protocol.exception.InvalidProtocolMessageException;
-import org.apache.geode.protocol.protobuf.ClientProtocol;
-import org.apache.geode.protocol.protobuf.ServerAPI;
-import org.apache.geode.protocol.protobuf.serializer.ProtobufProtocolSerializer;
-import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities;
-import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
-import org.apache.geode.test.dunit.DistributedTestUtils;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
-import org.apache.geode.test.junit.categories.DistributedTest;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.IOException;
-import java.net.Socket;
-
-import static org.junit.Assert.assertEquals;
-
-@Category(DistributedTest.class)
-public class GetAvailableServersDUnitTest extends JUnit4CacheTestCase {
-
-  @Rule
-  public DistributedRestoreSystemProperties distributedRestoreSystemProperties =
-      new DistributedRestoreSystemProperties();
-
-  @Before
-  public void setup() {
-
-  }
-
-  @Test
-  public void testGetAllAvailableServersRequest()
-      throws IOException, InvalidProtocolMessageException {
-    Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
-    VM vm2 = host.getVM(2);
-
-    int locatorPort = DistributedTestUtils.getDUnitLocatorPort();
-
-    // int cacheServer1Port = vm0.invoke("Start Cache1", () -> startCacheWithCacheServer());
-    int cacheServer1Port = startCacheWithCacheServer();
-    int cacheServer2Port = vm1.invoke("Start Cache2", () -> startCacheWithCacheServer());
-    int cacheServer3Port = vm2.invoke("Start Cache3", () -> startCacheWithCacheServer());
-
-    vm0.invoke(() -> {
-      Socket socket = new Socket(host.getHostName(), cacheServer1Port);
-      socket.getOutputStream().write(110);
-
-      ClientProtocol.Request.Builder protobufRequestBuilder =
-          ProtobufUtilities.createProtobufRequestBuilder();
-      ClientProtocol.Message getAvailableServersRequestMessage =
-          ProtobufUtilities.createProtobufMessage(ProtobufUtilities.createMessageHeader(1233445),
-              protobufRequestBuilder.setGetAvailableServersRequest(
-                  ProtobufRequestUtilities.createGetAvailableServersRequest()).build());
-
-      ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
-      protobufProtocolSerializer.serialize(getAvailableServersRequestMessage,
-          socket.getOutputStream());
-
-      ClientProtocol.Message getAvailableServersResponseMessage =
-          protobufProtocolSerializer.deserialize(socket.getInputStream());
-      assertEquals(1233445,
-          getAvailableServersResponseMessage.getMessageHeader().getCorrelationId());
-      assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE,
-          getAvailableServersResponseMessage.getMessageTypeCase());
-      ClientProtocol.Response messageResponse = getAvailableServersResponseMessage.getResponse();
-      assertEquals(ClientProtocol.Response.ResponseAPICase.GETAVAILABLESERVERSRESPONSE,
-          messageResponse.getResponseAPICase());
-      ServerAPI.GetAvailableServersResponse getAvailableServersResponse =
-          messageResponse.getGetAvailableServersResponse();
-      assertEquals(3, getAvailableServersResponse.getServersCount());
-    });
-  }
-
-  private Integer startCacheWithCacheServer() throws IOException {
-    System.setProperty("geode.feature-protobuf-protocol", "true");
-
-    InternalCache cache = getCache();
-    CacheServer cacheServer = cache.addCacheServer();
-    cacheServer.setPort(0);
-    cacheServer.start();
-    return cacheServer.getPort();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripLocatorConnectionJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripLocatorConnectionJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripLocatorConnectionJUnitTest.java
new file mode 100644
index 0000000..799c55c
--- /dev/null
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripLocatorConnectionJUnitTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.protocol;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.protocol.exception.InvalidProtocolMessageException;
+import org.apache.geode.protocol.protobuf.ClientProtocol;
+import org.apache.geode.protocol.protobuf.ProtocolErrorCode;
+import org.apache.geode.protocol.protobuf.ServerAPI;
+import org.apache.geode.protocol.protobuf.serializer.ProtobufProtocolSerializer;
+import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities;
+import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
+import org.apache.geode.test.dunit.DistributedTestUtils;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class RoundTripLocatorConnectionJUnitTest extends JUnit4CacheTestCase {
+
+  private Socket socket;
+  private DataOutputStream dataOutputStream;
+
+  @Rule
+  public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
+
+  @Before
+  public void setup() throws IOException {
+    Host host = Host.getHost(0);
+    int locatorPort = DistributedTestUtils.getDUnitLocatorPort();
+    int cacheServer1Port = startCacheWithCacheServer();
+
+    Host.getLocator().invoke(() -> System.setProperty("geode.feature-protobuf-protocol", "true"));
+
+    socket = new Socket(host.getHostName(), locatorPort);
+    dataOutputStream = new DataOutputStream(socket.getOutputStream());
+    dataOutputStream.writeInt(0);
+    dataOutputStream.writeByte(110);
+  }
+
+  @Test
+  public void testEchoProtobufMessageFromLocator()
+      throws IOException, InvalidProtocolMessageException {
+    ClientProtocol.Request.Builder protobufRequestBuilder =
+        ProtobufUtilities.createProtobufRequestBuilder();
+    ClientProtocol.Message getAvailableServersRequestMessage =
+        ProtobufUtilities.createProtobufMessage(ProtobufUtilities.createMessageHeader(1233445),
+            protobufRequestBuilder.setGetAvailableServersRequest(
+                ProtobufRequestUtilities.createGetAvailableServersRequest()).build());
+
+    ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
+    protobufProtocolSerializer.serialize(getAvailableServersRequestMessage,
+        socket.getOutputStream());
+
+    ClientProtocol.Message getAvailableServersResponseMessage =
+        protobufProtocolSerializer.deserialize(socket.getInputStream());
+    assertEquals(1233445, getAvailableServersResponseMessage.getMessageHeader().getCorrelationId());
+    assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE,
+        getAvailableServersResponseMessage.getMessageTypeCase());
+    ClientProtocol.Response messageResponse = getAvailableServersResponseMessage.getResponse();
+    assertEquals(ClientProtocol.Response.ResponseAPICase.GETAVAILABLESERVERSRESPONSE,
+        messageResponse.getResponseAPICase());
+    ServerAPI.GetAvailableServersResponse getAvailableServersResponse =
+        messageResponse.getGetAvailableServersResponse();
+    assertEquals(1, getAvailableServersResponse.getServersCount());
+  }
+
+  @Test
+  public void testInvalidOperationReturnsFailure()
+      throws IOException, InvalidProtocolMessageException {
+    ClientProtocol.Request.Builder protobufRequestBuilder =
+        ProtobufUtilities.createProtobufRequestBuilder();
+    ClientProtocol.Message getAvailableServersRequestMessage =
+        ProtobufUtilities.createProtobufMessage(ProtobufUtilities.createMessageHeader(1233445),
+            protobufRequestBuilder
+                .setGetRegionNamesRequest(ProtobufRequestUtilities.createGetRegionNamesRequest())
+                .build());
+
+    ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
+    protobufProtocolSerializer.serialize(getAvailableServersRequestMessage,
+        socket.getOutputStream());
+
+    ClientProtocol.Message getAvailableServersResponseMessage =
+        protobufProtocolSerializer.deserialize(socket.getInputStream());
+    assertEquals(1233445, getAvailableServersResponseMessage.getMessageHeader().getCorrelationId());
+    assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE,
+        getAvailableServersResponseMessage.getMessageTypeCase());
+    ClientProtocol.Response messageResponse = getAvailableServersResponseMessage.getResponse();
+    assertEquals(ClientProtocol.Response.ResponseAPICase.ERRORRESPONSE,
+        messageResponse.getResponseAPICase());
+    assertEquals(ProtocolErrorCode.UNSUPPORTED_OPERATION.codeValue,
+        messageResponse.getErrorResponse().getError().getErrorCode());
+  }
+
+  private Integer startCacheWithCacheServer() throws IOException {
+    System.setProperty("geode.feature-protobuf-protocol", "true");
+
+    InternalCache cache = getCache();
+    CacheServer cacheServer = cache.addCacheServer();
+    cacheServer.setPort(0);
+    cacheServer.start();
+    return cacheServer.getPort();
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java
index 87bfd52..2185b15 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java
@@ -26,6 +26,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.sockets.ExecutionContext;
 import org.apache.geode.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
@@ -37,6 +38,7 @@ public class ProtobufStreamProcessorTest {
 
     ProtobufStreamProcessor protobufStreamProcessor = new ProtobufStreamProcessor();
     InternalCache mockInternalCache = mock(InternalCache.class);
-    protobufStreamProcessor.receiveMessage(inputStream, outputStream, mockInternalCache);
+    protobufStreamProcessor.receiveMessage(inputStream, outputStream,
+        new ExecutionContext(mockInternalCache));
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java
index f2e3199..f4d098c 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java
@@ -17,7 +17,6 @@ package org.apache.geode.protocol.protobuf.operations;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -28,13 +27,14 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.tier.sockets.ExecutionContext;
+import org.apache.geode.internal.cache.tier.sockets.InvalidExecutionContextException;
 import org.apache.geode.protocol.protobuf.BasicTypes;
 import org.apache.geode.protocol.protobuf.RegionAPI;
 import org.apache.geode.protocol.protobuf.Result;
 import org.apache.geode.protocol.protobuf.Success;
 import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities;
 import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
-import org.apache.geode.serialization.SerializationService;
 import org.apache.geode.serialization.codec.StringCodec;
 import org.apache.geode.serialization.exception.UnsupportedEncodingTypeException;
 import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredForTypeException;
@@ -80,9 +80,9 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
   @Test
   public void processReturnsExpectedValuesForValidKeys()
       throws CodecAlreadyRegisteredForTypeException, UnsupportedEncodingTypeException,
-      CodecNotRegisteredForTypeException {
-    Result<RegionAPI.GetAllResponse> result =
-        operationHandler.process(serializationServiceStub, generateTestRequest(true), cacheStub);
+      CodecNotRegisteredForTypeException, InvalidExecutionContextException {
+    Result<RegionAPI.GetAllResponse> result = operationHandler.process(serializationServiceStub,
+        generateTestRequest(true), new ExecutionContext(cacheStub));
 
     Assert.assertTrue(result instanceof Success);
 
@@ -99,10 +99,10 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
   }
 
   @Test
-  public void processReturnsNoEntriesForNoKeysRequested()
-      throws UnsupportedEncodingTypeException, CodecNotRegisteredForTypeException {
-    Result<RegionAPI.GetAllResponse> result =
-        operationHandler.process(serializationServiceStub, generateTestRequest(false), cacheStub);
+  public void processReturnsNoEntriesForNoKeysRequested() throws UnsupportedEncodingTypeException,
+      CodecNotRegisteredForTypeException, InvalidExecutionContextException {
+    Result<RegionAPI.GetAllResponse> result = operationHandler.process(serializationServiceStub,
+        generateTestRequest(false), new ExecutionContext(cacheStub));
 
     Assert.assertTrue(result instanceof Success);
 

http://git-wip-us.apache.org/repos/asf/geode/blob/530f48f3/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java
index 77b088d..cff6ddc 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java
@@ -14,14 +14,12 @@
  */
 package org.apache.geode.protocol.protobuf.operations;
 
-import org.apache.geode.cache.client.internal.locator.GetAllServersResponse;
-import org.apache.geode.distributed.ConfigurationProperties;
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.distributed.internal.LocatorLoadSnapshot;
 import org.apache.geode.distributed.internal.ServerLocation;
-import org.apache.geode.distributed.internal.tcpserver.TcpClient;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.distributed.internal.ServerLocator;
+import org.apache.geode.internal.cache.tier.sockets.ExecutionContext;
 import org.apache.geode.protocol.protobuf.BasicTypes;
-import org.apache.geode.protocol.protobuf.Failure;
 import org.apache.geode.protocol.protobuf.Result;
 import org.apache.geode.protocol.protobuf.ServerAPI;
 import org.apache.geode.protocol.protobuf.ServerAPI.GetAvailableServersResponse;
@@ -32,75 +30,48 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @Category(UnitTest.class)
 public class GetAvailableServersOperationHandlerJUnitTest extends OperationHandlerJUnitTest {
 
-  private TcpClient mockTCPClient;
+  public static final String HOSTNAME_1 = "hostname1";
+  public static final int PORT_1 = 12345;
+
+  public static final String HOSTNAME_2 = "hostname2";
+  public static final int PORT_2 = 23456;
+
+  private InternalLocator internalLocatorMock;
 
   @Before
   public void setUp() throws Exception {
     super.setUp();
 
-    operationHandler = mock(GetAvailableServersOperationHandler.class);
-    cacheStub = mock(GemFireCacheImpl.class);
-    when(operationHandler.process(any(), any(), any())).thenCallRealMethod();
-    InternalDistributedSystem mockDistributedSystem = mock(InternalDistributedSystem.class);
-    when(cacheStub.getDistributedSystem()).thenReturn(mockDistributedSystem);
-    Properties mockProperties = mock(Properties.class);
-    when(mockDistributedSystem.getProperties()).thenReturn(mockProperties);
-    String locatorString = "testLocator1Host[12345],testLocator2Host[23456]";
-    when(mockProperties.getProperty(ConfigurationProperties.LOCATORS)).thenReturn(locatorString);
-    mockTCPClient = mock(TcpClient.class);
-    when(((GetAvailableServersOperationHandler) operationHandler).getTcpClient())
-        .thenReturn(mockTCPClient);
-  }
-
-  @Test
-  public void testServerReturnedFromHandler() throws Exception {
-    when(mockTCPClient.requestToServer(any(), any(), anyInt(), anyBoolean()))
-        .thenReturn(new GetAllServersResponse(new ArrayList<ServerLocation>() {
-          {
-            add(new ServerLocation("hostname1", 12345));
-            add(new ServerLocation("hostname2", 23456));
-          }
-        }));
+    operationHandler = new GetAvailableServersOperationHandler();
+    internalLocatorMock = mock(InternalLocator.class);
+    ServerLocator serverLocatorAdviseeMock = mock(ServerLocator.class);
+    LocatorLoadSnapshot locatorLoadSnapshot = mock(LocatorLoadSnapshot.class);
+    ArrayList<Object> serverList = new ArrayList<>();
+    serverList.add(new ServerLocation(HOSTNAME_1, PORT_1));
+    serverList.add(new ServerLocation(HOSTNAME_2, PORT_2));
 
-    ServerAPI.GetAvailableServersRequest getAvailableServersRequest =
-        ProtobufRequestUtilities.createGetAvailableServersRequest();
-    Result operationHandlerResult =
-        operationHandler.process(serializationServiceStub, getAvailableServersRequest, cacheStub);
-    assertTrue(operationHandlerResult instanceof Success);
-    ValidateGetAvailableServersResponse(
-        (GetAvailableServersResponse) operationHandlerResult.getMessage());
+    when(internalLocatorMock.getServerLocatorAdvisee()).thenReturn(serverLocatorAdviseeMock);
+    when(serverLocatorAdviseeMock.getLoadSnapshot()).thenReturn(locatorLoadSnapshot);
+    when(locatorLoadSnapshot.getServers(null)).thenReturn(serverList);
   }
 
   @Test
-  public void testServerReturnedFromSecondLocatorIfFirstDown() throws Exception {
-    when(mockTCPClient.requestToServer(any(), any(), anyInt(), anyBoolean()))
-        .thenThrow(new IOException("BOOM!!!"))
-        .thenReturn(new GetAllServersResponse(new ArrayList<ServerLocation>() {
-          {
-            add(new ServerLocation("hostname1", 12345));
-            add(new ServerLocation("hostname2", 23456));
-          }
-        }));
-
+  public void testServerReturnedFromHandler() throws Exception {
     ServerAPI.GetAvailableServersRequest getAvailableServersRequest =
         ProtobufRequestUtilities.createGetAvailableServersRequest();
-    Result operationHandlerResult =
-        operationHandler.process(serializationServiceStub, getAvailableServersRequest, cacheStub);
+    Result operationHandlerResult = operationHandler.process(serializationServiceStub,
+        getAvailableServersRequest, new ExecutionContext(internalLocatorMock));
     assertTrue(operationHandlerResult instanceof Success);
     ValidateGetAvailableServersResponse(
         (GetAvailableServersResponse) operationHandlerResult.getMessage());
@@ -110,22 +81,10 @@ public class GetAvailableServersOperationHandlerJUnitTest extends OperationHandl
       GetAvailableServersResponse getAvailableServersResponse) {
     assertEquals(2, getAvailableServersResponse.getServersCount());
     BasicTypes.Server server = getAvailableServersResponse.getServers(0);
-    assertEquals("hostname1", server.getHostname());
-    assertEquals(12345, server.getPort());
+    assertEquals(HOSTNAME_1, server.getHostname());
+    assertEquals(PORT_1, server.getPort());
     server = getAvailableServersResponse.getServers(1);
-    assertEquals("hostname2", server.getHostname());
-    assertEquals(23456, server.getPort());
-  }
-
-  @Test
-  public void testProcessFailsIfNoLocatorsAvailable() throws Exception {
-    when(mockTCPClient.requestToServer(any(), any(), anyInt(), anyBoolean()))
-        .thenThrow(new IOException("BOOM!!!"));
-
-    ServerAPI.GetAvailableServersRequest getAvailableServersRequest =
-        ProtobufRequestUtilities.createGetAvailableServersRequest();
-    Result operationHandlerResult =
-        operationHandler.process(serializationServiceStub, getAvailableServersRequest, cacheStub);
-    assertTrue(operationHandlerResult instanceof Failure);
+    assertEquals(HOSTNAME_2, server.getHostname());
+    assertEquals(PORT_2, server.getPort());
   }
 }