You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/08/01 21:03:22 UTC

[28/50] [abbrv] geode git commit: GEODE-3192, GEODE-3229: Change API and implementation of protobuf PutAll. This closes #643

GEODE-3192,GEODE-3229: Change API and implementation of protobuf PutAll. This closes #643

* We will now dispatch incoming protobuf PutAlls as a series of put operations
* The PutAllResponse will contain a set of failed keys and the error they failed with

Signed-off-by: Galen O'Sullivan <go...@pivotal.io>
Signed-off-by: Brian Rowe <br...@pivotal.io>


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/0debd20a
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/0debd20a
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/0debd20a

Branch: refs/heads/feature/GEODE-3299
Commit: 0debd20acc290281cd55bf373cd1fec7d1e3e4b0
Parents: c62764b
Author: Brian Rowe <br...@pivotal.io>
Authored: Tue Jul 18 14:52:50 2017 -0700
Committer: Udo Kohlmeyer <uk...@pivotal.io>
Committed: Thu Jul 27 08:34:19 2017 -0700

----------------------------------------------------------------------
 .../apache/geode/protocol/protobuf/Failure.java |  10 +-
 .../protocol/protobuf/OperationContext.java     |  10 +-
 .../apache/geode/protocol/protobuf/Result.java  |   4 +-
 .../apache/geode/protocol/protobuf/Success.java |   4 +-
 .../GetAllRequestOperationHandler.java          |  17 +--
 .../GetRegionRequestOperationHandler.java       |   3 +-
 .../operations/GetRequestOperationHandler.java  |   9 +-
 .../PutAllRequestOperationHandler.java          |  63 ++++----
 .../operations/PutRequestOperationHandler.java  |  11 +-
 .../RemoveRequestOperationHandler.java          |   9 +-
 .../utilities/ProtobufResponseUtilities.java    |  17 +--
 .../protobuf/utilities/ProtobufUtilities.java   |   4 +-
 .../geode/serialization/codec/BooleanCodec.java |   4 +-
 geode-protobuf/src/main/proto/basicTypes.proto  |  12 +-
 .../src/main/proto/clientProtocol.proto         |   5 -
 geode-protobuf/src/main/proto/region_API.proto  |   2 +-
 .../RoundTripCacheConnectionJUnitTest.java      | 148 ++++++++++---------
 .../PutAllRequestOperationHandlerJUnitTest.java |  77 ++++------
 .../PutRequestOperationHandlerJUnitTest.java    |  27 ++--
 19 files changed, 211 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/0debd20a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/Failure.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/Failure.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/Failure.java
