You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/04/05 19:33:08 UTC
[1/2] incubator-beam git commit: Give root transforms step names
Repository: incubator-beam
Updated Branches:
refs/heads/master 2f8ba65fa -> 1c2fa03cc
Give root transforms step names
Fix a bug where steps would only be given step names if they were a
non-root node.
Use the ConsumerTrackingPipelineVisitor in the
InProcessEvaluationContext test to handle runner-expanded transforms
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5888df7b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5888df7b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5888df7b
Branch: refs/heads/master
Commit: 5888df7b3d6183d389ce3141de321be25256fc2f
Parents: 2f8ba65
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 4 11:10:18 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 5 10:10:43 2016 -0700
----------------------------------------------------------------------
.../ConsumerTrackingPipelineVisitor.java | 2 +-
.../ConsumerTrackingPipelineVisitorTest.java | 37 +++++++++++++++
.../InProcessEvaluationContextTest.java | 50 +++++++-------------
3 files changed, 56 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5888df7b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
index ec4f08b..48836e9 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
@@ -76,12 +76,12 @@ public class ConsumerTrackingPipelineVisitor implements PipelineVisitor {
public void visitTransform(TransformTreeNode node) {
toFinalize.removeAll(node.getInput().expand());
AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node);
+ stepNames.put(appliedTransform, genStepName());
if (node.getInput().expand().isEmpty()) {
rootTransforms.add(appliedTransform);
} else {
for (PValue value : node.getInput().expand()) {
valueToConsumers.get(value).add(appliedTransform);
- stepNames.put(appliedTransform, genStepName());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5888df7b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
index bea6fe1..905f58f 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
@@ -177,6 +177,43 @@ public class ConsumerTrackingPipelineVisitorTest implements Serializable {
}
@Test
+ public void getStepNamesContainsAllTransforms() {
+ PCollection<String> created = p.apply(Create.of("1", "2", "3"));
+ PCollection<String> transformed =
+ created.apply(
+ ParDo.of(
+ new DoFn<String, String>() {
+ @Override
+ public void processElement(DoFn<String, String>.ProcessContext c)
+ throws Exception {
+ c.output(Integer.toString(c.element().length()));
+ }
+ }));
+ PDone finished =
+ transformed.apply(
+ new PTransform<PInput, PDone>() {
+ @Override
+ public PDone apply(PInput input) {
+ return PDone.in(input.getPipeline());
+ }
+ });
+
+ p.traverseTopologically(visitor);
+ assertThat(
+ visitor.getStepNames(),
+ Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry(
+ created.getProducingTransformInternal(), "s0"));
+ assertThat(
+ visitor.getStepNames(),
+ Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry(
+ transformed.getProducingTransformInternal(), "s1"));
+ assertThat(
+ visitor.getStepNames(),
+ Matchers.<AppliedPTransform<?, ?, ?>, String>hasEntry(
+ finished.getProducingTransformInternal(), "s2"));
+ }
+
+ @Test
public void traverseMultipleTimesThrows() {
p.apply(Create.of(1, 2, 3));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5888df7b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
index fde2cb4..e1faf1b 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
@@ -73,7 +73,6 @@ import org.junit.runners.JUnit4;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -90,7 +89,8 @@ public class InProcessEvaluationContextTest {
private PCollection<KV<String, Integer>> downstream;
private PCollectionView<Iterable<Integer>> view;
private PCollection<Long> unbounded;
-
+ private Collection<AppliedPTransform<?, ?, ?>> rootTransforms;
+ private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers;
@Before
public void setup() {
@@ -103,32 +103,20 @@ public class InProcessEvaluationContextTest {
downstream = created.apply(WithKeys.<String, Integer>of("foo"));
view = created.apply(View.<Integer>asIterable());
unbounded = p.apply(CountingInput.unbounded());
- Collection<AppliedPTransform<?, ?, ?>> rootTransforms =
- ImmutableList.<AppliedPTransform<?, ?, ?>>of(
- created.getProducingTransformInternal(), unbounded.getProducingTransformInternal());
- Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers = new HashMap<>();
- valueToConsumers.put(
- created,
- ImmutableList.<AppliedPTransform<?, ?, ?>>of(
- downstream.getProducingTransformInternal(), view.getProducingTransformInternal()));
- valueToConsumers.put(unbounded, ImmutableList.<AppliedPTransform<?, ?, ?>>of());
- valueToConsumers.put(downstream, ImmutableList.<AppliedPTransform<?, ?, ?>>of());
- valueToConsumers.put(view, ImmutableList.<AppliedPTransform<?, ?, ?>>of());
-
- Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
- stepNames.put(created.getProducingTransformInternal(), "s1");
- stepNames.put(downstream.getProducingTransformInternal(), "s2");
- stepNames.put(view.getProducingTransformInternal(), "s3");
- stepNames.put(unbounded.getProducingTransformInternal(), "s4");
-
- Collection<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(view);
- context = InProcessEvaluationContext.create(
+
+ ConsumerTrackingPipelineVisitor cVis = new ConsumerTrackingPipelineVisitor();
+ p.traverseTopologically(cVis);
+ rootTransforms = cVis.getRootTransforms();
+ valueToConsumers = cVis.getValueToConsumers();
+
+ context =
+ InProcessEvaluationContext.create(
runner.getPipelineOptions(),
InProcessBundleFactory.create(),
rootTransforms,
valueToConsumers,
- stepNames,
- views);
+ cVis.getStepNames(),
+ cVis.getViews());
}
@Test
@@ -495,16 +483,14 @@ public class InProcessEvaluationContextTest {
null,
ImmutableList.<TimerData>of(),
StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
- context.handleResult(
- committedBundle,
- ImmutableList.<TimerData>of(),
- StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build());
assertThat(context.isDone(), is(false));
- context.handleResult(
- committedBundle,
- ImmutableList.<TimerData>of(),
- StepTransformResult.withoutHold(view.getProducingTransformInternal()).build());
+ for (AppliedPTransform<?, ?, ?> consumers : valueToConsumers.get(created)) {
+ context.handleResult(
+ committedBundle,
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(consumers).build());
+ }
assertThat(context.isDone(), is(true));
}
[2/2] incubator-beam git commit: This closes #113
Posted by bc...@apache.org.
This closes #113
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1c2fa03c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1c2fa03c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1c2fa03c
Branch: refs/heads/master
Commit: 1c2fa03ccdbab556837f65dcf27c5b0117ad585f
Parents: 2f8ba65 5888df7
Author: bchambers <bc...@google.com>
Authored: Tue Apr 5 10:11:11 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Tue Apr 5 10:11:11 2016 -0700
----------------------------------------------------------------------
.../ConsumerTrackingPipelineVisitor.java | 2 +-
.../ConsumerTrackingPipelineVisitorTest.java | 37 +++++++++++++++
.../InProcessEvaluationContextTest.java | 50 +++++++-------------
3 files changed, 56 insertions(+), 33 deletions(-)
----------------------------------------------------------------------