You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/07/29 18:54:59 UTC

[1/2] incubator-beam git commit: Apply ExecutorUpdates in two Phases

Repository: incubator-beam
Updated Branches:
  refs/heads/master 36720a62d -> b1ecf2461


Apply ExecutorUpdates in two Phases

This removes the need for an explicit break by ensuring that work added
by the monitor will not complete and add more work for the monitor to
complete.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4b932d93
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4b932d93
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4b932d93

Branch: refs/heads/master
Commit: 4b932d93352b04f60392b6083ce5573c86535a65
Parents: 3c6e147
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jul 22 13:47:19 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Jul 27 16:52:58 2016 -0700

----------------------------------------------------------------------
 .../direct/ExecutorServiceParallelExecutor.java | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b932d93/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 78f3fe4..43195e3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -43,6 +43,7 @@ import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
@@ -358,11 +359,15 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
       String oldName = Thread.currentThread().getName();
       Thread.currentThread().setName(runnableName);
       try {
-        ExecutorUpdate update = allUpdates.poll();
-        int numUpdates = 0;
-        // pull all of the pending work off of the queue
-        long updatesStart = System.nanoTime();
-        while (update != null) {
+        Collection<ExecutorUpdate> updates = new ArrayList<>();
+        // Pull all available updates off of the queue before adding additional work. This ensures
+        // both loops terminate.
+        ExecutorUpdate pendingUpdate = allUpdates.poll();
+        while (pendingUpdate != null) {
+          updates.add(pendingUpdate);
+          pendingUpdate = allUpdates.poll();
+        }
+        for (ExecutorUpdate update : updates) {
           LOG.debug("Executor Update: {}", update);
           if (update.getBundle().isPresent()) {
             scheduleConsumers(update);
@@ -370,11 +375,6 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
             visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
             exceptionThrown = true;
           }
-          if (System.nanoTime() - updatesStart > maxTimeProcessingUpdatesNanos) {
-            break;
-          } else {
-            update = allUpdates.poll();
-          }
         }
         boolean timersFired = fireTimers();
         addWorkIfNecessary(timersFired);


[2/2] incubator-beam git commit: This closes #745

Posted by ke...@apache.org.
This closes #745


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b1ecf246
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b1ecf246
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b1ecf246

Branch: refs/heads/master
Commit: b1ecf2461bc805f12b3a3303c0f5579c239374c7
Parents: 36720a6 4b932d9
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jul 29 11:54:43 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jul 29 11:54:43 2016 -0700

----------------------------------------------------------------------
 .../direct/ExecutorServiceParallelExecutor.java | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------