You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2017/09/14 19:10:24 UTC

[geode] 04/08: Change stream processing to avoid intermediary collections and process in-line

This is an automated email from the ASF dual-hosted git repository.

udo pushed a commit to branch feature/GEODE-3604
in repository https://gitbox.apache.org/repos/asf/geode.git

commit f75db762f187de0044da3b8ca326ec0fa3bbc8f4
Author: kohlmu-pivotal <uk...@pivotal.io>
AuthorDate: Mon Sep 11 10:00:20 2017 -0700

    Change stream processing to avoid intermediary collections and process in-line
---
 .../operations/GetAllRequestOperationHandler.java  | 19 ++++++++---------
 .../GetAvailableServersOperationHandler.java       | 24 +++++++++++-----------
 .../operations/PutAllRequestOperationHandler.java  |  7 +++----
 3 files changed, 24 insertions(+), 26 deletions(-)

diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java
index 446dbcb..41ad65f 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java
@@ -53,18 +53,17 @@ public class GetAllRequestOperationHandler
           .makeErrorResponse(ProtocolErrorCode.REGION_NOT_FOUND.codeValue, "Region not found"));
     }
 
-    Map<Boolean, List<Object>> resultsCollection = request.getKeyList().stream()
-        .map((key) -> processOneMessage(serializationService, region, key))
-        .collect(Collectors.partitioningBy(x -> x instanceof BasicTypes.Entry));
     RegionAPI.GetAllResponse.Builder responseBuilder = RegionAPI.GetAllResponse.newBuilder();
 
-    for (Object entry : resultsCollection.get(true)) {
-      responseBuilder.addEntries((BasicTypes.Entry) entry);
-    }
-
-    for (Object entry : resultsCollection.get(false)) {
-      responseBuilder.addFailures((BasicTypes.KeyedError) entry);
-    }
+    request.getKeyList().stream()
+        .map((key) -> processOneMessage(serializationService, region, key))
+        .forEach(entry -> {
+          if (entry instanceof BasicTypes.Entry) {
+            responseBuilder.addEntries((BasicTypes.Entry) entry);
+          }else if (entry instanceof BasicTypes.KeyedError) {
+            responseBuilder.addFailures((BasicTypes.KeyedError) entry);
+          }
+        });
 
     return Success.of(responseBuilder.build());
   }
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java
index a242492..6d63b95 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java
@@ -14,19 +14,18 @@
  */
 package org.apache.geode.protocol.protobuf.operations;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.stream.Collectors;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.distributed.internal.ServerLocation;
 import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
-import org.apache.geode.protocol.operations.OperationHandler;
 import org.apache.geode.internal.protocol.protobuf.BasicTypes;
-import org.apache.geode.protocol.protobuf.Result;
 import org.apache.geode.internal.protocol.protobuf.ServerAPI;
+import org.apache.geode.protocol.operations.OperationHandler;
+import org.apache.geode.protocol.protobuf.Result;
 import org.apache.geode.protocol.protobuf.Success;
 import org.apache.geode.serialization.SerializationService;
 
@@ -40,18 +39,19 @@ public class GetAvailableServersOperationHandler implements
       MessageExecutionContext executionContext) throws InvalidExecutionContextException {
 
     InternalLocator internalLocator = (InternalLocator) executionContext.getLocator();
-    ArrayList serversFromSnapshot =
+    List serversFromSnapshot =
         internalLocator.getServerLocatorAdvisee().getLoadSnapshot().getServers(null);
     if (serversFromSnapshot == null) {
-      serversFromSnapshot = new ArrayList();
+      serversFromSnapshot = Collections.EMPTY_LIST;
     }
 
-    Collection<BasicTypes.Server> servers = (Collection<BasicTypes.Server>) serversFromSnapshot
+    ServerAPI.GetAvailableServersResponse.Builder serverResponseBuilder =
+        ServerAPI.GetAvailableServersResponse.newBuilder();
+
+    serversFromSnapshot
         .stream().map(serverLocation -> getServerProtobufMessage((ServerLocation) serverLocation))
-        .collect(Collectors.toList());
-    ServerAPI.GetAvailableServersResponse.Builder builder =
-        ServerAPI.GetAvailableServersResponse.newBuilder().addAllServers(servers);
-    return Success.of(builder.build());
+        .forEach( serverMessage -> serverResponseBuilder.addServers((BasicTypes.Server) serverMessage));
+    return Success.of(serverResponseBuilder.build());
   }
 
   private BasicTypes.Server getServerProtobufMessage(ServerLocation serverLocation) {
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java
index 8f2d9ef..b3ca344 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java
@@ -54,10 +54,9 @@ public class PutAllRequestOperationHandler
           "Region passed by client did not exist: " + putAllRequest.getRegionName(), logger, null));
     }
 
-    RegionAPI.PutAllResponse.Builder builder = RegionAPI.PutAllResponse.newBuilder()
-        .addAllFailedKeys(putAllRequest.getEntryList().stream()
-            .map((entry) -> singlePut(serializationService, region, entry)).filter(Objects::nonNull)
-            .collect(Collectors.toList()));
+    RegionAPI.PutAllResponse.Builder builder = RegionAPI.PutAllResponse.newBuilder();
+    putAllRequest.getEntryList().stream()
+            .map((entry) -> singlePut(serializationService, region, entry)).filter(Objects::nonNull).forEach(failedKey -> builder.addFailedKeys(failedKey));
     return Success.of(builder.build());
   }
 

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.