You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/13 03:06:49 UTC
[37/50] [abbrv] beam git commit: Rehydrate PCollections
Rehydrate PCollections
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1f17b8a2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1f17b8a2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1f17b8a2
Branch: refs/heads/DSL_SQL
Commit: 1f17b8a2bbd5068c8fd3374731d96f57d31433dc
Parents: 4c336e8
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jul 6 09:24:22 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 20:01:01 2017 -0700
----------------------------------------------------------------------
.../construction/PCollectionTranslation.java | 16 ++++++++++++++
.../PCollectionTranslationTest.java | 22 ++++++++++++++++++++
2 files changed, 38 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1f17b8a2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
index 968966f..52526bb 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.construction;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.values.PCollection;
@@ -47,6 +48,21 @@ public class PCollectionTranslation {
.build();
}
+ public static PCollection<?> fromProto(
+ Pipeline pipeline, RunnerApi.PCollection pCollection, RunnerApi.Components components)
+ throws IOException {
+ return PCollection.createPrimitiveOutputInternal(
+ pipeline,
+ WindowingStrategyTranslation.fromProto(
+ components.getWindowingStrategiesOrThrow(pCollection.getWindowingStrategyId()),
+ components),
+ fromProto(pCollection.getIsBounded()))
+ .setCoder(
+ (Coder)
+ CoderTranslation.fromProto(
+ components.getCodersOrThrow(pCollection.getCoderId()), components));
+ }
+
public static IsBounded isBounded(RunnerApi.PCollection pCollection) {
return fromProto(pCollection.getIsBounded());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1f17b8a2/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
index 3b94220..5c45487 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
@@ -113,6 +113,28 @@ public class PCollectionTranslationTest {
@Test
public void testEncodeDecodeCycle() throws Exception {
+ // Encode
+ SdkComponents sdkComponents = SdkComponents.create();
+ RunnerApi.PCollection protoCollection =
+ PCollectionTranslation.toProto(testCollection, sdkComponents);
+ RunnerApi.Components protoComponents = sdkComponents.toComponents();
+
+ // Decode
+ Pipeline pipeline = Pipeline.create();
+ PCollection<?> decodedCollection =
+ PCollectionTranslation.fromProto(pipeline, protoCollection, protoComponents);
+
+ // Verify
+ assertThat(decodedCollection.getCoder(), Matchers.<Coder<?>>equalTo(testCollection.getCoder()));
+ assertThat(
+ decodedCollection.getWindowingStrategy(),
+ Matchers.<WindowingStrategy<?, ?>>equalTo(
+ testCollection.getWindowingStrategy().fixDefaults()));
+ assertThat(decodedCollection.isBounded(), equalTo(testCollection.isBounded()));
+ }
+
+ @Test
+ public void testEncodeDecodeFields() throws Exception {
SdkComponents sdkComponents = SdkComponents.create();
RunnerApi.PCollection protoCollection = PCollectionTranslation
.toProto(testCollection, sdkComponents);