You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2018/03/29 00:49:10 UTC

[geode] branch develop updated: GEODE-4819: Separating authorization out from protobuf handlers (#1674)

This is an automated email from the ASF dual-hosted git repository.

upthewaterspout pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new f9b16e9  GEODE-4819: Separating authorization out from protobuf handlers (#1674)
f9b16e9 is described below

commit f9b16e937c94a93f1b1b2899ca43f5b1ebe76cc8
Author: Dan Smith <ds...@pivotal.io>
AuthorDate: Wed Mar 28 17:49:05 2018 -0700

    GEODE-4819: Separating authorization out from protobuf handlers (#1674)
    
    Separating out authentication from the logic of parsing protobuf
    messages. All region operations should be done through the new
    AuthenticatingCache API, which does authentication and the operation.
    
    In a future change we should move AuthenticationCache to the core and
    refactor gfsh, the REST API, and the old client to also go through this
    layer.
---
 .../tier/sockets/ProtobufServerConnection.java     |   8 +-
 .../security/IntegratedSecurityService.java        |  18 ++
 .../internal/security/LegacySecurityService.java   |  87 ++++++
 .../geode/internal/security/SecurityService.java   |  69 ++---
 .../v1/LocatorMessageExecutionContext.java         |  32 +--
 .../protobuf/v1/MessageExecutionContext.java       |  12 +-
 .../protobuf/v1/ProtobufOperationContext.java      |  34 +--
 .../protocol/protobuf/v1/ProtobufOpsProcessor.java |   3 +
 .../protobuf/v1/ProtobufProtocolService.java       |   9 +-
 .../protobuf/v1/ProtobufSerializationService.java  |   7 +
 .../protobuf/v1/ServerMessageExecutionContext.java |  32 +--
 .../protobuf/v1/authentication/Authorizer.java     |  30 ++
 .../v1/authentication/AuthorizingCache.java        |  60 ++++
 .../v1/authentication/AuthorizingCacheImpl.java    | 194 +++++++++++++
 .../authentication/AuthorizingFunctionService.java |  32 +++
 .../AuthorizingFunctionServiceImpl.java            | 123 ++++++++
 .../v1/authentication/AuthorizingLocator.java      |  26 ++
 .../v1/authentication/AuthorizingLocatorImpl.java  |  59 ++++
 .../v1/authentication/NoSecurityAuthorizer.java    |  30 ++
 .../v1/authentication/NotLoggedInAuthorizer.java   |  29 ++
 .../v1/authentication/ShiroAuthorizer.java         |  35 +++
 .../AbstractFunctionRequestOperationHandler.java   | 146 ----------
 .../operations/ClearRequestOperationHandler.java   |  15 +-
 ...cuteFunctionOnGroupRequestOperationHandler.java |  82 ++----
 ...uteFunctionOnMemberRequestOperationHandler.java |  92 ++----
 ...uteFunctionOnRegionRequestOperationHandler.java |  86 +++---
 .../operations/GetAllRequestOperationHandler.java  | 114 ++------
 .../GetRegionNamesRequestOperationHandler.java     |   8 +-
 .../v1/operations/GetRequestOperationHandler.java  |  34 +--
 .../v1/operations/GetServerOperationHandler.java   |  15 +-
 .../operations/GetSizeRequestOperationHandler.java |   9 +-
 .../v1/operations/KeySetOperationHandler.java      |  19 +-
 .../OqlQueryRequestOperationHandler.java           |  31 +-
 .../operations/PutAllRequestOperationHandler.java  | 104 ++-----
 .../PutIfAbsentRequestOperationHandler.java        |  25 +-
 .../v1/operations/PutRequestOperationHandler.java  |  22 +-
 .../operations/RemoveRequestOperationHandler.java  |  16 +-
 .../AuthenticationRequestOperationHandler.java     |   3 +-
 .../registry/ProtobufOperationContextRegistry.java |  94 ++-----
 ...obufConnectionAuthenticatingStateProcessor.java |   7 +-
 ...rotobufConnectionAuthorizingStateProcessor.java |  31 +-
 .../ProtobufConnectionHandshakeStateProcessor.java |   7 +-
 .../OutputCapturingServerConnectionTest.java       |   7 +-
 .../tier/sockets/ProtobufServerConnectionTest.java |   7 +-
 .../internal/protocol/TestExecutionContext.java    |   5 +-
 .../v1/acceptance/CacheOperationsJUnitTest.java    |   2 +-
 .../authentication/AuthorizingCacheImplTest.java   | 312 +++++++++++++++++++++
 .../AuthorizingFunctionServiceImplTest.java        | 109 +++++++
 .../ClearRequestOperationHandlerJUnitTest.java     |  11 +-
 ...ionOnGroupRequestOperationHandlerJUnitTest.java |  59 ++--
 ...onOnMemberRequestOperationHandlerJUnitTest.java |  47 ++--
 ...onOnRegionRequestOperationHandlerJUnitTest.java |  43 ++-
 .../GetAllRequestOperationHandlerJUnitTest.java    |  23 +-
 .../GetAndPutJsonDocumentsDUnitTest.java           |  52 ++--
 ...egionNamesRequestOperationHandlerJUnitTest.java |   6 +-
 .../GetRequestOperationHandlerJUnitTest.java       |  11 +-
 .../GetSizeRequestOperationHandlerJUnitTest.java   |  12 +-
 ...ueryRequestOperationHandlerIntegrationTest.java |   5 +-
 .../OqlQueryRequestOperationHandlerJUnitTest.java  |   7 +-
 .../PutAllRequestOperationHandlerJUnitTest.java    |  17 +-
 ...utIfAbsentRequestOperationHandlerJUnitTest.java |  35 +--
 .../PutRequestOperationHandlerJUnitTest.java       |  14 +-
 .../RemoveRequestOperationHandlerJUnitTest.java    |  12 +-
 63 files changed, 1586 insertions(+), 1099 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnection.java
index 8a37467..1e50531 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnection.java
@@ -67,7 +67,13 @@ public class ProtobufServerConnection extends ServerConnection {
       InputStream inputStream = socket.getInputStream();
       OutputStream outputStream = socket.getOutputStream();
 
-      protocolProcessor.processMessage(inputStream, outputStream);
+      InternalCache cache = getCache();
+      cache.setReadSerializedForCurrentThread(true);
+      try {
+        protocolProcessor.processMessage(inputStream, outputStream);
+      } finally {
+        cache.setReadSerializedForCurrentThread(false);
+      }
 
       if (protocolProcessor.socketProcessingIsFinished()) {
         this.setFlagProcessMessagesAsFalse();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/security/IntegratedSecurityService.java b/geode-core/src/main/java/org/apache/geode/internal/security/IntegratedSecurityService.java
index eaeceec..9bfe993 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/security/IntegratedSecurityService.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/security/IntegratedSecurityService.java
@@ -239,6 +239,24 @@ public class IntegratedSecurityService implements SecurityService {
   }
 
   @Override
+  public void authorize(ResourcePermission context, Subject currentUser) {
+    if (context == null) {
+      return;
+    }
+    if (context.getResource() == Resource.NULL && context.getOperation() == Operation.NULL) {
+      return;
+    }
+
+    try {
+      currentUser.checkPermission(context);
+    } catch (ShiroException e) {
+      String msg = currentUser.getPrincipal() + " not authorized for " + context;
+      logger.info("NotAuthorizedException: {}", msg);
+      throw new NotAuthorizedException(msg, e);
+    }
+  }
+
+  @Override
   public void close() {
     if (this.securityManager != null) {
       this.securityManager.close();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/security/LegacySecurityService.java b/geode-core/src/main/java/org/apache/geode/internal/security/LegacySecurityService.java
index 66b6876..05f7d46 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/security/LegacySecurityService.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/security/LegacySecurityService.java
@@ -14,7 +14,16 @@
  */
 package org.apache.geode.internal.security;
 
+import java.util.Properties;
+import java.util.concurrent.Callable;
+
 import org.apache.commons.lang.StringUtils;
+import org.apache.shiro.subject.Subject;
+import org.apache.shiro.util.ThreadState;
+
+import org.apache.geode.security.PostProcessor;
+import org.apache.geode.security.ResourcePermission;
+import org.apache.geode.security.SecurityManager;
 
 /**
  * implementing SecurityService when only legacy authenticators are specified
@@ -49,4 +58,82 @@ public class LegacySecurityService implements SecurityService {
     return this.hasPeerAuthenticator;
   }
 
+  @Override
+  public ThreadState bindSubject(Subject subject) {
+    return null;
+  }
+
+  @Override
+  public Subject getSubject() {
+    return null;
+  }
+
+  @Override
+  public Subject login(Properties credentials) {
+    return null;
+  }
+
+  @Override
+  public void logout() {}
+
+  @Override
+  public Callable associateWith(Callable callable) {
+    return callable;
+  }
+
+  @Override
+  public void authorize(ResourcePermission.Resource resource,
+      ResourcePermission.Operation operation) {}
+
+  @Override
+  public void authorize(ResourcePermission.Resource resource,
+      ResourcePermission.Operation operation, ResourcePermission.Target target) {}
+
+  @Override
+  public void authorize(ResourcePermission.Resource resource,
+      ResourcePermission.Operation operation, String target) {}
+
+  @Override
+  public void authorize(ResourcePermission.Resource resource,
+      ResourcePermission.Operation operation, String target, String key) {}
+
+  @Override
+  public void authorize(ResourcePermission.Resource resource,
+      ResourcePermission.Operation operation, ResourcePermission.Target target, String key) {}
+
+  @Override
+  public void authorize(ResourcePermission context) {}
+
+  @Override
+  public void authorize(ResourcePermission context, Subject currentUser) {}
+
+  @Override
+  public void close() {}
+
+  @Override
+  public boolean needPostProcess() {
+    return false;
+  }
+
+  @Override
+  public Object postProcess(String regionPath, Object key, Object value,
+      boolean valueIsSerialized) {
+    return value;
+  }
+
+  @Override
+  public Object postProcess(Object principal, String regionPath, Object key, Object value,
+      boolean valueIsSerialized) {
+    return value;
+  }
+
+  @Override
+  public SecurityManager getSecurityManager() {
+    return null;
+  }
+
+  @Override
+  public PostProcessor getPostProcessor() {
+    return null;
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/security/SecurityService.java b/geode-core/src/main/java/org/apache/geode/internal/security/SecurityService.java
index e86050e..5142bd8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/security/SecurityService.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/security/SecurityService.java
@@ -29,69 +29,46 @@ import org.apache.geode.security.SecurityManager;
 
 public interface SecurityService {
 
-  default ThreadState bindSubject(Subject subject) {
-    return null;
-  }
+  ThreadState bindSubject(Subject subject);
 
-  default Subject getSubject() {
-    return null;
-  }
+  Subject getSubject();
 
-  default Subject login(Properties credentials) {
-    return null;
-  }
+  Subject login(Properties credentials);
 
-  default void logout() {}
+  void logout();
 
-  default Callable associateWith(Callable callable) {
-    return callable;
-  }
+  Callable associateWith(Callable callable);
 
-  default void authorize(Resource resource, Operation operation) {}
+  void authorize(Resource resource, Operation operation);
 
-  default void authorize(Resource resource, Operation operation, Target target) {}
+  void authorize(Resource resource, Operation operation, Target target);
 
-  default void authorize(Resource resource, Operation operation, String target) {}
+  void authorize(Resource resource, Operation operation, String target);
 
-  default void authorize(Resource resource, Operation operation, String target, String key) {}
+  void authorize(Resource resource, Operation operation, String target, String key);
 
-  default void authorize(Resource resource, Operation operation, Target target, String key) {}
+  void authorize(Resource resource, Operation operation, Target target, String key);
 
-  default void authorize(ResourcePermission context) {}
+  void authorize(ResourcePermission context);
 
-  default void close() {}
+  void authorize(ResourcePermission context, Subject currentUser);
 
-  default boolean needPostProcess() {
-    return false;
-  }
+  void close();
 
-  default Object postProcess(String regionPath, Object key, Object value,
-      boolean valueIsSerialized) {
-    return value;
-  }
+  boolean needPostProcess();
 
-  default Object postProcess(Object principal, String regionPath, Object key, Object value,
-      boolean valueIsSerialized) {
-    return value;
-  }
+  Object postProcess(String regionPath, Object key, Object value, boolean valueIsSerialized);
 
-  default boolean isClientSecurityRequired() {
-    return false;
-  }
+  Object postProcess(Object principal, String regionPath, Object key, Object value,
+      boolean valueIsSerialized);
 
-  default boolean isIntegratedSecurity() {
-    return false;
-  }
+  boolean isClientSecurityRequired();
 
-  default boolean isPeerSecurityRequired() {
-    return false;
-  }
+  boolean isIntegratedSecurity();
 
-  default SecurityManager getSecurityManager() {
-    return null;
-  }
+  boolean isPeerSecurityRequired();
 
-  default PostProcessor getPostProcessor() {
-    return null;
-  }
+  SecurityManager getSecurityManager();
+
+  PostProcessor getPostProcessor();
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/LocatorMessageExecutionContext.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/LocatorMessageExecutionContext.java
index b615326..4280c3c 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/LocatorMessageExecutionContext.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/LocatorMessageExecutionContext.java
@@ -20,40 +20,40 @@ import org.apache.geode.distributed.Locator;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
 import org.apache.geode.internal.protocol.protobuf.statistics.ClientStatistics;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.Authorizer;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.AuthorizingCache;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.AuthorizingLocator;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.AuthorizingLocatorImpl;
 import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionStateProcessor;
 import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionTerminatingStateProcessor;
 
 @Experimental
 public class LocatorMessageExecutionContext extends MessageExecutionContext {
   private final Locator locator;
+  private AuthorizingLocator authorizingLocator;
 
   public LocatorMessageExecutionContext(Locator locator, ClientStatistics statistics,
-      ProtobufConnectionStateProcessor initialProtobufConnectionStateProcessor) {
+      ProtobufConnectionStateProcessor initialProtobufConnectionStateProcessor,
+      Authorizer authorizer) {
     super(statistics, initialProtobufConnectionStateProcessor);
     this.locator = locator;
+    this.authorizingLocator = new AuthorizingLocatorImpl(locator, authorizer);
   }
 
-  /**
-   * Returns the cache associated with this execution
-   * <p>
-   *
-   * @throws InvalidExecutionContextException if there is no cache available
-   */
   @Override
-  public InternalCache getCache() throws InvalidExecutionContextException {
+  public AuthorizingCache getAuthorizingCache() throws InvalidExecutionContextException {
     setConnectionStateProcessor(new ProtobufConnectionTerminatingStateProcessor());
     throw new InvalidExecutionContextException(
         "Operations on the locator should not to try to operate on a server");
   }
 
-  /**
-   * Returns the locator associated with this execution
-   * <p>
-   *
-   * @throws InvalidExecutionContextException if there is no locator available
-   */
   @Override
-  public Locator getLocator() throws InvalidExecutionContextException {
-    return locator;
+  public AuthorizingLocator getAuthorizingLocator() throws InvalidExecutionContextException {
+    return authorizingLocator;
+  }
+
+  @Override
+  public void setAuthorizer(Authorizer authorizer) {
+    this.authorizingLocator = new AuthorizingLocatorImpl(locator, authorizer);
   }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/MessageExecutionContext.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/MessageExecutionContext.java
index 80d560b..ea24ec5 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/MessageExecutionContext.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/MessageExecutionContext.java
@@ -15,10 +15,11 @@
 package org.apache.geode.internal.protocol.protobuf.v1;
 
 import org.apache.geode.annotations.Experimental;
-import org.apache.geode.distributed.Locator;
-import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
 import org.apache.geode.internal.protocol.protobuf.statistics.ClientStatistics;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.Authorizer;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.AuthorizingCache;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.AuthorizingLocator;
 import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionStateProcessor;
 
 @Experimental
@@ -36,9 +37,10 @@ public abstract class MessageExecutionContext {
     return protobufConnectionStateProcessor;
   }
 
-  public abstract InternalCache getCache() throws InvalidExecutionContextException;
+  public abstract AuthorizingCache getAuthorizingCache() throws InvalidExecutionContextException;
 
-  public abstract Locator getLocator() throws InvalidExecutionContextException;
+  public abstract AuthorizingLocator getAuthorizingLocator()
+      throws InvalidExecutionContextException;
 
   /**
    * Returns the statistics for recording operation stats. In a unit test environment this may not
@@ -52,4 +54,6 @@ public abstract class MessageExecutionContext {
       ProtobufConnectionStateProcessor protobufConnectionStateProcessor) {
     this.protobufConnectionStateProcessor = protobufConnectionStateProcessor;
   }
+
+  public abstract void setAuthorizer(Authorizer authorizer);
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufOperationContext.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufOperationContext.java
index 8e46786..453c62e 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufOperationContext.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufOperationContext.java
@@ -31,42 +31,15 @@ public class ProtobufOperationContext<OperationRequest, OperationResponse> {
   private final Function<ClientProtocol.Message, OperationRequest> fromRequest;
   private final Function<OperationResponse, ClientProtocol.Message.Builder> toResponse;
   private final Function<ClientProtocol.ErrorResponse, ClientProtocol.Message.Builder> toErrorResponse;
-  private final PermissionFunction<OperationRequest> accessPermissionRequired;
 
-  private class StaticResourcePermissionProvider implements PermissionFunction<OperationRequest> {
-    private final ResourcePermission permission;
-
-    StaticResourcePermissionProvider(ResourcePermission requiredPermission) {
-      permission = requiredPermission;
-    }
-
-    @Override
-    public ResourcePermission apply(OperationRequest request,
-        ProtobufSerializationService serializer) {
-      return permission;
-    }
-  }
 
   public ProtobufOperationContext(Function<ClientProtocol.Message, OperationRequest> fromRequest,
       ProtobufOperationHandler<OperationRequest, OperationResponse> operationHandler,
-      Function<OperationResponse, ClientProtocol.Message.Builder> toResponse,
-      ResourcePermission permissionRequired) {
+      Function<OperationResponse, ClientProtocol.Message.Builder> toResponse) {
     this.operationHandler = operationHandler;
     this.fromRequest = fromRequest;
     this.toResponse = toResponse;
     this.toErrorResponse = this::makeErrorBuilder;
-    accessPermissionRequired = new StaticResourcePermissionProvider(permissionRequired);
-  }
-
-  public ProtobufOperationContext(Function<ClientProtocol.Message, OperationRequest> fromRequest,
-      ProtobufOperationHandler<OperationRequest, OperationResponse> operationHandler,
-      Function<OperationResponse, ClientProtocol.Message.Builder> toResponse,
-      PermissionFunction<OperationRequest> permissionRequired) {
-    this.operationHandler = operationHandler;
-    this.fromRequest = fromRequest;
-    this.toResponse = toResponse;
-    this.toErrorResponse = this::makeErrorBuilder;
-    accessPermissionRequired = permissionRequired;
   }
 
 
@@ -90,9 +63,4 @@ public class ProtobufOperationContext<OperationRequest, OperationResponse> {
   public Function<ClientProtocol.ErrorResponse, ClientProtocol.Message.Builder> getToErrorResponse() {
     return toErrorResponse;
   }
-
-  public ResourcePermission getAccessPermissionRequired(OperationRequest request,
-      ProtobufSerializationService serializer) throws DecodingException {
-    return accessPermissionRequired.apply(request, serializer);
-  }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufOpsProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufOpsProcessor.java
index cfc71b3..c137c42 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufOpsProcessor.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufOpsProcessor.java
@@ -25,6 +25,7 @@ import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.De
 import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException;
 import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionTerminatingStateProcessor;
 import org.apache.geode.internal.protocol.protobuf.v1.state.exception.ConnectionStateException;
+import org.apache.geode.security.NotAuthorizedException;
 
 /**
  * This handles protobuf requests by determining the operation type of the request and dispatching
@@ -90,6 +91,8 @@ public class ProtobufOpsProcessor {
       logger.error(exception);
       return Failure.of(BasicTypes.ErrorCode.UNSUPPORTED_OPERATION,
           "Unsupported operation:" + exception.getMessage());
+    } catch (NotAuthorizedException e) {
+      return Failure.of(BasicTypes.ErrorCode.AUTHORIZATION_FAILED, "Not authorized: " + e);
     } finally {
       context.getStatistics().endOperation(startTime);
     }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java
index bfc2048..82e1ceb 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufProtocolService.java
@@ -23,6 +23,7 @@ import org.apache.geode.internal.protocol.protobuf.ProtocolVersion;
 import org.apache.geode.internal.protocol.protobuf.statistics.ClientStatistics;
 import org.apache.geode.internal.protocol.protobuf.statistics.NoOpStatistics;
 import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatistics;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.NotLoggedInAuthorizer;
 import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionHandshakeStateProcessor;
 import org.apache.geode.internal.security.SecurityService;
 
@@ -44,8 +45,8 @@ public class ProtobufProtocolService implements ClientProtocolService {
 
     ProtobufConnectionHandshakeStateProcessor connectionStateProcessor =
         new ProtobufConnectionHandshakeStateProcessor(securityService);
-    return new ProtobufCachePipeline(protobufStreamProcessor,
-        new ServerMessageExecutionContext(cache, statistics, connectionStateProcessor));
+    return new ProtobufCachePipeline(protobufStreamProcessor, new ServerMessageExecutionContext(
+        cache, statistics, connectionStateProcessor, new NotLoggedInAuthorizer()));
   }
 
   /**
@@ -64,8 +65,8 @@ public class ProtobufProtocolService implements ClientProtocolService {
       SecurityService securityService) {
     ProtobufConnectionHandshakeStateProcessor connectionStateProcessor =
         new ProtobufConnectionHandshakeStateProcessor(securityService);
-    return new ProtobufCachePipeline(protobufStreamProcessor,
-        new LocatorMessageExecutionContext(locator, statistics, connectionStateProcessor));
+    return new ProtobufCachePipeline(protobufStreamProcessor, new LocatorMessageExecutionContext(
+        locator, statistics, connectionStateProcessor, new NotLoggedInAuthorizer()));
   }
 
   @Override
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufSerializationService.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufSerializationService.java
index d20b05e..932f168 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufSerializationService.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufSerializationService.java
@@ -14,6 +14,9 @@
  */
 package org.apache.geode.internal.protocol.protobuf.v1;
 
+import java.util.Collection;
+import java.util.stream.Collectors;
+
 import com.google.protobuf.ByteString;
 import com.google.protobuf.NullValue;
 
@@ -133,6 +136,10 @@ public class ProtobufSerializationService implements SerializationService<BasicT
     }
   }
 
+  public Collection<Object> decodeList(Collection<BasicTypes.EncodedValue> encodedValues) {
+    return encodedValues.stream().map(this::decode).collect(Collectors.toList());
+  }
+
   /**
    * Maps classes to encoding for protobuf.
    *
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ServerMessageExecutionContext.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ServerMessageExecutionContext.java
index efb5430..e4db0f9 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ServerMessageExecutionContext.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ServerMessageExecutionContext.java
@@ -16,42 +16,40 @@
 package org.apache.geode.internal.protocol.protobuf.v1;
 
 import org.apache.geode.annotations.Experimental;
-import org.apache.geode.distributed.Locator;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
 import org.apache.geode.internal.protocol.protobuf.statistics.ClientStatistics;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.Authorizer;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.AuthorizingCache;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.AuthorizingCacheImpl;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.AuthorizingLocator;
 import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionStateProcessor;
 
 @Experimental
 public class ServerMessageExecutionContext extends MessageExecutionContext {
   private final InternalCache cache;
+  private AuthorizingCache authorizingCache;
 
   public ServerMessageExecutionContext(InternalCache cache, ClientStatistics statistics,
-      ProtobufConnectionStateProcessor initialConnectionStateProcessor) {
+      ProtobufConnectionStateProcessor initialConnectionStateProcessor, Authorizer authorizer) {
     super(statistics, initialConnectionStateProcessor);
     this.cache = cache;
+    this.authorizingCache = new AuthorizingCacheImpl(cache, authorizer);
   }
 
-  /**
-   * Returns the cache associated with this execution
-   * <p>
-   *
-   * @throws InvalidExecutionContextException if there is no cache available
-   */
   @Override
-  public InternalCache getCache() throws InvalidExecutionContextException {
-    return cache;
+  public AuthorizingCache getAuthorizingCache() {
+    return this.authorizingCache;
   }
 
-  /**
-   * Returns the locator associated with this execution
-   * <p>
-   *
-   * @throws InvalidExecutionContextException if there is no locator available
-   */
   @Override
-  public Locator getLocator() throws InvalidExecutionContextException {
+  public AuthorizingLocator getAuthorizingLocator() throws InvalidExecutionContextException {
     throw new InvalidExecutionContextException(
         "Operations on the server should not to try to operate on a locator");
   }
+
+  @Override
+  public void setAuthorizer(Authorizer authorizer) {
+    this.authorizingCache = new AuthorizingCacheImpl(cache, authorizer);
+  }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/Authorizer.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/Authorizer.java
new file mode 100644
index 0000000..d19205a
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/Authorizer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1.authentication;
+
+import org.apache.geode.security.ResourcePermission;
+
+/**
+ * Authorization checker. When security is enabled, this wraps a subject and checks that the
+ * subject is allowed to perform the requested operation
+ */
+public interface Authorizer {
+  default void authorize(ResourcePermission.Resource data, ResourcePermission.Operation read,
+      String regionName, Object key) {
+    authorize(new ResourcePermission(data, read, regionName, key.toString()));
+  }
+
+  void authorize(ResourcePermission resourcePermission);
+}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingCache.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingCache.java
new file mode 100644
index 0000000..aee50c4
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingCache.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1.authentication;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import org.apache.geode.cache.query.FunctionDomainException;
+import org.apache.geode.cache.query.NameResolutionException;
+import org.apache.geode.cache.query.Query;
+import org.apache.geode.cache.query.QueryInvocationTargetException;
+import org.apache.geode.cache.query.TypeMismatchException;
+
+/**
+ * Access layer for cache operations that performs the appropriate authorization checks before
+ * performing the operation
+ */
+public interface AuthorizingCache {
+  <K, V> void getAll(String regionName, Iterable<K> keys, BiConsumer<K, V> successConsumer,
+      BiConsumer<K, Exception> failureConsumer);
+
+  <K, V> V get(String regionName, K key);
+
+  <K, V> void put(String regionName, K key, V value);
+
+  <K, V> void putAll(String regionName, Map<K, V> entries,
+      BiConsumer<K, Exception> failureConsumer);
+
+  <K, V> V remove(String regionName, K key);
+
+  Collection<String> getRegionNames();
+
+  int getSize(String regionName);
+
+  <K> Set<K> keySet(String regionName);
+
+  AuthorizingFunctionService getFunctionService();
+
+  void clear(String regionName);
+
+  <K, V> V putIfAbsent(String regionName, K decodedKey, V decodedValue);
+
+  Object query(String query, Object[] bindParameters) throws NameResolutionException,
+      TypeMismatchException, QueryInvocationTargetException, FunctionDomainException;
+}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingCacheImpl.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingCacheImpl.java
new file mode 100644
index 0000000..50a96f2
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingCacheImpl.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1.authentication;
+
+import static org.apache.geode.security.ResourcePermission.ALL;
+import static org.apache.geode.security.ResourcePermission.Operation.READ;
+import static org.apache.geode.security.ResourcePermission.Operation.WRITE;
+import static org.apache.geode.security.ResourcePermission.Resource.DATA;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionDestroyedException;
+import org.apache.geode.cache.query.FunctionDomainException;
+import org.apache.geode.cache.query.NameResolutionException;
+import org.apache.geode.cache.query.Query;
+import org.apache.geode.cache.query.QueryInvocationTargetException;
+import org.apache.geode.cache.query.TypeMismatchException;
+import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.InternalQueryService;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.security.NotAuthorizedException;
+import org.apache.geode.security.ResourcePermission;
+
+public class AuthorizingCacheImpl implements AuthorizingCache {
+  protected final InternalCache cache;
+  protected final Authorizer authorizer;
+  private final AuthorizingFunctionService functionService;
+
+  public AuthorizingCacheImpl(InternalCache cache, Authorizer authorizer) {
+    this.cache = cache;
+    this.authorizer = authorizer;
+    this.functionService = new AuthorizingFunctionServiceImpl(cache, authorizer);
+  }
+
+  @Override
+  public <K, V> void getAll(String regionName, Iterable<K> keys, BiConsumer<K, V> successConsumer,
+      BiConsumer<K, Exception> failureConsumer) {
+    Region<K, V> region = getRegion(regionName);
+
+    boolean authorized = tryAuthorizeAllKeys(DATA, READ, regionName);
+
+    keys.forEach(key -> {
+      try {
+        if (!authorized) {
+          authorizer.authorize(DATA, READ, regionName, key);
+        }
+        V value = (V) region.get(key);
+        successConsumer.accept(key, value);
+      } catch (Exception e) {
+        failureConsumer.accept(key, e);
+      }
+    });
+  }
+
+  @Override
+  public <K, V> V get(String regionName, K key) {
+    authorizer.authorize(DATA, READ, regionName, key);
+    Region<K, V> region = getRegion(regionName);
+    return region.get(key);
+  }
+
+  @Override
+  public <K, V> void put(String regionName, K key, V value) {
+    authorizer.authorize(DATA, WRITE, regionName, key);
+    Region<K, V> region = getRegion(regionName);
+    region.put(key, value);
+  }
+
+  @Override
+  public <K, V> void putAll(String regionName, Map<K, V> entries,
+      BiConsumer<K, Exception> failureConsumer) {
+    // TODO - this is doing a very inefficient put for each key
+
+    boolean authorized = tryAuthorizeAllKeys(DATA, WRITE, regionName);
+
+    Region<K, V> region = getRegion(regionName);
+    entries.forEach((key, value) -> {
+      try {
+        if (!authorized) {
+          authorizer.authorize(DATA, WRITE, regionName, key);
+        }
+        region.put(key, value);
+      } catch (Exception e) {
+        failureConsumer.accept(key, e);
+      }
+    });
+  }
+
+  @Override
+  public <K, V> V remove(String regionName, K key) {
+    authorizer.authorize(DATA, WRITE, regionName, key);
+    Region<K, V> region = getRegion(regionName);
+    return region.remove(key);
+  }
+
+  @Override
+  public Collection<String> getRegionNames() {
+    authorizer.authorize(DATA, READ, ALL, ALL);
+    Set<String> regionNames = new HashSet<>();
+
+    cache.rootRegions().forEach(region -> {
+      regionNames.add(region.getFullPath());
+      region.subregions(true).stream().map(Region::getFullPath).forEach(regionNames::add);
+    });
+
+    return regionNames;
+  }
+
+  @Override
+  public int getSize(String regionName) {
+    authorizer.authorize(DATA, READ, regionName, ALL);
+    return getRegion(regionName).size();
+  }
+
+  @Override
+  public <K> Set<K> keySet(String regionName) {
+    authorizer.authorize(DATA, READ, regionName, ALL);
+    return ((Region<K, ?>) getRegion(regionName)).keySet();
+  }
+
+  @Override
+  public AuthorizingFunctionService getFunctionService() {
+    return functionService;
+  }
+
+  @Override
+  public void clear(String regionName) {
+    authorizer.authorize(DATA, WRITE, regionName, ALL);
+    Region<?, ?> region = getRegion(regionName);
+    region.clear();
+  }
+
+  @Override
+  public <K, V> V putIfAbsent(String regionName, K key, V value) {
+    authorizer.authorize(DATA, WRITE, regionName, key);
+    Region<K, V> region = getRegion(regionName);
+    return region.putIfAbsent(key, value);
+  }
+
+  @Override
+  public Object query(String queryString, Object[] bindParameters) throws NameResolutionException,
+      TypeMismatchException, QueryInvocationTargetException, FunctionDomainException {
+
+    InternalQueryService queryService = cache.getQueryService();
+
+    Query query = queryService.newQuery(queryString);
+
+    for (String regionName : ((DefaultQuery) query).getRegionsInQuery(bindParameters)) {
+      authorizer.authorize(DATA, READ, regionName, ALL);
+    }
+
+    return query.execute(bindParameters);
+  }
+
+  private <K, V> Region<K, V> getRegion(String regionName) {
+    Region<K, V> region = cache.getRegion(regionName);
+    if (region == null) {
+      throw new RegionDestroyedException("Region not found " + regionName, regionName);
+    }
+    return region;
+  }
+
+  /**
+   * Try to authorize the user for all keys.
+   *
+   * @return true if the user is authorized for all keys
+   */
+  private boolean tryAuthorizeAllKeys(ResourcePermission.Resource resource,
+      ResourcePermission.Operation operation, String regionName) {
+    try {
+      authorizer.authorize(resource, operation, regionName, ALL);
+      return true;
+    } catch (NotAuthorizedException e) {
+      return false;
+    }
+  }
+}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingFunctionService.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingFunctionService.java
new file mode 100644
index 0000000..acd815f
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingFunctionService.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1.authentication;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Access layer for the function service that performs authorization checks
+ */
+public interface AuthorizingFunctionService {
+  List<Object> executeFunctionOnRegion(String functionID, String regionName, Object arguments,
+      Set<?> keyFilter);
+
+  List<Object> executeFunctionOnMember(String functionID, Object arguments,
+      List<String> memberNameList);
+
+  List<Object> executeFunctionOnGroups(String functionID, Object arguments,
+      List<String> groupNameList);
+}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingFunctionServiceImpl.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingFunctionServiceImpl.java
new file mode 100644
index 0000000..f772f2d
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingFunctionServiceImpl.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1.authentication;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionDestroyedException;
+import org.apache.geode.cache.execute.Execution;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.i18n.LocalizedStrings;
+
+public class AuthorizingFunctionServiceImpl implements AuthorizingFunctionService {
+
+  private final Authorizer authorizer;
+  private final InternalCache internalCache;
+
+  public AuthorizingFunctionServiceImpl(InternalCache internalCache, Authorizer authorizer) {
+    this.authorizer = authorizer;
+    this.internalCache = internalCache;
+  }
+
+  @Override
+  public List<Object> executeFunctionOnRegion(String functionID, String regionName,
+      Object arguments, Set<?> keyFilter) {
+
+    Function function = authorizeAndGetFunction(regionName, functionID);
+    Region region = getRegion(regionName);
+    Execution execution = FunctionService.onRegion(region);
+    if (keyFilter != null) {
+      execution = execution.withFilter(keyFilter);
+    }
+    return executeFunction(execution, functionID, function, arguments);
+  }
+
+  private List<Object> executeFunction(Execution execution, String functionID, Function function,
+      Object arguments) {
+    if (arguments != null) {
+      execution = execution.setArguments(arguments);
+    }
+    ResultCollector<Object, List<Object>> collector = execution.execute(functionID);
+    if (function.hasResult()) {
+      return collector.getResult();
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  private Function<?> authorizeAndGetFunction(String regionName, String functionID) {
+    final Function<?> function = FunctionService.getFunction(functionID);
+    if (function == null) {
+      throw new IllegalArgumentException(
+          LocalizedStrings.ExecuteFunction_FUNCTION_NAMED_0_IS_NOT_REGISTERED
+              .toLocalizedString(functionID));
+    }
+
+    function.getRequiredPermissions(regionName).forEach(authorizer::authorize);
+    return function;
+  }
+
+  @Override
+  public List<Object> executeFunctionOnMember(String functionID, Object arguments,
+      List<String> memberNameList) {
+
+    Function function = authorizeAndGetFunction(null, functionID);
+    Execution execution = FunctionService.onMembers(getMemberIDs(functionID, memberNameList));
+    return executeFunction(execution, functionID, function, arguments);
+  }
+
+  @Override
+  public List<Object> executeFunctionOnGroups(String functionID, Object arguments,
+      List<String> groupNameList) {
+    Function function = authorizeAndGetFunction(null, functionID);
+    Execution execution = FunctionService.onMember(groupNameList.toArray(new String[0]));
+    return executeFunction(execution, functionID, function, arguments);
+  }
+
+  private Set<DistributedMember> getMemberIDs(String functionID, List<String> memberNameList) {
+    Set<DistributedMember> memberIds = new HashSet<>(memberNameList.size());
+    DistributionManager distributionManager = internalCache.getDistributionManager();
+    for (String name : memberNameList) {
+      DistributedMember member = distributionManager.getMemberWithName(name);
+      if (member == null) {
+        throw new IllegalArgumentException(
+            "Member " + name + " not found to execute \"" + functionID + "\"");
+      }
+      memberIds.add(member);
+    }
+    if (memberIds.isEmpty()) {
+      throw new IllegalArgumentException("No members found to execute \"" + functionID + "\"");
+    }
+
+    return memberIds;
+  }
+
+  private <K, V> Region<K, V> getRegion(String regionName) {
+    Region<K, V> region = internalCache.getRegion(regionName);
+    if (region == null) {
+      throw new RegionDestroyedException("Region not found " + regionName, regionName);
+    }
+    return region;
+  }
+}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingLocator.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingLocator.java
new file mode 100644
index 0000000..baa0fd9
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingLocator.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1.authentication;
+
+import java.util.Set;
+
+import org.apache.geode.distributed.internal.ServerLocation;
+
+/**
+ * Access layer for locator that performs authorization checks
+ */
+public interface AuthorizingLocator {
+  ServerLocation findServer(Set<ServerLocation> excludedServers, String serverGroup);
+}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingLocatorImpl.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingLocatorImpl.java
new file mode 100644
index 0000000..005bdef
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingLocatorImpl.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1.authentication;
+
+import static org.apache.geode.security.ResourcePermission.ALL;
+import static org.apache.geode.security.ResourcePermission.Operation.READ;
+import static org.apache.geode.security.ResourcePermission.Resource.CLUSTER;
+
+import java.util.Set;
+
+import org.apache.geode.cache.client.internal.locator.ClientConnectionRequest;
+import org.apache.geode.cache.client.internal.locator.ClientConnectionResponse;
+import org.apache.geode.distributed.Locator;
+import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.distributed.internal.ServerLocation;
+
+public class AuthorizingLocatorImpl implements AuthorizingLocator {
+  private final Locator locator;
+  private final Authorizer authorizer;
+
+  public AuthorizingLocatorImpl(Locator locator, Authorizer authorizer) {
+    this.locator = locator;
+    this.authorizer = authorizer;
+  }
+
+  @Override
+  public ServerLocation findServer(Set<ServerLocation> excludedServers, String serverGroup) {
+    authorizer.authorize(CLUSTER, READ, ALL, ALL);
+    InternalLocator internalLocator = (InternalLocator) locator;
+
+    // In order to ensure that proper checks are performed on the request we will use
+    // the locator's processRequest() API. We assume that all servers have Protobuf
+    // enabled.
+    ClientConnectionRequest clientConnectionRequest =
+        new ClientConnectionRequest(excludedServers, serverGroup);
+    ClientConnectionResponse connectionResponse = (ClientConnectionResponse) internalLocator
+        .getServerLocatorAdvisee().processRequest(clientConnectionRequest);
+
+    ServerLocation serverLocation = null;
+    if (connectionResponse != null && connectionResponse.hasResult()) {
+      serverLocation = connectionResponse.getServer();
+    }
+
+    return serverLocation;
+
+  }
+}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/NoSecurityAuthorizer.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/NoSecurityAuthorizer.java
new file mode 100644
index 0000000..170987b
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/NoSecurityAuthorizer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1.authentication;
+
+import org.apache.geode.security.ResourcePermission;
+
+public class NoSecurityAuthorizer implements Authorizer {
+  @Override
+  public void authorize(ResourcePermission.Resource data, ResourcePermission.Operation read,
+      String regionName, Object key) {
+
+  }
+
+  @Override
+  public void authorize(ResourcePermission resourcePermission) {
+
+  }
+}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/NotLoggedInAuthorizer.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/NotLoggedInAuthorizer.java
new file mode 100644
index 0000000..2538549
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/NotLoggedInAuthorizer.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1.authentication;
+
+import org.apache.geode.security.ResourcePermission;
+
+/**
+ * An Authorizor that fails authorization because the user is not logged in.
+ */
+public class NotLoggedInAuthorizer implements Authorizer {
+  @Override
+  public void authorize(ResourcePermission permission) {
+
+    throw new IllegalStateException("User has not yet logged in");
+
+  }
+}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/ShiroAuthorizer.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/ShiroAuthorizer.java
new file mode 100644
index 0000000..2c794b6
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/ShiroAuthorizer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1.authentication;
+
+import org.apache.shiro.subject.Subject;
+
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.security.ResourcePermission;
+
+public class ShiroAuthorizer implements Authorizer {
+  private final Subject subject;
+  private final SecurityService securityService;
+
+  public ShiroAuthorizer(SecurityService securityService, Subject subject) {
+    this.securityService = securityService;
+    this.subject = subject;
+  }
+
+  @Override
+  public void authorize(ResourcePermission permission) {
+    securityService.authorize(permission, subject);
+  }
+}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/AbstractFunctionRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/AbstractFunctionRequestOperationHandler.java
deleted file mode 100644
index afba64f..0000000
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/AbstractFunctionRequestOperationHandler.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.protocol.protobuf.v1.operations;
-
-import java.util.List;
-import java.util.Set;
-
-import org.apache.logging.log4j.Logger;
-import org.apache.shiro.util.ThreadState;
-
-import org.apache.geode.cache.execute.Execution;
-import org.apache.geode.cache.execute.Function;
-import org.apache.geode.cache.execute.FunctionException;
-import org.apache.geode.cache.execute.FunctionService;
-import org.apache.geode.cache.execute.ResultCollector;
-import org.apache.geode.internal.exception.InvalidExecutionContextException;
-import org.apache.geode.internal.i18n.LocalizedStrings;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler;
-import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
-import org.apache.geode.internal.protocol.protobuf.v1.Failure;
-import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
-import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
-import org.apache.geode.internal.protocol.protobuf.v1.Result;
-import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.DecodingException;
-import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException;
-import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionAuthorizingStateProcessor;
-import org.apache.geode.internal.protocol.protobuf.v1.state.exception.OperationNotAuthorizedException;
-import org.apache.geode.internal.security.SecurityService;
-import org.apache.geode.security.NotAuthorizedException;
-
-public abstract class AbstractFunctionRequestOperationHandler<Req, Resp>
-    implements ProtobufOperationHandler<Req, Resp> {
-  private static final Logger logger = LogService.getLogger();
-
-  @Override
-  public Result<Resp> process(ProtobufSerializationService serializationService, Req request,
-      MessageExecutionContext messageExecutionContext)
-      throws InvalidExecutionContextException, DecodingException, EncodingException {
-
-    final String functionID = getFunctionID(request);
-
-    final Function<?> function = FunctionService.getFunction(functionID);
-    if (function == null) {
-      return Failure.of(BasicTypes.ErrorCode.INVALID_REQUEST,
-          LocalizedStrings.ExecuteFunction_FUNCTION_NAMED_0_IS_NOT_REGISTERED
-              .toLocalizedString(functionID));
-    }
-
-    final SecurityService securityService = messageExecutionContext.getCache().getSecurityService();
-    final String regionName = getRegionName(request);
-
-    ThreadState threadState = null;
-    if (messageExecutionContext
-        .getConnectionStateProcessor() instanceof ProtobufConnectionAuthorizingStateProcessor) {
-      threadState = ((ProtobufConnectionAuthorizingStateProcessor) messageExecutionContext
-          .getConnectionStateProcessor()).prepareThreadForAuthorization();
-    }
-    try {
-      // check security for function.
-      function.getRequiredPermissions(regionName).forEach(securityService::authorize);
-    } catch (NotAuthorizedException ex) {
-      messageExecutionContext.getStatistics().incAuthorizationViolations();
-      throw new OperationNotAuthorizedException(
-          "The user is not authorized to complete this operation");
-    } finally {
-      if (threadState != null) {
-        ((ProtobufConnectionAuthorizingStateProcessor) messageExecutionContext
-            .getConnectionStateProcessor()).restoreThreadState(threadState);
-      }
-    }
-
-    Object executionTarget = getExecutionTarget(request, regionName, messageExecutionContext);
-    if (executionTarget instanceof Failure) {
-      return (Failure) executionTarget;
-    }
-
-    try {
-      Execution execution = getFunctionExecutionObject(executionTarget);
-
-      Object arguments = getFunctionArguments(request, serializationService);
-
-      if (arguments != null) {
-        execution = execution.setArguments(arguments);
-      }
-
-      Set<?> parseFilter = parseFilter(serializationService, request);
-      if (parseFilter != null) {
-        execution = execution.withFilter(parseFilter);
-      }
-
-      final ResultCollector<Object, List<Object>> resultCollector = execution.execute(functionID);
-
-      if (function.hasResult()) {
-        List<Object> results = resultCollector.getResult();
-
-        return buildResultMessage(serializationService, results);
-      } else {
-        // This is fire and forget.
-        return buildResultMessage(serializationService);
-      }
-    } catch (FunctionException ex) {
-      final String message = "Function execution failed: " + ex.toString();
-      logger.info(message, ex);
-      return Failure.of(BasicTypes.ErrorCode.SERVER_ERROR, message);
-    }
-  }
-
-  protected abstract Set<?> parseFilter(ProtobufSerializationService serializationService,
-      Req request) throws EncodingException, DecodingException;
-
-  protected abstract String getFunctionID(Req request);
-
-  /** the result of this may be null, which is used by the security service to mean "no region" */
-  protected abstract String getRegionName(Req request);
-
-  /** region, list of members, etc */
-  protected abstract Object getExecutionTarget(Req request, String regionName,
-      MessageExecutionContext executionContext) throws InvalidExecutionContextException;
-
-  /** arguments for the function */
-  protected abstract Object getFunctionArguments(Req request,
-      ProtobufSerializationService serializationService)
-      throws EncodingException, DecodingException;
-
-  protected abstract Execution getFunctionExecutionObject(Object executionTarget)
-      throws InvalidExecutionContextException;
-
-  protected abstract Result buildResultMessage(ProtobufSerializationService serializationService)
-      throws EncodingException;
-
-  protected abstract Result buildResultMessage(ProtobufSerializationService serializationService,
-      List<Object> results) throws EncodingException;
-}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ClearRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ClearRequestOperationHandler.java
index 7e3b400..b42b40d 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ClearRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ClearRequestOperationHandler.java
@@ -34,7 +34,6 @@ import org.apache.geode.security.ResourcePermission;
 @Experimental
 public class ClearRequestOperationHandler
     implements ProtobufOperationHandler<RegionAPI.ClearRequest, RegionAPI.ClearResponse> {
-  private static final Logger logger = LogManager.getLogger();
 
   @Override
   public Result<RegionAPI.ClearResponse> process(ProtobufSerializationService serializationService,
@@ -42,21 +41,9 @@ public class ClearRequestOperationHandler
       throws InvalidExecutionContextException, DecodingException {
 
     String regionName = request.getRegionName();
-    Region region = messageExecutionContext.getCache().getRegion(regionName);
-    if (region == null) {
-      logger.error("Received clear request for nonexistent region: {}", regionName);
-      return Failure.of(BasicTypes.ErrorCode.SERVER_ERROR,
-          "Region \"" + regionName + "\" not found");
-    }
 
-    region.clear();
+    messageExecutionContext.getAuthorizingCache().clear(regionName);
 
     return Success.of(RegionAPI.ClearResponse.newBuilder().build());
   }
-
-  public static ResourcePermission determineRequiredPermission(RegionAPI.ClearRequest request,
-      ProtobufSerializationService serializer) throws DecodingException {
-    return new ResourcePermission(ResourcePermission.Resource.DATA,
-        ResourcePermission.Operation.WRITE, request.getRegionName());
-  }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnGroupRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnGroupRequestOperationHandler.java
index 3224e81..e6ec3db 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnGroupRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnGroupRequestOperationHandler.java
@@ -23,8 +23,10 @@ import org.apache.geode.cache.execute.Execution;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
+import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.Failure;
+import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI;
 import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI.ExecuteFunctionOnGroupRequest;
 import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI.ExecuteFunctionOnGroupResponse;
 import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
@@ -34,56 +36,32 @@ import org.apache.geode.internal.protocol.protobuf.v1.Success;
 import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.DecodingException;
 import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException;
 
-public class ExecuteFunctionOnGroupRequestOperationHandler extends
-    AbstractFunctionRequestOperationHandler<ExecuteFunctionOnGroupRequest, ExecuteFunctionOnGroupResponse> {
-
+public class ExecuteFunctionOnGroupRequestOperationHandler implements
+    ProtobufOperationHandler<ExecuteFunctionOnGroupRequest, ExecuteFunctionOnGroupResponse> {
 
   @Override
-  protected Set<?> parseFilter(ProtobufSerializationService serializationService,
-      ExecuteFunctionOnGroupRequest request) {
-    // filters are not allowed on functions not associated with regions
-    return null;
-  }
+  public Result<FunctionAPI.ExecuteFunctionOnGroupResponse> process(
+      ProtobufSerializationService serializationService,
+      FunctionAPI.ExecuteFunctionOnGroupRequest request,
+      MessageExecutionContext messageExecutionContext)
+      throws InvalidExecutionContextException, DecodingException, EncodingException {
 
-  @Override
-  protected String getFunctionID(ExecuteFunctionOnGroupRequest request) {
-    return request.getFunctionID();
-  }
+    final String functionID = request.getFunctionID();
+    List<String> memberNameList = request.getGroupNameList();
+    Object arguments = getFunctionArguments(request, serializationService);
 
-  @Override
-  protected String getRegionName(ExecuteFunctionOnGroupRequest request) {
-    // region name is not allowed in onMember invocation
-    return null;
-  }
+    List<Object> results = messageExecutionContext.getAuthorizingCache().getFunctionService()
+        .executeFunctionOnGroups(functionID, arguments, memberNameList);
 
-  @Override
-  protected Object getExecutionTarget(ExecuteFunctionOnGroupRequest request, String regionName,
-      MessageExecutionContext executionContext) throws InvalidExecutionContextException {
+    final FunctionAPI.ExecuteFunctionOnGroupResponse.Builder responseMessage =
+        FunctionAPI.ExecuteFunctionOnGroupResponse.newBuilder();
 
-    ProtocolStringList groupList = request.getGroupNameList();
+    results.stream().map(serializationService::encode).forEach(responseMessage::addResults);
 
-    // unfortunately FunctionServiceManager throws a FunctionException if there are no
-    // servers matching any of the given groups. In order to distinguish between
-    // function execution failure and this condition we have to preprocess the groups
-    // and ensure that there is at least one server that has one of the given groups
-    DistributedSystem distributedSystem =
-        executionContext.getCache().getDistributionManager().getSystem();
-    boolean foundMatch = false;
-    for (String group : groupList) {
-      if (distributedSystem.getGroupMembers(group).size() > 0) {
-        foundMatch = true;
-        break;
-      }
-    }
-    if (!foundMatch) {
-      return Failure.of(BasicTypes.ErrorCode.NO_AVAILABLE_SERVER, "No server  in groups "
-          + groupList + " could be found to execute \"" + request.getFunctionID() + "\"");
-    }
-    return groupList;
+    return Success.of(responseMessage.build());
   }
 
-  @Override
-  protected Object getFunctionArguments(ExecuteFunctionOnGroupRequest request,
+  private Object getFunctionArguments(ExecuteFunctionOnGroupRequest request,
       ProtobufSerializationService serializationService) throws DecodingException {
     if (request.hasArguments()) {
       return serializationService.decode(request.getArguments());
@@ -92,26 +70,4 @@ public class ExecuteFunctionOnGroupRequestOperationHandler extends
     }
   }
 
-  @Override
-  protected Execution getFunctionExecutionObject(Object executionTarget) {
-    ProtocolStringList groupList = (ProtocolStringList) executionTarget;
-    return FunctionService.onMember(groupList.toArray(new String[0]));
-  }
-
-  @Override
-  protected Result buildResultMessage(ProtobufSerializationService serializationService,
-      List<Object> results) throws EncodingException {
-    final ExecuteFunctionOnGroupResponse.Builder responseMessage =
-        ExecuteFunctionOnGroupResponse.newBuilder();
-    for (Object result : results) {
-      responseMessage.addResults(serializationService.encode(result));
-    }
-    return Success.of(responseMessage.build());
-  }
-
-  @Override
-  protected Result buildResultMessage(ProtobufSerializationService serializationService) {
-    return Success.of(ExecuteFunctionOnGroupResponse.newBuilder().build());
-  }
-
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandler.java
index 63ca258..3a2bd82 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandler.java
@@ -14,19 +14,10 @@
  */
 package org.apache.geode.internal.protocol.protobuf.v1.operations;
 
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
-import com.google.protobuf.ProtocolStringList;
-
-import org.apache.geode.cache.execute.Execution;
-import org.apache.geode.cache.execute.FunctionService;
-import org.apache.geode.distributed.DistributedMember;
-import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
-import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
-import org.apache.geode.internal.protocol.protobuf.v1.Failure;
+import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI.ExecuteFunctionOnMemberRequest;
 import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI.ExecuteFunctionOnMemberResponse;
 import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
@@ -36,53 +27,31 @@ import org.apache.geode.internal.protocol.protobuf.v1.Success;
 import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.DecodingException;
 import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException;
 
-public class ExecuteFunctionOnMemberRequestOperationHandler extends
-    AbstractFunctionRequestOperationHandler<ExecuteFunctionOnMemberRequest, ExecuteFunctionOnMemberResponse> {
-
+public class ExecuteFunctionOnMemberRequestOperationHandler implements
+    ProtobufOperationHandler<ExecuteFunctionOnMemberRequest, ExecuteFunctionOnMemberResponse> {
 
   @Override
-  protected Set<?> parseFilter(ProtobufSerializationService serializationService,
-      ExecuteFunctionOnMemberRequest request) throws EncodingException {
-    // filters are not allowed on functions not associated with regions
-    return null;
-  }
+  public Result<ExecuteFunctionOnMemberResponse> process(
+      ProtobufSerializationService serializationService, ExecuteFunctionOnMemberRequest request,
+      MessageExecutionContext messageExecutionContext)
+      throws InvalidExecutionContextException, DecodingException, EncodingException {
 
-  @Override
-  protected String getFunctionID(ExecuteFunctionOnMemberRequest request) {
-    return request.getFunctionID();
-  }
+    final String functionID = request.getFunctionID();
+    List<String> memberNameList = request.getMemberNameList();
+    Object arguments = getFunctionArguments(request, serializationService);
 
-  @Override
-  protected String getRegionName(ExecuteFunctionOnMemberRequest request) {
-    // region name is not allowed in onMember invocation
-    return null;
-  }
+    List<Object> results = messageExecutionContext.getAuthorizingCache().getFunctionService()
+        .executeFunctionOnMember(functionID, arguments, memberNameList);
 
-  @Override
-  protected Object getExecutionTarget(ExecuteFunctionOnMemberRequest request, String regionName,
-      MessageExecutionContext executionContext) throws InvalidExecutionContextException {
+    final ExecuteFunctionOnMemberResponse.Builder responseMessage =
+        ExecuteFunctionOnMemberResponse.newBuilder();
 
-    ProtocolStringList memberNameList = request.getMemberNameList();
+    results.stream().map(serializationService::encode).forEach(responseMessage::addResults);
 
-    Set<DistributedMember> memberIds = new HashSet<>(memberNameList.size());
-    DistributionManager distributionManager = executionContext.getCache().getDistributionManager();
-    for (String name : memberNameList) {
-      DistributedMember member = distributionManager.getMemberWithName(name);
-      if (member == null) {
-        return Failure.of(BasicTypes.ErrorCode.NO_AVAILABLE_SERVER,
-            "Member " + name + " not found to execute \"" + request.getFunctionID() + "\"");
-      }
-      memberIds.add(member);
-    }
-    if (memberIds.isEmpty()) {
-      return Failure.of(BasicTypes.ErrorCode.NO_AVAILABLE_SERVER,
-          "No members found to execute \"" + request.getFunctionID() + "\"");
-    }
-    return memberIds;
+    return Success.of(responseMessage.build());
   }
 
-  @Override
-  protected Object getFunctionArguments(ExecuteFunctionOnMemberRequest request,
+  private Object getFunctionArguments(ExecuteFunctionOnMemberRequest request,
       ProtobufSerializationService serializationService) throws DecodingException {
     if (request.hasArguments()) {
       return serializationService.decode(request.getArguments());
@@ -90,31 +59,4 @@ public class ExecuteFunctionOnMemberRequestOperationHandler extends
       return null;
     }
   }
-
-  @Override
-  protected Execution getFunctionExecutionObject(Object executionTarget) {
-    Set<DistributedMember> memberIds = (Set<DistributedMember>) executionTarget;
-    if (memberIds.size() == 1) {
-      return FunctionService.onMember(memberIds.iterator().next());
-    } else {
-      return FunctionService.onMembers(memberIds);
-    }
-  }
-
-  @Override
-  protected Result buildResultMessage(ProtobufSerializationService serializationService,
-      List<Object> results) throws EncodingException {
-    final ExecuteFunctionOnMemberResponse.Builder responseMessage =
-        ExecuteFunctionOnMemberResponse.newBuilder();
-    for (Object result : results) {
-      responseMessage.addResults(serializationService.encode(result));
-    }
-    return Success.of(responseMessage.build());
-  }
-
-  @Override
-  protected Result buildResultMessage(ProtobufSerializationService serializationService) {
-    return Success.of(ExecuteFunctionOnMemberResponse.newBuilder().build());
-  }
-
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java
index 4fde3ff..7ab22ce 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java
@@ -20,27 +20,51 @@ import java.util.Set;
 
 import org.apache.logging.log4j.Logger;
 
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.execute.Execution;
-import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
-import org.apache.geode.internal.protocol.protobuf.v1.Failure;
 import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI.ExecuteFunctionOnRegionRequest;
 import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI.ExecuteFunctionOnRegionResponse;
 import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
 import org.apache.geode.internal.protocol.protobuf.v1.Result;
 import org.apache.geode.internal.protocol.protobuf.v1.Success;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.AuthorizingFunctionService;
 import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.DecodingException;
 import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException;
+import org.apache.geode.internal.protocol.protobuf.v1.state.exception.ConnectionStateException;
 
-public class ExecuteFunctionOnRegionRequestOperationHandler extends
-    AbstractFunctionRequestOperationHandler<ExecuteFunctionOnRegionRequest, ExecuteFunctionOnRegionResponse> {
+public class ExecuteFunctionOnRegionRequestOperationHandler implements
+    ProtobufOperationHandler<ExecuteFunctionOnRegionRequest, ExecuteFunctionOnRegionResponse> {
   private static final Logger logger = LogService.getLogger();
 
-  protected Set<Object> parseFilter(ProtobufSerializationService serializationService,
+  @Override
+  public Result<ExecuteFunctionOnRegionResponse> process(
+      ProtobufSerializationService serializationService, ExecuteFunctionOnRegionRequest request,
+      MessageExecutionContext messageExecutionContext) throws InvalidExecutionContextException,
+      ConnectionStateException, EncodingException, DecodingException {
+
+    final String functionID = request.getFunctionID();
+    final String regionName = request.getRegion();
+    Object arguments = getFunctionArguments(request, serializationService);
+    Set<?> filter = parseFilter(serializationService, request);
+
+    AuthorizingFunctionService functionService =
+        messageExecutionContext.getAuthorizingCache().getFunctionService();
+
+    List<Object> results =
+        functionService.executeFunctionOnRegion(functionID, regionName, arguments, filter);
+
+    final ExecuteFunctionOnRegionResponse.Builder responseMessage =
+        ExecuteFunctionOnRegionResponse.newBuilder();
+    for (Object result : results) {
+      responseMessage.addResults(serializationService.encode(result));
+    }
+    return Success.of(responseMessage.build());
+  }
+
+  private Set<Object> parseFilter(ProtobufSerializationService serializationService,
       ExecuteFunctionOnRegionRequest request) throws DecodingException {
     List<BasicTypes.EncodedValue> encodedFilter = request.getKeyFilterList();
     Set<Object> filter = new HashSet<>();
@@ -51,31 +75,7 @@ public class ExecuteFunctionOnRegionRequestOperationHandler extends
     return filter;
   }
 
-  @Override
-  protected String getFunctionID(ExecuteFunctionOnRegionRequest request) {
-    return request.getFunctionID();
-  }
-
-  @Override
-  protected String getRegionName(ExecuteFunctionOnRegionRequest request) {
-    return request.getRegion();
-  }
-
-  @Override
-  protected Object getExecutionTarget(ExecuteFunctionOnRegionRequest request, String regionName,
-      MessageExecutionContext executionContext) throws InvalidExecutionContextException {
-    final Region<Object, Object> region = executionContext.getCache().getRegion(regionName);
-    if (region == null) {
-      logger.error("Received execute-function-on-region request for nonexistent region: {}",
-          regionName);
-      return Failure.of(BasicTypes.ErrorCode.SERVER_ERROR,
-          "Region \"" + regionName + "\" not found");
-    }
-    return region;
-  }
-
-  @Override
-  protected Object getFunctionArguments(ExecuteFunctionOnRegionRequest request,
+  private Object getFunctionArguments(ExecuteFunctionOnRegionRequest request,
       ProtobufSerializationService serializationService) throws DecodingException {
     if (request.hasArguments()) {
       return serializationService.decode(request.getArguments());
@@ -83,26 +83,4 @@ public class ExecuteFunctionOnRegionRequestOperationHandler extends
       return null;
     }
   }
-
-  @Override
-  protected Execution getFunctionExecutionObject(Object executionTarget) {
-    return FunctionService.onRegion((Region) executionTarget);
-  }
-
-  @Override
-  protected Result buildResultMessage(ProtobufSerializationService serializationService,
-      List<Object> results) throws EncodingException {
-    final ExecuteFunctionOnRegionResponse.Builder responseMessage =
-        ExecuteFunctionOnRegionResponse.newBuilder();
-    for (Object result : results) {
-      responseMessage.addResults(serializationService.encode(result));
-    }
-    return Success.of(responseMessage.build());
-  }
-
-  @Override
-  protected Result buildResultMessage(ProtobufSerializationService serializationService) {
-    return Success.of(ExecuteFunctionOnRegionResponse.newBuilder().build());
-  }
-
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetAllRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetAllRequestOperationHandler.java
index 0ca071c..31602da 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetAllRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetAllRequestOperationHandler.java
@@ -14,32 +14,26 @@
  */
 package org.apache.geode.internal.protocol.protobuf.v1.operations;
 
-import static org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCode.INVALID_REQUEST;
 import static org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCode.SERVER_ERROR;
 
+import java.util.Collection;
+
 import org.apache.logging.log4j.Logger;
-import org.apache.shiro.authz.AuthorizationException;
-import org.apache.shiro.util.ThreadState;
 
 import org.apache.geode.annotations.Experimental;
-import org.apache.geode.cache.Region;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
-import org.apache.geode.internal.protocol.protobuf.v1.Failure;
 import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
 import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI;
 import org.apache.geode.internal.protocol.protobuf.v1.Result;
 import org.apache.geode.internal.protocol.protobuf.v1.Success;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.AuthorizingCache;
 import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.DecodingException;
-import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException;
-import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionAuthorizingStateProcessor;
+import org.apache.geode.internal.protocol.protobuf.v1.state.exception.ConnectionStateException;
 import org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufUtilities;
-import org.apache.geode.internal.security.SecurityService;
-import org.apache.geode.security.NotAuthorizedException;
-import org.apache.geode.security.ResourcePermission;
 
 @Experimental
 public class GetAllRequestOperationHandler
@@ -49,96 +43,32 @@ public class GetAllRequestOperationHandler
   @Override
   public Result<RegionAPI.GetAllResponse> process(ProtobufSerializationService serializationService,
       RegionAPI.GetAllRequest request, MessageExecutionContext messageExecutionContext)
-      throws InvalidExecutionContextException, DecodingException {
+      throws InvalidExecutionContextException, DecodingException, ConnectionStateException {
     String regionName = request.getRegionName();
-    Region region = messageExecutionContext.getCache().getRegion(regionName);
-    if (region == null) {
-      logger.error("Received get-all request for nonexistent region: {}", regionName);
-      return Failure.of(BasicTypes.ErrorCode.SERVER_ERROR,
-          "Region \"" + regionName + "\" not found");
-    }
-
-    ThreadState threadState = null;
-    SecurityService securityService = messageExecutionContext.getCache().getSecurityService();
-    boolean perKeyAuthorization = false;
-    if (messageExecutionContext
-        .getConnectionStateProcessor() instanceof ProtobufConnectionAuthorizingStateProcessor) {
-      threadState = ((ProtobufConnectionAuthorizingStateProcessor) messageExecutionContext
-          .getConnectionStateProcessor()).prepareThreadForAuthorization();
-      // Check if authorized for entire region
-      try {
-        securityService.authorize(new ResourcePermission(ResourcePermission.Resource.DATA,
-            ResourcePermission.Operation.READ, regionName));
-        ((ProtobufConnectionAuthorizingStateProcessor) messageExecutionContext
-            .getConnectionStateProcessor()).restoreThreadState(threadState);
-        threadState = null;
-      } catch (NotAuthorizedException ex) {
-        // Not authorized for the region, have to check keys individually
-        perKeyAuthorization = true;
-      }
-    }
-    final boolean authorizeKeys = perKeyAuthorization; // Required for use in lambda
 
-    long startTime = messageExecutionContext.getStatistics().startOperation();
     RegionAPI.GetAllResponse.Builder responseBuilder = RegionAPI.GetAllResponse.newBuilder();
-    try {
-      messageExecutionContext.getCache().setReadSerializedForCurrentThread(true);
-      request.getKeyList().stream().forEach((key) -> processSingleKey(responseBuilder,
-          serializationService, region, key, securityService, authorizeKeys));
-    } finally {
-      messageExecutionContext.getCache().setReadSerializedForCurrentThread(false);
-      messageExecutionContext.getStatistics().endOperation(startTime);
-      if (threadState != null) {
-        ((ProtobufConnectionAuthorizingStateProcessor) messageExecutionContext
-            .getConnectionStateProcessor()).restoreThreadState(threadState);
-      }
-    }
+    AuthorizingCache cache = messageExecutionContext.getAuthorizingCache();
+    Collection<Object> keys = serializationService.decodeList(request.getKeyList());
+    cache.getAll(regionName, keys,
+        (key, value) -> addEntry(serializationService, responseBuilder, key, value),
+        (key, exception) -> addException(serializationService, responseBuilder, key, exception));
 
     return Success.of(responseBuilder.build());
   }
 
-  private void processSingleKey(RegionAPI.GetAllResponse.Builder responseBuilder,
-      ProtobufSerializationService serializationService, Region region, BasicTypes.EncodedValue key,
-      SecurityService securityService, boolean authorizeKeys) {
-    try {
-
-      Object decodedKey = serializationService.decode(key);
-      if (decodedKey == null) {
-        responseBuilder
-            .addFailures(buildKeyedError(key, "NULL is not a valid key for get.", INVALID_REQUEST));
-        return;
-      }
-      if (authorizeKeys) {
-        securityService.authorize(new ResourcePermission(ResourcePermission.Resource.DATA,
-            ResourcePermission.Operation.READ, region.getName(), decodedKey.toString()));
-      }
-      Object value = region.get(decodedKey);
-      BasicTypes.Entry entry =
-          ProtobufUtilities.createEntry(serializationService, decodedKey, value);
-      responseBuilder.addEntries(entry);
-
-    } catch (NotAuthorizedException ex) {
-      responseBuilder.addFailures(
-          buildKeyedError(key, "Unauthorized access", BasicTypes.ErrorCode.AUTHORIZATION_FAILED));
-    } catch (DecodingException ex) {
-      logger.info("Key encoding not supported: {}", ex);
-      responseBuilder
-          .addFailures(buildKeyedError(key, "Key encoding not supported.", INVALID_REQUEST));
-    } catch (EncodingException ex) {
-      logger.info("Value encoding not supported: {}", ex);
-      responseBuilder
-          .addFailures(buildKeyedError(key, "Value encoding not supported.", INVALID_REQUEST));
-    } catch (Exception ex) {
-      logger.warn("Failure in protobuf getAll operation for key: " + key, ex);
-      responseBuilder.addFailures(buildKeyedError(key, ex.toString(), SERVER_ERROR));
-    }
-  }
-
-  private BasicTypes.KeyedError buildKeyedError(BasicTypes.EncodedValue key, String errorMessage,
-      BasicTypes.ErrorCode errorCode) {
-    return BasicTypes.KeyedError.newBuilder().setKey(key)
-        .setError(BasicTypes.Error.newBuilder().setErrorCode(errorCode).setMessage(errorMessage))
+  private void addException(ProtobufSerializationService serializationService,
+      RegionAPI.GetAllResponse.Builder responseBuilder, Object key, Object exception) {
+    logger.warn("Failure in protobuf getAll operation for key: " + key, exception);
+    BasicTypes.EncodedValue encodedKey = serializationService.encode(key);
+    BasicTypes.KeyedError failure = BasicTypes.KeyedError.newBuilder().setKey(encodedKey).setError(
+        BasicTypes.Error.newBuilder().setErrorCode(SERVER_ERROR).setMessage(exception.toString()))
         .build();
+    responseBuilder.addFailures(failure);
   }
 
+  private void addEntry(ProtobufSerializationService serializationService,
+      RegionAPI.GetAllResponse.Builder responseBuilder, Object key, Object value) {
+    BasicTypes.Entry entry = ProtobufUtilities.createEntry(serializationService, key, value);
+    responseBuilder.addEntries(entry);
+  }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetRegionNamesRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetRegionNamesRequestOperationHandler.java
index e6b9fe0..dc4cddd 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetRegionNamesRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetRegionNamesRequestOperationHandler.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.internal.protocol.protobuf.v1.operations;
 
+import java.util.Collection;
 import java.util.Set;
 
 import org.apache.geode.annotations.Experimental;
@@ -34,13 +35,12 @@ public class GetRegionNamesRequestOperationHandler implements
   public Result<RegionAPI.GetRegionNamesResponse> process(
       ProtobufSerializationService serializationService, RegionAPI.GetRegionNamesRequest request,
       MessageExecutionContext messageExecutionContext) throws InvalidExecutionContextException {
-    Set<Region<?, ?>> regions = messageExecutionContext.getCache().rootRegions();
+
+    Collection<String> regions = messageExecutionContext.getAuthorizingCache().getRegionNames();
 
     RegionAPI.GetRegionNamesResponse.Builder builder =
         RegionAPI.GetRegionNamesResponse.newBuilder();
-    for (Region region : regions) {
-      builder.addRegions(region.getName());
-    }
+    regions.forEach(builder::addRegions);
 
     return Success.of(builder.build());
   }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetRequestOperationHandler.java
index 089274b..4366c7f 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetRequestOperationHandler.java
@@ -28,6 +28,7 @@ import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationServi
 import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI;
 import org.apache.geode.internal.protocol.protobuf.v1.Result;
 import org.apache.geode.internal.protocol.protobuf.v1.Success;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.AuthorizingCache;
 import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.DecodingException;
 import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException;
 import org.apache.geode.security.ResourcePermission;
@@ -35,40 +36,23 @@ import org.apache.geode.security.ResourcePermission;
 @Experimental
 public class GetRequestOperationHandler
     implements ProtobufOperationHandler<RegionAPI.GetRequest, RegionAPI.GetResponse> {
-  private static final Logger logger = LogService.getLogger();
 
   @Override
   public Result<RegionAPI.GetResponse> process(ProtobufSerializationService serializationService,
       RegionAPI.GetRequest request, MessageExecutionContext messageExecutionContext)
       throws InvalidExecutionContextException, EncodingException, DecodingException {
     String regionName = request.getRegionName();
-    Region region = messageExecutionContext.getCache().getRegion(regionName);
-    if (region == null) {
-      logger.error("Received get request for nonexistent region: {}", regionName);
-      return Failure.of(BasicTypes.ErrorCode.SERVER_ERROR,
-          "Region \"" + regionName + "\" not found");
-    }
-
-    try {
-      messageExecutionContext.getCache().setReadSerializedForCurrentThread(true);
 
-      Object decodedKey = serializationService.decode(request.getKey());
-      if (decodedKey == null) {
-        return Failure.of(BasicTypes.ErrorCode.INVALID_REQUEST, "Performing a get on a NULL key.");
-      }
-      Object resultValue = region.get(decodedKey);
 
-      BasicTypes.EncodedValue encodedValue = serializationService.encode(resultValue);
-      return Success.of(RegionAPI.GetResponse.newBuilder().setResult(encodedValue).build());
-    } finally {
-      messageExecutionContext.getCache().setReadSerializedForCurrentThread(false);
+    Object decodedKey = serializationService.decode(request.getKey());
+    if (decodedKey == null) {
+      return Failure.of(BasicTypes.ErrorCode.INVALID_REQUEST, "Performing a get on a NULL key.");
     }
-  }
 
-  public static ResourcePermission determineRequiredPermission(RegionAPI.GetRequest request,
-      ProtobufSerializationService serializer) throws DecodingException {
-    return new ResourcePermission(ResourcePermission.Resource.DATA,
-        ResourcePermission.Operation.READ, request.getRegionName(),
-        serializer.decode(request.getKey()).toString());
+    AuthorizingCache authorizingCache = messageExecutionContext.getAuthorizingCache();
+    Object resultValue = authorizingCache.get(regionName, decodedKey);
+
+    BasicTypes.EncodedValue encodedValue = serializationService.encode(resultValue);
+    return Success.of(RegionAPI.GetResponse.newBuilder().setResult(encodedValue).build());
   }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetServerOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetServerOperationHandler.java
index 528bc2e..5752bb1 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetServerOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetServerOperationHandler.java
@@ -59,20 +59,9 @@ public class GetServerOperationHandler
 
     messageExecutionContext
         .setConnectionStateProcessor(new ProtobufConnectionTerminatingStateProcessor());
-    InternalLocator internalLocator = (InternalLocator) messageExecutionContext.getLocator();
 
-    // In order to ensure that proper checks are performed on the request we will use
-    // the locator's processRequest() API. We assume that all servers have Protobuf
-    // enabled.
-    ClientConnectionRequest clientConnectionRequest =
-        new ClientConnectionRequest(excludedServers, serverGroup);
-    ClientConnectionResponse connectionResponse = (ClientConnectionResponse) internalLocator
-        .getServerLocatorAdvisee().processRequest(clientConnectionRequest);
-
-    ServerLocation serverLocation = null;
-    if (connectionResponse != null && connectionResponse.hasResult()) {
-      serverLocation = connectionResponse.getServer();
-    }
+    ServerLocation serverLocation =
+        messageExecutionContext.getAuthorizingLocator().findServer(excludedServers, serverGroup);
 
     if (serverLocation == null) {
       StringBuilder builder = new StringBuilder("Unable to find a server");
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetSizeRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetSizeRequestOperationHandler.java
index eed38c1..ed0e993 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetSizeRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetSizeRequestOperationHandler.java
@@ -40,13 +40,8 @@ public class GetSizeRequestOperationHandler
       MessageExecutionContext messageExecutionContext) throws InvalidExecutionContextException {
     String regionName = request.getRegionName();
 
-    Region region = messageExecutionContext.getCache().getRegion(regionName);
-    if (region == null) {
-      logger.error("Received GetRegion request for non-existing region {}", regionName);
-      return Failure.of(BasicTypes.ErrorCode.SERVER_ERROR,
-          "No region exists for name: " + regionName);
-    }
+    int size = messageExecutionContext.getAuthorizingCache().getSize(regionName);
 
-    return Success.of(RegionAPI.GetSizeResponse.newBuilder().setSize(region.size()).build());
+    return Success.of(RegionAPI.GetSizeResponse.newBuilder().setSize(size).build());
   }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/KeySetOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/KeySetOperationHandler.java
index ecab692..298f624 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/KeySetOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/KeySetOperationHandler.java
@@ -38,31 +38,18 @@ import org.apache.geode.security.ResourcePermission;
 @Experimental
 public class KeySetOperationHandler
     implements ProtobufOperationHandler<RegionAPI.KeySetRequest, RegionAPI.KeySetResponse> {
-  private static final Logger logger = LogService.getLogger();
 
   @Override
   public Result<RegionAPI.KeySetResponse> process(ProtobufSerializationService serializationService,
       RegionAPI.KeySetRequest request, MessageExecutionContext messageExecutionContext)
       throws InvalidExecutionContextException, EncodingException, DecodingException {
     String regionName = request.getRegionName();
-    Region region = messageExecutionContext.getCache().getRegion(regionName);
-    if (region == null) {
-      logger.error("Received request for nonexistent region: {}", regionName);
-      return Failure.of(BasicTypes.ErrorCode.SERVER_ERROR,
-          "Region \"" + regionName + "\" not found");
-    }
 
-    Set keySet = region.keySet();
+    Set<Object> keySet = messageExecutionContext.getAuthorizingCache().keySet(regionName);
+
     RegionAPI.KeySetResponse.Builder builder = RegionAPI.KeySetResponse.newBuilder();
-    keySet.stream().map(serializationService::encode)
-        .forEach(value -> builder.addKeys((BasicTypes.EncodedValue) value));
+    keySet.stream().map(serializationService::encode).forEach(builder::addKeys);
 
     return Success.of(builder.build());
   }
-
-  public static ResourcePermission determineRequiredPermission(RegionAPI.KeySetRequest request,
-      ProtobufSerializationService serializer) throws DecodingException {
-    return new ResourcePermission(ResourcePermission.Resource.DATA,
-        ResourcePermission.Operation.READ, request.getRegionName());
-  }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/OqlQueryRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/OqlQueryRequestOperationHandler.java
index faabb91..1b5d1a6 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/OqlQueryRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/OqlQueryRequestOperationHandler.java
@@ -61,39 +61,14 @@ public class OqlQueryRequestOperationHandler
       DecodingException {
     String queryString = request.getQuery();
     List<EncodedValue> encodedParameters = request.getBindParameterList();
-
-    InternalQueryService queryService = messageExecutionContext.getCache().getQueryService();
-
-    Query query = queryService.newQuery(queryString);
     Object[] bindParameters = decodeBindParameters(serializationService, encodedParameters);
 
-    if (messageExecutionContext
-        .getConnectionStateProcessor() instanceof ProtobufConnectionAuthorizingStateProcessor) {
-      final SecurityService securityService =
-          messageExecutionContext.getCache().getSecurityService();
-      ThreadState threadState =
-          ((ProtobufConnectionAuthorizingStateProcessor) messageExecutionContext
-              .getConnectionStateProcessor()).prepareThreadForAuthorization();
-      try {
-        for (String regionName : ((DefaultQuery) query).getRegionsInQuery(bindParameters)) {
-          securityService.authorize(ResourcePermission.Resource.DATA,
-              ResourcePermission.Operation.READ, regionName);
-        }
-      } catch (NotAuthorizedException ex) {
-        messageExecutionContext.getStatistics().incAuthorizationViolations();
-        throw new OperationNotAuthorizedException(
-            "The user is not authorized to complete this operation");
-      } finally {
-        ((ProtobufConnectionAuthorizingStateProcessor) messageExecutionContext
-            .getConnectionStateProcessor()).restoreThreadState(threadState);
-      }
-    }
-
     try {
-      Object results = query.execute(bindParameters);
+      Object results =
+          messageExecutionContext.getAuthorizingCache().query(queryString, bindParameters);
       return Success.of(encodeResults(serializationService, results));
     } catch (QueryException e) {
-      logger.info("Query failed: " + query, e);
+      logger.info("Query failed: " + queryString, e);
       return Failure.of(e);
     }
 
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutAllRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutAllRequestOperationHandler.java
index c882cbe..55146d4 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutAllRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutAllRequestOperationHandler.java
@@ -14,30 +14,28 @@
  */
 package org.apache.geode.internal.protocol.protobuf.v1.operations;
 
+import static org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCode.AUTHORIZATION_FAILED;
 import static org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCode.INVALID_REQUEST;
 import static org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCode.SERVER_ERROR;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.apache.shiro.util.ThreadState;
 
 import org.apache.geode.annotations.Experimental;
-import org.apache.geode.cache.Region;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
 import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
-import org.apache.geode.internal.protocol.protobuf.v1.Failure;
 import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
 import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI;
 import org.apache.geode.internal.protocol.protobuf.v1.Result;
 import org.apache.geode.internal.protocol.protobuf.v1.Success;
-import org.apache.geode.internal.protocol.protobuf.v1.serialization.SerializationService;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.AuthorizingCache;
 import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.DecodingException;
-import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionAuthorizingStateProcessor;
-import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.security.NotAuthorizedException;
-import org.apache.geode.security.ResourcePermission;
 
 @Experimental
 public class PutAllRequestOperationHandler
@@ -49,88 +47,34 @@ public class PutAllRequestOperationHandler
       RegionAPI.PutAllRequest putAllRequest, MessageExecutionContext messageExecutionContext)
       throws InvalidExecutionContextException, DecodingException {
     String regionName = putAllRequest.getRegionName();
-    Region region = messageExecutionContext.getCache().getRegion(regionName);
-
-    if (region == null) {
-      logger.error("Received put-all request for nonexistent region: {}", regionName);
-      return Failure.of(BasicTypes.ErrorCode.SERVER_ERROR,
-          "Region \"" + regionName + "\" not found");
-    }
-
-    ThreadState threadState = null;
-    SecurityService securityService = messageExecutionContext.getCache().getSecurityService();
-    boolean perKeyAuthorization = false;
-    if (messageExecutionContext
-        .getConnectionStateProcessor() instanceof ProtobufConnectionAuthorizingStateProcessor) {
-      threadState = ((ProtobufConnectionAuthorizingStateProcessor) messageExecutionContext
-          .getConnectionStateProcessor()).prepareThreadForAuthorization();
-      // Check if authorized for entire region
-      try {
-        securityService.authorize(new ResourcePermission(ResourcePermission.Resource.DATA,
-            ResourcePermission.Operation.WRITE, regionName));
-        ((ProtobufConnectionAuthorizingStateProcessor) messageExecutionContext
-            .getConnectionStateProcessor()).restoreThreadState(threadState);
-        threadState = null;
-      } catch (NotAuthorizedException ex) {
-        // Not authorized for the region, have to check keys individually
-        perKeyAuthorization = true;
-      }
-    }
-    final boolean authorizeKeys = perKeyAuthorization; // Required for use in lambda
 
-    long startTime = messageExecutionContext.getStatistics().startOperation();
     RegionAPI.PutAllResponse.Builder builder = RegionAPI.PutAllResponse.newBuilder();
-    try {
-      messageExecutionContext.getCache().setReadSerializedForCurrentThread(true);
+    AuthorizingCache cache = messageExecutionContext.getAuthorizingCache();
+    Map<Object, Object> entries = new HashMap<>(putAllRequest.getEntryList().size());
+
+    putAllRequest.getEntryList()
+        .forEach(entry -> entries.put(serializationService.decode(entry.getKey()),
+            serializationService.decode(entry.getValue())));
+    cache.putAll(regionName, entries,
+        (key, exception) -> addError(builder, serializationService.encode(key), exception));
 
-      putAllRequest.getEntryList().stream().forEach((entry) -> processSinglePut(builder,
-          serializationService, region, entry, securityService, authorizeKeys));
 
-    } finally {
-      messageExecutionContext.getCache().setReadSerializedForCurrentThread(false);
-      if (threadState != null) {
-        ((ProtobufConnectionAuthorizingStateProcessor) messageExecutionContext
-            .getConnectionStateProcessor()).restoreThreadState(threadState);
-      }
-    }
     return Success.of(builder.build());
   }
 
-  private void processSinglePut(RegionAPI.PutAllResponse.Builder builder,
-      SerializationService serializationService, Region region, BasicTypes.Entry entry,
-      SecurityService securityService, boolean authorizeKeys) {
-    try {
+  private void addError(RegionAPI.PutAllResponse.Builder builder, BasicTypes.EncodedValue key,
+      Exception exception) {
 
-      Object decodedKey = serializationService.decode(entry.getKey());
-      Object decodedValue = serializationService.decode(entry.getValue());
-      if (decodedKey == null || decodedValue == null) {
-        builder.addFailedKeys(
-            buildKeyedError(entry, INVALID_REQUEST, "Key and value must both be non-NULL"));
-      }
-      if (authorizeKeys) {
-        securityService.authorize(new ResourcePermission(ResourcePermission.Resource.DATA,
-            ResourcePermission.Operation.WRITE, region.getName(), decodedKey.toString()));
-      }
-      region.put(decodedKey, decodedValue);
-
-    } catch (NotAuthorizedException ex) {
-      builder.addFailedKeys(
-          buildKeyedError(entry, BasicTypes.ErrorCode.AUTHORIZATION_FAILED, "Unauthorized access"));
-    } catch (DecodingException ex) {
-      logger.info("Encoding not supported: " + ex);
-      builder.addFailedKeys(this.buildKeyedError(entry, INVALID_REQUEST, "Encoding not supported"));
-    } catch (ClassCastException ex) {
-      builder.addFailedKeys(buildKeyedError(entry, SERVER_ERROR, ex.toString()));
-    } catch (Exception ex) {
-      logger.warn("Error processing putAll entry", ex);
-      builder.addFailedKeys(buildKeyedError(entry, SERVER_ERROR, ex.toString()));
+    BasicTypes.ErrorCode errorCode;
+    if (exception instanceof NotAuthorizedException) {
+      errorCode = AUTHORIZATION_FAILED;
+    } else if (exception instanceof DecodingException) {
+      errorCode = INVALID_REQUEST;
+    } else {
+      errorCode = SERVER_ERROR;
     }
-  }
 
-  private BasicTypes.KeyedError buildKeyedError(BasicTypes.Entry entry,
-      BasicTypes.ErrorCode errorCode, String message) {
-    return BasicTypes.KeyedError.newBuilder().setKey(entry.getKey())
-        .setError(BasicTypes.Error.newBuilder().setErrorCode(errorCode).setMessage(message))
-        .build();
+    builder.addFailedKeys(BasicTypes.KeyedError.newBuilder().setKey(key).setError(
+        BasicTypes.Error.newBuilder().setErrorCode(errorCode).setMessage(exception.toString())));
   }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutIfAbsentRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutIfAbsentRequestOperationHandler.java
index 9713183..f061ea0 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutIfAbsentRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutIfAbsentRequestOperationHandler.java
@@ -34,7 +34,6 @@ import org.apache.geode.security.ResourcePermission;
 
 public class PutIfAbsentRequestOperationHandler implements
     ProtobufOperationHandler<RegionAPI.PutIfAbsentRequest, RegionAPI.PutIfAbsentResponse> {
-  private static final Logger logger = LogManager.getLogger();
 
   @Override
   public Result<RegionAPI.PutIfAbsentResponse> process(
@@ -44,35 +43,15 @@ public class PutIfAbsentRequestOperationHandler implements
 
     final String regionName = request.getRegionName();
 
-    Region<Object, Object> region;
-    try {
-      region = messageExecutionContext.getCache().getRegion(regionName);
-    } catch (IllegalArgumentException ex) {
-      return Failure.of(BasicTypes.ErrorCode.INVALID_REQUEST,
-          "Invalid region name: \"" + regionName + "\"");
-    }
-
-    if (region == null) {
-      logger.error("Received PutIfAbsentRequest for nonexistent region: {}", regionName);
-      return Failure.of(BasicTypes.ErrorCode.SERVER_ERROR,
-          "Region \"" + regionName + "\" not found");
-    }
-
     final BasicTypes.Entry entry = request.getEntry();
 
     Object decodedValue = serializationService.decode(entry.getValue());
     Object decodedKey = serializationService.decode(entry.getKey());
 
-    final Object oldValue = region.putIfAbsent(decodedKey, decodedValue);
+    final Object oldValue = messageExecutionContext.getAuthorizingCache().putIfAbsent(regionName,
+        decodedKey, decodedValue);
 
     return Success.of(RegionAPI.PutIfAbsentResponse.newBuilder()
         .setOldValue(serializationService.encode(oldValue)).build());
   }
-
-  public static ResourcePermission determineRequiredPermission(RegionAPI.PutIfAbsentRequest request,
-      ProtobufSerializationService serializer) throws DecodingException {
-    return new ResourcePermission(ResourcePermission.Resource.DATA,
-        ResourcePermission.Operation.WRITE, request.getRegionName(),
-        serializer.decode(request.getEntry().getKey()).toString());
-  }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutRequestOperationHandler.java
index 23448ed..dad6ac5 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutRequestOperationHandler.java
@@ -28,6 +28,7 @@ import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationServi
 import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI;
 import org.apache.geode.internal.protocol.protobuf.v1.Result;
 import org.apache.geode.internal.protocol.protobuf.v1.Success;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.AuthorizingCache;
 import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.DecodingException;
 import org.apache.geode.security.ResourcePermission;
 
@@ -41,30 +42,13 @@ public class PutRequestOperationHandler
       RegionAPI.PutRequest request, MessageExecutionContext messageExecutionContext)
       throws InvalidExecutionContextException, DecodingException {
     String regionName = request.getRegionName();
-    Region region = messageExecutionContext.getCache().getRegion(regionName);
-    if (region == null) {
-      logger.error("Received put request for nonexistent region: {}", regionName);
-      return Failure.of(BasicTypes.ErrorCode.SERVER_ERROR,
-          "Region \"" + regionName + "\" not found");
-    }
-
     BasicTypes.Entry entry = request.getEntry();
 
     Object decodedValue = serializationService.decode(entry.getValue());
     Object decodedKey = serializationService.decode(entry.getKey());
-    if (decodedKey == null || decodedValue == null) {
-      return Failure.of(BasicTypes.ErrorCode.INVALID_REQUEST,
-          "Key and value must both be non-NULL");
-    }
 
-    region.put(decodedKey, decodedValue);
+    AuthorizingCache cache = messageExecutionContext.getAuthorizingCache();
+    cache.put(regionName, decodedKey, decodedValue);
     return Success.of(RegionAPI.PutResponse.newBuilder().build());
   }
-
-  public static ResourcePermission determineRequiredPermission(RegionAPI.PutRequest request,
-      ProtobufSerializationService serializer) throws DecodingException {
-    return new ResourcePermission(ResourcePermission.Resource.DATA,
-        ResourcePermission.Operation.WRITE, request.getRegionName(),
-        serializer.decode(request.getEntry().getKey()).toString());
-  }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/RemoveRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/RemoveRequestOperationHandler.java
index b2ba6f7..7731b9b 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/RemoveRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/RemoveRequestOperationHandler.java
@@ -42,27 +42,15 @@ public class RemoveRequestOperationHandler
       throws InvalidExecutionContextException, DecodingException {
 
     String regionName = request.getRegionName();
-    Region region = messageExecutionContext.getCache().getRegion(regionName);
-    if (region == null) {
-      logger.error("Received remove request for nonexistent region: {}", regionName);
-      return Failure.of(BasicTypes.ErrorCode.SERVER_ERROR,
-          "Region \"" + regionName + "\" not found");
-    }
 
     Object decodedKey = serializationService.decode(request.getKey());
     if (decodedKey == null) {
       return Failure.of(BasicTypes.ErrorCode.INVALID_REQUEST,
           "NULL is not a valid key for removal.");
     }
-    region.remove(decodedKey);
 
-    return Success.of(RegionAPI.RemoveResponse.newBuilder().build());
-  }
+    messageExecutionContext.getAuthorizingCache().remove(regionName, decodedKey);
 
-  public static ResourcePermission determineRequiredPermission(RegionAPI.RemoveRequest request,
-      ProtobufSerializationService serializer) throws DecodingException {
-    return new ResourcePermission(ResourcePermission.Resource.DATA,
-        ResourcePermission.Operation.WRITE, request.getRegionName(),
-        serializer.decode(request.getKey()).toString());
+    return Success.of(RegionAPI.RemoveResponse.newBuilder().build());
   }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/security/AuthenticationRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/security/AuthenticationRequestOperationHandler.java
index d323593..21ad411 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/security/AuthenticationRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/security/AuthenticationRequestOperationHandler.java
@@ -51,7 +51,8 @@ public class AuthenticationRequestOperationHandler implements
     properties.putAll(request.getCredentialsMap());
 
     try {
-      ProtobufConnectionStateProcessor nextState = stateProcessor.authenticate(properties);
+      ProtobufConnectionStateProcessor nextState =
+          stateProcessor.authenticate(messageExecutionContext, properties);
       messageExecutionContext.setConnectionStateProcessor(nextState);
       return Success
           .of(ConnectionAPI.AuthenticationResponse.newBuilder().setAuthenticated(true).build());
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
index 64a5cba..4013b3a 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
@@ -15,6 +15,8 @@
 
 package org.apache.geode.internal.protocol.protobuf.v1.registry;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -22,7 +24,6 @@ import org.apache.geode.annotations.Experimental;
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message.MessageTypeCase;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufOperationContext;
-import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
 import org.apache.geode.internal.protocol.protobuf.v1.operations.ClearRequestOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.operations.DisconnectClientRequestOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnGroupRequestOperationHandler;
@@ -40,145 +41,106 @@ import org.apache.geode.internal.protocol.protobuf.v1.operations.PutIfAbsentRequ
 import org.apache.geode.internal.protocol.protobuf.v1.operations.PutRequestOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.operations.RemoveRequestOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.operations.security.AuthenticationRequestOperationHandler;
-import org.apache.geode.management.internal.security.ResourcePermissions;
-import org.apache.geode.security.ResourcePermission;
-import org.apache.geode.security.ResourcePermission.Operation;
-import org.apache.geode.security.ResourcePermission.Resource;
 
 @Experimental
 public class ProtobufOperationContextRegistry {
-  private final Map<MessageTypeCase, ProtobufOperationContext> operationContexts =
-      new ConcurrentHashMap<>();
+  private final Map<MessageTypeCase, ProtobufOperationContext> operationContexts;
 
   public ProtobufOperationContextRegistry() {
-    addContexts();
+    operationContexts = Collections.unmodifiableMap(generateContexts());
   }
 
   public ProtobufOperationContext getOperationContext(MessageTypeCase apiCase) {
     return operationContexts.get(apiCase);
   }
 
-  private final ResourcePermission noneRequired =
-      new ResourcePermission(ResourcePermission.NULL, ResourcePermission.NULL);
+  private Map<MessageTypeCase, ProtobufOperationContext> generateContexts() {
+    final Map<MessageTypeCase, ProtobufOperationContext> operationContexts = new HashMap<>();
 
-  private ResourcePermission skipAuthorizationCheck(Object unused,
-      ProtobufSerializationService unused2) {
-    return noneRequired;
-  }
-
-  private void addContexts() {
     operationContexts.put(MessageTypeCase.AUTHENTICATIONREQUEST,
         new ProtobufOperationContext<>(ClientProtocol.Message::getAuthenticationRequest,
             new AuthenticationRequestOperationHandler(),
-            opsResp -> ClientProtocol.Message.newBuilder().setAuthenticationResponse(opsResp),
-            this::skipAuthorizationCheck));
+            opsResp -> ClientProtocol.Message.newBuilder().setAuthenticationResponse(opsResp)));
 
     operationContexts.put(MessageTypeCase.DISCONNECTCLIENTREQUEST,
         new ProtobufOperationContext<>(ClientProtocol.Message::getDisconnectClientRequest,
             new DisconnectClientRequestOperationHandler(),
-            opsResp -> ClientProtocol.Message.newBuilder().setDisconnectClientResponse(opsResp),
-            this::skipAuthorizationCheck));
+            opsResp -> ClientProtocol.Message.newBuilder().setDisconnectClientResponse(opsResp)));
 
     operationContexts.put(MessageTypeCase.GETREQUEST,
         new ProtobufOperationContext<>(ClientProtocol.Message::getGetRequest,
             new GetRequestOperationHandler(),
-            opsResp -> ClientProtocol.Message.newBuilder().setGetResponse(opsResp),
-            GetRequestOperationHandler::determineRequiredPermission));
+            opsResp -> ClientProtocol.Message.newBuilder().setGetResponse(opsResp)));
 
     operationContexts.put(MessageTypeCase.GETALLREQUEST,
         new ProtobufOperationContext<>(ClientProtocol.Message::getGetAllRequest,
             new GetAllRequestOperationHandler(),
-            opsResp -> ClientProtocol.Message.newBuilder().setGetAllResponse(opsResp),
-            // May require per-key checks, will be handled by OperationHandler
-            this::skipAuthorizationCheck));
+            opsResp -> ClientProtocol.Message.newBuilder().setGetAllResponse(opsResp)));
 
     operationContexts.put(MessageTypeCase.PUTREQUEST,
         new ProtobufOperationContext<>(ClientProtocol.Message::getPutRequest,
             new PutRequestOperationHandler(),
-            opsResp -> ClientProtocol.Message.newBuilder().setPutResponse(opsResp),
-            PutRequestOperationHandler::determineRequiredPermission));
+            opsResp -> ClientProtocol.Message.newBuilder().setPutResponse(opsResp)));
 
     operationContexts.put(MessageTypeCase.PUTALLREQUEST,
         new ProtobufOperationContext<>(ClientProtocol.Message::getPutAllRequest,
             new PutAllRequestOperationHandler(),
-            opsResp -> ClientProtocol.Message.newBuilder().setPutAllResponse(opsResp),
-            // May require per-key checks, will be handled by OperationHandler
-            this::skipAuthorizationCheck));
+            opsResp -> ClientProtocol.Message.newBuilder().setPutAllResponse(opsResp)));
 
     operationContexts.put(MessageTypeCase.REMOVEREQUEST,
         new ProtobufOperationContext<>(ClientProtocol.Message::getRemoveRequest,
             new RemoveRequestOperationHandler(),
-            opsResp -> ClientProtocol.Message.newBuilder().setRemoveResponse(opsResp),
-            RemoveRequestOperationHandler::determineRequiredPermission));
+            opsResp -> ClientProtocol.Message.newBuilder().setRemoveResponse(opsResp)));
 
     operationContexts.put(MessageTypeCase.GETREGIONNAMESREQUEST,
         new ProtobufOperationContext<>(ClientProtocol.Message::getGetRegionNamesRequest,
             new GetRegionNamesRequestOperationHandler(),
-            opsResp -> ClientProtocol.Message.newBuilder().setGetRegionNamesResponse(opsResp),
-            ResourcePermissions.DATA_READ));
+            opsResp -> ClientProtocol.Message.newBuilder().setGetRegionNamesResponse(opsResp)));
 
     operationContexts.put(MessageTypeCase.GETSIZEREQUEST,
         new ProtobufOperationContext<>(ClientProtocol.Message::getGetSizeRequest,
             new GetSizeRequestOperationHandler(),
-            opsResp -> ClientProtocol.Message.newBuilder().setGetSizeResponse(opsResp),
-            ResourcePermissions.DATA_READ));
+            opsResp -> ClientProtocol.Message.newBuilder().setGetSizeResponse(opsResp)));
 
     operationContexts.put(MessageTypeCase.GETSERVERREQUEST,
         new ProtobufOperationContext<>(ClientProtocol.Message::getGetServerRequest,
             new GetServerOperationHandler(),
-            opsResp -> ClientProtocol.Message.newBuilder().setGetServerResponse(opsResp),
-            ResourcePermissions.CLUSTER_READ));
+            opsResp -> ClientProtocol.Message.newBuilder().setGetServerResponse(opsResp)));
 
     operationContexts.put(MessageTypeCase.EXECUTEFUNCTIONONREGIONREQUEST,
         new ProtobufOperationContext<>(ClientProtocol.Message::getExecuteFunctionOnRegionRequest,
-            new ExecuteFunctionOnRegionRequestOperationHandler(),
-            opsResp -> ClientProtocol.Message.newBuilder()
-                .setExecuteFunctionOnRegionResponse(opsResp),
-            // Resource permissions get handled per-function, since they have varying permission
-            // requirements.
-            this::skipAuthorizationCheck));
+            new ExecuteFunctionOnRegionRequestOperationHandler(), opsResp -> ClientProtocol.Message
+                .newBuilder().setExecuteFunctionOnRegionResponse(opsResp)));
 
     operationContexts.put(MessageTypeCase.EXECUTEFUNCTIONONMEMBERREQUEST,
         new ProtobufOperationContext<>(ClientProtocol.Message::getExecuteFunctionOnMemberRequest,
-            new ExecuteFunctionOnMemberRequestOperationHandler(),
-            opsResp -> ClientProtocol.Message.newBuilder()
-                .setExecuteFunctionOnMemberResponse(opsResp),
-            // Resource permissions get handled per-function, since they have varying permission
-            // requirements.
-            this::skipAuthorizationCheck));
+            new ExecuteFunctionOnMemberRequestOperationHandler(), opsResp -> ClientProtocol.Message
+                .newBuilder().setExecuteFunctionOnMemberResponse(opsResp)));
 
     operationContexts.put(MessageTypeCase.EXECUTEFUNCTIONONGROUPREQUEST,
         new ProtobufOperationContext<>(ClientProtocol.Message::getExecuteFunctionOnGroupRequest,
-            new ExecuteFunctionOnGroupRequestOperationHandler(),
-            opsResp -> ClientProtocol.Message.newBuilder()
-                .setExecuteFunctionOnGroupResponse(opsResp),
-            // Resource permissions get handled per-function, since they have varying permission
-            // requirements.
-            this::skipAuthorizationCheck));
+            new ExecuteFunctionOnGroupRequestOperationHandler(), opsResp -> ClientProtocol.Message
+                .newBuilder().setExecuteFunctionOnGroupResponse(opsResp)));
     operationContexts.put(MessageTypeCase.OQLQUERYREQUEST,
         new ProtobufOperationContext<>(ClientProtocol.Message::getOqlQueryRequest,
             new OqlQueryRequestOperationHandler(),
-            opsResp -> ClientProtocol.Message.newBuilder().setOqlQueryResponse(opsResp),
-            // Perform authorization inside handler to avoid having to compile the OQL multiple
-            // times
-            this::skipAuthorizationCheck));
+            opsResp -> ClientProtocol.Message.newBuilder().setOqlQueryResponse(opsResp)));
 
     operationContexts.put(MessageTypeCase.KEYSETREQUEST,
         new ProtobufOperationContext<>(ClientProtocol.Message::getKeySetRequest,
             new KeySetOperationHandler(),
-            opsResp -> ClientProtocol.Message.newBuilder().setKeySetResponse(opsResp),
-            KeySetOperationHandler::determineRequiredPermission));
+            opsResp -> ClientProtocol.Message.newBuilder().setKeySetResponse(opsResp)));
 
     operationContexts.put(ClientProtocol.Message.MessageTypeCase.CLEARREQUEST,
         new ProtobufOperationContext<>(ClientProtocol.Message::getClearRequest,
             new ClearRequestOperationHandler(),
-            opsResp -> ClientProtocol.Message.newBuilder().setClearResponse(opsResp),
-            ClearRequestOperationHandler::determineRequiredPermission));
+            opsResp -> ClientProtocol.Message.newBuilder().setClearResponse(opsResp)));
 
     operationContexts.put(MessageTypeCase.PUTIFABSENTREQUEST,
         new ProtobufOperationContext<>(ClientProtocol.Message::getPutIfAbsentRequest,
             new PutIfAbsentRequestOperationHandler(),
-            opsResp -> ClientProtocol.Message.newBuilder().setPutIfAbsentResponse(opsResp),
-            PutIfAbsentRequestOperationHandler::determineRequiredPermission));
+            opsResp -> ClientProtocol.Message.newBuilder().setPutIfAbsentResponse(opsResp)));
+
+    return operationContexts;
   }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionAuthenticatingStateProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionAuthenticatingStateProcessor.java
index f2dabaf..fca9336 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionAuthenticatingStateProcessor.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionAuthenticatingStateProcessor.java
@@ -22,6 +22,7 @@ import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufOperationContext;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.ShiroAuthorizer;
 import org.apache.geode.internal.protocol.protobuf.v1.operations.security.AuthenticationRequestOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.state.exception.ConnectionStateException;
 import org.apache.geode.internal.security.SecurityService;
@@ -52,9 +53,11 @@ public class ProtobufConnectionAuthenticatingStateProcessor
     return this;
   }
 
