You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/14 17:40:02 UTC
[1/2] beam git commit: This closes #2506
Repository: beam
Updated Branches:
refs/heads/master 6bf42622c -> b2329a7ef
This closes #2506
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b2329a7e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b2329a7e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b2329a7e
Branch: refs/heads/master
Commit: b2329a7eff1e2b660af1efcc0a58bc54a478bfdd
Parents: 6bf4262 79f0d11
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 14 10:39:52 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Apr 14 10:39:52 2017 -0700
----------------------------------------------------------------------
.../main/java/org/apache/beam/sdk/Pipeline.java | 24 ++++--
.../java/org/apache/beam/sdk/PipelineTest.java | 77 ++++++++++++++++++++
2 files changed, 93 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Free PTransform Names if they are being
Replaced
Posted by tg...@apache.org.
Free PTransform Names if they are being Replaced
Naming is based on what's in the graph, not what once was there.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/79f0d114
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/79f0d114
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/79f0d114
Branch: refs/heads/master
Commit: 79f0d114b75752024ca41038b649f72a0882dabd
Parents: 6bf4262
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 11 18:17:02 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Apr 14 10:39:52 2017 -0700
----------------------------------------------------------------------
.../main/java/org/apache/beam/sdk/Pipeline.java | 24 ++++--
.../java/org/apache/beam/sdk/PipelineTest.java | 77 ++++++++++++++++++++
2 files changed, 93 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/79f0d114/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 11d781d..791166e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -30,7 +30,6 @@ import java.util.Set;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -229,26 +228,38 @@ public class Pipeline {
}
private void replace(final PTransformOverride override) {
- final Collection<Node> matches = new ArrayList<>();
+ final Set<Node> matches = new HashSet<>();
+ final Set<Node> freedNodes = new HashSet<>();
transforms.visit(
new PipelineVisitor.Defaults() {
@Override
public CompositeBehavior enterCompositeTransform(Node node) {
+ if (!node.isRootNode() && freedNodes.contains(node.getEnclosingNode())) {
+ // This node will be freed because its parent will be freed.
+ freedNodes.add(node);
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
if (!node.isRootNode() && override.getMatcher().matches(node.toAppliedPTransform())) {
matches.add(node);
- // This node will be replaced. It should not be visited.
- return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+ // This node will be freed. When we visit any of its children, they will also be freed
+ freedNodes.add(node);
}
return CompositeBehavior.ENTER_TRANSFORM;
}
@Override
public void visitPrimitiveTransform(Node node) {
- if (override.getMatcher().matches(node.toAppliedPTransform())) {
+ if (freedNodes.contains(node.getEnclosingNode())) {
+ freedNodes.add(node);
+ } else if (override.getMatcher().matches(node.toAppliedPTransform())) {
matches.add(node);
+ freedNodes.add(node);
}
}
});
+ for (Node freedNode : freedNodes) {
+ usedFullNames.remove(freedNode.getFullName());
+ }
for (Node match : matches) {
applyReplacement(match, override.getOverrideFactory());
}
@@ -486,9 +497,6 @@ public class Pipeline {
void applyReplacement(
Node original,
PTransformOverrideFactory<InputT, OutputT, TransformT> replacementFactory) {
- // Names for top-level transforms have been assigned. Any new collisions are within a node
- // and its replacement.
- getOptions().setStableUniqueNames(CheckEnabled.OFF);
PTransform<InputT, OutputT> replacement =
replacementFactory.getReplacementTransform((TransformT) original.getTransform());
if (replacement == original.getTransform()) {
http://git-wip-us.apache.org/repos/asf/beam/blob/79f0d114/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index 0a5746b..6ce016d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.isA;
import static org.hamcrest.Matchers.not;
@@ -29,8 +30,10 @@ import static org.junit.Assert.fail;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.io.CountingInput;
@@ -51,6 +54,7 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
@@ -384,6 +388,79 @@ public class PipelineTest {
new UnboundedCountingInputOverride())));
}
+ @Test
+ public void testReplacedNames() {
+ final PCollection<String> originalInput = pipeline.apply(Create.of("foo", "bar", "baz"));
+ class OriginalTransform extends PTransform<PCollection<String>, PCollection<Long>> {
+ @Override
+ public PCollection<Long> expand(PCollection<String> input) {
+ return input.apply("custom_name", Count.<String>globally());
+ }
+ }
+ class ReplacementTransform extends PTransform<PCollection<String>, PCollection<Long>> {
+ @Override
+ public PCollection<Long> expand(PCollection<String> input) {
+ return input.apply("custom_name", Count.<String>globally());
+ }
+ }
+ class ReplacementOverrideFactory
+ implements PTransformOverrideFactory<
+ PCollection<String>, PCollection<Long>, OriginalTransform> {
+
+ @Override
+ public PTransform<PCollection<String>, PCollection<Long>> getReplacementTransform(
+ OriginalTransform transform) {
+ return new ReplacementTransform();
+ }
+
+ @Override
+ public PCollection<String> getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) {
+ return originalInput;
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ Map<TupleTag<?>, PValue> outputs, PCollection<Long> newOutput) {
+ return Collections.<PValue, ReplacementOutput>singletonMap(
+ newOutput,
+ ReplacementOutput.of(
+ TaggedPValue.ofExpandedValue(
+ Iterables.getOnlyElement(outputs.values())),
+ TaggedPValue.ofExpandedValue(newOutput)));
+ }
+ }
+
+ class OriginalMatcher implements PTransformMatcher {
+ @Override
+ public boolean matches(AppliedPTransform<?, ?, ?> application) {
+ return application.getTransform() instanceof OriginalTransform;
+ }
+ }
+
+ originalInput.apply("original_application", new OriginalTransform());
+ pipeline.replaceAll(
+ Collections.singletonList(
+ PTransformOverride.of(new OriginalMatcher(), new ReplacementOverrideFactory())));
+ final Set<String> names = new HashSet<>();
+ pipeline.traverseTopologically(
+ new PipelineVisitor.Defaults() {
+ @Override
+ public void leaveCompositeTransform(Node node) {
+ if (!node.isRootNode()) {
+ names.add(node.getFullName());
+ }
+ }
+
+ @Override
+ public void visitPrimitiveTransform(Node node) {
+ names.add(node.getFullName());
+ }
+ });
+
+ assertThat(names, hasItem("original_application/custom_name"));
+ assertThat(names, not(hasItem("original_application/custom_name2")));
+ }
+
static class BoundedCountingInputOverride
implements PTransformOverrideFactory<PBegin, PCollection<Long>, BoundedCountingInput> {
@Override