You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/11 03:39:22 UTC

[3/8] beam git commit: Include PCollection in rehydrated PCollectionView

Include PCollection in rehydrated PCollectionView


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/de39f324
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/de39f324
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/de39f324

Branch: refs/heads/master
Commit: de39f324c4b0914418894a41c6f75596310bf633
Parents: 165dfa6
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jul 6 09:24:55 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 10 20:14:27 2017 -0700

----------------------------------------------------------------------
 .../core/construction/ParDoTranslation.java     | 51 +++++++++++++++++---
 .../construction/RunnerPCollectionView.java     |  7 +--
 .../core/construction/ParDoTranslationTest.java | 28 +++++++----
 3 files changed, 67 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/de39f324/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 5f2bcae..fe8c5aa 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -40,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
@@ -74,6 +75,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
@@ -262,8 +264,12 @@ public class ParDoTranslation {
     List<PCollectionView<?>> views = new ArrayList<>();
     for (Map.Entry<String, SideInput> sideInput : payload.getSideInputsMap().entrySet()) {
       views.add(
-          fromProto(
-              sideInput.getValue(), sideInput.getKey(), parDoProto, sdkComponents.toComponents()));
+          viewFromProto(
+              application.getPipeline(),
+              sideInput.getValue(),
+              sideInput.getKey(),
+              parDoProto,
+              sdkComponents.toComponents()));
     }
     return views;
   }
@@ -495,15 +501,47 @@ public class ParDoTranslation {
     return builder.build();
   }
 
-  public static PCollectionView<?> fromProto(
-      SideInput sideInput, String id, RunnerApi.PTransform parDoTransform, Components components)
+  public static PCollectionView<?> viewFromProto(
+      Pipeline pipeline,
+      SideInput sideInput,
+      String localName,
+      RunnerApi.PTransform parDoTransform,
+      Components components)
       throws IOException {
-    TupleTag<?> tag = new TupleTag<>(id);
+
+    String pCollectionId = parDoTransform.getInputsOrThrow(localName);
+
+    // This may be a PCollection defined in another language, but we should be
+    // able to rehydrate it enough to stick it in a side input. The coder may not
+    // be grokkable in Java.
+    PCollection<?> pCollection =
+        PCollectionTranslation.fromProto(
+            pipeline, components.getPcollectionsOrThrow(pCollectionId), components);
+
+    return viewFromProto(sideInput, localName, pCollection, parDoTransform, components);
+  }
+
+  /**
+   * Create a {@link PCollectionView} from a side input spec and an already-deserialized {@link
+   * PCollection} that should be wired up.
+   */
+  public static PCollectionView<?> viewFromProto(
+      SideInput sideInput,
+      String localName,
+      PCollection<?> pCollection,
+      RunnerApi.PTransform parDoTransform,
+      Components components)
+      throws IOException {
+    checkArgument(
+        localName != null,
+        "%s.viewFromProto: localName must not be null",
+        ParDoTranslation.class.getSimpleName());
+    TupleTag<?> tag = new TupleTag<>(localName);
     WindowMappingFn<?> windowMappingFn = windowMappingFnFromProto(sideInput.getWindowMappingFn());
     ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn());
 
     RunnerApi.PCollection inputCollection =
