You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2017/09/14 19:10:24 UTC
[geode] 04/08: Change stream processing to avoid intermediary
collections and process in-line
This is an automated email from the ASF dual-hosted git repository.
udo pushed a commit to branch feature/GEODE-3604
in repository https://gitbox.apache.org/repos/asf/geode.git
commit f75db762f187de0044da3b8ca326ec0fa3bbc8f4
Author: kohlmu-pivotal <uk...@pivotal.io>
AuthorDate: Mon Sep 11 10:00:20 2017 -0700
Change stream processing to avoid intermediary collections and process in-line
---
.../operations/GetAllRequestOperationHandler.java | 19 ++++++++---------
.../GetAvailableServersOperationHandler.java | 24 +++++++++++-----------
.../operations/PutAllRequestOperationHandler.java | 7 +++----
3 files changed, 24 insertions(+), 26 deletions(-)
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java
index 446dbcb..41ad65f 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java
@@ -53,18 +53,17 @@ public class GetAllRequestOperationHandler
.makeErrorResponse(ProtocolErrorCode.REGION_NOT_FOUND.codeValue, "Region not found"));
}
- Map<Boolean, List<Object>> resultsCollection = request.getKeyList().stream()
- .map((key) -> processOneMessage(serializationService, region, key))
- .collect(Collectors.partitioningBy(x -> x instanceof BasicTypes.Entry));
RegionAPI.GetAllResponse.Builder responseBuilder = RegionAPI.GetAllResponse.newBuilder();
- for (Object entry : resultsCollection.get(true)) {
- responseBuilder.addEntries((BasicTypes.Entry) entry);
- }
-
- for (Object entry : resultsCollection.get(false)) {
- responseBuilder.addFailures((BasicTypes.KeyedError) entry);
- }
+ request.getKeyList().stream()
+ .map((key) -> processOneMessage(serializationService, region, key))
+ .forEach(entry -> {
+ if (entry instanceof BasicTypes.Entry) {
+ responseBuilder.addEntries((BasicTypes.Entry) entry);
+ }else if (entry instanceof BasicTypes.KeyedError) {
+ responseBuilder.addFailures((BasicTypes.KeyedError) entry);
+ }
+ });
return Success.of(responseBuilder.build());
}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java
index a242492..6d63b95 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAvailableServersOperationHandler.java
@@ -14,19 +14,18 @@
*/
package org.apache.geode.protocol.protobuf.operations;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.stream.Collectors;
+import java.util.Collections;
+import java.util.List;
import org.apache.geode.annotations.Experimental;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
import org.apache.geode.internal.exception.InvalidExecutionContextException;
-import org.apache.geode.protocol.operations.OperationHandler;
import org.apache.geode.internal.protocol.protobuf.BasicTypes;
-import org.apache.geode.protocol.protobuf.Result;
import org.apache.geode.internal.protocol.protobuf.ServerAPI;
+import org.apache.geode.protocol.operations.OperationHandler;
+import org.apache.geode.protocol.protobuf.Result;
import org.apache.geode.protocol.protobuf.Success;
import org.apache.geode.serialization.SerializationService;
@@ -40,18 +39,19 @@ public class GetAvailableServersOperationHandler implements
MessageExecutionContext executionContext) throws InvalidExecutionContextException {
InternalLocator internalLocator = (InternalLocator) executionContext.getLocator();
- ArrayList serversFromSnapshot =
+ List serversFromSnapshot =
internalLocator.getServerLocatorAdvisee().getLoadSnapshot().getServers(null);
if (serversFromSnapshot == null) {
- serversFromSnapshot = new ArrayList();
+ serversFromSnapshot = Collections.EMPTY_LIST;
}
- Collection<BasicTypes.Server> servers = (Collection<BasicTypes.Server>) serversFromSnapshot
+ ServerAPI.GetAvailableServersResponse.Builder serverResponseBuilder =
+ ServerAPI.GetAvailableServersResponse.newBuilder();
+
+ serversFromSnapshot
.stream().map(serverLocation -> getServerProtobufMessage((ServerLocation) serverLocation))
- .collect(Collectors.toList());
- ServerAPI.GetAvailableServersResponse.Builder builder =
- ServerAPI.GetAvailableServersResponse.newBuilder().addAllServers(servers);
- return Success.of(builder.build());
+ .forEach( serverMessage -> serverResponseBuilder.addServers((BasicTypes.Server) serverMessage));
+ return Success.of(serverResponseBuilder.build());
}
private BasicTypes.Server getServerProtobufMessage(ServerLocation serverLocation) {
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java
index 8f2d9ef..b3ca344 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java
@@ -54,10 +54,9 @@ public class PutAllRequestOperationHandler
"Region passed by client did not exist: " + putAllRequest.getRegionName(), logger, null));
}
- RegionAPI.PutAllResponse.Builder builder = RegionAPI.PutAllResponse.newBuilder()
- .addAllFailedKeys(putAllRequest.getEntryList().stream()
- .map((entry) -> singlePut(serializationService, region, entry)).filter(Objects::nonNull)
- .collect(Collectors.toList()));
+ RegionAPI.PutAllResponse.Builder builder = RegionAPI.PutAllResponse.newBuilder();
+ putAllRequest.getEntryList().stream()
+ .map((entry) -> singlePut(serializationService, region, entry)).filter(Objects::nonNull).forEach(failedKey -> builder.addFailedKeys(failedKey));
return Success.of(builder.build());
}
--
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.