You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/04 00:46:02 UTC

[2/4] incubator-beam git commit: Add ProducedOutput method to TransformResult

Add ProducedOutput method to TransformResult

This can communicate that a PTransform that produced no outputs still
should cause pending work to be evaluated. PCollectionViews modifiy the
state of the evaluator and can cause formerly blocked PTransforms to be
able to progress.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f7cc7e17
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f7cc7e17
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f7cc7e17

Branch: refs/heads/master
Commit: f7cc7e178db211509aecb65ba203930fd159629a
Parents: a8eb274
Author: Thomas Groh <tg...@google.com>
Authored: Tue Jul 26 09:53:22 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Aug 3 17:45:12 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/CommittedResult.java    | 16 +++-
 .../beam/runners/direct/EvaluationContext.java  |  3 +-
 .../runners/direct/StepTransformResult.java     | 28 ++++++-
 .../beam/runners/direct/TransformResult.java    |  9 +++
 .../runners/direct/ViewEvaluatorFactory.java    |  4 +-
 .../runners/direct/CommittedResultTest.java     | 24 ++++--
 .../runners/direct/StepTransformResultTest.java | 85 ++++++++++++++++++++
 .../runners/direct/TransformExecutorTest.java   |  5 +-
 .../runners/direct/WatermarkManagerTest.java    |  7 +-
 9 files changed, 163 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f7cc7e17/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
index e86f07d..e9a40a8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.direct;
 
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 
 import com.google.auto.value.AutoValue;
 
