You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/26 04:08:14 UTC
[08/12] beam git commit: Add registration for Read and WindowInto
translators
Add registration for Read and WindowInto translators
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/afaebb13
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/afaebb13
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/afaebb13
Branch: refs/heads/master
Commit: afaebb13f73868799b83c7af95cca2732e3ecb9a
Parents: 0bf4ddb
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 22 21:00:02 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 25 11:16:47 2017 -0700
----------------------------------------------------------------------
.../core/construction/ReadTranslation.java | 74 +++++++++++++++++++-
.../construction/WindowIntoTranslation.java | 43 +++++++++++-
2 files changed, 115 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/afaebb13/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
index d6c3400..aff5fc9 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
@@ -20,10 +20,15 @@ package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.Map;
+import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.IsBounded;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.ReadPayload;
@@ -32,12 +37,13 @@ import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.SerializableUtils;
/**
* Methods for translating {@link Read.Bounded} and {@link Read.Unbounded}
- * {@link PTransform PTransforms} into {@link ReadPayload} protos.
+ * {@link PTransform PTransformTranslation} into {@link ReadPayload} protos.
*/
public class ReadTranslation {
private static final String JAVA_SERIALIZED_BOUNDED_SOURCE = "urn:beam:java:boundedsource:v1";
@@ -124,4 +130,70 @@ public class ReadTranslation {
"BoundedSource");
}
+ /**
+ * A {@link TransformPayloadTranslator} for {@link Read.Unbounded}.
+ */
+ public static class UnboundedReadPayloadTranslator
+ implements PTransformTranslation.TransformPayloadTranslator<Read.Unbounded<?>> {
+ public static TransformPayloadTranslator create() {
+ return new UnboundedReadPayloadTranslator();
+ }
+
+ private UnboundedReadPayloadTranslator() {}
+
+ @Override
+ public String getUrn(Read.Unbounded<?> transform) {
+ return PTransformTranslation.WINDOW_TRANSFORM_URN;
+ }
+
+ @Override
+ public FunctionSpec translate(
+ AppliedPTransform<?, ?, Read.Unbounded<?>> transform, SdkComponents components) {
+ ReadPayload payload = toProto(transform.getTransform());
+ return RunnerApi.FunctionSpec.newBuilder()
+ .setUrn(PTransformTranslation.READ_TRANSFORM_URN)
+ .setParameter(Any.pack(payload))
+ .build();
+ }
+ }
+
+ /**
+ * A {@link TransformPayloadTranslator} for {@link Read.Bounded}.
+ */
+ public static class BoundedReadPayloadTranslator
+ implements PTransformTranslation.TransformPayloadTranslator<Read.Bounded<?>> {
+ public static TransformPayloadTranslator create() {
+ return new BoundedReadPayloadTranslator();
+ }
+
+ private BoundedReadPayloadTranslator() {}
+
+ @Override
+ public String getUrn(Read.Bounded<?> transform) {
+ return PTransformTranslation.WINDOW_TRANSFORM_URN;
+ }
+
+ @Override
+ public FunctionSpec translate(
+ AppliedPTransform<?, ?, Read.Bounded<?>> transform, SdkComponents components) {
+ ReadPayload payload = toProto(transform.getTransform());
+ return RunnerApi.FunctionSpec.newBuilder()
+ .setUrn(PTransformTranslation.READ_TRANSFORM_URN)
+ .setParameter(Any.pack(payload))
+ .build();
+ }
+ }
+
+ /** Registers {@link UnboundedReadPayloadTranslator} and {@link BoundedReadPayloadTranslator}. */
+ @AutoService(TransformPayloadTranslatorRegistrar.class)
+ public static class Registrar implements TransformPayloadTranslatorRegistrar {
+ @Override
+ public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+ getTransformPayloadTranslators() {
+ return ImmutableMap.<Class<? extends PTransform>, TransformPayloadTranslator>builder()
+ .put(Read.Unbounded.class, new UnboundedReadPayloadTranslator())
+ .put(Read.Bounded.class, new BoundedReadPayloadTranslator())
+ .build();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/afaebb13/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
index 33faa02..5ed4d24 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
@@ -18,14 +18,18 @@
package org.apache.beam.runners.core.construction;
+import com.google.auto.service.AutoService;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.Collections;
+import java.util.Map;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.WindowIntoPayload;
import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.Window.Assign;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -40,7 +44,7 @@ public class WindowIntoTranslation {
@Override
public String getUrn(Assign<?> transform) {
- return PTransforms.WINDOW_TRANSFORM_URN;
+ return PTransformTranslation.WINDOW_TRANSFORM_URN;
}
@Override
@@ -65,4 +69,41 @@ public class WindowIntoTranslation {
SdkFunctionSpec spec = payload.getWindowFn();
return WindowingStrategyTranslation.windowFnFromProto(spec);
}
+
+ /**
+ * A {@link TransformPayloadTranslator} for {@link Window}.
+ */
+ public static class WindowIntoPayloadTranslator
+ implements PTransformTranslation.TransformPayloadTranslator<Window.Assign<?>> {
+ public static TransformPayloadTranslator create() {
+ return new WindowIntoPayloadTranslator();
+ }
+
+ private WindowIntoPayloadTranslator() {}
+
+ @Override
+ public String getUrn(Window.Assign<?> transform) {
+ return PTransformTranslation.WINDOW_TRANSFORM_URN;
+ }
+
+ @Override
+ public FunctionSpec translate(
+ AppliedPTransform<?, ?, Window.Assign<?>> transform, SdkComponents components) {
+ WindowIntoPayload payload = toProto(transform.getTransform(), components);
+ return RunnerApi.FunctionSpec.newBuilder()
+ .setUrn(PTransformTranslation.WINDOW_TRANSFORM_URN)
+ .setParameter(Any.pack(payload))
+ .build();
+ }
+ }
+
+ /** Registers {@link WindowIntoPayloadTranslator}. */
+ @AutoService(TransformPayloadTranslatorRegistrar.class)
+ public static class Registrar implements TransformPayloadTranslatorRegistrar {
+ @Override
+ public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+ getTransformPayloadTranslators() {
+ return Collections.singletonMap(Window.Assign.class, new WindowIntoPayloadTranslator());
+ }
+ }
}