You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "walterddr (via GitHub)" <gi...@apache.org> on 2023/07/25 21:06:39 UTC

[GitHub] [pinot] walterddr commented on a diff in pull request #11173: [multistage] fix error propagate physical planner

walterddr commented on code in PR #11173:
URL: https://github.com/apache/pinot/pull/11173#discussion_r1274120511


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java:
##########
@@ -97,23 +100,45 @@ public void submit(Worker.QueryRequest request, StreamObserver<Worker.QueryRespo
     Map<String, String> requestMetadataMap;
     requestMetadataMap = request.getMetadataMap();
     long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
+    long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS));
+    long deadlineMs = System.currentTimeMillis() + timeoutMs;
     try {
       distributedStagePlans = QueryPlanSerDeUtils.deserializeStagePlan(request);
     } catch (Exception e) {
       LOGGER.error("Caught exception while deserializing the request: {}", requestId, e);
       responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad request").withCause(e).asException());
       return;
     }
-    // TODO: allow thrown exception to return back to broker in asynchronous manner.
+    BlockingQueue<String> stageSubmissionCallbacks = new LinkedBlockingQueue<>();
     distributedStagePlans.forEach(distributedStagePlan -> _querySubmissionExecutorService.submit(() -> {
           try {
             _queryRunner.processQuery(distributedStagePlan, requestMetadataMap);
+            stageSubmissionCallbacks.offer(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_OK);
           } catch (Throwable t) {
             LOGGER.error("Caught exception while compiling opChain for request: {}, stage: {}", requestId,
                 distributedStagePlan.getStageId(), t);
+            stageSubmissionCallbacks.offer(t.getMessage());
           }
         })
     );
+
+    int successfulSubmissionCount = 0;
+    while (System.currentTimeMillis() < deadlineMs && successfulSubmissionCount < distributedStagePlans.size()) {

Review Comment:
   @xiangfu0 @Jackie-Jiang please share your feedback on this one. This technically shouldn't matter much. But would like the see if there's any better handle



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org