You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/10 17:28:22 UTC

[2/3] incubator-beam git commit: Limit the number of work schedules per MonitorRunnable run

Limit the number of work schedules per MonitorRunnable run

This ensures that work readded to the queue will not cause the monitor runnable
to run forever before delivering timers


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d3b96bc3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d3b96bc3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d3b96bc3

Branch: refs/heads/master
Commit: d3b96bc33c7e9846f756457d1214a011da1cf84b
Parents: 272493e
Author: Thomas Groh <tg...@google.com>
Authored: Thu Apr 28 10:12:09 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 10 10:15:14 2016 -0700

----------------------------------------------------------------------
 .../direct/ExecutorServiceParallelExecutor.java  | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d3b96bc3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index de409e3..367c190 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -54,6 +54,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nullable;
 
@@ -344,11 +345,11 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
   }
 
   private class MonitorRunnable implements Runnable {
-    private final String runnableName =
-        String.format(
-            "%s$%s-monitor",
-            evaluationContext.getPipelineOptions().getAppName(),
-            ExecutorServiceParallelExecutor.class.getSimpleName());
+    // arbitrary termination condition to ensure progress in the presence of pushback
+    private final long maxTimeProcessingUpdatesNanos = TimeUnit.MILLISECONDS.toNanos(5L);
+    private final String runnableName = String.format("%s$%s-monitor",
+        evaluationContext.getPipelineOptions().getAppName(),
+        ExecutorServiceParallelExecutor.class.getSimpleName());
 
     @Override
     public void run() {
@@ -356,7 +357,9 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
       Thread.currentThread().setName(runnableName);
       try {
         ExecutorUpdate update = allUpdates.poll();
+        int numUpdates = 0;
         // pull all of the pending work off of the queue
+        long updatesStart = System.nanoTime();
         while (update != null) {
           LOG.debug("Executor Update: {}", update);
           if (update.getBundle().isPresent()) {
@@ -364,7 +367,11 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
           } else if (update.getException().isPresent()) {
             visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
           }
-          update = allUpdates.poll();
+          if (System.nanoTime() - updatesStart > maxTimeProcessingUpdatesNanos) {
+            break;
+          } else {
+            update = allUpdates.poll();
+          }
         }
         boolean timersFired = fireTimers();
         addWorkIfNecessary(timersFired);