-  public ProtobufConnectionStateProcessor authenticate(Properties properties)
+  public ProtobufConnectionStateProcessor authenticate(
+      MessageExecutionContext messageExecutionContext, Properties properties)
       throws AuthenticationFailedException {
     Subject subject = securityService.login(properties);
-    return new ProtobufConnectionAuthorizingStateProcessor(securityService, subject);
+    messageExecutionContext.setAuthorizer(new ShiroAuthorizer(securityService, subject));
+    return new ProtobufConnectionAuthorizingStateProcessor();
   }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionAuthorizingStateProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionAuthorizingStateProcessor.java
index b78172f..8a19122 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionAuthorizingStateProcessor.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionAuthorizingStateProcessor.java
@@ -30,32 +30,13 @@ import org.apache.geode.security.NotAuthorizedException;
 
 public class ProtobufConnectionAuthorizingStateProcessor
     implements ProtobufConnectionStateProcessor {
-  private final SecurityService securityService;
-  private final Subject subject;
 
-  public ProtobufConnectionAuthorizingStateProcessor(SecurityService securityService,
-      Subject subject) {
-    this.securityService = securityService;
-    this.subject = subject;
-  }
+  public ProtobufConnectionAuthorizingStateProcessor() {}
 
   @Override
   public void validateOperation(Object message, ProtobufSerializationService serializer,
       MessageExecutionContext messageContext, ProtobufOperationContext operationContext)
-      throws ConnectionStateException, DecodingException {
-    ThreadState threadState = securityService.bindSubject(subject);
-    try {
-      securityService.authorize(operationContext.getAccessPermissionRequired(
-          operationContext.getFromRequest().apply(message), serializer));
-    } catch (NotAuthorizedException e) {
-      messageContext.getStatistics().incAuthorizationViolations();
-      throw new OperationNotAuthorizedException(
-          "The user is not authorized to complete this operation: "
-              + ((ClientProtocol.Message) message).getMessageTypeCase());
-    } finally {
-      threadState.restore();
-    }
-  }
+      throws ConnectionStateException, DecodingException {}
 
   @Override
   public ProtobufConnectionAuthenticatingStateProcessor allowAuthentication()
@@ -63,12 +44,4 @@ public class ProtobufConnectionAuthorizingStateProcessor
     throw new ConnectionStateException(BasicTypes.ErrorCode.ALREADY_AUTHENTICATED,
         "The user has already been authenticated for this connection. Re-authentication is not supported at this time.");
   }