index fb75c26..fcbbb50 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/Failure.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/Failure.java
@@ -17,19 +17,19 @@ package org.apache.geode.protocol.protobuf;
 import java.util.function.Function;
 
 public class Failure<SuccessType> implements Result<SuccessType> {
-  private final ClientProtocol.ErrorResponse errorResponse;
+  private final BasicTypes.ErrorResponse errorResponse;
 
-  public Failure(ClientProtocol.ErrorResponse errorResponse) {
+  public Failure(BasicTypes.ErrorResponse errorResponse) {
     this.errorResponse = errorResponse;
   }
 
-  public static <T> Failure<T> of(ClientProtocol.ErrorResponse errorResponse) {
+  public static <T> Failure<T> of(BasicTypes.ErrorResponse errorResponse) {
     return new Failure<>(errorResponse);
   }
 
   @Override
   public <T> T map(Function<SuccessType, T> successFunction,
-      Function<ClientProtocol.ErrorResponse, T> errorFunction) {
+      Function<BasicTypes.ErrorResponse, T> errorFunction) {
     return errorFunction.apply(errorResponse);
   }
 
@@ -39,7 +39,7 @@ public class Failure<SuccessType> implements Result<SuccessType> {
   }
 
   @Override
-  public ClientProtocol.ErrorResponse getErrorMessage() {
+  public BasicTypes.ErrorResponse getErrorMessage() {
     return errorResponse;
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/0debd20a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/OperationContext.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/OperationContext.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/OperationContext.java
index 9b234f2..72e4d75 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/OperationContext.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/OperationContext.java
@@ -15,15 +15,15 @@
 
 package org.apache.geode.protocol.protobuf;
 
-import java.util.function.Function;
-
 import org.apache.geode.protocol.operations.OperationHandler;
 
+import java.util.function.Function;
+
 public class OperationContext<OperationRequest, OperationResponse> {
   private final OperationHandler<OperationRequest, OperationResponse> operationHandler;
   private final Function<ClientProtocol.Request, OperationRequest> fromRequest;
   private final Function<OperationResponse, ClientProtocol.Response.Builder> toResponse;
-  private final Function<ClientProtocol.ErrorResponse, ClientProtocol.Response.Builder> toErrorResponse;
+  private final Function<BasicTypes.ErrorResponse, ClientProtocol.Response.Builder> toErrorResponse;
 
   public OperationContext(Function<ClientProtocol.Request, OperationRequest> fromRequest,
       OperationHandler<OperationRequest, OperationResponse> operationHandler,
@@ -35,7 +35,7 @@ public class OperationContext<OperationRequest, OperationResponse> {
   }
 
   public static ClientProtocol.Response.Builder makeErrorBuilder(
-      ClientProtocol.ErrorResponse errorResponse) {
+      BasicTypes.ErrorResponse errorResponse) {
     return ClientProtocol.Response.newBuilder().setErrorResponse(errorResponse);
   }
 
@@ -51,7 +51,7 @@ public class OperationContext<OperationRequest, OperationResponse> {
     return toResponse;
   }
 
-  public Function<ClientProtocol.ErrorResponse, ClientProtocol.Response.Builder> getToErrorResponse() {
+  public Function<BasicTypes.ErrorResponse, ClientProtocol.Response.Builder> getToErrorResponse() {
     return toErrorResponse;
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/0debd20a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/Result.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/Result.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/Result.java
index f22d845..14168bc 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/Result.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/Result.java
@@ -18,9 +18,9 @@ import java.util.function.Function;
 
 public interface Result<SuccessType> {
   <T> T map(Function<SuccessType, T> successFunction,
-      Function<ClientProtocol.ErrorResponse, T> errorFunction);
+      Function<BasicTypes.ErrorResponse, T> errorFunction);
 
   SuccessType getMessage();
 
-  ClientProtocol.ErrorResponse getErrorMessage();
+  BasicTypes.ErrorResponse getErrorMessage();
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/0debd20a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/Success.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/Success.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/Success.java
index 4bb07b8..2c409dd 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/Success.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/Success.java
@@ -29,7 +29,7 @@ public class Success<SuccessType> implements Result<SuccessType> {
 
   @Override
   public <T> T map(Function<SuccessType, T> successFunction,
-      Function<ClientProtocol.ErrorResponse, T> errorFunction) {
+      Function<BasicTypes.ErrorResponse, T> errorFunction) {
     return successFunction.apply(successResponse);
   }
 
@@ -39,7 +39,7 @@ public class Success<SuccessType> implements Result<SuccessType> {
   }
 
   @Override
-  public ClientProtocol.ErrorResponse getErrorMessage() {
+  public BasicTypes.ErrorResponse getErrorMessage() {
     throw new RuntimeException("This is a not Failure result");
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/0debd20a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java
index e3401c8..a6ffd9d 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java
@@ -14,15 +14,10 @@
  */
 package org.apache.geode.protocol.protobuf.operations;
 
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.Region;
 import org.apache.geode.protocol.operations.OperationHandler;
 import org.apache.geode.protocol.protobuf.BasicTypes;
-import org.apache.geode.protocol.protobuf.ClientProtocol;
 import org.apache.geode.protocol.protobuf.Failure;
 import org.apache.geode.protocol.protobuf.RegionAPI;
 import org.apache.geode.protocol.protobuf.Result;
@@ -32,6 +27,10 @@ import org.apache.geode.serialization.SerializationService;
 import org.apache.geode.serialization.exception.UnsupportedEncodingTypeException;
 import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTypeException;
 
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 public class GetAllRequestOperationHandler
     implements OperationHandler<RegionAPI.GetAllRequest, RegionAPI.GetAllResponse> {
 
@@ -42,7 +41,7 @@ public class GetAllRequestOperationHandler
     Region region = cache.getRegion(regionName);
     if (region == null) {
       return Failure
-          .of(ClientProtocol.ErrorResponse.newBuilder().setMessage("Region not found").build());
+          .of(BasicTypes.ErrorResponse.newBuilder().setMessage("Region not found").build());
     }
 
     try {
@@ -58,10 +57,10 @@ public class GetAllRequestOperationHandler
       }
       return Success.of(RegionAPI.GetAllResponse.newBuilder().addAllEntries(entries).build());
     } catch (UnsupportedEncodingTypeException ex) {
-      return Failure.of(
-          ClientProtocol.ErrorResponse.newBuilder().setMessage("Encoding not supported.").build());
+      return Failure
+          .of(BasicTypes.ErrorResponse.newBuilder().setMessage("Encoding not supported.").build());
     } catch (CodecNotRegisteredForTypeException ex) {
-      return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
+      return Failure.of(BasicTypes.ErrorResponse.newBuilder()
           .setMessage("Codec error in protobuf deserialization.").build());
     }
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/0debd20a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandler.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandler.java
index f2e1b37..e8dfdda 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandler.java
@@ -18,7 +18,6 @@ import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.Region;
 import org.apache.geode.protocol.operations.OperationHandler;
 import org.apache.geode.protocol.protobuf.BasicTypes;
-import org.apache.geode.protocol.protobuf.ClientProtocol;
 import org.apache.geode.protocol.protobuf.Failure;
 import org.apache.geode.protocol.protobuf.RegionAPI;
 import org.apache.geode.protocol.protobuf.Result;
@@ -37,7 +36,7 @@ public class GetRegionRequestOperationHandler
 
     Region region = cache.getRegion(regionName);
     if (region == null) {
-      return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
+      return Failure.of(BasicTypes.ErrorResponse.newBuilder()
           .setMessage("No region exists for name: " + regionName).build());
     }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/0debd20a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java
index 79c59f8..21aa15f 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java
@@ -18,7 +18,6 @@ import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.Region;
 import org.apache.geode.protocol.operations.OperationHandler;
 import org.apache.geode.protocol.protobuf.BasicTypes;
-import org.apache.geode.protocol.protobuf.ClientProtocol;
 import org.apache.geode.protocol.protobuf.Failure;
 import org.apache.geode.protocol.protobuf.RegionAPI;
 import org.apache.geode.protocol.protobuf.Result;
@@ -38,7 +37,7 @@ public class GetRequestOperationHandler
     Region region = cache.getRegion(regionName);
     if (region == null) {
       return Failure
-          .of(ClientProtocol.ErrorResponse.newBuilder().setMessage("Region not found").build());
+          .of(BasicTypes.ErrorResponse.newBuilder().setMessage("Region not found").build());
     }
 
     try {
@@ -53,10 +52,10 @@ public class GetRequestOperationHandler
           ProtobufUtilities.createEncodedValue(serializationService, resultValue);
       return Success.of(RegionAPI.GetResponse.newBuilder().setResult(encodedValue).build());
     } catch (UnsupportedEncodingTypeException ex) {
-      return Failure.of(
-          ClientProtocol.ErrorResponse.newBuilder().setMessage("Encoding not supported.").build());
+      return Failure
+          .of(BasicTypes.ErrorResponse.newBuilder().setMessage("Encoding not supported.").build());
     } catch (CodecNotRegisteredForTypeException ex) {
-      return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
+      return Failure.of(BasicTypes.ErrorResponse.newBuilder()
           .setMessage("Codec error in protobuf deserialization.").build());
     }
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/0debd20a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java
index 55d2f3f..6e26e75 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java
@@ -14,12 +14,6 @@
  */
 package org.apache.geode.protocol.protobuf.operations;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.Region;
 import org.apache.geode.protocol.operations.OperationHandler;
@@ -33,6 +27,11 @@ import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
 import org.apache.geode.serialization.SerializationService;
 import org.apache.geode.serialization.exception.UnsupportedEncodingTypeException;
 import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTypeException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 public class PutAllRequestOperationHandler
     implements OperationHandler<RegionAPI.PutAllRequest, RegionAPI.PutAllResponse> {
@@ -40,44 +39,44 @@ public class PutAllRequestOperationHandler
 
   @Override
   public Result<RegionAPI.PutAllResponse> process(SerializationService serializationService,
-      RegionAPI.PutAllRequest request, Cache cache) {
-    String regionName = request.getRegionName();
-    Region region = cache.getRegion(regionName);
+      RegionAPI.PutAllRequest putAllRequest, Cache cache) {
+    Region region = cache.getRegion(putAllRequest.getRegionName());
 
     if (region == null) {
       return Failure.of(ProtobufResponseUtilities.createAndLogErrorResponse(
-          "Region passed by client did not exist: " + regionName, logger, null));
+          "Region passed by client did not exist: " + putAllRequest.getRegionName(), logger, null));
     }
 
-    Map entries = extractPutAllEntries(serializationService, request);
-    try {
-      region.putAll(entries);
-    } catch (Exception ex) {
-      return Failure
-          .of(ProtobufResponseUtilities.createAndLogErrorResponse(ex.getMessage(), logger, ex));
-    }
-
-    return Success.of(RegionAPI.PutAllResponse.newBuilder().build());
+    RegionAPI.PutAllResponse.Builder builder = RegionAPI.PutAllResponse.newBuilder()
+        .addAllFailedKeys(putAllRequest.getEntryList().stream()
+            .map((entry) -> singlePut(serializationService, region, entry)).filter(Objects::nonNull)
+            .collect(Collectors.toList()));
+    return Success.of(builder.build());
   }
 
-  // Read all of the entries out of the protobuf and return an error (without performing any puts)
-  // if any of the entries can't be decoded
-  private Map extractPutAllEntries(SerializationService serializationService,
-      RegionAPI.PutAllRequest putAllRequest) {
-    Map entries = new HashMap();
+  private BasicTypes.KeyedErrorResponse singlePut(SerializationService serializationService,
+      Region region, BasicTypes.Entry entry) {
     try {
-      for (BasicTypes.Entry entry : putAllRequest.getEntryList()) {
-        Object decodedValue = ProtobufUtilities.decodeValue(serializationService, entry.getValue());
-        Object decodedKey = ProtobufUtilities.decodeValue(serializationService, entry.getKey());
+      Object decodedValue = ProtobufUtilities.decodeValue(serializationService, entry.getValue());
+      Object decodedKey = ProtobufUtilities.decodeValue(serializationService, entry.getKey());
 
-        entries.put(decodedKey, decodedValue);
-      }
+      region.put(decodedKey, decodedValue);
     } catch (UnsupportedEncodingTypeException ex) {
-      throw new RuntimeException("This exception still needs to be handled in an ErrorMessage");
+      return buildAndLogKeyedError(entry, "Encoding not supported", ex);
     } catch (CodecNotRegisteredForTypeException ex) {
-      throw new RuntimeException("This exception still needs to be handled in an ErrorMessage");
+      return buildAndLogKeyedError(entry, "Codec error in protobuf deserialization", ex);
+    } catch (ClassCastException ex) {
+      return buildAndLogKeyedError(entry, "Invalid key or value type for region", ex);
     }
+    return null;
+  }
 
-    return entries;
+  private BasicTypes.KeyedErrorResponse buildAndLogKeyedError(BasicTypes.Entry entry,
+      String message, Exception ex) {
+    logger.error(message, ex);
+    BasicTypes.ErrorResponse errorResponse =
+        BasicTypes.ErrorResponse.newBuilder().setMessage(message).build();
+    return BasicTypes.KeyedErrorResponse.newBuilder().setKey(entry.getKey()).setError(errorResponse)
+        .build();
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/0debd20a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java
index be2308e..ccfd0c1 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java
@@ -18,7 +18,6 @@ import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.Region;
 import org.apache.geode.protocol.operations.OperationHandler;
 import org.apache.geode.protocol.protobuf.BasicTypes;
-import org.apache.geode.protocol.protobuf.ClientProtocol;
 import org.apache.geode.protocol.protobuf.Failure;
 import org.apache.geode.protocol.protobuf.RegionAPI;
 import org.apache.geode.protocol.protobuf.Result;
@@ -37,7 +36,7 @@ public class PutRequestOperationHandler
     String regionName = request.getRegionName();
     Region region = cache.getRegion(regionName);
     if (region == null) {
-      return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
+      return Failure.of(BasicTypes.ErrorResponse.newBuilder()
           .setMessage("Region passed by client did not exist: " + regionName).build());
     }
 
@@ -50,18 +49,16 @@ public class PutRequestOperationHandler
         region.put(decodedKey, decodedValue);
         return Success.of(RegionAPI.PutResponse.newBuilder().build());
       } catch (ClassCastException ex) {
-        return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
+        return Failure.of(BasicTypes.ErrorResponse.newBuilder()
             .setMessage("invalid key or value type for region " + regionName + ",passed key: "
                 + entry.getKey().getEncodingType() + " value: "
                 + entry.getValue().getEncodingType())
             .build());
       }
     } catch (UnsupportedEncodingTypeException ex) {
-      return Failure
-          .of(ClientProtocol.ErrorResponse.newBuilder().setMessage(ex.getMessage()).build());
+      return Failure.of(BasicTypes.ErrorResponse.newBuilder().setMessage(ex.getMessage()).build());
     } catch (CodecNotRegisteredForTypeException ex) {
-      return Failure
-          .of(ClientProtocol.ErrorResponse.newBuilder().setMessage(ex.getMessage()).build());
+      return Failure.of(BasicTypes.ErrorResponse.newBuilder().setMessage(ex.getMessage()).build());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/0debd20a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java
index 1058ca3..0bf162e 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java
@@ -14,13 +14,10 @@
  */
 package org.apache.geode.protocol.protobuf.operations;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.Region;
 import org.apache.geode.protocol.operations.OperationHandler;
-import org.apache.geode.protocol.protobuf.ClientProtocol;
+import org.apache.geode.protocol.protobuf.BasicTypes;
 import org.apache.geode.protocol.protobuf.Failure;
 import org.apache.geode.protocol.protobuf.RegionAPI;
 import org.apache.geode.protocol.protobuf.Result;
@@ -30,6 +27,8 @@ import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
 import org.apache.geode.serialization.SerializationService;
 import org.apache.geode.serialization.exception.UnsupportedEncodingTypeException;
 import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTypeException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 public class RemoveRequestOperationHandler
     implements OperationHandler<RegionAPI.RemoveRequest, RegionAPI.RemoveResponse> {
@@ -43,7 +42,7 @@ public class RemoveRequestOperationHandler
     Region region = cache.getRegion(regionName);
     if (region == null) {
       return Failure
-          .of(ClientProtocol.ErrorResponse.newBuilder().setMessage("Region not found").build());
+          .of(BasicTypes.ErrorResponse.newBuilder().setMessage("Region not found").build());
     }
 
     try {

http://git-wip-us.apache.org/repos/asf/geode/blob/0debd20a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufResponseUtilities.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufResponseUtilities.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufResponseUtilities.java
index 6da4730..06ae401 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufResponseUtilities.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufResponseUtilities.java
@@ -14,13 +14,12 @@
  */
 package org.apache.geode.protocol.protobuf.utilities;
 
-import java.util.Set;
-
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.cache.Region;
-import org.apache.geode.protocol.protobuf.ClientProtocol;
+import org.apache.geode.protocol.protobuf.BasicTypes;
 import org.apache.geode.protocol.protobuf.RegionAPI;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Set;
 
 /**
  * This class contains helper functions for generating ClientProtocol.Response objects.
@@ -31,22 +30,22 @@ import org.apache.geode.protocol.protobuf.RegionAPI;
 public abstract class ProtobufResponseUtilities {
 
   /**
-   * This creates response object containing a ClientProtocol.ErrorResponse, and also logs the
-   * passed error message and exception (if present) to the provided logger.
+   * This creates response object containing a BasicTypes.ErrorResponse, and also logs the passed
+   * error message and exception (if present) to the provided logger.
    *
    * @param errorMessage - description of the error
    * @param logger - logger to write the error message to
    * @param ex - exception which should be logged
    * @return An error response containing the first three parameters.
    */
-  public static ClientProtocol.ErrorResponse createAndLogErrorResponse(String errorMessage,
+  public static BasicTypes.ErrorResponse createAndLogErrorResponse(String errorMessage,
       Logger logger, Exception ex) {
     if (ex != null) {
       logger.error(errorMessage, ex);
     } else {
       logger.error(errorMessage);
     }
-    return ClientProtocol.ErrorResponse.newBuilder().setMessage(errorMessage).build();
+    return BasicTypes.ErrorResponse.newBuilder().setMessage(errorMessage).build();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/geode/blob/0debd20a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufUtilities.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufUtilities.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufUtilities.java
index 242f5e3..c7bf6aa 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufUtilities.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufUtilities.java
@@ -15,7 +15,6 @@
 package org.apache.geode.protocol.protobuf.utilities;
 
 import com.google.protobuf.ByteString;
-
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.protocol.protobuf.BasicTypes;
@@ -31,7 +30,7 @@ import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTy
  * This class contains helper functions for assistance in creating protobuf objects. This class is
  * mainly focused on helper functions which can be used in building BasicTypes for use in other
  * messages or those used to create the top level Message objects.
- *
+ * <p>
  * Helper functions specific to creating ClientProtocol.Responses can be found at
  * {@link ProtobufResponseUtilities} Helper functions specific to creating ClientProtocol.Requests
  * can be found at {@link ProtobufRequestUtilities}
@@ -170,7 +169,6 @@ public abstract class ProtobufUtilities {
   }
 
   /**
-   *
    * @param region
    * @return a Protobuf BasicTypes.Region message that represents the {@link Region}
    */

http://git-wip-us.apache.org/repos/asf/geode/blob/0debd20a/geode-protobuf/src/main/java/org/apache/geode/serialization/codec/BooleanCodec.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/serialization/codec/BooleanCodec.java b/geode-protobuf/src/main/java/org/apache/geode/serialization/codec/BooleanCodec.java
index d9f2d07..e3e234d 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/serialization/codec/BooleanCodec.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/serialization/codec/BooleanCodec.java
@@ -14,11 +14,11 @@
  */
 package org.apache.geode.serialization.codec;
 
-import java.nio.ByteBuffer;
-
 import org.apache.geode.serialization.SerializationType;
 import org.apache.geode.serialization.TypeCodec;
 
+import java.nio.ByteBuffer;
+
 public class BooleanCodec implements TypeCodec<Boolean> {
   @Override
   public Boolean decode(byte[] incoming) {

http://git-wip-us.apache.org/repos/asf/geode/blob/0debd20a/geode-protobuf/src/main/proto/basicTypes.proto
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/proto/basicTypes.proto b/geode-protobuf/src/main/proto/basicTypes.proto
index d45d61b..ad254cd 100644
--- a/geode-protobuf/src/main/proto/basicTypes.proto
+++ b/geode-protobuf/src/main/proto/basicTypes.proto
@@ -62,4 +62,14 @@ message Region {
 
 message Server {
     string url = 1;
-}
\ No newline at end of file
+}
+
+message ErrorResponse {
+    int32 errorCode = 1;
+    string message = 2;
+}
+
+message KeyedErrorResponse {
+    EncodedValue key = 1;
+    ErrorResponse error = 2;
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/0debd20a/geode-protobuf/src/main/proto/clientProtocol.proto
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/proto/clientProtocol.proto b/geode-protobuf/src/main/proto/clientProtocol.proto
index 2c3bed0..6b037ca 100644
--- a/geode-protobuf/src/main/proto/clientProtocol.proto
+++ b/geode-protobuf/src/main/proto/clientProtocol.proto
@@ -78,11 +78,6 @@ message Response {
     }
 }
 
-message ErrorResponse {
-    int32 errorCode = 1;
-    string message = 2;
-}
-
 message MetaData {
     int32 numberOfMetadata = 1;
     map<int32, google.protobuf.Any> metaDataEntries = 2;

http://git-wip-us.apache.org/repos/asf/geode/blob/0debd20a/geode-protobuf/src/main/proto/region_API.proto
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/proto/region_API.proto b/geode-protobuf/src/main/proto/region_API.proto
index 5d411ba..bf2c15e 100644
--- a/geode-protobuf/src/main/proto/region_API.proto
+++ b/geode-protobuf/src/main/proto/region_API.proto
@@ -42,7 +42,7 @@ message PutAllRequest {
 }
 
 message PutAllResponse {
-    // message presence indicates success.
+    repeated KeyedErrorResponse failedKeys = 1;
 }
 
 message GetAllRequest {

http://git-wip-us.apache.org/repos/asf/geode/blob/0debd20a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java
index 78f7ee0..5005314 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java
@@ -15,34 +15,6 @@
 
 package org.apache.geode.protocol;
 
-import static org.apache.geode.distributed.ConfigurationProperties.SSL_ENABLED_COMPONENTS;
-import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE;
-import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_PASSWORD;
-import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_TYPE;
-import static org.apache.geode.distributed.ConfigurationProperties.SSL_REQUIRE_AUTHENTICATION;
-import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE;
-import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.awaitility.Awaitility;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.contrib.java.lang.system.RestoreSystemProperties;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.DataPolicy;
@@ -69,6 +41,36 @@ import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredF
 import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTypeException;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 import org.apache.geode.util.test.TestUtil;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_ENABLED_COMPONENTS;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_PASSWORD;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_TYPE;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_REQUIRE_AUTHENTICATION;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE;
+import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test that switching on the header byte makes instances of
@@ -185,8 +187,7 @@ public class RoundTripCacheConnectionJUnitTest {
         ProtobufUtilities.createMessageHeader(TEST_PUT_CORRELATION_ID),
         ProtobufRequestUtilities.createPutAllRequest(TEST_REGION, putEntries));
     protobufProtocolSerializer.serialize(putAllMessage, outputStream);
-    validatePutAllResponse(socket, protobufProtocolSerializer,
-        ClientProtocol.Response.ResponseAPICase.PUTALLRESPONSE);
+    validatePutAllResponse(socket, protobufProtocolSerializer, new HashSet<>());
 
     Set<BasicTypes.EncodedValue> getEntries = new HashSet<>();
     getEntries.add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY1));
@@ -218,8 +219,9 @@ public class RoundTripCacheConnectionJUnitTest {
 
     ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
     Set<BasicTypes.Entry> putEntries = new HashSet<>();
-    putEntries.add(
-        ProtobufUtilities.createEntry(serializationService, new Float(2.2), TEST_MULTIOP_VALUE1));
+    Float validKey = new Float(2.2);
+    putEntries
+        .add(ProtobufUtilities.createEntry(serializationService, validKey, TEST_MULTIOP_VALUE1));
     putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY2,
         TEST_MULTIOP_VALUE2));
     putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY3,
@@ -227,12 +229,17 @@ public class RoundTripCacheConnectionJUnitTest {
     ClientProtocol.Message putAllMessage = ProtobufUtilities.createProtobufMessage(
         ProtobufUtilities.createMessageHeader(TEST_PUT_CORRELATION_ID),
         ProtobufRequestUtilities.createPutAllRequest(regionName, putEntries));
+
     protobufProtocolSerializer.serialize(putAllMessage, outputStream);
-    validatePutAllResponse(socket, protobufProtocolSerializer,
-        ClientProtocol.Response.ResponseAPICase.ERRORRESPONSE);
+    HashSet<BasicTypes.EncodedValue> expectedFailedKeys = new HashSet<BasicTypes.EncodedValue>();
+    expectedFailedKeys
+        .add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY2));
+    expectedFailedKeys
+        .add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY3));
+    validatePutAllResponse(socket, protobufProtocolSerializer, expectedFailedKeys);
 
     ClientProtocol.Message getMessage = MessageUtil.makeGetRequestMessage(serializationService,
-        new Float(2.2), regionName, ProtobufUtilities.createMessageHeader(TEST_GET_CORRELATION_ID));
+        validKey, regionName, ProtobufUtilities.createMessageHeader(TEST_GET_CORRELATION_ID));
     protobufProtocolSerializer.serialize(getMessage, outputStream);
     validateGetResponse(socket, protobufProtocolSerializer, TEST_MULTIOP_VALUE1);
 
@@ -252,11 +259,8 @@ public class RoundTripCacheConnectionJUnitTest {
         TEST_KEY, TEST_REGION, ProtobufUtilities.createMessageHeader(TEST_GET_CORRELATION_ID));
     protobufProtocolSerializer.serialize(getMessage, outputStream);
 
-    ClientProtocol.Message message =
-        protobufProtocolSerializer.deserialize(socket.getInputStream());
-    assertEquals(TEST_GET_CORRELATION_ID, message.getMessageHeader().getCorrelationId());
-    assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, message.getMessageTypeCase());
-    ClientProtocol.Response response = message.getResponse();
+    ClientProtocol.Response response =
+        deserializeResponse(socket, protobufProtocolSerializer, TEST_GET_CORRELATION_ID);
     assertEquals(ClientProtocol.Response.ResponseAPICase.GETRESPONSE,
         response.getResponseAPICase());
     RegionAPI.GetResponse getResponse = response.getGetResponse();
@@ -321,11 +325,8 @@ public class RoundTripCacheConnectionJUnitTest {
 
   private void validatePutResponse(Socket socket,
       ProtobufProtocolSerializer protobufProtocolSerializer) throws Exception {
-    ClientProtocol.Message message =
-        protobufProtocolSerializer.deserialize(socket.getInputStream());
-    assertEquals(TEST_PUT_CORRELATION_ID, message.getMessageHeader().getCorrelationId());
-    assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, message.getMessageTypeCase());
-    ClientProtocol.Response response = message.getResponse();
+    ClientProtocol.Response response =
+        deserializeResponse(socket, protobufProtocolSerializer, TEST_PUT_CORRELATION_ID);
     assertEquals(ClientProtocol.Response.ResponseAPICase.PUTRESPONSE,
         response.getResponseAPICase());
   }
@@ -334,11 +335,9 @@ public class RoundTripCacheConnectionJUnitTest {
       ProtobufProtocolSerializer protobufProtocolSerializer, Object expectedValue)
       throws InvalidProtocolMessageException, IOException, UnsupportedEncodingTypeException,
       CodecNotRegisteredForTypeException, CodecAlreadyRegisteredForTypeException {
-    ClientProtocol.Message message =
-        protobufProtocolSerializer.deserialize(socket.getInputStream());
-    assertEquals(TEST_GET_CORRELATION_ID, message.getMessageHeader().getCorrelationId());
-    assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, message.getMessageTypeCase());
-    ClientProtocol.Response response = message.getResponse();
+    ClientProtocol.Response response =
+        deserializeResponse(socket, protobufProtocolSerializer, TEST_GET_CORRELATION_ID);
+
     assertEquals(ClientProtocol.Response.ResponseAPICase.GETRESPONSE,
         response.getResponseAPICase());
     RegionAPI.GetResponse getResponse = response.getGetResponse();
@@ -348,14 +347,22 @@ public class RoundTripCacheConnectionJUnitTest {
         result.getValue().toByteArray()));
   }
 
-  private void validateGetRegionNamesResponse(Socket socket, int correlationId,
-      ProtobufProtocolSerializer protobufProtocolSerializer)
+  private ClientProtocol.Response deserializeResponse(Socket socket,
+      ProtobufProtocolSerializer protobufProtocolSerializer, int expectedCorrelationId)
       throws InvalidProtocolMessageException, IOException {
     ClientProtocol.Message message =
         protobufProtocolSerializer.deserialize(socket.getInputStream());
-    assertEquals(correlationId, message.getMessageHeader().getCorrelationId());
+    assertEquals(expectedCorrelationId, message.getMessageHeader().getCorrelationId());
     assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, message.getMessageTypeCase());
-    ClientProtocol.Response response = message.getResponse();
+    return message.getResponse();
+  }
+
+  private void validateGetRegionNamesResponse(Socket socket, int correlationId,
+      ProtobufProtocolSerializer protobufProtocolSerializer)
+      throws InvalidProtocolMessageException, IOException {
+    ClientProtocol.Response response =
+        deserializeResponse(socket, protobufProtocolSerializer, correlationId);
+
     assertEquals(ClientProtocol.Response.ResponseAPICase.GETREGIONNAMESRESPONSE,
         response.getResponseAPICase());
     RegionAPI.GetRegionNamesResponse getRegionsResponse = response.getGetRegionNamesResponse();
@@ -365,24 +372,26 @@ public class RoundTripCacheConnectionJUnitTest {
 
   private void validatePutAllResponse(Socket socket,
       ProtobufProtocolSerializer protobufProtocolSerializer,
-      ClientProtocol.Response.ResponseAPICase responseCase) throws Exception {
-    ClientProtocol.Message message =
-        protobufProtocolSerializer.deserialize(socket.getInputStream());
-    assertEquals(TEST_PUT_CORRELATION_ID, message.getMessageHeader().getCorrelationId());
-    assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, message.getMessageTypeCase());
-    ClientProtocol.Response response = message.getResponse();
-    assertEquals(responseCase, response.getResponseAPICase());
+      Collection<BasicTypes.EncodedValue> expectedFailedKeys) throws Exception {
+    ClientProtocol.Response response =
+        deserializeResponse(socket, protobufProtocolSerializer, TEST_PUT_CORRELATION_ID);
+
+    assertEquals(ClientProtocol.Response.ResponseAPICase.PUTALLRESPONSE,
+        response.getResponseAPICase());
+    assertEquals(expectedFailedKeys.size(), response.getPutAllResponse().getFailedKeysCount());
+
+    Stream<BasicTypes.EncodedValue> failedKeyStream = response.getPutAllResponse()
+        .getFailedKeysList().stream().map(BasicTypes.KeyedErrorResponse::getKey);
+    assertTrue(failedKeyStream.allMatch(expectedFailedKeys::contains));
+
   }
 
   private void validateGetAllResponse(Socket socket,
       ProtobufProtocolSerializer protobufProtocolSerializer)
       throws InvalidProtocolMessageException, IOException, UnsupportedEncodingTypeException,
       CodecNotRegisteredForTypeException, CodecAlreadyRegisteredForTypeException {
-    ClientProtocol.Message message =
-        protobufProtocolSerializer.deserialize(socket.getInputStream());
-    assertEquals(TEST_GET_CORRELATION_ID, message.getMessageHeader().getCorrelationId());
-    assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, message.getMessageTypeCase());
-    ClientProtocol.Response response = message.getResponse();
+    ClientProtocol.Response response =
+        deserializeResponse(socket, protobufProtocolSerializer, TEST_GET_CORRELATION_ID);
     assertEquals(ClientProtocol.Response.ResponseAPICase.GETALLRESPONSE,
         response.getResponseAPICase());
     RegionAPI.GetAllResponse getAllResponse = response.getGetAllResponse();
@@ -409,11 +418,8 @@ public class RoundTripCacheConnectionJUnitTest {
 
   private void validateRemoveResponse(Socket socket,
       ProtobufProtocolSerializer protobufProtocolSerializer) throws Exception {
-    ClientProtocol.Message message =
-        protobufProtocolSerializer.deserialize(socket.getInputStream());
-    assertEquals(TEST_REMOVE_CORRELATION_ID, message.getMessageHeader().getCorrelationId());
-    assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, message.getMessageTypeCase());
-    ClientProtocol.Response response = message.getResponse();
+    ClientProtocol.Response response =
+        deserializeResponse(socket, protobufProtocolSerializer, TEST_REMOVE_CORRELATION_ID);
     assertEquals(ClientProtocol.Response.ResponseAPICase.REMOVERESPONSE,
         response.getResponseAPICase());
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/0debd20a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java
index d0736de..33d21e7 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java
@@ -14,33 +14,8 @@
  */
 package org.apache.geode.protocol.protobuf.operations;
 
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.hamcrest.CoreMatchers;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.ArgumentMatcher;
-
 import org.apache.geode.cache.Region;
 import org.apache.geode.protocol.protobuf.BasicTypes;
-import org.apache.geode.protocol.protobuf.ClientProtocol;
-import org.apache.geode.protocol.protobuf.Failure;
 import org.apache.geode.protocol.protobuf.RegionAPI;
 import org.apache.geode.protocol.protobuf.Result;
 import org.apache.geode.protocol.protobuf.Success;
@@ -50,7 +25,24 @@ import org.apache.geode.serialization.SerializationService;
 import org.apache.geode.serialization.exception.UnsupportedEncodingTypeException;
 import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredForTypeException;
 import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTypeException;
+import org.apache.geode.test.dunit.Assert;
 import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 @Category(UnitTest.class)
 public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTest {
@@ -66,13 +58,6 @@ public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
   private final String EXCEPTION_TEXT = "Simulating put failure";
   private Region regionMock;
 
-  private class MapContainingInvalidKeyMatcher implements ArgumentMatcher<Map> {
-    @Override
-    public boolean matches(Map argument) {
-      return argument.containsKey(TEST_INVALID_KEY);
-    }
-  }
-
   @Before
   public void setUp() throws Exception {
     super.setUp();
@@ -91,9 +76,8 @@ public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
             .thenReturn(TEST_INVALID_VALUE);
 
     regionMock = mock(Region.class);
-
-    doThrow(new ClassCastException(EXCEPTION_TEXT)).when(regionMock)
-        .putAll(argThat(new MapContainingInvalidKeyMatcher()));
+    when(regionMock.put(TEST_INVALID_KEY, TEST_INVALID_VALUE))
+        .thenThrow(new ClassCastException(EXCEPTION_TEXT));
 
     when(cacheStub.getRegion(TEST_REGION)).thenReturn(regionMock);
   }
@@ -115,12 +99,9 @@ public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
 
     Assert.assertTrue(result instanceof Success);
 
-    HashMap<Object, Object> expectedValues = new HashMap<>();
-    expectedValues.put(TEST_KEY1, TEST_VALUE1);
-    expectedValues.put(TEST_KEY2, TEST_VALUE2);
-    expectedValues.put(TEST_KEY3, TEST_VALUE3);
-
-    verify(regionMock).putAll(expectedValues);
+    verify(regionMock).put(TEST_KEY1, TEST_VALUE1);
+    verify(regionMock).put(TEST_KEY2, TEST_VALUE2);
+    verify(regionMock).put(TEST_KEY3, TEST_VALUE3);
   }
 
   @Test
@@ -130,10 +111,16 @@ public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
     Result<RegionAPI.PutAllResponse> result = operationHandler.process(serializationServiceStub,
         generateTestRequest(true, true), cacheStub);
 
-    assertTrue(result instanceof Failure);
-    ClientProtocol.ErrorResponse errorMessage = result.getErrorMessage();
-    Assert.assertThat(errorMessage.getMessage(), CoreMatchers.containsString(EXCEPTION_TEXT));
-    // can't verify anything about put keys because we make no guarantees.
+    assertTrue(result instanceof Success);
+    verify(regionMock).put(TEST_KEY1, TEST_VALUE1);
+    verify(regionMock).put(TEST_KEY2, TEST_VALUE2);
+    verify(regionMock).put(TEST_KEY3, TEST_VALUE3);
+
+    RegionAPI.PutAllResponse putAllResponse = result.getMessage();
+    assertEquals(1, putAllResponse.getFailedKeysCount());
+    BasicTypes.KeyedErrorResponse error = putAllResponse.getFailedKeys(0);
+    assertEquals(TEST_INVALID_KEY, serializationServiceStub.decode(error.getKey().getEncodingType(),
+        error.getKey().getValue().toByteArray()));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/geode/blob/0debd20a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandlerJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandlerJUnitTest.java
index f628d4e..8c1652b 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandlerJUnitTest.java
@@ -14,20 +14,6 @@
  */
 package org.apache.geode.protocol.protobuf.operations;
 
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.nio.charset.Charset;
-
-import org.hamcrest.CoreMatchers;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
 import org.apache.geode.cache.Region;
 import org.apache.geode.protocol.protobuf.BasicTypes;
 import org.apache.geode.protocol.protobuf.Failure;
@@ -41,6 +27,19 @@ import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredF
 import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTypeException;
 import org.apache.geode.test.dunit.Assert;
 import org.apache.geode.test.junit.categories.UnitTest;
+import org.hamcrest.CoreMatchers;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.charset.Charset;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 @Category(UnitTest.class)
 public class PutRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTest {