You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/26 04:08:15 UTC
[09/12] beam git commit: Add transform-analysis helpers to
ReadTranslation
Add transform-analysis helpers to ReadTranslation
These helpers allow a runner to extract a source and inspect boundedness
without reference to the specific user-facing Java classes Read.Bounded
and Read.Unbounded.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8e09596a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8e09596a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8e09596a
Branch: refs/heads/master
Commit: 8e09596a981fe69edb7a1560e864bc852978ba81
Parents: 121631c
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 23 21:15:48 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 25 11:17:04 2017 -0700
----------------------------------------------------------------------
.../construction/PCollectionTranslation.java | 4 +-
.../core/construction/ReadTranslation.java | 43 ++++++++++++++++++++
2 files changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8e09596a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
index 303c02d..968966f 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
@@ -64,7 +64,7 @@ public class PCollectionTranslation {
components.getWindowingStrategiesOrThrow(pCollection.getWindowingStrategyId()), components);
}
- private static RunnerApi.IsBounded toProto(IsBounded bounded) {
+ static RunnerApi.IsBounded toProto(IsBounded bounded) {
switch (bounded) {
case BOUNDED:
return RunnerApi.IsBounded.BOUNDED;
@@ -76,7 +76,7 @@ public class PCollectionTranslation {
}
}
- private static IsBounded fromProto(RunnerApi.IsBounded isBounded) {
+ static IsBounded fromProto(RunnerApi.IsBounded isBounded) {
switch (isBounded) {
case BOUNDED:
return IsBounded.BOUNDED;
http://git-wip-us.apache.org/repos/asf/beam/blob/8e09596a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
index 3ddde8d..572384b 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
@@ -26,6 +26,8 @@ import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.Collections;
import java.util.Map;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
@@ -40,6 +42,8 @@ import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
/**
* Methods for translating {@link Read.Bounded} and {@link Read.Unbounded}
@@ -102,6 +106,29 @@ public class ReadTranslation {
"BoundedSource");
}
+ public static <T> BoundedSource<T> boundedSourceFromTransform(
+ AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform)
+ throws IOException {
+ return (BoundedSource<T>) boundedSourceFromProto(getReadPayload(transform));
+ }
+
+ public static <T, CheckpointT extends UnboundedSource.CheckpointMark>
+ UnboundedSource<T, CheckpointT> unboundedSourceFromTransform(
+ AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform)
+ throws IOException {
+ return (UnboundedSource<T, CheckpointT>) unboundedSourceFromProto(getReadPayload(transform));
+ }
+
+ private static <T> ReadPayload getReadPayload(
+ AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform)
+ throws IOException {
+ return PTransformTranslation.toProto(
+ transform, Collections.<AppliedPTransform<?, ?, ?>>emptyList(), SdkComponents.create())
+ .getSpec()
+ .getParameter()
+ .unpack(ReadPayload.class);
+ }
+
private static SdkFunctionSpec toProto(UnboundedSource<?, ?> source) {
return SdkFunctionSpec.newBuilder()
.setSpec(
@@ -130,6 +157,22 @@ public class ReadTranslation {
"BoundedSource");
}
+ public static PCollection.IsBounded sourceIsBounded(AppliedPTransform<?, ?, ?> transform) {
+ try {
+ return PCollectionTranslation.fromProto(
+ PTransformTranslation.toProto(
+ transform,
+ Collections.<AppliedPTransform<?, ?, ?>>emptyList(),
+ SdkComponents.create())
+ .getSpec()
+ .getParameter()
+ .unpack(ReadPayload.class)
+ .getIsBounded());
+ } catch (IOException e) {
+ throw new RuntimeException("Internal error determining boundedness of Read", e);
+ }
+ }
+
/**
* A {@link TransformPayloadTranslator} for {@link Read.Unbounded}.
*/