You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/24 04:35:27 UTC
[44/50] [abbrv] beam git commit: Translate a Pipeline in SdkComponents
Translate a Pipeline in SdkComponents
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1a05a6a6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1a05a6a6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1a05a6a6
Branch: refs/heads/jstorm-runner
Commit: 1a05a6a66fabfaa2968f81a73df6b8a1a6fc1301
Parents: d6cc850
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 11 16:50:47 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed May 17 09:29:25 2017 -0700
----------------------------------------------------------------------
.../runners/core/construction/PTransforms.java | 17 ++--
.../core/construction/SdkComponents.java | 50 ++++++++++
.../core/construction/SdkComponentsTest.java | 100 +++++++++++++++++++
3 files changed, 160 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1a05a6a6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
index 6d2c6b6..d25d342 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
@@ -66,13 +66,16 @@ public class PTransforms {
components.registerPCollection((PCollection<?>) taggedInput.getValue()));
}
for (Map.Entry<TupleTag<?>, PValue> taggedOutput : appliedPTransform.getOutputs().entrySet()) {
- checkArgument(
- taggedOutput.getValue() instanceof PCollection,
- "Unexpected output type %s",
- taggedOutput.getValue().getClass());
- transformBuilder.putOutputs(
- toProto(taggedOutput.getKey()),
- components.registerPCollection((PCollection<?>) taggedOutput.getValue()));
+ // TODO: Remove gating
+ if (taggedOutput.getValue() instanceof PCollection) {
+ checkArgument(
+ taggedOutput.getValue() instanceof PCollection,
+ "Unexpected output type %s",
+ taggedOutput.getValue().getClass());
+ transformBuilder.putOutputs(
+ toProto(taggedOutput.getKey()),
+ components.registerPCollection((PCollection<?>) taggedOutput.getValue()));
+ }
}
for (AppliedPTransform<?, ?, ?> subtransform : subtransforms) {
transformBuilder.addSubtransforms(components.getExistingPTransformId(subtransform));
http://git-wip-us.apache.org/repos/asf/beam/blob/1a05a6a6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index 2de8237..eb29b9a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -22,16 +22,24 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.Equivalence;
+import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ListMultimap;
import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.NameUtils;
import org.apache.beam.sdk.values.PCollection;
@@ -54,6 +62,48 @@ class SdkComponents {
return new SdkComponents();
}
+ public static RunnerApi.Pipeline translatePipeline(Pipeline p) {
+ final SdkComponents components = create();
+ final Collection<String> rootIds = new HashSet<>();
+ p.traverseTopologically(
+ new PipelineVisitor.Defaults() {
+ private final ListMultimap<Node, AppliedPTransform<?, ?, ?>> children =
+ ArrayListMultimap.create();
+
+ @Override
+ public void leaveCompositeTransform(Node node) {
+ if (node.isRootNode()) {
+ for (AppliedPTransform<?, ?, ?> pipelineRoot : children.get(node)) {
+ rootIds.add(components.getExistingPTransformId(pipelineRoot));
+ }
+ } else {
+ children.put(node.getEnclosingNode(), node.toAppliedPTransform());
+ try {
+ components.registerPTransform(node.toAppliedPTransform(), children.get(node));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void visitPrimitiveTransform(Node node) {
+ children.put(node.getEnclosingNode(), node.toAppliedPTransform());
+ try {
+ components.registerPTransform(
+ node.toAppliedPTransform(), Collections.<AppliedPTransform<?, ?, ?>>emptyList());
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ });
+ // TODO: Display Data
+ return RunnerApi.Pipeline.newBuilder()
+ .setComponents(components.toComponents())
+ .addAllRootTransformIds(rootIds)
+ .build();
+ }
+
private SdkComponents() {
this.componentsBuilder = RunnerApi.Components.newBuilder();
this.transformIds = HashBiMap.create();
http://git-wip-us.apache.org/repos/asf/beam/blob/1a05a6a6/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
index 82840d6..7424886 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
@@ -24,25 +24,43 @@ import static org.hamcrest.Matchers.isEmptyOrNullString;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
+import com.google.common.base.Equivalence;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SetCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
import org.hamcrest.Matchers;
+import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -60,6 +78,88 @@ public class SdkComponentsTest {
private SdkComponents components = SdkComponents.create();
@Test
+ public void translatePipeline() {
+ BigEndianLongCoder customCoder = BigEndianLongCoder.of();
+ PCollection<Long> elems = pipeline.apply(GenerateSequence.from(0L).to(207L));
+ PCollection<Long> counted = elems.apply(Count.<Long>globally()).setCoder(customCoder);
+ PCollection<Long> windowed =
+ counted.apply(
+ Window.<Long>into(FixedWindows.of(Duration.standardMinutes(7)))
+ .triggering(
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(AfterPane.elementCountAtLeast(19)))
+ .accumulatingFiredPanes()
+ .withAllowedLateness(Duration.standardMinutes(3L)));
+ final WindowingStrategy<?, ?> windowedStrategy = windowed.getWindowingStrategy();
+ PCollection<KV<String, Long>> keyed = windowed.apply(WithKeys.<String, Long>of("foo"));
+ PCollection<KV<String, Iterable<Long>>> grouped =
+ keyed.apply(GroupByKey.<String, Long>create());
+
+ final RunnerApi.Pipeline pipelineProto = SdkComponents.translatePipeline(pipeline);
+ pipeline.traverseTopologically(
+ new PipelineVisitor() {
+ Set<Node> transforms = new HashSet<>();
+ Set<PCollection<?>> pcollections = new HashSet<>();
+ Set<Equivalence.Wrapper<? extends Coder<?>>> coders = new HashSet<>();
+ Set<WindowingStrategy<?, ?>> windowingStrategies = new HashSet<>();
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(Node node) {
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void leaveCompositeTransform(Node node) {
+ if (node.isRootNode()) {
+ assertThat(
+ "Unexpected number of PTransforms",
+ pipelineProto.getComponents().getTransformsCount(),
+ equalTo(transforms.size()));
+ assertThat(
+ "Unexpected number of PCollections",
+ pipelineProto.getComponents().getPcollectionsCount(),
+ equalTo(pcollections.size()));
+ assertThat(
+ "Unexpected number of Coders",
+ pipelineProto.getComponents().getCodersCount(),
+ equalTo(coders.size()));
+ assertThat(
+ "Unexpected number of Windowing Strategies",
+ pipelineProto.getComponents().getWindowingStrategiesCount(),
+ equalTo(windowingStrategies.size()));
+ } else {
+ transforms.add(node);
+ }
+ }
+
+ @Override
+ public void visitPrimitiveTransform(Node node) {
+ transforms.add(node);
+ }
+
+ @Override
+ public void visitValue(PValue value, Node producer) {
+ if (value instanceof PCollection) {
+ PCollection pc = (PCollection) value;
+ pcollections.add(pc);
+ addCoders(pc.getCoder());
+ windowingStrategies.add(pc.getWindowingStrategy());
+ addCoders(pc.getWindowingStrategy().getWindowFn().windowCoder());
+ }
+ }
+
+ private void addCoders(Coder<?> coder) {
+ coders.add(Equivalence.<Coder<?>>identity().wrap(coder));
+ if (coder instanceof StructuredCoder) {
+ for (Coder<?> component : ((StructuredCoder <?>) coder).getComponents()) {
+ addCoders(component);
+ }
+ }
+ }
+ });
+ }
+
+ @Test
public void registerCoder() throws IOException {
Coder<?> coder =
KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(SetCoder.of(ByteArrayCoder.of())));