-        components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(id));
+        components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(localName));
     WindowingStrategy<?, ?> windowingStrategy =
         WindowingStrategyTranslation.fromProto(
             components.getWindowingStrategiesOrThrow(inputCollection.getWindowingStrategyId()),
@@ -523,6 +561,7 @@ public class ParDoTranslation {
 
     PCollectionView<?> view =
         new RunnerPCollectionView<>(
+            pCollection,
             (TupleTag<Iterable<WindowedValue<?>>>) tag,
             (ViewFn<Iterable<WindowedValue<?>>, ?>) viewFn,
             windowMappingFn,

http://git-wip-us.apache.org/repos/asf/beam/blob/de39f324/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
index c359cec..b275188 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RunnerPCollectionView.java
@@ -39,16 +39,19 @@ class RunnerPCollectionView<T> extends PValueBase implements PCollectionView<T>
   private final WindowMappingFn<?> windowMappingFn;
   private final WindowingStrategy<?, ?> windowingStrategy;
   private final Coder<Iterable<WindowedValue<?>>> coder;
+  private final transient PCollection<?> pCollection;
 
   /**
    * Create a new {@link RunnerPCollectionView} from the provided components.
    */
   RunnerPCollectionView(
+      PCollection<?> pCollection,
       TupleTag<Iterable<WindowedValue<?>>> tag,
       ViewFn<Iterable<WindowedValue<?>>, T> viewFn,
       WindowMappingFn<?> windowMappingFn,
       @Nullable WindowingStrategy<?, ?> windowingStrategy,
       @Nullable Coder<Iterable<WindowedValue<?>>> coder) {
+    this.pCollection = pCollection;
     this.tag = tag;
     this.viewFn = viewFn;
     this.windowMappingFn = windowMappingFn;
@@ -56,11 +59,9 @@ class RunnerPCollectionView<T> extends PValueBase implements PCollectionView<T>
     this.coder = coder;
   }
 
-  @Nullable
   @Override
   public PCollection<?> getPCollection() {
-    throw new IllegalStateException(
-        String.format("Cannot call getPCollection on a %s", getClass().getSimpleName()));
+    return pCollection;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/de39f324/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
index a8490bf..6fdf9d6 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
@@ -23,9 +23,9 @@ import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -143,22 +143,30 @@ public class ParDoTranslationTest {
       inputs.putAll(parDo.getAdditionalInputs());
       PCollectionTuple output = mainInput.apply(parDo);
 
-      SdkComponents components = SdkComponents.create();
-      String transformId =
-          components.registerPTransform(
+      SdkComponents sdkComponents = SdkComponents.create();
+
+      // Encode
+      RunnerApi.PTransform protoTransform =
+          PTransformTranslation.toProto(
               AppliedPTransform.<PCollection<KV<Long, String>>, PCollection<Void>, MultiOutput>of(
                   "foo", inputs, output.expand(), parDo, p),
-              Collections.<AppliedPTransform<?, ?, ?>>emptyList());
+              sdkComponents);
+      Components protoComponents = sdkComponents.toComponents();
+
+      // Decode
+      Pipeline rehydratedPipeline = Pipeline.create();
 
-      Components protoComponents = components.toComponents();
-      RunnerApi.PTransform protoTransform = protoComponents.getTransformsOrThrow(transformId);
       ParDoPayload parDoPayload =
           protoTransform.getSpec().getParameter().unpack(ParDoPayload.class);
       for (PCollectionView<?> view : parDo.getSideInputs()) {
         SideInput sideInput = parDoPayload.getSideInputsOrThrow(view.getTagInternal().getId());
         PCollectionView<?> restoredView =
-            ParDoTranslation.fromProto(
-                sideInput, view.getTagInternal().getId(), protoTransform, protoComponents);
+            ParDoTranslation.viewFromProto(
+                rehydratedPipeline,
+                sideInput,
+                view.getTagInternal().getId(),
+                protoTransform,
+                protoComponents);
         assertThat(restoredView.getTagInternal(), equalTo(view.getTagInternal()));
         assertThat(restoredView.getViewFn(), instanceOf(view.getViewFn().getClass()));
         assertThat(
@@ -169,7 +177,7 @@ public class ParDoTranslationTest {
                 view.getWindowingStrategyInternal().fixDefaults()));
         assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal()));
       }
-      String mainInputId = components.registerPCollection(mainInput);
+      String mainInputId = sdkComponents.registerPCollection(mainInput);
       assertThat(
           ParDoTranslation.getMainInput(protoTransform, protoComponents),
           equalTo(protoComponents.getPcollectionsOrThrow(mainInputId)));