You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/26 04:08:15 UTC

[09/12] beam git commit: Add transform-analysis helpers to ReadTranslation

Add transform-analysis helpers to ReadTranslation

These helpers allow a runner to extract a source and inspect boundedness
without reference to the specific user-facing Java classes Read.Bounded
and Read.Unbounded.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8e09596a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8e09596a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8e09596a

Branch: refs/heads/master
Commit: 8e09596a981fe69edb7a1560e864bc852978ba81
Parents: 121631c
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 23 21:15:48 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 25 11:17:04 2017 -0700

----------------------------------------------------------------------
 .../construction/PCollectionTranslation.java    |  4 +-
 .../core/construction/ReadTranslation.java      | 43 ++++++++++++++++++++
 2 files changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8e09596a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
index 303c02d..968966f 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
@@ -64,7 +64,7 @@ public class PCollectionTranslation {
         components.getWindowingStrategiesOrThrow(pCollection.getWindowingStrategyId()), components);
   }
 
-  private static RunnerApi.IsBounded toProto(IsBounded bounded) {
+  static RunnerApi.IsBounded toProto(IsBounded bounded) {
     switch (bounded) {
       case BOUNDED:
         return RunnerApi.IsBounded.BOUNDED;
@@ -76,7 +76,7 @@ public class PCollectionTranslation {
     }
   }
 
-  private static IsBounded fromProto(RunnerApi.IsBounded isBounded) {
+  static IsBounded fromProto(RunnerApi.IsBounded isBounded) {
     switch (isBounded) {
       case BOUNDED:
         return IsBounded.BOUNDED;

http://git-wip-us.apache.org/repos/asf/beam/blob/8e09596a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
index 3ddde8d..572384b 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
@@ -26,6 +26,8 @@ import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.BytesValue;
 import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
@@ -40,6 +42,8 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
 
 /**
  * Methods for translating {@link Read.Bounded} and {@link Read.Unbounded}
@@ -102,6 +106,29 @@ public class ReadTranslation {
         "BoundedSource");
   }
 
+  public static <T> BoundedSource<T> boundedSourceFromTransform(
+      AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform)
+      throws IOException {
+    return (BoundedSource<T>) boundedSourceFromProto(getReadPayload(transform));
+  }
+
+  public static <T, CheckpointT extends UnboundedSource.CheckpointMark>
+      UnboundedSource<T, CheckpointT> unboundedSourceFromTransform(
+          AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform)
+          throws IOException {
+    return (UnboundedSource<T, CheckpointT>) unboundedSourceFromProto(getReadPayload(transform));
+  }
+
+  private static <T> ReadPayload getReadPayload(
+      AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform)
+      throws IOException {
+    return PTransformTranslation.toProto(
+            transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), SdkComponents.create())
+        .getSpec()
+        .getParameter()
+        .unpack(ReadPayload.class);
+  }
+
   private static SdkFunctionSpec toProto(UnboundedSource<?, ?> source) {
     return SdkFunctionSpec.newBuilder()
         .setSpec(
@@ -130,6 +157,22 @@ public class ReadTranslation {
         "BoundedSource");
   }
 
+  public static PCollection.IsBounded sourceIsBounded(AppliedPTransform<?, ?, ?> transform) {
+    try {
+      return PCollectionTranslation.fromProto(
+          PTransformTranslation.toProto(
+                  transform,
+                  Collections.<AppliedPTransform<?, ?, ?>>emptyList(),
+                  SdkComponents.create())
+              .getSpec()
+              .getParameter()
+              .unpack(ReadPayload.class)
+              .getIsBounded());
+    } catch (IOException e) {
+      throw new RuntimeException("Internal error determining boundedness of Read", e);
+    }
+  }
+
   /**
    * A {@link TransformPayloadTranslator} for {@link Read.Unbounded}.
    */