-
-  public ThreadState prepareThreadForAuthorization() {
-    return securityService.bindSubject(subject);
-  }
-
-  public void restoreThreadState(ThreadState state) {
-    state.restore();
-  }
 }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionHandshakeStateProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionHandshakeStateProcessor.java
index 3886680..d8f38d7 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionHandshakeStateProcessor.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionHandshakeStateProcessor.java
@@ -24,6 +24,7 @@ import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufOperationContext;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.NoSecurityAuthorizer;
 import org.apache.geode.internal.protocol.protobuf.v1.operations.ProtocolVersionHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.state.exception.ConnectionStateException;
 import org.apache.geode.internal.security.SecurityService;
@@ -43,13 +44,15 @@ public class ProtobufConnectionHandshakeStateProcessor implements ProtobufConnec
         "Connection processing should never be asked to validate an operation");
   }
 
-  private ProtobufConnectionStateProcessor nextConnectionState() {
+  private ProtobufConnectionStateProcessor nextConnectionState(
+      MessageExecutionContext executionContext) {
     if (securityService.isIntegratedSecurity()) {
       return new ProtobufConnectionAuthenticatingStateProcessor(securityService);
     } else if (securityService.isPeerSecurityRequired()
         || securityService.isClientSecurityRequired()) {
       return new LegacySecurityProtobufConnectionStateProcessor();
     } else {
+      executionContext.setAuthorizer(new NoSecurityAuthorizer());
       // Noop authenticator...no security
       return new NoSecurityProtobufConnectionStateProcessor();
     }
@@ -65,7 +68,7 @@ public class ProtobufConnectionHandshakeStateProcessor implements ProtobufConnec
 
     if (ProtocolVersionHandler.handleVersionMessage(messageStream, outputStream,
         executionContext.getStatistics())) {
-      executionContext.setConnectionStateProcessor(nextConnectionState());
+      executionContext.setConnectionStateProcessor(nextConnectionState(executionContext));
     }
     return true;
   }
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/OutputCapturingServerConnectionTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/OutputCapturingServerConnectionTest.java
index 1541499..3fc9970 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/OutputCapturingServerConnectionTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/OutputCapturingServerConnectionTest.java
@@ -85,8 +85,11 @@ public class OutputCapturingServerConnectionTest {
     when(socketMock.getRemoteSocketAddress()).thenReturn(inetSocketAddressStub);
     when(socketMock.getInetAddress()).thenReturn(inetAddressStub);
 
-    return new ProtobufServerConnection(socketMock, mock(InternalCache.class),
-        mock(CachedRegionHelper.class), mock(CacheServerStats.class), 0, 0, "",
+    InternalCache cache = mock(InternalCache.class);
+    CachedRegionHelper cachedRegionHelper = mock(CachedRegionHelper.class);
+    when(cachedRegionHelper.getCache()).thenReturn(cache);
+    return new ProtobufServerConnection(socketMock, cache, cachedRegionHelper,
+        mock(CacheServerStats.class), 0, 0, "",
         CommunicationMode.ProtobufClientServerProtocol.getModeNumber(), acceptorStub,
         clientProtocolProcessorMock, mock(SecurityService.class));
   }
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnectionTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnectionTest.java
index bb2e264..ce829f5 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnectionTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnectionTest.java
@@ -126,8 +126,11 @@ public class ProtobufServerConnectionTest {
     when(socketMock.getRemoteSocketAddress()).thenReturn(inetSocketAddressStub);
     when(socketMock.getInetAddress()).thenReturn(inetAddressStub);
 
-    return new ProtobufServerConnection(socketMock, mock(InternalCache.class),
-        mock(CachedRegionHelper.class), mock(CacheServerStats.class), 0, 0, "",
+    InternalCache cache = mock(InternalCache.class);
+    CachedRegionHelper cachedRegionHelper = mock(CachedRegionHelper.class);
+    when(cachedRegionHelper.getCache()).thenReturn(cache);
+    return new ProtobufServerConnection(socketMock, cache, cachedRegionHelper,
+        mock(CacheServerStats.class), 0, 0, "",
         CommunicationMode.ProtobufClientServerProtocol.getModeNumber(), acceptorStub,
         clientProtocolProcessorMock, mock(SecurityService.class));
   }
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/TestExecutionContext.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/TestExecutionContext.java
index 1a4c3f5..ad2e918 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/TestExecutionContext.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/TestExecutionContext.java
@@ -20,16 +20,17 @@ import org.apache.geode.internal.protocol.protobuf.statistics.NoOpStatistics;
 import org.apache.geode.internal.protocol.protobuf.v1.LocatorMessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.v1.ServerMessageExecutionContext;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.NoSecurityAuthorizer;
 import org.apache.geode.internal.protocol.protobuf.v1.state.NoSecurityProtobufConnectionStateProcessor;
 
 public class TestExecutionContext {
   public static MessageExecutionContext getNoAuthCacheExecutionContext(InternalCache cache) {
     return new ServerMessageExecutionContext(cache, new NoOpStatistics(),
-        new NoSecurityProtobufConnectionStateProcessor());
+        new NoSecurityProtobufConnectionStateProcessor(), new NoSecurityAuthorizer());
   }
 
   public static MessageExecutionContext getLocatorExecutionContext(InternalLocator locator) {
     return new LocatorMessageExecutionContext(locator, new NoOpStatistics(),
-        new NoSecurityProtobufConnectionStateProcessor());
+        new NoSecurityProtobufConnectionStateProcessor(), new NoSecurityAuthorizer());
   }
 }
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java
index c312532..0d4717b 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/acceptance/CacheOperationsJUnitTest.java
@@ -286,7 +286,7 @@ public class CacheOperationsJUnitTest {
         response.getMessageTypeCase());
     RegionAPI.GetRegionNamesResponse getRegionsResponse = response.getGetRegionNamesResponse();
     assertEquals(1, getRegionsResponse.getRegionsCount());
-    assertEquals(TEST_REGION, getRegionsResponse.getRegions(0));
+    assertEquals("/" + TEST_REGION, getRegionsResponse.getRegions(0));
   }
 
   private void validatePutAllResponse(Socket socket,
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingCacheImplTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingCacheImplTest.java
new file mode 100644
index 0000000..cf5bd4b
--- /dev/null
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingCacheImplTest.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1.authentication;
+
+import static org.apache.geode.security.ResourcePermission.ALL;
+import static org.apache.geode.security.ResourcePermission.Operation.READ;
+import static org.apache.geode.security.ResourcePermission.Operation.WRITE;
+import static org.apache.geode.security.ResourcePermission.Resource.DATA;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.data.MapEntry.entry;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.query.internal.DefaultQuery;
+import org.apache.geode.cache.query.internal.InternalQueryService;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.security.NotAuthorizedException;
+import org.apache.geode.security.ResourcePermission;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class AuthorizingCacheImplTest {
+
+  public static final String REGION = "TestRegion";
+  private AuthorizingCacheImpl authorizingCache;
+  private InternalCache cache;
+  private Authorizer authorizer;
+  private Region region;
+
+  @Before
+  public void setUp() {
+    cache = mock(InternalCache.class);
+    region = mock(Region.class);
+    when(cache.getRegion(REGION)).thenReturn(region);
+    authorizer = mock(Authorizer.class);
+    doThrow(NotAuthorizedException.class).when(authorizer).authorize(any());
+    doThrow(NotAuthorizedException.class).when(authorizer).authorize(any(), any(), any(), any());
+    authorizingCache = new AuthorizingCacheImpl(cache, authorizer);
+  }
+
+  @Test
+  public void getAllSuccesses() throws Exception {
+    authorize(DATA, READ, REGION, "a");
+    authorize(DATA, READ, REGION, "b");
+    Map<Object, Object> okValues = new HashMap<>();
+    Map<Object, Exception> exceptionValues = new HashMap<>();
+
+    when(region.get("b")).thenReturn("existing value");
+
+    authorizingCache.getAll(REGION, Arrays.asList("a", "b"), okValues::put, exceptionValues::put);
+
+    verify(region).get("a");
+    verify(region).get("b");
+    assertThat(okValues).containsOnly(entry("a", null), entry("b", "existing value"));
+    assertThat(exceptionValues).isEmpty();
+  }
+
+  @Test
+  public void getAllWithRegionLevelAuthorizationSucceeds() throws Exception {
+    authorize(DATA, READ, REGION, ALL);
+    Map<Object, Object> okValues = new HashMap<>();
+    Map<Object, Exception> exceptionValues = new HashMap<>();
+
+    when(region.get("b")).thenReturn("existing value");
+
+    authorizingCache.getAll(REGION, Arrays.asList("a", "b"), okValues::put, exceptionValues::put);
+
+    verify(region).get("a");
+    verify(region).get("b");
+    assertThat(okValues).containsOnly(entry("a", null), entry("b", "existing value"));
+    assertThat(exceptionValues).isEmpty();
+  }
+
+  @Test
+  public void getAllWithFailure() throws Exception {
+    authorize(DATA, READ, REGION, "b");
+    Map<Object, Object> okValues = new HashMap<>();
+    Map<Object, Exception> exceptionValues = new HashMap<>();
+
+    when(region.get("b")).thenReturn("existing value");
+
+    authorizingCache.getAll(REGION, Arrays.asList("a", "b"), okValues::put, exceptionValues::put);
+
+    verify(region).get("b");
+    verifyNoMoreInteractions(region);
+    assertThat(okValues).containsOnly(entry("b", "existing value"));
+    assertThat(exceptionValues).containsOnlyKeys("a");
+    assertThat(exceptionValues.values().iterator().next())
+        .isInstanceOf(NotAuthorizedException.class);
+  }
+
+  private void authorize(ResourcePermission.Resource resource,
+      ResourcePermission.Operation operation, String region, String key) {
+    doNothing().when(authorizer).authorize(resource, operation, region, key);
+  }
+
+  @Test
+  public void get() throws Exception {
+    authorize(DATA, READ, REGION, "a");
+    when(region.get("a")).thenReturn("value");
+    assertEquals("value", authorizingCache.get(REGION, "a"));
+  }
+
+  @Test
+  public void getWithFailure() throws Exception {
+    assertThatThrownBy(() -> authorizingCache.get(REGION, "a"))
+        .isInstanceOf(NotAuthorizedException.class);
+  }
+
+  @Test
+  public void put() throws Exception {
+    authorize(DATA, WRITE, REGION, "a");
+    authorizingCache.put(REGION, "a", "value");
+    verify(region).put("a", "value");
+  }
+
+  @Test
+  public void putWithFailure() throws Exception {
+    assertThatThrownBy(() -> authorizingCache.put(REGION, "a", "value"))
+        .isInstanceOf(NotAuthorizedException.class);
+  }
+
+  @Test
+  public void putAll() throws Exception {
+    authorize(DATA, WRITE, REGION, "a");
+    authorize(DATA, WRITE, REGION, "c");
+    Map<Object, Object> entries = new HashMap<>();
+    entries.put("a", "b");
+    entries.put("c", "d");
+
+    Map<Object, Exception> exceptionValues = new HashMap<>();
+
+    authorizingCache.putAll(REGION, entries, exceptionValues::put);
+
+    verify(region).put("a", "b");
+    verify(region).put("c", "d");
+    assertThat(exceptionValues).isEmpty();
+  }
+
+  @Test
+  public void putAllWithRegionLevelAuthorizationSucceeds() throws Exception {
+    authorize(DATA, WRITE, REGION, ALL);
+    Map<Object, Object> entries = new HashMap<>();
+    entries.put("a", "b");
+    entries.put("c", "d");
+
+    Map<Object, Exception> exceptionValues = new HashMap<>();
+
+    authorizingCache.putAll(REGION, entries, exceptionValues::put);
+
+    verify(region).put("a", "b");
+    verify(region).put("c", "d");
+    assertThat(exceptionValues).isEmpty();
+  }
+
+  @Test
+  public void putAllWithFailure() throws Exception {
+    authorize(DATA, WRITE, REGION, "a");
+    Map<Object, Object> entries = new HashMap<>();
+    entries.put("a", "b");
+    entries.put("c", "d");
+
+    Map<Object, Exception> exceptionValues = new HashMap<>();
+
+    authorizingCache.putAll(REGION, entries, exceptionValues::put);
+
+    verify(authorizer).authorize(DATA, WRITE, REGION, "a");
+    verify(authorizer).authorize(DATA, WRITE, REGION, "c");
+    verify(region).put("a", "b");
+    verifyNoMoreInteractions(region);
+    assertThat(exceptionValues).containsOnlyKeys("c");
+  }
+
+  @Test
+  public void remove() throws Exception {
+    authorize(DATA, WRITE, REGION, "a");
+    authorizingCache.remove(REGION, "a");
+    verify(region).remove("a");
+  }
+
+  @Test
+  public void removeWithoutAuthorization() throws Exception {
+    assertThatThrownBy(() -> authorizingCache.remove(REGION, "a"))
+        .isInstanceOf(NotAuthorizedException.class);
+  }
+
+  @Test
+  public void getRegionNames() throws Exception {
+    authorize(DATA, READ, ALL, ALL);
+    Set<Region<?, ?>> regions = new HashSet<>();
+    regions.add(region);
+    when(cache.rootRegions()).thenReturn(regions);
+
+    Set subregions = new HashSet<>();
+    Region region2 = mock(Region.class);
+    subregions.add(region2);
+    Region region3 = mock(Region.class);
+    subregions.add(region3);
+    when(region.getFullPath()).thenReturn("region1");
+    when(region2.getFullPath()).thenReturn("region2");
+    when(region3.getFullPath()).thenReturn("region3");
+    when(region.subregions(true)).thenReturn(subregions);
+    Collection<String> regionNames = authorizingCache.getRegionNames();
+    assertThat(regionNames).containsExactly("region1", "region2", "region3");
+
+    verify(cache).rootRegions();
+  }
+
+  @Test
+  public void getRegionNamesWithoutAuthorization() throws Exception {
+    assertThatThrownBy(() -> authorizingCache.getRegionNames())
+        .isInstanceOf(NotAuthorizedException.class);
+  }
+
+  @Test
+  public void getSize() throws Exception {
+    authorize(DATA, READ, REGION, ALL);
+    authorizingCache.getSize(REGION);
+    verify(region).size();
+  }
+
+  @Test
+  public void getSizeWithoutAuthorization() throws Exception {
+    assertThatThrownBy(() -> authorizingCache.getSize(REGION))
+        .isInstanceOf(NotAuthorizedException.class);
+  }
+
+  @Test
+  public void keySet() throws Exception {
+    authorize(DATA, READ, REGION, ALL);
+    authorizingCache.keySet(REGION);
+    verify(region).keySet();
+  }
+
+  @Test
+  public void keySetWithoutAuthorization() throws Exception {
+    assertThatThrownBy(() -> authorizingCache.keySet(REGION))
+        .isInstanceOf(NotAuthorizedException.class);
+  }
+
+  @Test
+  public void clear() throws Exception {
+    authorize(DATA, WRITE, REGION, ALL);
+    authorizingCache.clear(REGION);
+    verify(region).clear();
+  }
+
+  @Test
+  public void clearWithoutAuthorization() throws Exception {
+    assertThatThrownBy(() -> authorizingCache.clear(REGION))
+        .isInstanceOf(NotAuthorizedException.class);
+  }
+
+  @Test
+  public void putIfAbsent() throws Exception {
+    authorize(DATA, WRITE, REGION, "a");
+    String oldValue = authorizingCache.putIfAbsent(REGION, "a", "b");
+    verify(region).putIfAbsent("a", "b");
+  }
+
+  @Test
+  public void putIfAbsentWithoutAuthorization() throws Exception {
+    assertThatThrownBy(() -> authorizingCache.putIfAbsent(REGION, "a", "b"))
+        .isInstanceOf(NotAuthorizedException.class);
+  }
+
+  @Test
+  public void query() throws Exception {
+    authorize(DATA, READ, REGION, ALL);
+    InternalQueryService queryService = mock(InternalQueryService.class);
+    when(cache.getQueryService()).thenReturn(queryService);
+    DefaultQuery query = mock(DefaultQuery.class);
+    when(queryService.newQuery(any())).thenReturn(query);
+    when(query.getRegionsInQuery(any())).thenReturn(Collections.singleton(REGION));
+    String queryString = "select * from /region";
+    Object[] bindParameters = {"a"};
+    authorizingCache.query(queryString, bindParameters);
+  }
+
+}
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingFunctionServiceImplTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingFunctionServiceImplTest.java
new file mode 100644
index 0000000..04feab8
--- /dev/null
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/authentication/AuthorizingFunctionServiceImplTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1.authentication;
+
+import static org.apache.geode.security.ResourcePermission.ALL;
+import static org.apache.geode.security.ResourcePermission.Operation.WRITE;
+import static org.apache.geode.security.ResourcePermission.Resource.CLUSTER;
+import static org.apache.geode.security.ResourcePermission.Resource.DATA;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import com.sun.org.apache.regexp.internal.RE;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.security.NotAuthorizedException;
+import org.apache.geode.security.ResourcePermission;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class AuthorizingFunctionServiceImplTest {
+  public static final String REGION = "TestRegion";
+  public static final String FUNCTION_ID = "id";
+  private AuthorizingFunctionServiceImpl functionService;
+  private InternalCache cache;
+  private Authorizer authorizer;
+  private Region region;
+  private Function function;
+
+  @Before
+  public void setUp() {
+    cache = mock(InternalCache.class);
+    region = mock(Region.class);
+    when(cache.getRegion(REGION)).thenReturn(region);
+    authorizer = mock(Authorizer.class);
+    doThrow(NotAuthorizedException.class).when(authorizer).authorize(any());
+    doThrow(NotAuthorizedException.class).when(authorizer).authorize(any(), any(), any(), any());
+    functionService = new AuthorizingFunctionServiceImpl(cache, authorizer);
+    function = mock(Function.class);
+    when(function.getId()).thenReturn("id");
+    FunctionService.registerFunction(function);
+  }
+
+  @After
+  public void tearDown() {
+    FunctionService.unregisterFunction(FUNCTION_ID);
+  }
+
+  @Test
+  public void executeFunctionOnRegionWithoutAuthorization() throws Exception {
+    when(function.getRequiredPermissions(REGION))
+        .thenReturn(Collections.singleton(new ResourcePermission(CLUSTER, WRITE, REGION, ALL)));
+    assertThatThrownBy(
+        () -> functionService.executeFunctionOnRegion(FUNCTION_ID, REGION, null, null))
+            .isInstanceOf(NotAuthorizedException.class);
+  }
+
+  @Test
+  public void executeFunctionOnMemberWithoutAuthorization() throws Exception {
+    when(function.getRequiredPermissions(null))
+        .thenReturn(Collections.singleton(new ResourcePermission(CLUSTER, WRITE, REGION, ALL)));
+    assertThatThrownBy(
+        () -> functionService.executeFunctionOnMember(FUNCTION_ID, null, Arrays.asList("member")))
+            .isInstanceOf(NotAuthorizedException.class);
+  }
+
+  @Test
+  public void executeFunctionOnGroupsWithoutAuthorization() throws Exception {
+    when(function.getRequiredPermissions(null))
+        .thenReturn(Collections.singleton(new ResourcePermission(CLUSTER, WRITE, REGION, ALL)));
+    assertThatThrownBy(
+        () -> functionService.executeFunctionOnGroups(FUNCTION_ID, null, Arrays.asList("group")))
+            .isInstanceOf(NotAuthorizedException.class);
+  }
+
+  private void authorize(ResourcePermission.Resource resource,
+      ResourcePermission.Operation operation, String region, String key) {
+    doNothing().when(authorizer).authorize(resource, operation, region, key);
+    doNothing().when(authorizer)
+        .authorize(new ResourcePermission(resource, operation, region, key));
+  }
+
+}
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ClearRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ClearRequestOperationHandlerJUnitTest.java
index 167ce46..36244a5 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ClearRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ClearRequestOperationHandlerJUnitTest.java
@@ -22,10 +22,13 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.internal.protocol.TestExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
@@ -45,6 +48,9 @@ public class ClearRequestOperationHandlerJUnitTest extends OperationHandlerJUnit
   private final String MISSING_REGION = "missing region";
   private Region regionStub;
 
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
   @Before
   public void setUp() throws Exception {
     regionStub = mock(Region.class);
@@ -68,11 +74,8 @@ public class ClearRequestOperationHandlerJUnitTest extends OperationHandlerJUnit
   public void processReturnsFailureForInvalidRegion() throws Exception {
     RegionAPI.ClearRequest removeRequest =
         ProtobufRequestUtilities.createClearRequest(MISSING_REGION).getClearRequest();
+    expectedException.expect(RegionDestroyedException.class);
     Result result = operationHandler.process(serializationService, removeRequest,
         TestExecutionContext.getNoAuthCacheExecutionContext(cacheStub));
-
-    assertTrue(result instanceof Failure);
-    ClientProtocol.ErrorResponse errorMessage = result.getErrorMessage();
-    assertEquals(BasicTypes.ErrorCode.SERVER_ERROR, errorMessage.getError().getErrorCode());
   }
 }
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnGroupRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnGroupRequestOperationHandlerJUnitTest.java
index 385d9b5..1ec6793 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnGroupRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnGroupRequestOperationHandlerJUnitTest.java
@@ -14,9 +14,6 @@
  */
 package org.apache.geode.internal.protocol.protobuf.v1.operations;
 
-import static org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCode.AUTHORIZATION_FAILED;
-import static org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCode.INVALID_REQUEST;
-import static org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCode.NO_AVAILABLE_SERVER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -31,8 +28,10 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.FunctionContext;
@@ -44,13 +43,12 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatistics;
-import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
-import org.apache.geode.internal.protocol.protobuf.v1.Failure;
 import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
 import org.apache.geode.internal.protocol.protobuf.v1.Result;
 import org.apache.geode.internal.protocol.protobuf.v1.ServerMessageExecutionContext;
-import org.apache.geode.internal.protocol.protobuf.v1.state.exception.OperationNotAuthorizedException;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.Authorizer;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.NoSecurityAuthorizer;
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.management.internal.security.ResourcePermissions;
 import org.apache.geode.security.NotAuthorizedException;
@@ -69,6 +67,9 @@ public class ExecuteFunctionOnGroupRequestOperationHandlerJUnitTest {
   private TestFunction function;
   private InternalDistributedSystem distributedSystem;
 
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
   private static class TestFunction implements Function {
     // non-null iff function has been executed.
     private AtomicReference<FunctionContext> context = new AtomicReference<>();
@@ -117,32 +118,6 @@ public class ExecuteFunctionOnGroupRequestOperationHandlerJUnitTest {
     FunctionService.unregisterFunction(TEST_FUNCTION_ID);
   }
 
-  @Test
-  public void failsOnUnknownGroup() throws Exception {
-    final FunctionAPI.ExecuteFunctionOnGroupRequest request =
-        FunctionAPI.ExecuteFunctionOnGroupRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID)
-            .addGroupName(NOT_A_GROUP).build();
-
-    final Result<FunctionAPI.ExecuteFunctionOnGroupResponse> result =
-        operationHandler.process(serializationService, request, mockedMessageExecutionContext());
-
-    assertTrue(result instanceof Failure);
-    assertEquals(NO_AVAILABLE_SERVER, result.getErrorMessage().getError().getErrorCode());
-  }
-
-  @Test
-  public void failsIfNoGroupSpecified() throws Exception {
-    final FunctionAPI.ExecuteFunctionOnGroupRequest request =
-        FunctionAPI.ExecuteFunctionOnGroupRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID)
-            .build();
-
-    final Result<FunctionAPI.ExecuteFunctionOnGroupResponse> result =
-        operationHandler.process(serializationService, request, mockedMessageExecutionContext());
-
-    assertTrue(result instanceof Failure);
-    assertEquals(NO_AVAILABLE_SERVER, result.getErrorMessage().getError().getErrorCode());
-  }
-
   @Test(expected = DistributedSystemDisconnectedException.class)
   public void succeedsWithValidMembers() throws Exception {
     when(distributionManager.getMemberWithName(any(String.class))).thenReturn(
@@ -174,12 +149,13 @@ public class ExecuteFunctionOnGroupRequestOperationHandlerJUnitTest {
         FunctionAPI.ExecuteFunctionOnGroupRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID)
             .addGroupName(TEST_GROUP1).build();
 
-    try {
-      operationHandler.process(serializationService, request, mockedMessageExecutionContext());
-      fail("Should not have been authorized.");
-    } catch (OperationNotAuthorizedException ex) {
-      // Expected failure
-    }
+    Authorizer authorizer = mock(Authorizer.class);
+    doThrow(new NotAuthorizedException("we should catch this")).when(authorizer)
+        .authorize(ResourcePermissions.DATA_WRITE);
+    ServerMessageExecutionContext context = new ServerMessageExecutionContext(cacheStub,
+        mock(ProtobufClientStatistics.class), null, authorizer);
+    expectedException.expect(NotAuthorizedException.class);
+    operationHandler.process(serializationService, request, context);
   }
 
   @Test
@@ -188,15 +164,14 @@ public class ExecuteFunctionOnGroupRequestOperationHandlerJUnitTest {
         FunctionAPI.ExecuteFunctionOnGroupRequest.newBuilder()
             .setFunctionID("I am not a function, I am a human").addGroupName(TEST_GROUP1).build();
 
+    expectedException.expect(IllegalArgumentException.class);
     final Result<FunctionAPI.ExecuteFunctionOnGroupResponse> result =
         operationHandler.process(serializationService, request, mockedMessageExecutionContext());
 
-    final ClientProtocol.ErrorResponse errorMessage = result.getErrorMessage();
-
-    assertEquals(INVALID_REQUEST, errorMessage.getError().getErrorCode());
   }
 
   private ServerMessageExecutionContext mockedMessageExecutionContext() {
-    return new ServerMessageExecutionContext(cacheStub, mock(ProtobufClientStatistics.class), null);
+    return new ServerMessageExecutionContext(cacheStub, mock(ProtobufClientStatistics.class), null,
+        new NoSecurityAuthorizer());
   }
 }
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandlerJUnitTest.java
index e296303..de60a4d 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandlerJUnitTest.java
@@ -14,9 +14,6 @@
  */
 package org.apache.geode.internal.protocol.protobuf.v1.operations;
 
-import static org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCode.AUTHORIZATION_FAILED;
-import static org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCode.INVALID_REQUEST;
-import static org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCode.NO_AVAILABLE_SERVER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -29,8 +26,10 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.FunctionContext;
@@ -40,13 +39,12 @@ import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatistics;
-import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
-import org.apache.geode.internal.protocol.protobuf.v1.Failure;
 import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
 import org.apache.geode.internal.protocol.protobuf.v1.Result;
 import org.apache.geode.internal.protocol.protobuf.v1.ServerMessageExecutionContext;
-import org.apache.geode.internal.protocol.protobuf.v1.state.exception.OperationNotAuthorizedException;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.Authorizer;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.NoSecurityAuthorizer;
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.management.internal.security.ResourcePermissions;
 import org.apache.geode.security.NotAuthorizedException;
@@ -64,6 +62,9 @@ public class ExecuteFunctionOnMemberRequestOperationHandlerJUnitTest {
   private ProtobufSerializationService serializationService;
   private TestFunction function;
 
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
   private static class TestFunction implements Function {
     // non-null iff function has been executed.
     private AtomicReference<FunctionContext> context = new AtomicReference<>();
@@ -111,11 +112,9 @@ public class ExecuteFunctionOnMemberRequestOperationHandlerJUnitTest {
         FunctionAPI.ExecuteFunctionOnMemberRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID)
             .addMemberName(NOT_A_MEMBER).build();
 
+    expectedException.expect(IllegalArgumentException.class);
     final Result<FunctionAPI.ExecuteFunctionOnMemberResponse> result =
         operationHandler.process(serializationService, request, mockedMessageExecutionContext());
-
-    assertTrue(result instanceof Failure);
-    assertEquals(NO_AVAILABLE_SERVER, result.getErrorMessage().getError().getErrorCode());
   }
 
   @Test
@@ -124,11 +123,9 @@ public class ExecuteFunctionOnMemberRequestOperationHandlerJUnitTest {
         FunctionAPI.ExecuteFunctionOnMemberRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID)
             .build();
 
+    expectedException.expect(IllegalArgumentException.class);
     final Result<FunctionAPI.ExecuteFunctionOnMemberResponse> result =
         operationHandler.process(serializationService, request, mockedMessageExecutionContext());
-
-    assertTrue(result instanceof Failure);
-    assertEquals(NO_AVAILABLE_SERVER, result.getErrorMessage().getError().getErrorCode());
   }
 
   @Test(expected = DistributedSystemDisconnectedException.class)
@@ -153,20 +150,16 @@ public class ExecuteFunctionOnMemberRequestOperationHandlerJUnitTest {
 
   @Test
   public void requiresPermissions() throws Exception {
-    final SecurityService securityService = mock(SecurityService.class);
-    doThrow(new NotAuthorizedException("we should catch this")).when(securityService)
-        .authorize(ResourcePermissions.DATA_WRITE);
-    when(cacheStub.getSecurityService()).thenReturn(securityService);
-
     final FunctionAPI.ExecuteFunctionOnMemberRequest request =
         FunctionAPI.ExecuteFunctionOnMemberRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID)
             .addMemberName(TEST_MEMBER1).build();
-    try {
-      operationHandler.process(serializationService, request, mockedMessageExecutionContext());
-      fail("Should not have been authorized.");
-    } catch (OperationNotAuthorizedException ex) {
-      // Expected failure
-    }
+    Authorizer authorizer = mock(Authorizer.class);
+    doThrow(new NotAuthorizedException("we should catch this")).when(authorizer)
+        .authorize(ResourcePermissions.DATA_WRITE);
+    ServerMessageExecutionContext context = new ServerMessageExecutionContext(cacheStub,
+        mock(ProtobufClientStatistics.class), null, authorizer);
+    expectedException.expect(NotAuthorizedException.class);
+    operationHandler.process(serializationService, request, context);
   }
 
   @Test
@@ -175,15 +168,13 @@ public class ExecuteFunctionOnMemberRequestOperationHandlerJUnitTest {
         FunctionAPI.ExecuteFunctionOnMemberRequest.newBuilder()
             .setFunctionID("I am not a function, I am a human").addMemberName(TEST_MEMBER1).build();
 
+    expectedException.expect(IllegalArgumentException.class);
     final Result<FunctionAPI.ExecuteFunctionOnMemberResponse> result =
         operationHandler.process(serializationService, request, mockedMessageExecutionContext());
-
-    final ClientProtocol.ErrorResponse errorMessage = result.getErrorMessage();
-
-    assertEquals(INVALID_REQUEST, errorMessage.getError().getErrorCode());
   }
 
   private ServerMessageExecutionContext mockedMessageExecutionContext() {
-    return new ServerMessageExecutionContext(cacheStub, mock(ProtobufClientStatistics.class), null);
+    return new ServerMessageExecutionContext(cacheStub, mock(ProtobufClientStatistics.class), null,
+        new NoSecurityAuthorizer());
   }
 }
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandlerJUnitTest.java
index 2fa98d2..9bc756c 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandlerJUnitTest.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.protocol.protobuf.v1.operations;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -26,24 +27,25 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.FunctionContext;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatistics;
-import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
-import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
-import org.apache.geode.internal.protocol.protobuf.v1.Failure;
 import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
 import org.apache.geode.internal.protocol.protobuf.v1.Result;
 import org.apache.geode.internal.protocol.protobuf.v1.ServerMessageExecutionContext;
