You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/08 23:26:52 UTC

[2/3] incubator-beam git commit: Schedule all pending work before firing timers

Schedule all pending work before firing timers

Pull all available work off of the ExecutorUpdate queue during each
execution of the MonitorRunnable.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ad58e26d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ad58e26d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ad58e26d

Branch: refs/heads/master
Commit: ad58e26db132268c676ca9289b73e1ddab72627a
Parents: 529bcdf
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 29 10:27:37 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Apr 8 14:20:20 2016 -0700

----------------------------------------------------------------------
 .../sdk/runners/inprocess/ExecutorServiceParallelExecutor.java   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad58e26d/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
index 9af6f97..4d45e8f 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
@@ -325,13 +325,15 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
       Thread.currentThread().setName(runnableName);
       try {
         ExecutorUpdate update = allUpdates.poll();
-        if (update != null) {
+        // pull all of the pending work off of the queue
+        while (update != null) {
           LOG.debug("Executor Update: {}", update);
           if (update.getBundle().isPresent()) {
             scheduleConsumers(update.getBundle().get());
           } else if (update.getException().isPresent()) {
             visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
           }
+          update = allUpdates.poll();
         }
         boolean timersFired = fireTimers();
         addWorkIfNecessary(timersFired);