You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/22 22:16:54 UTC
[1/2] beam git commit: Remove Pipeline reference from
TransformHierarchy
Repository: beam
Updated Branches:
refs/heads/master 983a44926 -> b633abe2c
Remove Pipeline reference from TransformHierarchy
This change removes a direct dependency cycle between Pipeline and
TransformHierarchy. There is still an indirect cycle through PValues, but that
is slightly less problematic.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5e9fcebc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5e9fcebc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5e9fcebc
Branch: refs/heads/master
Commit: 5e9fcebc07725de368391914781e5b4d5f9c4a19
Parents: d7a4e49
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri May 19 12:57:41 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sat May 20 08:33:21 2017 -0700
----------------------------------------------------------------------
.../translation/ApexPipelineTranslator.java | 4 +-
.../apex/translation/TranslationContext.java | 5 +-
.../core/construction/SdkComponents.java | 14 +++---
.../core/construction/SdkComponentsTest.java | 7 +--
.../beam/runners/direct/DirectGraphVisitor.java | 3 +-
.../direct/KeyedPValueTrackingVisitor.java | 5 +-
.../flink/FlinkBatchPipelineTranslator.java | 2 +-
.../apache/beam/runners/flink/FlinkRunner.java | 10 +---
.../flink/FlinkStreamingPipelineTranslator.java | 4 +-
.../dataflow/DataflowPipelineTranslator.java | 6 +--
.../beam/runners/dataflow/DataflowRunner.java | 2 +-
.../apache/beam/runners/spark/SparkRunner.java | 2 +-
.../streaming/TrackStreamingSourcesTest.java | 14 +++++-
.../main/java/org/apache/beam/sdk/Pipeline.java | 50 ++++++++++++++++----
.../beam/sdk/runners/TransformHierarchy.java | 6 +--
.../sdk/runners/TransformHierarchyTest.java | 2 +-
16 files changed, 82 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
index 32e470f..bda074b 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory;
* into Apex logical plan {@link DAG}.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
-public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
+public class ApexPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
private static final Logger LOG = LoggerFactory.getLogger(ApexPipelineTranslator.class);
/**
@@ -110,7 +110,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
throw new UnsupportedOperationException(
"no translator registered for " + transform);
}
- translationContext.setCurrentTransform(node);
+ translationContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
translator.translate(transform, translationContext);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
index a5e3028..aff3863 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
@@ -36,7 +36,6 @@ import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -77,8 +76,8 @@ class TranslationContext {
this.pipelineOptions = pipelineOptions;
}
- public void setCurrentTransform(TransformHierarchy.Node treeNode) {
- this.currentTransform = treeNode.toAppliedPTransform();
+ public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
+ this.currentTransform = transform;
}
public ApexPipelineOptions getPipelineOptions() {
http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index eb29b9a..5714fc5 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -62,10 +62,10 @@ class SdkComponents {
return new SdkComponents();
}
- public static RunnerApi.Pipeline translatePipeline(Pipeline p) {
+ public static RunnerApi.Pipeline translatePipeline(Pipeline pipeline) {
final SdkComponents components = create();
final Collection<String> rootIds = new HashSet<>();
- p.traverseTopologically(
+ pipeline.traverseTopologically(
new PipelineVisitor.Defaults() {
private final ListMultimap<Node, AppliedPTransform<?, ?, ?>> children =
ArrayListMultimap.create();
@@ -77,9 +77,10 @@ class SdkComponents {
rootIds.add(components.getExistingPTransformId(pipelineRoot));
}
} else {
- children.put(node.getEnclosingNode(), node.toAppliedPTransform());
+ children.put(node.getEnclosingNode(), node.toAppliedPTransform(getPipeline()));
try {
- components.registerPTransform(node.toAppliedPTransform(), children.get(node));
+ components.registerPTransform(
+ node.toAppliedPTransform(getPipeline()), children.get(node));
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -88,10 +89,11 @@ class SdkComponents {
@Override
public void visitPrimitiveTransform(Node node) {
- children.put(node.getEnclosingNode(), node.toAppliedPTransform());
+ children.put(node.getEnclosingNode(), node.toAppliedPTransform(getPipeline()));
try {
components.registerPTransform(
- node.toAppliedPTransform(), Collections.<AppliedPTransform<?, ?, ?>>emptyList());
+ node.toAppliedPTransform(getPipeline()),
+ Collections.<AppliedPTransform<?, ?, ?>>emptyList());
} catch (IOException e) {
throw new IllegalStateException(e);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
index 7424886..55702ea 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
@@ -97,18 +97,13 @@ public class SdkComponentsTest {
final RunnerApi.Pipeline pipelineProto = SdkComponents.translatePipeline(pipeline);
pipeline.traverseTopologically(
- new PipelineVisitor() {
+ new PipelineVisitor.Defaults() {
Set<Node> transforms = new HashSet<>();
Set<PCollection<?>> pcollections = new HashSet<>();
Set<Equivalence.Wrapper<? extends Coder<?>>> coders = new HashSet<>();
Set<WindowingStrategy<?, ?>> windowingStrategies = new HashSet<>();
@Override
- public CompositeBehavior enterCompositeTransform(Node node) {
- return CompositeBehavior.ENTER_TRANSFORM;
- }
-
- @Override
public void leaveCompositeTransform(Node node) {
if (node.isRootNode()) {
assertThat(
http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
index 1ee8ceb..01204e3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.values.PValue;
* input after the upstream transform has produced and committed output.
*/
class DirectGraphVisitor extends PipelineVisitor.Defaults {
+
private Map<POutput, AppliedPTransform<?, ?, ?>> producers = new HashMap<>();
private ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers =
@@ -101,7 +102,7 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformHierarchy.Node node) {
@SuppressWarnings({"rawtypes", "unchecked"})
- AppliedPTransform<?, ?, ?> application = node.toAppliedPTransform();
+ AppliedPTransform<?, ?, ?> application = node.toAppliedPTransform(getPipeline());
return application;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
index 347f313..f9b2dae 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
@@ -44,7 +44,7 @@ import org.apache.beam.sdk.values.TupleTag;
*/
// TODO: Handle Key-preserving transforms when appropriate and more aggressively make PTransforms
// unkeyed
-class KeyedPValueTrackingVisitor implements PipelineVisitor {
+class KeyedPValueTrackingVisitor extends PipelineVisitor.Defaults {
private static final Set<Class<? extends PTransform>> PRODUCES_KEYED_OUTPUTS =
ImmutableSet.of(
@@ -91,9 +91,6 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor {
}
@Override
- public void visitPrimitiveTransform(TransformHierarchy.Node node) {}
-
- @Override
public void visitValue(PValue value, TransformHierarchy.Node producer) {
boolean inputsAreKeyed = true;
for (PValue input : producer.getInputs().values()) {
http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
index 854b674..50910b5 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java
@@ -112,7 +112,7 @@ class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
BatchTransformTranslator<T> typedTranslator = (BatchTransformTranslator<T>) translator;
// create the applied PTransform on the batchContext
- batchContext.setCurrentTransform(node.toAppliedPTransform());
+ batchContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
typedTranslator.translateNode(typedTransform, batchContext);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 80ef7bb..ca12615 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.values.PValue;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.client.program.DetachedEnvironment;
import org.slf4j.Logger;
@@ -199,10 +198,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
// have just recorded the full names during apply time.
if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) {
final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet<>();
- pipeline.traverseTopologically(new Pipeline.PipelineVisitor() {
- @Override
- public void visitValue(PValue value, TransformHierarchy.Node producer) {
- }
+ pipeline.traverseTopologically(new Pipeline.PipelineVisitor.Defaults() {
@Override
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
@@ -218,10 +214,6 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> {
}
return CompositeBehavior.ENTER_TRANSFORM;
}
-
- @Override
- public void leaveCompositeTransform(TransformHierarchy.Node node) {
- }
});
LOG.warn("Unable to use indexed implementation for View.AsMap and View.AsMultimap for {} "
http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index 53a1fa1..8da68c5 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -188,7 +188,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator;
// create the applied PTransform on the streamingContext
- streamingContext.setCurrentTransform(node.toAppliedPTransform());
+ streamingContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
typedTranslator.translateNode(typedTransform, streamingContext);
}
@@ -203,7 +203,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
@SuppressWarnings("unchecked")
StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator;
- streamingContext.setCurrentTransform(node.toAppliedPTransform());
+ streamingContext.setCurrentTransform(node.toAppliedPTransform(getPipeline()));
return typedTranslator.canTranslate(typedTransform, streamingContext);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 840bda8..6d7a0f8 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -431,18 +431,18 @@ public class DataflowPipelineTranslator {
transform,
node.getFullName());
LOG.debug("Translating {}", transform);
- currentTransform = node.toAppliedPTransform();
+ currentTransform = node.toAppliedPTransform(getPipeline());
translator.translate(transform, this);
currentTransform = null;
}
@Override
public void visitValue(PValue value, TransformHierarchy.Node producer) {
- producers.put(value, producer.toAppliedPTransform());
+ producers.put(value, producer.toAppliedPTransform(getPipeline()));
LOG.debug("Checking translation of {}", value);
if (!producer.isCompositeNode()) {
// Primitive transforms are the only ones assigned step names.
- asOutputReference(value, producer.toAppliedPTransform());
+ asOutputReference(value, producer.toAppliedPTransform(getPipeline()));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 2ef8737..cce6ce7 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -729,7 +729,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
if (!ptransformViewsWithNonDeterministicKeyCoders.isEmpty()) {
final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders = new TreeSet<>();
pipeline.traverseTopologically(
- new PipelineVisitor() {
+ new PipelineVisitor.Defaults() {
@Override
public void visitValue(PValue value, TransformHierarchy.Node producer) {}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 8c02f0f..9e2426e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -404,7 +404,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
@SuppressWarnings("unchecked")
TransformEvaluator<TransformT> evaluator = translate(node, transform, transformClass);
LOG.info("Evaluating {}", transform);
- AppliedPTransform<?, ?, ?> appliedTransform = node.toAppliedPTransform();
+ AppliedPTransform<?, ?, ?> appliedTransform = node.toAppliedPTransform(getPipeline());
ctxt.setCurrentTransform(appliedTransform);
evaluator.evaluate(transform, ctxt);
ctxt.setCurrentTransform(null);
http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
index 33a636a..e8a5951 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
@@ -148,6 +148,12 @@ public class TrackStreamingSourcesTest {
}
@Override
+ public void enterPipeline(Pipeline p) {
+ super.enterPipeline(p);
+ evaluator.enterPipeline(p);
+ }
+
+ @Override
public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
return evaluator.enterCompositeTransform(node);
}
@@ -156,7 +162,7 @@ public class TrackStreamingSourcesTest {
public void visitPrimitiveTransform(TransformHierarchy.Node node) {
PTransform transform = node.getTransform();
if (transform.getClass() == transformClassToAssert) {
- AppliedPTransform<?, ?, ?> appliedTransform = node.toAppliedPTransform();
+ AppliedPTransform<?, ?, ?> appliedTransform = node.toAppliedPTransform(getPipeline());
ctxt.setCurrentTransform(appliedTransform);
//noinspection unchecked
Dataset dataset = ctxt.borrowDataset((PTransform<? extends PValue, ?>) transform);
@@ -166,6 +172,12 @@ public class TrackStreamingSourcesTest {
evaluator.visitPrimitiveTransform(node);
}
}
+
+ @Override
+ public void leavePipeline(Pipeline p) {
+ super.leavePipeline(p);
+ evaluator.leavePipeline(p);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 83496a5..bdf8a12 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk;
+import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
@@ -205,7 +206,7 @@ public class Pipeline {
public CompositeBehavior enterCompositeTransform(Node node) {
if (!node.isRootNode()) {
for (PTransformOverride override : overrides) {
- if (override.getMatcher().matches(node.toAppliedPTransform())) {
+ if (override.getMatcher().matches(node.toAppliedPTransform(getPipeline()))) {
matched.put(node, override);
}
}
@@ -227,7 +228,7 @@ public class Pipeline {
@Override
public void visitPrimitiveTransform(Node node) {
for (PTransformOverride override : overrides) {
- if (override.getMatcher().matches(node.toAppliedPTransform())) {
+ if (override.getMatcher().matches(node.toAppliedPTransform(getPipeline()))) {
matched.put(node, override);
}
}
@@ -238,7 +239,7 @@ public class Pipeline {
private void replace(final PTransformOverride override) {
final Set<Node> matches = new HashSet<>();
final Set<Node> freedNodes = new HashSet<>();
- transforms.visit(
+ traverseTopologically(
new PipelineVisitor.Defaults() {
@Override
public CompositeBehavior enterCompositeTransform(Node node) {
@@ -247,7 +248,8 @@ public class Pipeline {
freedNodes.add(node);
return CompositeBehavior.ENTER_TRANSFORM;
}
- if (!node.isRootNode() && override.getMatcher().matches(node.toAppliedPTransform())) {
+ if (!node.isRootNode()
+ && override.getMatcher().matches(node.toAppliedPTransform(getPipeline()))) {
matches.add(node);
// This node will be freed. When we visit any of its children, they will also be freed
freedNodes.add(node);
@@ -259,7 +261,7 @@ public class Pipeline {
public void visitPrimitiveTransform(Node node) {
if (freedNodes.contains(node.getEnclosingNode())) {
freedNodes.add(node);
- } else if (override.getMatcher().matches(node.toAppliedPTransform())) {
+ } else if (override.getMatcher().matches(node.toAppliedPTransform(getPipeline()))) {
matches.add(node);
freedNodes.add(node);
}
@@ -334,8 +336,14 @@ public class Pipeline {
@Internal
public interface PipelineVisitor {
/**
- * Called for each composite transform after all topological predecessors have been visited
- * but before any of its component transforms.
+ * Called before visiting anything values or transforms, as many uses of a visitor require
+ * access to the {@link Pipeline} object itself.
+ */
+ void enterPipeline(Pipeline p);
+
+ /**
+ * Called for each composite transform after all topological predecessors have been visited but
+ * before any of its component transforms.
*
* <p>The return value controls whether or not child transforms are visited.
*/
@@ -360,6 +368,11 @@ public class Pipeline {
void visitValue(PValue value, TransformHierarchy.Node producer);
/**
+ * Called when all values and transforms in a {@link Pipeline} have been visited.
+ */
+ void leavePipeline(Pipeline pipeline);
+
+ /**
* Control enum for indicating whether or not a traversal should process the contents of
* a composite transform or not.
*/
@@ -373,6 +386,18 @@ public class Pipeline {
* User implementations can override just those methods they are interested in.
*/
class Defaults implements PipelineVisitor {
+
+ private Pipeline pipeline;
+
+ protected Pipeline getPipeline() {
+ return pipeline;
+ }
+
+ @Override
+ public void enterPipeline(Pipeline pipeline) {
+ this.pipeline = checkNotNull(pipeline);
+ }
+
@Override
public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
return CompositeBehavior.ENTER_TRANSFORM;
@@ -386,6 +411,11 @@ public class Pipeline {
@Override
public void visitValue(PValue value, TransformHierarchy.Node producer) { }
+
+ @Override
+ public void leavePipeline(Pipeline pipeline) {
+ this.pipeline = null;
+ }
}
}
@@ -406,7 +436,9 @@ public class Pipeline {
*/
@Internal
public void traverseTopologically(PipelineVisitor visitor) {
+ visitor.enterPipeline(this);
transforms.visit(visitor);
+ visitor.leavePipeline(this);
}
/**
@@ -444,7 +476,7 @@ public class Pipeline {
/////////////////////////////////////////////////////////////////////////////
// Below here are internal operations, never called by users.
- private final TransformHierarchy transforms = new TransformHierarchy(this);
+ private final TransformHierarchy transforms = new TransformHierarchy();
private Set<String> usedFullNames = new HashSet<>();
private CoderRegistry coderRegistry;
private final List<String> unstableNames = new ArrayList<>();
@@ -495,7 +527,7 @@ public class Pipeline {
PTransformOverrideFactory<InputT, OutputT, TransformT> replacementFactory) {
PTransformReplacement<InputT, OutputT> replacement =
replacementFactory.getReplacementTransform(
- (AppliedPTransform<InputT, OutputT, TransformT>) original.toAppliedPTransform());
+ (AppliedPTransform<InputT, OutputT, TransformT>) original.toAppliedPTransform(this));
if (replacement.getTransform() == original.getTransform()) {
return;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index fac558b..2f0e8ef 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -56,7 +56,6 @@ import org.slf4j.LoggerFactory;
public class TransformHierarchy {
private static final Logger LOG = LoggerFactory.getLogger(TransformHierarchy.class);
- private final Pipeline pipeline;
private final Node root;
private final Map<Node, PInput> unexpandedInputs;
private final Map<POutput, Node> producers;
@@ -65,8 +64,7 @@ public class TransformHierarchy {
// Maintain a stack based on the enclosing nodes
private Node current;
- public TransformHierarchy(Pipeline pipeline) {
- this.pipeline = pipeline;
+ public TransformHierarchy() {
producers = new HashMap<>();
producerInput = new HashMap<>();
unexpandedInputs = new HashMap<>();
@@ -453,7 +451,7 @@ public class TransformHierarchy {
/**
* Returns the {@link AppliedPTransform} representing this {@link Node}.
*/
- public AppliedPTransform<?, ?, ?> toAppliedPTransform() {
+ public AppliedPTransform<?, ?, ?> toAppliedPTransform(Pipeline pipeline) {
return AppliedPTransform.of(
getFullName(), inputs, outputs, (PTransform) getTransform(), pipeline);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e9fcebc/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index 125e159..1197d1b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -79,7 +79,7 @@ public class TransformHierarchyTest implements Serializable {
@Before
public void setup() {
- hierarchy = new TransformHierarchy(pipeline);
+ hierarchy = new TransformHierarchy();
}
@Test
[2/2] beam git commit: This closes #3187: Remove Pipeline reference
from TransformHierarchy
Posted by ke...@apache.org.
This closes #3187: Remove Pipeline reference from TransformHierarchy
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b633abe2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b633abe2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b633abe2
Branch: refs/heads/master
Commit: b633abe2c8efd3ac607a0be14980315e5f0d9fa6
Parents: 983a449 5e9fceb
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 22 15:16:22 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon May 22 15:16:22 2017 -0700
----------------------------------------------------------------------
.../translation/ApexPipelineTranslator.java | 4 +-
.../apex/translation/TranslationContext.java | 5 +-
.../core/construction/SdkComponents.java | 14 +++---
.../core/construction/SdkComponentsTest.java | 7 +--
.../beam/runners/direct/DirectGraphVisitor.java | 3 +-
.../direct/KeyedPValueTrackingVisitor.java | 5 +-
.../flink/FlinkBatchPipelineTranslator.java | 2 +-
.../apache/beam/runners/flink/FlinkRunner.java | 10 +---
.../flink/FlinkStreamingPipelineTranslator.java | 4 +-
.../dataflow/DataflowPipelineTranslator.java | 6 +--
.../beam/runners/dataflow/DataflowRunner.java | 2 +-
.../apache/beam/runners/spark/SparkRunner.java | 2 +-
.../streaming/TrackStreamingSourcesTest.java | 14 +++++-
.../main/java/org/apache/beam/sdk/Pipeline.java | 50 ++++++++++++++++----
.../beam/sdk/runners/TransformHierarchy.java | 6 +--
.../sdk/runners/TransformHierarchyTest.java | 2 +-
16 files changed, 82 insertions(+), 54 deletions(-)
----------------------------------------------------------------------