-import org.apache.geode.internal.protocol.protobuf.v1.state.exception.OperationNotAuthorizedException;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.Authorizer;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.NoSecurityAuthorizer;
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.management.internal.security.ResourcePermissions;
 import org.apache.geode.security.NotAuthorizedException;
@@ -64,6 +66,9 @@ public class ExecuteFunctionOnRegionRequestOperationHandlerJUnitTest {
   private ProtobufSerializationService serializationService;
   private TestFunction function;
 
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
   private static class TestFunction implements Function {
     // non-null iff function has been executed.
     private AtomicReference<FunctionContext> context = new AtomicReference<>();
@@ -112,30 +117,23 @@ public class ExecuteFunctionOnRegionRequestOperationHandlerJUnitTest {
         FunctionAPI.ExecuteFunctionOnRegionRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID)
             .setRegion(NOT_A_REGION).build();
 
+    expectedException.expect(RegionDestroyedException.class);
     final Result<FunctionAPI.ExecuteFunctionOnRegionResponse> result =
         operationHandler.process(serializationService, request, mockedMessageExecutionContext());
-
-    assertTrue(result instanceof Failure);
-
-    verify(cacheStub).getRegion(NOT_A_REGION);
   }
 
   @Test
   public void requiresPermissions() throws Exception {
-    final SecurityService securityService = mock(SecurityService.class);
-    doThrow(new NotAuthorizedException("we should catch this")).when(securityService)
-        .authorize(ResourcePermissions.DATA_WRITE);
-    when(cacheStub.getSecurityService()).thenReturn(securityService);
-
     final FunctionAPI.ExecuteFunctionOnRegionRequest request =
         FunctionAPI.ExecuteFunctionOnRegionRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID)
             .setRegion(TEST_REGION).build();
-    try {
-      operationHandler.process(serializationService, request, mockedMessageExecutionContext());
-      fail("Should not have been authorized.");
-    } catch (OperationNotAuthorizedException ex) {
-      // Expected failure
-    }
+    Authorizer authorizer = mock(Authorizer.class);
+    doThrow(new NotAuthorizedException("we should catch this")).when(authorizer)
+        .authorize(ResourcePermissions.DATA_WRITE);
+    ServerMessageExecutionContext context = new ServerMessageExecutionContext(cacheStub,
+        mock(ProtobufClientStatistics.class), null, authorizer);
+    expectedException.expect(NotAuthorizedException.class);
+    operationHandler.process(serializationService, request, context);
   }
 
   @Test
