You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/04/05 19:33:08 UTC

[1/2] incubator-beam git commit: Give root transforms step names

Repository: incubator-beam
Updated Branches:
  refs/heads/master 2f8ba65fa -> 1c2fa03cc


Give root transforms step names

Fix a bug where steps would only be given step names if they were a
non-root node.

Use the ConsumerTrackingPipelineVisitor in the
InProcessEvaluationContext test to handle runner-expanded transforms


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5888df7b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5888df7b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5888df7b

Branch: refs/heads/master
Commit: 5888df7b3d6183d389ce3141de321be25256fc2f
Parents: 2f8ba65
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 4 11:10:18 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 5 10:10:43 2016 -0700

----------------------------------------------------------------------
 .../ConsumerTrackingPipelineVisitor.java        |  2 +-
 .../ConsumerTrackingPipelineVisitorTest.java    | 37 +++++++++++++++
 .../InProcessEvaluationContextTest.java         | 50 +++++++-------------
 3 files changed, 56 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5888df7b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
index ec4f08b..48836e9 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
@@ -76,12 +76,12 @@ public class ConsumerTrackingPipelineVisitor implements PipelineVisitor {
   public void visitTransform(TransformTreeNode node) {
     toFinalize.removeAll(node.getInput().expand());
     AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node);
+    stepNames.put(appliedTransform, genStepName());
     if (node.getInput().expand().isEmpty()) {
       rootTransforms.add(appliedTransform);
     } else {
       for (PValue value : node.getInput().expand()) {
         valueToConsumers.get(value).add(appliedTransform);
-        stepNames.put(appliedTransform, genStepName());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5888df7b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
index bea6fe1..905f58f 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
@@ -177,6 +177,43 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
   }
 
   @Test
+  public void getStepNamesContainsAllTransforms() {
+    PCollection<String> created = p.apply(Create.of("1", "2", "3"));
+    PCollection<String> transformed =
+        created.apply(
+            ParDo.of(
+                new DoFn<String, String>() {
+                  @Override
+                  public void processElement(DoFn<String, String>.ProcessContext c)
+                      throws Exception {
+                    c.output(Integer.toString(c.element().length()));
+                  }
+                }));
+    PDone finished =
+        transformed.apply(
+            new PTransform<PInput, PDone>() {
+              @Override
+              public PDone apply(PInput input) {
+                return PDone.in(input.getPipeline());
+              }
+            });
+
+    p.traverseTopologically(visitor);
+    assertThat(
+        visitor.getStepNames(),
+        Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry(
+            created.getProducingTransformInternal(), "s0"));
+    assertThat(
+        visitor.getStepNames(),
+        Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry(
+            transformed.getProducingTransformInternal(), "s1"));
+    assertThat(
+        visitor.getStepNames(),
+        Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry(
+            finished.getProducingTransformInternal(), "s2"));
+  }
+
+  @Test
   public void traverseMultipleTimesThrows() {
     p.apply(Create.of(1, 2, 3));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5888df7b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
index fde2cb4..e1faf1b 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
@@ -73,7 +73,6 @@ import org.junit.runners.JUnit4;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -90,7 +89,8 @@ public class InProcessEvaluationContextTest {
   private PCollection<KV<String, Integer>> downstream;
   private PCollectionView<Iterable<Integer>> view;
   private PCollection<Long> unbounded;
-
+  private Collection<AppliedPTransform<?, ?, ?>> rootTransforms;
+  private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers;
 
   @Before
   public void setup() {
@@ -103,32 +103,20 @@ public class InProcessEvaluationContextTest {
     downstream = created.apply(WithKeys.<String, Integer>of("foo"));
     view = created.apply(View.<Integer>asIterable());
     unbounded = p.apply(CountingInput.unbounded());
-    Collection<AppliedPTransform<?, ?, ?>> rootTransforms =
-        ImmutableList.<AppliedPTransform<?, ?, ?>>of(
-            created.getProducingTransformInternal(), unbounded.getProducingTransformInternal());
-    Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers = new HashMap<>();
-    valueToConsumers.put(
-        created,
-        ImmutableList.<AppliedPTransform<?, ?, ?>>of(
-            downstream.getProducingTransformInternal(), view.getProducingTransformInternal()));
-    valueToConsumers.put(unbounded, ImmutableList.<AppliedPTransform<?, ?, ?>>of());
-    valueToConsumers.put(downstream, ImmutableList.<AppliedPTransform<?, ?, ?>>of());
-    valueToConsumers.put(view, ImmutableList.<AppliedPTransform<?, ?, ?>>of());
-
-    Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
-    stepNames.put(created.getProducingTransformInternal(), "s1");
-    stepNames.put(downstream.getProducingTransformInternal(), "s2");
-    stepNames.put(view.getProducingTransformInternal(), "s3");
-    stepNames.put(unbounded.getProducingTransformInternal(), "s4");
-
-    Collection<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(view);
-    context = InProcessEvaluationContext.create(
+
+    ConsumerTrackingPipelineVisitor cVis = new ConsumerTrackingPipelineVisitor();
+    p.traverseTopologically(cVis);
+    rootTransforms = cVis.getRootTransforms();
+    valueToConsumers = cVis.getValueToConsumers();
+
+    context =
+        InProcessEvaluationContext.create(
             runner.getPipelineOptions(),
             InProcessBundleFactory.create(),
             rootTransforms,
             valueToConsumers,
-            stepNames,
-            views);
+            cVis.getStepNames(),
+            cVis.getViews());
   }
 
   @Test
@@ -495,16 +483,14 @@ public class InProcessEvaluationContextTest {
         null,
         ImmutableList.<TimerData>of(),
         StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
-    context.handleResult(
-        committedBundle,
-        ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build());
     assertThat(context.isDone(), is(false));
 
-    context.handleResult(
-        committedBundle,
-        ImmutableList.<TimerData>of(),
-        StepTransformResult.withoutHold(view.getProducingTransformInternal()).build());
+    for (AppliedPTransform<?, ?, ?> consumers : valueToConsumers.get(created)) {
+      context.handleResult(
+          committedBundle,
+          ImmutableList.<TimerData>of(),
+          StepTransformResult.withoutHold(consumers).build());
+    }
     assertThat(context.isDone(), is(true));
   }
 


[2/2] incubator-beam git commit: This closes #113

Posted by bc...@apache.org.
This closes #113


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1c2fa03c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1c2fa03c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1c2fa03c

Branch: refs/heads/master
Commit: 1c2fa03ccdbab556837f65dcf27c5b0117ad585f
Parents: 2f8ba65 5888df7
Author: bchambers <bc...@google.com>
Authored: Tue Apr 5 10:11:11 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 5 10:11:11 2016 -0700

----------------------------------------------------------------------
 .../ConsumerTrackingPipelineVisitor.java        |  2 +-
 .../ConsumerTrackingPipelineVisitorTest.java    | 37 +++++++++++++++
 .../InProcessEvaluationContextTest.java         | 50 +++++++-------------
 3 files changed, 56 insertions(+), 33 deletions(-)
----------------------------------------------------------------------