You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/13 03:06:49 UTC

[37/50] [abbrv] beam git commit: Rehydrate PCollections

Rehydrate PCollections


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f17b8a2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f17b8a2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f17b8a2

Branch: refs/heads/DSL_SQL
Commit: 1f17b8a2bbd5068c8fd3374731d96f57d31433dc
Parents: 4c336e8
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jul 6 09:24:22 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 20:01:01 2017 -0700

----------------------------------------------------------------------
 .../construction/PCollectionTranslation.java    | 16 ++++++++++++++
 .../PCollectionTranslationTest.java             | 22 ++++++++++++++++++++
 2 files changed, 38 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1f17b8a2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
index 968966f..52526bb 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.values.PCollection;
@@ -47,6 +48,21 @@ public class PCollectionTranslation {
         .build();
   }
 
+  public static PCollection<?> fromProto(
+      Pipeline pipeline, RunnerApi.PCollection pCollection, RunnerApi.Components components)
+      throws IOException {
+    return PCollection.createPrimitiveOutputInternal(
+            pipeline,
+            WindowingStrategyTranslation.fromProto(
+                components.getWindowingStrategiesOrThrow(pCollection.getWindowingStrategyId()),
+                components),
+            fromProto(pCollection.getIsBounded()))
+        .setCoder(
+            (Coder)
+                CoderTranslation.fromProto(
+                    components.getCodersOrThrow(pCollection.getCoderId()), components));
+  }
+
   public static IsBounded isBounded(RunnerApi.PCollection pCollection) {
     return fromProto(pCollection.getIsBounded());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/1f17b8a2/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
index 3b94220..5c45487 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
@@ -113,6 +113,28 @@ public class PCollectionTranslationTest {
 
   @Test
   public void testEncodeDecodeCycle() throws Exception {
+    // Encode
+    SdkComponents sdkComponents = SdkComponents.create();
+    RunnerApi.PCollection protoCollection =
+        PCollectionTranslation.toProto(testCollection, sdkComponents);
+    RunnerApi.Components protoComponents = sdkComponents.toComponents();
+
+    // Decode
+    Pipeline pipeline = Pipeline.create();
+    PCollection<?> decodedCollection =
+        PCollectionTranslation.fromProto(pipeline, protoCollection, protoComponents);
+
+    // Verify
+    assertThat(decodedCollection.getCoder(), Matchers.<Coder<?>>equalTo(testCollection.getCoder()));
+    assertThat(
+        decodedCollection.getWindowingStrategy(),
+        Matchers.<WindowingStrategy<?, ?>>equalTo(
+            testCollection.getWindowingStrategy().fixDefaults()));
+    assertThat(decodedCollection.isBounded(), equalTo(testCollection.isBounded()));
+  }
+
+  @Test
+  public void testEncodeDecodeFields() throws Exception {
     SdkComponents sdkComponents = SdkComponents.create();
     RunnerApi.PCollection protoCollection = PCollectionTranslation
         .toProto(testCollection, sdkComponents);