@@ -146,15 +144,14 @@ public class ExecuteFunctionOnRegionRequestOperationHandlerJUnitTest {
 
     FunctionService.unregisterFunction(TEST_FUNCTION_ID);
 
+    expectedException.expect(IllegalArgumentException.class);
     final Result<FunctionAPI.ExecuteFunctionOnRegionResponse> result =
         operationHandler.process(serializationService, request, mockedMessageExecutionContext());
 
-    final ClientProtocol.ErrorResponse errorMessage = result.getErrorMessage();
-
-    assertEquals(BasicTypes.ErrorCode.INVALID_REQUEST, errorMessage.getError().getErrorCode());
   }
 
   private ServerMessageExecutionContext mockedMessageExecutionContext() {
-    return new ServerMessageExecutionContext(cacheStub, mock(ProtobufClientStatistics.class), null);
+    return new ServerMessageExecutionContext(cacheStub, mock(ProtobufClientStatistics.class), null,
+        new NoSecurityAuthorizer());
   }
 }
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetAllRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetAllRequestOperationHandlerJUnitTest.java
index e21f051..b55e415 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetAllRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetAllRequestOperationHandlerJUnitTest.java
@@ -32,8 +32,10 @@ import java.util.Map;
 import java.util.Set;
 
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 
 import org.apache.geode.cache.CacheLoaderException;
 import org.apache.geode.cache.Region;
