You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/03/02 01:14:26 UTC

[GitHub] [pinot] dongxiaoman commented on a change in pull request #8272: Fix bug in GrpcBrokerRequestHandler: exceptions are suppressed

dongxiaoman commented on a change in pull request #8272:
URL: https://github.com/apache/pinot/pull/8272#discussion_r817269798



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
##########
@@ -106,15 +108,17 @@ public BrokerResponseNative reduceOnStreamResponse(BrokerRequest brokerRequest,
     return brokerResponseNative;
   }
 
-  private static void processIterativeServerResponse(StreamingReducer reducer, ExecutorService executorService,
+  @VisibleForTesting
+  static void processIterativeServerResponse(StreamingReducer reducer, ExecutorService executorService,
       Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> serverResponseMap, long reduceTimeOutMs,
-      ExecutionStatsAggregator aggregator) throws Exception {
+      ExecutionStatsAggregator aggregator) throws ExecutionException {
     int cnt = 0;
-    Future[] futures = new Future[serverResponseMap.size()];
-    CountDownLatch countDownLatch = new CountDownLatch(serverResponseMap.size());
-
+    Future<Void>[] futures = new Future[serverResponseMap.size()];
+    // based on ideas from on https://stackoverflow.com/questions/19348248/waiting-on-a-list-of-future
+    // mostly because we need to handle and transfer the exception in threads into caller thread.
+    ExecutorCompletionService<Void> countDownHelper = new ExecutorCompletionService<>(executorService);

Review comment:
       Another way to fix is:
   in the end of this method, call
   ```
   for (Future future : futures) {
       future.get(reduceTimeoutMs, TimeUnit.MILLISECONS);
   }
   ```
   Somehow this has the potential to multiply the time wait by `futures.length()` times, and it breaks the purpose of parallel execution.
   
   I remember in Scala I have written or found some utility class to help with this kind of `WaitForMultipleObjects` thing, not sure where to find it now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org