You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/11 20:01:36 UTC

[1/2] incubator-beam git commit: More regularly schedule additional roots

Repository: incubator-beam
Updated Branches:
  refs/heads/master a25fd05e0 -> e6cb86a3c


More regularly schedule additional roots

This ensures that even when elements are pushed back into the Pipeline
Runner, roots are scheduled if necessary.

As elements may be rescheduled indefinitely, this is required to ensure
that unbounded roots are scheduled during pipeline execution when
existing elements are blocked on side inputs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1f5e3996
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1f5e3996
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1f5e3996

Branch: refs/heads/master
Commit: 1f5e399630e3ba71e2c025e7ab3f32c6c3ba9518
Parents: acb0406
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 10 17:11:23 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue May 10 17:52:26 2016 -0700

----------------------------------------------------------------------
 .../direct/ExecutorServiceParallelExecutor.java | 46 +-------------------
 .../direct/TransformExecutorServices.java       | 25 +++--------
 .../direct/TransformExecutorServicesTest.java   | 40 ++---------------
 .../runners/direct/TransformExecutorTest.java   | 10 +----
 4 files changed, 12 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f5e3996/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 367c190..70a8035 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -50,9 +50,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -77,7 +75,6 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
   private final InProcessEvaluationContext evaluationContext;
 
   private final LoadingCache<StepAndKey, TransformExecutorService> executorServices;
-  private final ConcurrentMap<TransformExecutor<?>, Boolean> scheduledExecutors;
 
   private final Queue<ExecutorUpdate> allUpdates;
   private final BlockingQueue<VisibleExecutorUpdate> visibleUpdates;
@@ -114,7 +111,6 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
     this.transformEnforcements = transformEnforcements;
     this.evaluationContext = context;
 
-    scheduledExecutors = new ConcurrentHashMap<>();
     // Weak Values allows TransformExecutorServices that are no longer in use to be reclaimed.
     // Executing TransformExecutorServices have a strong reference to their TransformExecutorService
     // which stops the TransformExecutorServices from being prematurely garbage collected
@@ -124,8 +120,7 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
     this.allUpdates = new ConcurrentLinkedQueue<>();
     this.visibleUpdates = new ArrayBlockingQueue<>(20);
 
-    parallelExecutorService =
-        TransformExecutorServices.parallel(executorService, scheduledExecutors);
+    parallelExecutorService = TransformExecutorServices.parallel(executorService);
     defaultCompletionCallback = new DefaultCompletionCallback();
   }
 
@@ -134,7 +129,7 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
     return new CacheLoader<StepAndKey, TransformExecutorService>() {
       @Override
       public TransformExecutorService load(StepAndKey stepAndKey) throws Exception {
-        return TransformExecutorServices.serial(executorService, scheduledExecutors);
+        return TransformExecutorServices.serial(executorService);
       }
     };
   }
@@ -453,12 +448,6 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
       if (firedTimers) {
         return;
       }
-      for (TransformExecutor<?> executor : scheduledExecutors.keySet()) {
-        if (!isExecutorBlocked(executor)) {
-          // We have at least one executor that can proceed without adding additional work
-          return;
-        }
-      }
       // All current TransformExecutors are blocked; add more work from the roots.
       for (AppliedPTransform<?, ?, ?> root : rootNodes) {
         if (!evaluationContext.isDone(root)) {
@@ -466,36 +455,5 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
         }
       }
     }
