You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/09 18:46:55 UTC

[2/4] incubator-beam git commit: Use AutoValue for ExecutorUpdate

Use AutoValue for ExecutorUpdate

Explicitly provide the collections the Bundle should be consumed by in
the update.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/59cca8dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/59cca8dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/59cca8dd

Branch: refs/heads/master
Commit: 59cca8ddae3d544beea9684719409efe3acbe634
Parents: e7df160
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 6 10:33:32 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 6 10:55:53 2016 -0700

----------------------------------------------------------------------
 .../direct/ExecutorServiceParallelExecutor.java | 59 ++++++++++----------
 1 file changed, 30 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59cca8dd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 6f26b6b..fd4cc2c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 
+import com.google.auto.value.AutoValue;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
 import com.google.common.cache.CacheBuilder;
@@ -191,8 +192,9 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
     return keyedPValues.contains(pvalue);
   }
 
-  private void scheduleConsumers(CommittedBundle<?> bundle) {
-    for (AppliedPTransform<?, ?, ?> consumer : valueToConsumers.get(bundle.getPCollection())) {
+  private void scheduleConsumers(ExecutorUpdate update) {
+    CommittedBundle<?> bundle = update.getBundle().get();
+    for (AppliedPTransform<?, ?, ?> consumer : update.getConsumers()) {
       scheduleConsumption(consumer, bundle, defaultCompletionCallback);
     }
   }
@@ -225,7 +227,8 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
         CommittedBundle<?> inputBundle, InProcessTransformResult result) {
       CommittedResult committedResult = getCommittedResult(inputBundle, result);
       for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
-        allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
+        allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle,
+            valueToConsumers.get(outputBundle.getPCollection())));
       }
       return committedResult;
     }
@@ -276,38 +279,36 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
    *
    * Used to signal when the executor should be shut down (due to an exception).
    */
-  private static class ExecutorUpdate {
-    private final Optional<? extends CommittedBundle<?>> bundle;
-    private final Optional<? extends Throwable> throwable;
-
-    public static ExecutorUpdate fromBundle(CommittedBundle<?> bundle) {
-      return new ExecutorUpdate(bundle, null);
+  @AutoValue
+  abstract static class ExecutorUpdate {
+    public static ExecutorUpdate fromBundle(
+        CommittedBundle<?> bundle,
+        Collection<AppliedPTransform<?, ?, ?>> consumers) {
+      return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate(
+          Optional.of(bundle),
+          consumers,
+          Optional.<Throwable>absent());
     }
 
     public static ExecutorUpdate fromThrowable(Throwable t) {
-      return new ExecutorUpdate(null, t);
-    }
-
-    private ExecutorUpdate(CommittedBundle<?> producedBundle, Throwable throwable) {
-      this.bundle = Optional.fromNullable(producedBundle);
-      this.throwable = Optional.fromNullable(throwable);
+      return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate(
+          Optional.<CommittedBundle<?>>absent(),
+          Collections.<AppliedPTransform<?, ?, ?>>emptyList(),
+          Optional.of(t));
     }
 
-    public Optional<? extends CommittedBundle<?>> getBundle() {
-      return bundle;
-    }
+    /**
+     * Returns the bundle that produced this update.
+     */
+    public abstract Optional<? extends CommittedBundle<?>> getBundle();
 
-    public Optional<? extends Throwable> getException() {
-      return throwable;
-    }
+    /**
+     * Returns the transforms to process the bundle. If nonempty, {@link #getBundle()} will return
+     * a present {@link Optional}.
+     */
+    public abstract Collection<AppliedPTransform<?, ?, ?>> getConsumers();
 
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(ExecutorUpdate.class)
-          .add("bundle", bundle)
-          .add("exception", throwable)
-          .toString();
-    }
+    public abstract Optional<? extends Throwable> getException();
   }
 
   /**
@@ -353,7 +354,7 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
         while (update != null) {
           LOG.debug("Executor Update: {}", update);
           if (update.getBundle().isPresent()) {
-            scheduleConsumers(update.getBundle().get());
+            scheduleConsumers(update);
           } else if (update.getException().isPresent()) {
             visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
           }