You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/08 20:52:23 UTC

[GitHub] [beam] lukecwik commented on a change in pull request #11275: [BEAM-9648]: DirectRunner should return null on timeout

lukecwik commented on a change in pull request #11275: [BEAM-9648]: DirectRunner should return null on timeout
URL: https://github.com/apache/beam/pull/11275#discussion_r405807176
 
 

 ##########
 File path: runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 ##########
 @@ -260,6 +260,11 @@ public State waitUntilFinish(Duration duration) throws Exception {
         }
       }
     }
+
+    if (Instant.now().isAfter(completionTime)) {
+      return null;
+    }
 
 Review comment:
   This allows for a race condition where we exit the while loop above not due to a timeout but would turn it into a timeout here because of the timing.
   
   We could clean-up the loop above so it only exits on timeout while all non-timeout returns happen within the loop with something like:
   ```
       while (Instant.now().isBefore(completionTime)) {
         // Get an update; don't block forever if another thread has handled it. The call to poll will
         // wait the entire timeout; this call primarily exists to relinquish any core.
         VisibleExecutorUpdate update = visibleUpdates.tryNext(Duration.millis(25L));
         if (pipelineState.get().isTerminal() || (update != null && isTerminalStateUpdate(update))) {
           // there are no updates to process and no updates will ever be published because the
           // executor is shutdown OR there has been an update and the update is terminal
           return pipelineState.get();
         } else if (update != null && update.thrown.isPresent()) {
           Throwable thrown = update.thrown.get();
           if (thrown instanceof Exception) {
             throw (Exception) thrown;
           } else if (thrown instanceof Error) {
             throw (Error) thrown;
           } else {
             throw new Exception("Unknown Type of Throwable", thrown);
           }
         }
       }
       return null;
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services