-
-    /**
-     * Return true if the provided executor might make more progress if no action is taken.
-     *
-     * <p>May return false even if all executor threads are currently blocked or cleaning up, as
-     * these can cause more work to be scheduled. If this does not occur, after these calls
-     * terminate, future calls will return true if all executors are waiting.
-     */
-    private boolean isExecutorBlocked(TransformExecutor<?> executor) {
-      Thread thread = executor.getThread();
-      if (thread == null) {
-        return false;
-      }
-      switch (thread.getState()) {
-        case TERMINATED:
-          throw new IllegalStateException(String.format(
-              "Unexpectedly encountered a Terminated TransformExecutor %s", executor));
-        case WAITING:
-        case TIMED_WAITING:
-          // The thread is waiting for some external input. Adding more work may cause the thread
-          // to stop waiting (e.g. the thread is waiting on an unbounded side input)
-          return true;
-        case BLOCKED:
-          // The executor is blocked on acquisition of a java monitor. This usually means it is
-          // making a call to the EvaluationContext, but not a model-blocking call - and will
-          // eventually complete, at which point we may reevaluate.
-        default:
-          // NEW and RUNNABLE threads can make progress
-          return false;
-      }
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f5e3996/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
index 087b7c2..ea15f03 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.direct;
 
 import com.google.common.base.MoreObjects;
 
-import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
@@ -37,18 +36,16 @@ final class TransformExecutorServices {
    * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in
    * parallel.
    */
-  public static TransformExecutorService parallel(
-      ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
-    return new ParallelEvaluationState(executor, scheduled);
+  public static TransformExecutorService parallel(ExecutorService executor) {
+    return new ParallelEvaluationState(executor);
   }
 
   /**
    * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors} in
    * serial.
    */
-  public static TransformExecutorService serial(
-      ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
-    return new SerialEvaluationState(executor, scheduled);
+  public static TransformExecutorService serial(ExecutorService executor) {
+    return new SerialEvaluationState(executor);
   }
 
   /**
@@ -60,23 +57,18 @@ final class TransformExecutorServices {
    */
   private static class ParallelEvaluationState implements TransformExecutorService {
     private final ExecutorService executor;
-    private final Map<TransformExecutor<?>, Boolean> scheduled;
 
-    private ParallelEvaluationState(
-        ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
+    private ParallelEvaluationState(ExecutorService executor) {
       this.executor = executor;
-      this.scheduled = scheduled;
     }
 
     @Override
     public void schedule(TransformExecutor<?> work) {
       executor.submit(work);
-      scheduled.put(work, true);
     }
 
     @Override
     public void complete(TransformExecutor<?> completed) {
-      scheduled.remove(completed);
     }
   }
 
@@ -90,14 +82,11 @@ final class TransformExecutorServices {
    */
   private static class SerialEvaluationState implements TransformExecutorService {
     private final ExecutorService executor;
-    private final Map<TransformExecutor<?>, Boolean> scheduled;
 
     private AtomicReference<TransformExecutor<?>> currentlyEvaluating;
     private final Queue<TransformExecutor<?>> workQueue;
 
-    private SerialEvaluationState(
-        ExecutorService executor, Map<TransformExecutor<?>, Boolean> scheduled) {
-      this.scheduled = scheduled;
+    private SerialEvaluationState(ExecutorService executor) {
       this.executor = executor;
       this.currentlyEvaluating = new AtomicReference<>();
       this.workQueue = new ConcurrentLinkedQueue<>();
@@ -122,7 +111,6 @@ final class TransformExecutorServices {
                 + " but could not complete due to unexpected currently executing "
                 + currentlyEvaluating.get());
       }
-      scheduled.remove(completed);
       updateCurrentlyEvaluating();
     }
 
@@ -133,7 +121,6 @@ final class TransformExecutorServices {
           TransformExecutor<?> newWork = workQueue.poll();
           if (newWork != null) {
             if (currentlyEvaluating.compareAndSet(null, newWork)) {
-              scheduled.put(newWork, true);
               executor.submit(newWork);
             } else {
               workQueue.offer(newWork);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f5e3996/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java
index 0e92da1..04aa96f 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java
@@ -17,17 +17,12 @@
  */
 package org.apache.beam.runners.direct;
 
-import static org.hamcrest.Matchers.any;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
 import com.google.common.util.concurrent.MoreExecutors;
 
-import org.hamcrest.Matchers;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -35,8 +30,6 @@ import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 
 /**
@@ -47,12 +40,10 @@ public class TransformExecutorServicesTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
 
   private ExecutorService executorService;
-  private Map<TransformExecutor<?>, Boolean> scheduled;
 
   @Before
   public void setup() {
     executorService = MoreExecutors.newDirectExecutorService();
-    scheduled = new ConcurrentHashMap<>();
   }
 
   @Test
@@ -63,27 +54,15 @@ public class TransformExecutorServicesTest {
     TransformExecutor<Object> second = mock(TransformExecutor.class);
 
     TransformExecutorService parallel =
-        TransformExecutorServices.parallel(executorService, scheduled);
+        TransformExecutorServices.parallel(executorService);
     parallel.schedule(first);
     parallel.schedule(second);
 
     verify(first).run();
     verify(second).run();
-    assertThat(
-        scheduled,
-        Matchers.allOf(
-            Matchers.<TransformExecutor<?>, Boolean>hasEntry(first, true),
-            Matchers.<TransformExecutor<?>, Boolean>hasEntry(second, true)));
 
     parallel.complete(first);
-    assertThat(scheduled, Matchers.<TransformExecutor<?>, Boolean>hasEntry(second, true));
-    assertThat(
-        scheduled,
-        not(
-            Matchers.<TransformExecutor<?>, Boolean>hasEntry(
-                Matchers.<TransformExecutor<?>>equalTo(first), any(Boolean.class))));
     parallel.complete(second);
-    assertThat(scheduled.isEmpty(), is(true));
   }
 
   @Test
@@ -93,28 +72,15 @@ public class TransformExecutorServicesTest {
     @SuppressWarnings("unchecked")
     TransformExecutor<Object> second = mock(TransformExecutor.class);
 
-    TransformExecutorService serial = TransformExecutorServices.serial(executorService, scheduled);
+    TransformExecutorService serial = TransformExecutorServices.serial(executorService);
     serial.schedule(first);
     verify(first).run();
 
     serial.schedule(second);
     verify(second, never()).run();
 
-    assertThat(scheduled, Matchers.<TransformExecutor<?>, Boolean>hasEntry(first, true));
-    assertThat(
-        scheduled,
-        not(
-            Matchers.<TransformExecutor<?>, Boolean>hasEntry(
-                Matchers.<TransformExecutor<?>>equalTo(second), any(Boolean.class))));
-
     serial.complete(first);
     verify(second).run();
-    assertThat(scheduled, Matchers.<TransformExecutor<?>, Boolean>hasEntry(second, true));
-    assertThat(
-        scheduled,
-        not(
-            Matchers.<TransformExecutor<?>, Boolean>hasEntry(
-                Matchers.<TransformExecutor<?>>equalTo(first), any(Boolean.class))));
 
     serial.complete(second);
   }
@@ -126,7 +92,7 @@ public class TransformExecutorServicesTest {
     @SuppressWarnings("unchecked")
     TransformExecutor<Object> second = mock(TransformExecutor.class);
 
-    TransformExecutorService serial = TransformExecutorServices.serial(executorService, scheduled);
+    TransformExecutorService serial = TransformExecutorServices.serial(executorService);
     serial.schedule(first);
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("unexpected currently executing");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f5e3996/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index cc7857d..d230950 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -55,9 +55,7 @@ import org.mockito.MockitoAnnotations;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -79,7 +77,6 @@ public class TransformExecutorTest {
   private BundleFactory bundleFactory;
   @Mock private InProcessEvaluationContext evaluationContext;
   @Mock private TransformEvaluatorRegistry registry;
-  private Map<TransformExecutor<?>, Boolean> scheduled;
 
   @Before
   public void setup() {
@@ -87,9 +84,8 @@ public class TransformExecutorTest {
 
     bundleFactory = InProcessBundleFactory.create();
 
-    scheduled = new HashMap<>();
     transformEvaluationState =
-        TransformExecutorServices.parallel(MoreExecutors.newDirectExecutorService(), scheduled);
+        TransformExecutorServices.parallel(MoreExecutors.newDirectExecutorService());
 
     evaluatorCompleted = new CountDownLatch(1);
     completionCallback = new RegisteringCompletionCallback(evaluatorCompleted);
@@ -135,7 +131,6 @@ public class TransformExecutorTest {
     assertThat(finishCalled.get(), is(true));
     assertThat(completionCallback.handledResult, equalTo(result));
     assertThat(completionCallback.handledThrowable, is(nullValue()));
-    assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor)));
   }
 
   @Test
@@ -184,7 +179,6 @@ public class TransformExecutorTest {
     assertThat(elementsProcessed, containsInAnyOrder(spam, third, foo));
     assertThat(completionCallback.handledResult, equalTo(result));
     assertThat(completionCallback.handledThrowable, is(nullValue()));
-    assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor)));
   }
 
   @Test
@@ -228,7 +222,6 @@ public class TransformExecutorTest {
 
     assertThat(completionCallback.handledResult, is(nullValue()));
     assertThat(completionCallback.handledThrowable, Matchers.<Throwable>equalTo(exception));
-    assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor)));
   }
 
   @Test
@@ -267,7 +260,6 @@ public class TransformExecutorTest {
 
     assertThat(completionCallback.handledResult, is(nullValue()));
     assertThat(completionCallback.handledThrowable, Matchers.<Throwable>equalTo(exception));
-    assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor)));
   }
 
   @Test


[2/2] incubator-beam git commit: This closes #322

Posted by ke...@apache.org.
This closes #322


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e6cb86a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e6cb86a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e6cb86a3

Branch: refs/heads/master
Commit: e6cb86a3c135c2dc36bc2d60866d05877bc480f3
Parents: a25fd05 1f5e399
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed May 11 13:01:12 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 11 13:01:12 2016 -0700

----------------------------------------------------------------------
 .../direct/ExecutorServiceParallelExecutor.java | 46 +-------------------
 .../direct/TransformExecutorServices.java       | 25 +++--------
 .../direct/TransformExecutorServicesTest.java   | 40 ++---------------
 .../runners/direct/TransformExecutorTest.java   | 10 +----
 4 files changed, 12 insertions(+), 109 deletions(-)
----------------------------------------------------------------------