You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/24 04:35:27 UTC

[44/50] [abbrv] beam git commit: Translate a Pipeline in SdkComponents

Translate a Pipeline in SdkComponents


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1a05a6a6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1a05a6a6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1a05a6a6

Branch: refs/heads/jstorm-runner
Commit: 1a05a6a66fabfaa2968f81a73df6b8a1a6fc1301
Parents: d6cc850
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 11 16:50:47 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed May 17 09:29:25 2017 -0700

----------------------------------------------------------------------
 .../runners/core/construction/PTransforms.java  |  17 ++--
 .../core/construction/SdkComponents.java        |  50 ++++++++++
 .../core/construction/SdkComponentsTest.java    | 100 +++++++++++++++++++
 3 files changed, 160 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1a05a6a6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
index 6d2c6b6..d25d342 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java
@@ -66,13 +66,16 @@ public class PTransforms {
           components.registerPCollection((PCollection<?>) taggedInput.getValue()));
     }
     for (Map.Entry<TupleTag<?>, PValue> taggedOutput : appliedPTransform.getOutputs().entrySet()) {
-      checkArgument(
-          taggedOutput.getValue() instanceof PCollection,
-          "Unexpected output type %s",
-          taggedOutput.getValue().getClass());
-      transformBuilder.putOutputs(
-          toProto(taggedOutput.getKey()),
-          components.registerPCollection((PCollection<?>) taggedOutput.getValue()));
+      // TODO: Remove gating
+      if (taggedOutput.getValue() instanceof PCollection) {
+        checkArgument(
+            taggedOutput.getValue() instanceof PCollection,
+            "Unexpected output type %s",
+            taggedOutput.getValue().getClass());
+        transformBuilder.putOutputs(
+            toProto(taggedOutput.getKey()),
+            components.registerPCollection((PCollection<?>) taggedOutput.getValue()));
+      }
     }
     for (AppliedPTransform<?, ?, ?> subtransform : subtransforms) {
       transformBuilder.addSubtransforms(components.getExistingPTransformId(subtransform));

http://git-wip-us.apache.org/repos/asf/beam/blob/1a05a6a6/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index 2de8237..eb29b9a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -22,16 +22,24 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.base.Equivalence;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ListMultimap;
 import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
 import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.NameUtils;
 import org.apache.beam.sdk.values.PCollection;
@@ -54,6 +62,48 @@ class SdkComponents {
     return new SdkComponents();
   }
 
+  public static RunnerApi.Pipeline translatePipeline(Pipeline p) {
+    final SdkComponents components = create();
+    final Collection<String> rootIds = new HashSet<>();
+    p.traverseTopologically(
+        new PipelineVisitor.Defaults() {
+          private final ListMultimap<Node, AppliedPTransform<?, ?, ?>> children =
+              ArrayListMultimap.create();
+
+          @Override
+          public void leaveCompositeTransform(Node node) {
+            if (node.isRootNode()) {
+              for (AppliedPTransform<?, ?, ?> pipelineRoot : children.get(node)) {
+                rootIds.add(components.getExistingPTransformId(pipelineRoot));
+              }
+            } else {
+              children.put(node.getEnclosingNode(), node.toAppliedPTransform());
+              try {
+                components.registerPTransform(node.toAppliedPTransform(), children.get(node));
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          }
+
+          @Override
+          public void visitPrimitiveTransform(Node node) {
+            children.put(node.getEnclosingNode(), node.toAppliedPTransform());
+            try {
+              components.registerPTransform(
+                  node.toAppliedPTransform(), Collections.<AppliedPTransform<?, ?, ?>>emptyList());
+            } catch (IOException e) {
+              throw new IllegalStateException(e);
+            }
+          }
+        });
+    // TODO: Display Data
+    return RunnerApi.Pipeline.newBuilder()
+        .setComponents(components.toComponents())
+        .addAllRootTransformIds(rootIds)
+        .build();
+  }
+
   private SdkComponents() {
     this.componentsBuilder = RunnerApi.Components.newBuilder();
     this.transformIds = HashBiMap.create();

http://git-wip-us.apache.org/repos/asf/beam/blob/1a05a6a6/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
index 82840d6..7424886 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
@@ -24,25 +24,43 @@ import static org.hamcrest.Matchers.isEmptyOrNullString;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
+import com.google.common.base.Equivalence;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SetCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
 import org.hamcrest.Matchers;
+import org.joda.time.Duration;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -60,6 +78,88 @@ public class SdkComponentsTest {
   private SdkComponents components = SdkComponents.create();
 
   @Test
+  public void translatePipeline() {
+    BigEndianLongCoder customCoder = BigEndianLongCoder.of();
+    PCollection<Long> elems = pipeline.apply(GenerateSequence.from(0L).to(207L));
+    PCollection<Long> counted = elems.apply(Count.<Long>globally()).setCoder(customCoder);
+    PCollection<Long> windowed =
+        counted.apply(
+            Window.<Long>into(FixedWindows.of(Duration.standardMinutes(7)))
+                .triggering(
+                    AfterWatermark.pastEndOfWindow()
+                        .withEarlyFirings(AfterPane.elementCountAtLeast(19)))
+                .accumulatingFiredPanes()
+                .withAllowedLateness(Duration.standardMinutes(3L)));
+    final WindowingStrategy<?, ?> windowedStrategy = windowed.getWindowingStrategy();
+    PCollection<KV<String, Long>> keyed = windowed.apply(WithKeys.<String, Long>of("foo"));
+    PCollection<KV<String, Iterable<Long>>> grouped =
+        keyed.apply(GroupByKey.<String, Long>create());
+
+    final RunnerApi.Pipeline pipelineProto = SdkComponents.translatePipeline(pipeline);
+    pipeline.traverseTopologically(
+        new PipelineVisitor() {
+          Set<Node> transforms = new HashSet<>();
+          Set<PCollection<?>> pcollections = new HashSet<>();
+          Set<Equivalence.Wrapper<? extends Coder<?>>> coders = new HashSet<>();
+          Set<WindowingStrategy<?, ?>> windowingStrategies = new HashSet<>();
+
+          @Override
+          public CompositeBehavior enterCompositeTransform(Node node) {
+            return CompositeBehavior.ENTER_TRANSFORM;
+          }
+
+          @Override
+          public void leaveCompositeTransform(Node node) {
+            if (node.isRootNode()) {
+              assertThat(
+                  "Unexpected number of PTransforms",
+                  pipelineProto.getComponents().getTransformsCount(),
+                  equalTo(transforms.size()));
+              assertThat(
+                  "Unexpected number of PCollections",
+                  pipelineProto.getComponents().getPcollectionsCount(),
+                  equalTo(pcollections.size()));
+              assertThat(
+                  "Unexpected number of Coders",
+                  pipelineProto.getComponents().getCodersCount(),
+                  equalTo(coders.size()));
+              assertThat(
+                  "Unexpected number of Windowing Strategies",
+                  pipelineProto.getComponents().getWindowingStrategiesCount(),
+                  equalTo(windowingStrategies.size()));
+            } else {
+              transforms.add(node);
+            }
+          }
+
+          @Override
+          public void visitPrimitiveTransform(Node node) {
+            transforms.add(node);
+          }
+
+          @Override
+          public void visitValue(PValue value, Node producer) {
+            if (value instanceof PCollection) {
+              PCollection pc = (PCollection) value;
+              pcollections.add(pc);
+              addCoders(pc.getCoder());
+              windowingStrategies.add(pc.getWindowingStrategy());
+              addCoders(pc.getWindowingStrategy().getWindowFn().windowCoder());
+            }
+          }
+
+          private void addCoders(Coder<?> coder) {
+            coders.add(Equivalence.<Coder<?>>identity().wrap(coder));
+            if (coder instanceof StructuredCoder) {
+              for (Coder<?> component : ((StructuredCoder <?>) coder).getComponents()) {
+                addCoders(component);
+              }
+            }
+          }
+        });
+  }
+
+  @Test
   public void registerCoder() throws IOException {
     Coder<?> coder =
         KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(SetCoder.of(ByteArrayCoder.of())));