You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/03/02 13:42:16 UTC

[GitHub] [pinot] richardstartin commented on a change in pull request #8272: Fix bug in GrpcBrokerRequestHandler: exceptions are suppressed

richardstartin commented on a change in pull request #8272:
URL: https://github.com/apache/pinot/pull/8272#discussion_r817701130



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
##########
@@ -106,15 +104,18 @@ public BrokerResponseNative reduceOnStreamResponse(BrokerRequest brokerRequest,
     return brokerResponseNative;
   }
 
-  private static void processIterativeServerResponse(StreamingReducer reducer, ExecutorService executorService,
+  @VisibleForTesting
+  static void processIterativeServerResponse(StreamingReducer reducer,
       Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponseMap, long reduceTimeOutMs,
-      ExecutionStatsAggregator aggregator) throws Exception {
+      ExecutionStatsAggregator aggregator)
+      throws Exception {
     int cnt = 0;
-    Future[] futures = new Future[serverResponseMap.size()];
-    CountDownLatch countDownLatch = new CountDownLatch(serverResponseMap.size());
-
-    for (Map.Entry<ServerRoutingInstance, Iterator<Server.ServerResponse>> entry: serverResponseMap.entrySet()) {
-      futures[cnt++] = executorService.submit(() -> {
+    CompletableFuture<Void>[] futures = new CompletableFuture[serverResponseMap.size()];
+    // based on ideas from on https://stackoverflow.com/questions/19348248/waiting-on-a-list-of-future
+    // and https://stackoverflow.com/questions/23301598/transform-java-future-into-a-completablefuture
+    // Future created via ExecutorService.submit() can be created by CompletableFuture.supplyAsync()
+    for (Map.Entry<ServerRoutingInstance, Iterator<Server.ServerResponse>> entry : serverResponseMap.entrySet()) {
+      futures[cnt++] = CompletableFuture.supplyAsync(() -> {

Review comment:
       This executes in a global pool which isn't the right behaviour. You can supply an executor to make it execute in the allotted thread pool.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org