@@ -61,6 +63,9 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
   private static final String NO_VALUE_PRESENT_FOR_THIS_KEY = "no value present for this key";
   private Region regionStub;
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @Before
   public void setUp() throws Exception {
     regionStub = mock(Region.class);
@@ -80,9 +85,7 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
     Exception exception = new DecodingException("error finding codec for type");
     ProtobufSerializationService serializationServiceStub =
         mock(ProtobufSerializationService.class);
-    when(serializationServiceStub.decode(any())).thenReturn(TEST_KEY1).thenThrow(exception);
-    when(serializationServiceStub.encode(any()))
-        .thenReturn(BasicTypes.EncodedValue.newBuilder().setStringResult("some value").build());
+    when(serializationServiceStub.decodeList(any())).thenThrow(exception);
 
     BasicTypes.EncodedValue encodedKey1 =
         BasicTypes.EncodedValue.newBuilder().setStringResult(TEST_KEY1).build();
@@ -96,19 +99,9 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
     RegionAPI.GetAllRequest getRequest =
         ProtobufRequestUtilities.createGetAllRequest(TEST_REGION, keys);
 
-    Result response = operationHandler.process(serializationServiceStub, getRequest,
+    expectedException.expect(DecodingException.class);
+    operationHandler.process(serializationServiceStub, getRequest,
         TestExecutionContext.getNoAuthCacheExecutionContext(cacheStub));
-
-    assertTrue("response was " + response, response instanceof Success);
-
-    RegionAPI.GetAllResponse message = (RegionAPI.GetAllResponse) response.getMessage();
-    assertEquals(1, message.getFailuresCount());
-
-    BasicTypes.KeyedError error = message.getFailures(0);
-    assertEquals(BasicTypes.ErrorCode.INVALID_REQUEST, error.getError().getErrorCode());
-    assertTrue(error.getError().getMessage().contains("encoding not supported"));
-
-    assertEquals(1, message.getEntriesCount());
   }
 
 
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetAndPutJsonDocumentsDUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetAndPutJsonDocumentsDUnitTest.java
index 2ddfbcb..4bf9051 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetAndPutJsonDocumentsDUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetAndPutJsonDocumentsDUnitTest.java
@@ -109,33 +109,43 @@ public class GetAndPutJsonDocumentsDUnitTest extends JUnit4CacheTestCase {
 
     RegionAPI.GetRequest getRequest = generateGetRequest(key);
     GetRequestOperationHandler operationHandler = new GetRequestOperationHandler();
-    Result result = operationHandler.process(serializationService, getRequest,
-        TestExecutionContext.getNoAuthCacheExecutionContext(getCache()));
-
-    Assert.assertTrue(result instanceof Success);
-    RegionAPI.GetResponse response = (RegionAPI.GetResponse) result.getMessage();
-    assertEquals(BasicTypes.EncodedValue.ValueCase.JSONOBJECTRESULT,
-        response.getResult().getValueCase());
-    String actualValue = response.getResult().getJsonObjectResult();
-    assertEquals(jsonDocument, actualValue);
+    getCache().setReadSerializedForCurrentThread(true);
+    try {
+      Result result = operationHandler.process(serializationService, getRequest,
+          TestExecutionContext.getNoAuthCacheExecutionContext(getCache()));
+
+      Assert.assertTrue(result instanceof Success);
+      RegionAPI.GetResponse response = (RegionAPI.GetResponse) result.getMessage();
+      assertEquals(BasicTypes.EncodedValue.ValueCase.JSONOBJECTRESULT,
+          response.getResult().getValueCase());
+      String actualValue = response.getResult().getJsonObjectResult();
+      assertEquals(jsonDocument, actualValue);
+    } finally {
+      getCache().setReadSerializedForCurrentThread(false);
+    }
   }
 
   @Test
   public void testThatGetAllReturnsJSONDocumentForPdxInstance() throws Exception {
     storeTestDocument();
 
-    RegionAPI.GetAllRequest getRequest = generateGetAllRequest(key);
-    GetAllRequestOperationHandler operationHandler = new GetAllRequestOperationHandler();
-    Result result = operationHandler.process(serializationService, getRequest,
-        TestExecutionContext.getNoAuthCacheExecutionContext(getCache()));
-
-    Assert.assertTrue(result instanceof Success);
-    RegionAPI.GetAllResponse response = (RegionAPI.GetAllResponse) result.getMessage();
-    BasicTypes.Entry entry = response.getEntriesList().get(0);
-    BasicTypes.EncodedValue entryValue = entry.getValue();
-    assertEquals(BasicTypes.EncodedValue.ValueCase.JSONOBJECTRESULT, entryValue.getValueCase());
-    String actualValue = entryValue.getJsonObjectResult();
-    assertEquals(jsonDocument, actualValue);
+    getCache().setReadSerializedForCurrentThread(true);
+    try {
+      RegionAPI.GetAllRequest getRequest = generateGetAllRequest(key);
+      GetAllRequestOperationHandler operationHandler = new GetAllRequestOperationHandler();
+      Result result = operationHandler.process(serializationService, getRequest,
+          TestExecutionContext.getNoAuthCacheExecutionContext(getCache()));
+
+      Assert.assertTrue(result instanceof Success);
+      RegionAPI.GetAllResponse response = (RegionAPI.GetAllResponse) result.getMessage();
+      BasicTypes.Entry entry = response.getEntriesList().get(0);
+      BasicTypes.EncodedValue entryValue = entry.getValue();
+      assertEquals(BasicTypes.EncodedValue.ValueCase.JSONOBJECTRESULT, entryValue.getValueCase());
+      String actualValue = entryValue.getJsonObjectResult();
+      assertEquals(jsonDocument, actualValue);
+    } finally {
+      getCache().setReadSerializedForCurrentThread(false);
+    }
   }
 
   @Test
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetRegionNamesRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetRegionNamesRequestOperationHandlerJUnitTest.java
index b50eb30..7f0d1a8 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetRegionNamesRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetRegionNamesRequestOperationHandlerJUnitTest.java
@@ -46,11 +46,11 @@ public class GetRegionNamesRequestOperationHandlerJUnitTest extends OperationHan
   @Before
   public void setUp() throws Exception {
     Region<String, String> region1Stub = mock(Region.class);
-    when(region1Stub.getName()).thenReturn(TEST_REGION1);
+    when(region1Stub.getFullPath()).thenReturn(TEST_REGION1);
     Region<String, String> region2Stub = mock(Region.class);
-    when(region2Stub.getName()).thenReturn(TEST_REGION2);
+    when(region2Stub.getFullPath()).thenReturn(TEST_REGION2);
     Region<String, String> region3Stub = mock(Region.class);
-    when(region3Stub.getName()).thenReturn(TEST_REGION3);
+    when(region3Stub.getFullPath()).thenReturn(TEST_REGION3);
 
     when(cacheStub.rootRegions()).thenReturn(Collections
         .unmodifiableSet(new HashSet<>(Arrays.asList(region1Stub, region2Stub, region3Stub))));
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetRequestOperationHandlerJUnitTest.java
index 4e7af95..80e7264 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetRequestOperationHandlerJUnitTest.java
@@ -20,10 +20,13 @@ import static org.mockito.Mockito.when;
 
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.internal.protocol.TestExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
@@ -48,6 +51,9 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
   private final String MISSING_KEY = "missing key";
   private final String NULLED_KEY = "nulled key";
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @Before
   public void setUp() throws Exception {
     Region regionStub = mock(Region.class);
@@ -79,13 +85,10 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
   @Test
   public void processReturnsUnsucessfulResponseForInvalidRegion() throws Exception {
     RegionAPI.GetRequest getRequest = generateTestRequest(true, false, false);
+    expectedException.expect(RegionDestroyedException.class);
     Result response = operationHandler.process(serializationService, getRequest,
         TestExecutionContext.getNoAuthCacheExecutionContext(cacheStub));
 
-    Assert.assertTrue(response instanceof Failure);
-    ClientProtocol.ErrorResponse errorMessage =
-        (ClientProtocol.ErrorResponse) response.getErrorMessage();
-    Assert.assertEquals(BasicTypes.ErrorCode.SERVER_ERROR, errorMessage.getError().getErrorCode());
   }
 
   @Test
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetSizeRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetSizeRequestOperationHandlerJUnitTest.java
index 6abde82..1296668 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetSizeRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetSizeRequestOperationHandlerJUnitTest.java
@@ -23,12 +23,15 @@ import java.util.HashSet;
 
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 
 import org.apache.geode.cache.DataPolicy;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
@@ -44,6 +47,9 @@ public class GetSizeRequestOperationHandlerJUnitTest extends OperationHandlerJUn
   private final String TEST_REGION1 = "test region 1";
   private Region region1Stub;
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @Before
   public void setUp() {
     region1Stub = mock(Region.class);
@@ -78,11 +84,9 @@ public class GetSizeRequestOperationHandlerJUnitTest extends OperationHandlerJUn
     when(emptyCache.rootRegions())
         .thenReturn(Collections.unmodifiableSet(new HashSet<Region<String, String>>()));
     String unknownRegionName = "UNKNOWN_REGION";
-    Result result = operationHandler.process(serializationService,
+    expectedException.expect(RegionDestroyedException.class);
+    operationHandler.process(serializationService,
         MessageUtil.makeGetSizeRequest(unknownRegionName),
         getNoAuthCacheExecutionContext(emptyCache));
-    Assert.assertTrue(result instanceof Failure);
-    ClientProtocol.ErrorResponse errorMessage = result.getErrorMessage();
-    Assert.assertEquals(BasicTypes.ErrorCode.SERVER_ERROR, errorMessage.getError().getErrorCode());
   }
 }
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/OqlQueryRequestOperationHandlerIntegrationTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/OqlQueryRequestOperationHandlerIntegrationTest.java
index 85f04c2..9bc7bd8 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/OqlQueryRequestOperationHandlerIntegrationTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/OqlQueryRequestOperationHandlerIntegrationTest.java
@@ -48,6 +48,8 @@ import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationServi
 import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.OQLQueryRequest;
 import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI.OQLQueryResponse;
 import org.apache.geode.internal.protocol.protobuf.v1.Result;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.AuthorizingCacheImpl;
+import org.apache.geode.internal.protocol.protobuf.v1.authentication.NoSecurityAuthorizer;
 import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.DecodingException;
 import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException;
 import org.apache.geode.internal.protocol.protobuf.v1.state.exception.ConnectionStateException;
@@ -153,7 +155,8 @@ public class OqlQueryRequestOperationHandlerIntegrationTest {
       ProtobufSerializationService serializer) throws InvalidExecutionContextException,
       ConnectionStateException, EncodingException, DecodingException {
     final MessageExecutionContext context = mock(MessageExecutionContext.class);
-    when(context.getCache()).thenReturn((InternalCache) cache);
+    when(context.getAuthorizingCache())
+        .thenReturn(new AuthorizingCacheImpl((InternalCache) cache, new NoSecurityAuthorizer()));
     final OQLQueryRequest request = OQLQueryRequest.newBuilder().setQuery(query)
         .addAllBindParameter(Arrays.asList(bindParameters)).build();
     Result<OQLQueryResponse> result =
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/OqlQueryRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/OqlQueryRequestOperationHandlerJUnitTest.java
index d0b4aaf..7dcd0bd 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/OqlQueryRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/OqlQueryRequestOperationHandlerJUnitTest.java
@@ -33,6 +33,7 @@ import org.apache.geode.cache.query.Query;
 import org.apache.geode.cache.query.QueryInvocationTargetException;
 import org.apache.geode.cache.query.SelectResults;
 import org.apache.geode.cache.query.TypeMismatchException;
+import org.apache.geode.cache.query.internal.DefaultQuery;
 import org.apache.geode.cache.query.internal.InternalQueryService;
 import org.apache.geode.cache.query.internal.LinkedStructSet;
 import org.apache.geode.cache.query.internal.ResultsBag;
@@ -70,7 +71,7 @@ public class OqlQueryRequestOperationHandlerJUnitTest extends OperationHandlerJU
   public void queryForSingleObject() throws ConnectionStateException, DecodingException,
       InvalidExecutionContextException, EncodingException, NameResolutionException,
       TypeMismatchException, QueryInvocationTargetException, FunctionDomainException {
-    Query query = mock(Query.class);
+    Query query = mock(DefaultQuery.class);
     when(queryService.newQuery(eq(SELECT_STAR_QUERY))).thenReturn(query);
     when(query.execute((Object[]) any())).thenReturn(STRING_RESULT_1);
     final OQLQueryRequest request =
@@ -86,7 +87,7 @@ public class OqlQueryRequestOperationHandlerJUnitTest extends OperationHandlerJU
   public void queryForMultipleObjects() throws ConnectionStateException, DecodingException,
       InvalidExecutionContextException, EncodingException, NameResolutionException,
       TypeMismatchException, QueryInvocationTargetException, FunctionDomainException {
-    Query query = mock(Query.class);
+    Query query = mock(DefaultQuery.class);
     when(queryService.newQuery(eq(SELECT_STAR_QUERY))).thenReturn(query);
     SelectResults results = new ResultsBag();
     results.setElementType(new ObjectTypeImpl());
@@ -108,7 +109,7 @@ public class OqlQueryRequestOperationHandlerJUnitTest extends OperationHandlerJU
   public void queryForMultipleStructs() throws ConnectionStateException, DecodingException,
       InvalidExecutionContextException, EncodingException, NameResolutionException,
       TypeMismatchException, QueryInvocationTargetException, FunctionDomainException {
-    Query query = mock(Query.class);
+    Query query = mock(DefaultQuery.class);
     when(queryService.newQuery(eq(SELECT_STAR_QUERY))).thenReturn(query);
 
 
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutAllRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutAllRequestOperationHandlerJUnitTest.java
index 30e1eac..60c6782 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutAllRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutAllRequestOperationHandlerJUnitTest.java
@@ -27,8 +27,10 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 
 import org.apache.geode.cache.Region;
 import org.apache.geode.internal.protocol.TestExecutionContext;
@@ -58,6 +60,9 @@ public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
   private final String EXCEPTION_TEXT = "Simulating put failure";
   private Region regionMock;
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @Before
   public void setUp() throws Exception {
     regionMock = mock(Region.class);
@@ -91,21 +96,11 @@ public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
     RegionAPI.PutAllRequest putAllRequest =
         ProtobufRequestUtilities.createPutAllRequest(TEST_REGION, entries).getPutAllRequest();
 
+    expectedException.expect(DecodingException.class);
     Result response = operationHandler.process(serializationServiceStub, putAllRequest,
         TestExecutionContext.getNoAuthCacheExecutionContext(cacheStub));
-
-    assertTrue("response was " + response, response instanceof Success);
-
-    RegionAPI.PutAllResponse message = (RegionAPI.PutAllResponse) response.getMessage();
-    assertEquals(1, message.getFailedKeysCount());
-
-    BasicTypes.KeyedError error = message.getFailedKeys(0);
-    assertEquals(BasicTypes.ErrorCode.INVALID_REQUEST, error.getError().getErrorCode());
-    assertTrue(error.getError().getMessage().contains("Encoding not supported"));
   }
 
-
-
   @Test
   public void processInsertsMultipleValidEntriesInCache() throws Exception {
     Result result = operationHandler.process(serializationService, generateTestRequest(false, true),
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutIfAbsentRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutIfAbsentRequestOperationHandlerJUnitTest.java
index 6531f75..ab242d1 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutIfAbsentRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutIfAbsentRequestOperationHandlerJUnitTest.java
@@ -24,10 +24,13 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.internal.protocol.TestExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI;
@@ -47,6 +50,9 @@ public class PutIfAbsentRequestOperationHandlerJUnitTest extends OperationHandle
   private Region regionMock;
   private PutIfAbsentRequestOperationHandler operationHandler;
 
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
   @Before
   public void setUp() throws Exception {
     regionMock = mock(Region.class);
@@ -108,16 +114,6 @@ public class PutIfAbsentRequestOperationHandlerJUnitTest extends OperationHandle
     verify(regionMock).putIfAbsent(null, TEST_VALUE);
   }
 
-  @Test
-  public void failsWithNoAuthCacheExecutionContext() throws Exception {
-    Result<RegionAPI.PutIfAbsentResponse> result1 = operationHandler.process(serializationService,
-        RegionAPI.PutIfAbsentRequest.newBuilder().build(),
-        TestExecutionContext.getNoAuthCacheExecutionContext(cacheStub));
-
-    assertEquals(BasicTypes.ErrorCode.SERVER_ERROR,
-        result1.getErrorMessage().getError().getErrorCode());
-  }
-
   @Test(expected = DecodingException.class)
   public void unsetEntrythrowsDecodingException() throws Exception {
     Result<RegionAPI.PutIfAbsentResponse> result1 =
@@ -130,23 +126,19 @@ public class PutIfAbsentRequestOperationHandlerJUnitTest extends OperationHandle
 
   @Test
   public void unsetRegionGetsServerError() throws Exception {
+    expectedException.expect(RegionDestroyedException.class);
     Result<RegionAPI.PutIfAbsentResponse> result1 =
         operationHandler.process(serializationService, generateTestRequest(false, true),
             TestExecutionContext.getNoAuthCacheExecutionContext(cacheStub));
-
-    assertEquals(BasicTypes.ErrorCode.SERVER_ERROR,
-        result1.getErrorMessage().getError().getErrorCode());
   }
 
   @Test
   public void nonexistingRegionReturnsServerError() throws Exception {
     when(cacheStub.getRegion(TEST_REGION)).thenReturn(null);
 
+    expectedException.expect(RegionDestroyedException.class);
     Result<RegionAPI.PutIfAbsentResponse> result1 = operationHandler.process(serializationService,
         generateTestRequest(), TestExecutionContext.getNoAuthCacheExecutionContext(cacheStub));
-
-    assertEquals(BasicTypes.ErrorCode.SERVER_ERROR,
-        result1.getErrorMessage().getError().getErrorCode());
   }
 
   /**
@@ -162,17 +154,6 @@ public class PutIfAbsentRequestOperationHandlerJUnitTest extends OperationHandle
         result1.getErrorMessage().getError().getErrorCode());
   }
 
-  @Test
-  public void invalidRegionReturnsInvalidRequestError() throws Exception {
-    // doesn't test which regions are invalid; those are documented under Cache.getRegion.
-    when(cacheStub.getRegion(any())).thenThrow(new IllegalArgumentException());
-
-    Result<RegionAPI.PutIfAbsentResponse> result1 = operationHandler.process(serializationService,
-        generateTestRequest(), TestExecutionContext.getNoAuthCacheExecutionContext(cacheStub));
-    assertEquals(BasicTypes.ErrorCode.INVALID_REQUEST,
-        result1.getErrorMessage().getError().getErrorCode());
-  }
-
   private RegionAPI.PutIfAbsentRequest generateTestRequest(boolean includeRegion,
       boolean includeEntry) throws EncodingException {
     RegionAPI.PutIfAbsentRequest.Builder builder = RegionAPI.PutIfAbsentRequest.newBuilder();
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutRequestOperationHandlerJUnitTest.java
index 3da6017..14565e4 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutRequestOperationHandlerJUnitTest.java
@@ -14,7 +14,6 @@
  */
 package org.apache.geode.internal.protocol.protobuf.v1.operations;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyString;
@@ -24,14 +23,15 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.internal.protocol.TestExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
-import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
-import org.apache.geode.internal.protocol.protobuf.v1.Failure;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufRequestUtilities;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
 import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI;
@@ -49,6 +49,9 @@ public class PutRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
   private final String TEST_REGION = "test region";
   private Region regionMock;
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @Before
   public void setUp() throws Exception {
     regionMock = mock(Region.class);
@@ -94,13 +97,10 @@ public class PutRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
   public void test_RegionNotFound() throws Exception {
     when(cacheStub.getRegion(TEST_REGION)).thenReturn(null);
     PutRequestOperationHandler operationHandler = new PutRequestOperationHandler();
+    expectedException.expect(RegionDestroyedException.class);
     Result result = operationHandler.process(serializationService, generateTestRequest(),
         TestExecutionContext.getNoAuthCacheExecutionContext(cacheStub));
 
-    assertTrue(result instanceof Failure);
-    ClientProtocol.ErrorResponse errorMessage =
-        (ClientProtocol.ErrorResponse) result.getErrorMessage();
-    assertEquals(BasicTypes.ErrorCode.SERVER_ERROR, errorMessage.getError().getErrorCode());
   }
 
   private RegionAPI.PutRequest generateTestRequest() throws EncodingException {
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/RemoveRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/RemoveRequestOperationHandlerJUnitTest.java
index db3cb09..296576f 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/RemoveRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/RemoveRequestOperationHandlerJUnitTest.java
@@ -22,10 +22,13 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.internal.protocol.TestExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
@@ -48,6 +51,9 @@ public class RemoveRequestOperationHandlerJUnitTest extends OperationHandlerJUni
   private final String MISSING_KEY = "missing key";
   private Region regionStub;
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @Before
   public void setUp() throws Exception {
     regionStub = mock(Region.class);
@@ -73,13 +79,9 @@ public class RemoveRequestOperationHandlerJUnitTest extends OperationHandlerJUni
   @Test
   public void processReturnsUnsucessfulResponseForInvalidRegion() throws Exception {
     RegionAPI.RemoveRequest removeRequest = generateTestRequest(true, false).getRemoveRequest();
+    expectedException.expect(RegionDestroyedException.class);
     Result result = operationHandler.process(serializationService, removeRequest,
         TestExecutionContext.getNoAuthCacheExecutionContext(cacheStub));
-
-    assertTrue(result instanceof Failure);
-    ClientProtocol.ErrorResponse errorMessage =
-        (ClientProtocol.ErrorResponse) result.getErrorMessage();
-    assertEquals(BasicTypes.ErrorCode.SERVER_ERROR, errorMessage.getError().getErrorCode());
   }
 
   @Test

-- 
To stop receiving notification emails like this one, please contact
upthewaterspout@apache.org.