@@ -49,12 +50,23 @@ abstract class CommittedResult {
    */
   public abstract Iterable<? extends CommittedBundle<?>> getOutputs();
 
+  /**
+   * Returns if the transform that produced this result produced outputs.
+   *
+   * <p>Transforms that produce output via modifying the state of the runner (e.g.
+   * {@link CreatePCollectionView}) should explicitly set this to true. If {@link #getOutputs()}
+   * returns a nonempty iterable, this will also return true.
+   */
+  public abstract boolean producedOutputs();
+
   public static CommittedResult create(
       TransformResult original,
       CommittedBundle<?> unprocessedElements,
-      Iterable<? extends CommittedBundle<?>> outputs) {
+      Iterable<? extends CommittedBundle<?>> outputs,
+      boolean producedOutputs) {
     return new AutoValue_CommittedResult(original.getTransform(),
         unprocessedElements,
-        outputs);
+        outputs,
+        producedOutputs);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f7cc7e17/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index ea713fa..610a62d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -159,7 +159,8 @@ class EvaluationContext {
         completedBundle == null
             ? null
             : completedBundle.withElements((Iterable) result.getUnprocessedElements()),
-        committedBundles);
+        committedBundles,
+        result.producedOutput());
     watermarkManager.updateWatermarks(
         completedBundle,
         result.getTimerUpdate().withCompletedTimers(completedTimers),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f7cc7e17/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index 176bb14..3d6841d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -23,10 +23,16 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
 import org.joda.time.Instant;
+
 import java.util.Collection;
+
 import javax.annotation.Nullable;
 
 /**
@@ -57,6 +63,19 @@ public abstract class StepTransformResult implements TransformResult {
   @Override
   public abstract TimerUpdate getTimerUpdate();
 
+  @Override
+  public boolean producedOutput() {
+    return !Iterables.isEmpty(getOutputBundles()) || producedAdditionalOutput();
+  }
+
+  /**
+   * Returns {@code true} if the step produced output that is not reflected in the Output Bundles.
+   *
+   * <p>If a step modifies the contents of a {@link PCollectionView}, this should return {@code
+   * true}.
+   */
+  abstract boolean producedAdditionalOutput();
+
   public static Builder withHold(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
     return new Builder(transform, watermarkHold);
   }
@@ -75,6 +94,7 @@ public abstract class StepTransformResult implements TransformResult {
     private CopyOnAccessInMemoryStateInternals<?> state;
     private TimerUpdate timerUpdate;
     private AggregatorContainer.Mutator aggregatorChanges;
+    private boolean producedAdditionalOutput;
     private final Instant watermarkHold;
 
     private Builder(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) {
@@ -93,7 +113,8 @@ public abstract class StepTransformResult implements TransformResult {
           aggregatorChanges,
           watermarkHold,
           state,
-          timerUpdate);
+          timerUpdate,
+          producedAdditionalOutput);
     }
 
     public Builder withAggregatorChanges(AggregatorContainer.Mutator aggregatorChanges) {
@@ -127,5 +148,10 @@ public abstract class StepTransformResult implements TransformResult {
       bundlesBuilder.addAll(outputBundles);
       return this;
     }
+
+    public Builder withAdditionalOutput(boolean producedAdditionalOutput) {
+      this.producedAdditionalOutput = producedAdditionalOutput;
+      return this;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f7cc7e17/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
index 65f9629..f678928 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
@@ -24,6 +24,8 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+
 import org.joda.time.Instant;
 import javax.annotation.Nullable;
 
@@ -78,4 +80,11 @@ public interface TransformResult {
    * <p>If this evaluation did not add or remove any timers, returns an empty TimerUpdate.
    */
   TimerUpdate getTimerUpdate();
+
+  /**
+   * Returns whether output was produced by the evaluation of this transform. True if
+   * {@link #getOutputBundles()} is nonempty, or if pipeline-visible state has changed (for example,
+   * the contents of a {@link PCollectionView} were updated).
+   */
+  boolean producedOutput();
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f7cc7e17/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 47eaae7..0a687ba 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -77,7 +77,9 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
       @Override
       public TransformResult finishBundle() {
         writer.add(elements);
-        return StepTransformResult.withoutHold(application).build();
+        return StepTransformResult.withoutHold(application)
+            .withAdditionalOutput(!elements.isEmpty())
+            .build();
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f7cc7e17/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index c0c06e5..2ebd804 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -62,9 +62,11 @@ public class CommittedResultTest implements Serializable {
   @Test
   public void getTransformExtractsFromResult() {
     CommittedResult result =
-        CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
+        CommittedResult.create(
+            StepTransformResult.withoutHold(transform).build(),
             bundleFactory.createRootBundle(created).commit(Instant.now()),
-            Collections.<DirectRunner.CommittedBundle<?>>emptyList());
+            Collections.<DirectRunner.CommittedBundle<?>>emptyList(),
+            false);
 
     assertThat(result.getTransform(), Matchers.<AppliedPTransform<?, ?, ?>>equalTo(transform));
   }
@@ -76,9 +78,11 @@ public class CommittedResultTest implements Serializable {
             .add(WindowedValue.valueInGlobalWindow(2))
             .commit(Instant.now());
     CommittedResult result =
-        CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
+        CommittedResult.create(
+            StepTransformResult.withoutHold(transform).build(),
             bundle,
-            Collections.<DirectRunner.CommittedBundle<?>>emptyList());
+            Collections.<DirectRunner.CommittedBundle<?>>emptyList(),
+            false);
 
     assertThat(result.getUnprocessedInputs(),
         Matchers.<DirectRunner.CommittedBundle<?>>equalTo(bundle));
@@ -87,9 +91,11 @@ public class CommittedResultTest implements Serializable {
   @Test
   public void getUncommittedElementsNull() {
     CommittedResult result =
-        CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
+        CommittedResult.create(
+            StepTransformResult.withoutHold(transform).build(),
             null,
-            Collections.<DirectRunner.CommittedBundle<?>>emptyList());
+            Collections.<DirectRunner.CommittedBundle<?>>emptyList(),
+            false);
 
     assertThat(result.getUnprocessedInputs(), nullValue());
   }
@@ -104,9 +110,11 @@ public class CommittedResultTest implements Serializable {
                 WindowingStrategy.globalDefault(),
                 PCollection.IsBounded.UNBOUNDED)).commit(Instant.now()));
     CommittedResult result =
-        CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
+        CommittedResult.create(
+            StepTransformResult.withoutHold(transform).build(),
             bundleFactory.createRootBundle(created).commit(Instant.now()),
-            outputs);
+            outputs,
+            true);
 
     assertThat(result.getOutputs(), Matchers.containsInAnyOrder(outputs.toArray()));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f7cc7e17/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
new file mode 100644
index 0000000..59e1c71
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link StepTransformResult}.
+ */
+@RunWith(JUnit4.class)
+public class StepTransformResultTest {
+  private AppliedPTransform<?, ?, ?> transform;
+  private BundleFactory bundleFactory;
+  private PCollection<Integer> pc;
+
+  @Before
+  public void setup() {
+    TestPipeline p = TestPipeline.create();
+    pc = p.apply(Create.of(1, 2, 3));
+    transform = pc.getProducingTransformInternal();
+
+    bundleFactory = ImmutableListBundleFactory.create();
+  }
+
+  @Test
+  public void producedBundlesProducedOutputs() {
+    TransformResult result = StepTransformResult.withoutHold(transform)
+        .addOutput(bundleFactory.createRootBundle(pc))
+        .build();
+
+    assertThat(result.producedOutput(), is(true));
+  }
+
+  @Test
+  public void withAdditionalOutputProducedOutputs() {
+    TransformResult result =
+        StepTransformResult.withoutHold(transform).withAdditionalOutput(true).build();
+
+    assertThat(result.producedOutput(), is(true));
+  }
+
+  @Test
+  public void producedBundlesAndAdditionalOutputProducedOutputs() {
+    TransformResult result = StepTransformResult.withoutHold(transform)
+        .addOutput(bundleFactory.createRootBundle(pc))
+        .withAdditionalOutput(true)
+        .build();
+
+    assertThat(result.producedOutput(), is(true));
+  }
+
+  @Test
+  public void noBundlesNoAdditionalOutputProducedOutputsFalse() {
+    TransformResult result = StepTransformResult.withoutHold(transform).build();
+
+    assertThat(result.producedOutput(), is(false));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f7cc7e17/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 95206f3..7bd7ff2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -511,9 +511,8 @@ public class TransformExecutorTest {
 
       CommittedBundle<?> unprocessedBundle =
           inputBundle == null ? null : inputBundle.withElements(unprocessedElements);
-      return CommittedResult.create(result,
-          unprocessedBundle,
-          Collections.<CommittedBundle<?>>emptyList());
+      return CommittedResult.create(
+          result, unprocessedBundle, Collections.<CommittedBundle<?>>emptyList(), false);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f7cc7e17/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 7173129..7edfc5d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -57,6 +57,7 @@ import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TimestampedValue;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
@@ -1421,8 +1422,10 @@ public class WatermarkManagerTest implements Serializable {
       AppliedPTransform<?, ?, ?> transform,
       @Nullable CommittedBundle<?> unprocessedBundle,
       Iterable<? extends CommittedBundle<?>> bundles) {
-    return CommittedResult.create(StepTransformResult.withoutHold(transform).build(),
+    return CommittedResult.create(
+        StepTransformResult.withoutHold(transform).build(),
         unprocessedBundle,
-        bundles);
+        bundles,
+        !Iterables.isEmpty(bundles));
   }
 }