You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/26 04:08:14 UTC

[08/12] beam git commit: Add registration for Read and WindowInto translators

Add registration for Read and WindowInto translators


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/afaebb13
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/afaebb13
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/afaebb13

Branch: refs/heads/master
Commit: afaebb13f73868799b83c7af95cca2732e3ecb9a
Parents: 0bf4ddb
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon May 22 21:00:02 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 25 11:16:47 2017 -0700

----------------------------------------------------------------------
 .../core/construction/ReadTranslation.java      | 74 +++++++++++++++++++-
 .../construction/WindowIntoTranslation.java     | 43 +++++++++++-
 2 files changed, 115 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/afaebb13/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
index d6c3400..aff5fc9 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReadTranslation.java
@@ -20,10 +20,15 @@ package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
 import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.BytesValue;
 import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.Map;
+import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.IsBounded;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.ReadPayload;
@@ -32,12 +37,13 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.SerializableUtils;
 
 /**
  * Methods for translating {@link Read.Bounded} and {@link Read.Unbounded}
- * {@link PTransform PTransforms} into {@link ReadPayload} protos.
+ * {@link PTransform PTransformTranslation} into {@link ReadPayload} protos.
  */
 public class ReadTranslation {
   private static final String JAVA_SERIALIZED_BOUNDED_SOURCE = "urn:beam:java:boundedsource:v1";
@@ -124,4 +130,70 @@ public class ReadTranslation {
         "BoundedSource");
   }
 
+  /**
+   * A {@link TransformPayloadTranslator} for {@link Read.Unbounded}.
+   */
+  public static class UnboundedReadPayloadTranslator
+      implements PTransformTranslation.TransformPayloadTranslator<Read.Unbounded<?>> {
+    public static TransformPayloadTranslator create() {
+      return new UnboundedReadPayloadTranslator();
+    }
+
+    private UnboundedReadPayloadTranslator() {}
+
+    @Override
+    public String getUrn(Read.Unbounded<?> transform) {
+      return PTransformTranslation.WINDOW_TRANSFORM_URN;
+    }
+
+    @Override
+    public FunctionSpec translate(
+        AppliedPTransform<?, ?, Read.Unbounded<?>> transform, SdkComponents components) {
+      ReadPayload payload = toProto(transform.getTransform());
+      return RunnerApi.FunctionSpec.newBuilder()
+          .setUrn(PTransformTranslation.READ_TRANSFORM_URN)
+          .setParameter(Any.pack(payload))
+          .build();
+    }
+  }
+
+  /**
+   * A {@link TransformPayloadTranslator} for {@link Read.Bounded}.
+   */
+  public static class BoundedReadPayloadTranslator
+      implements PTransformTranslation.TransformPayloadTranslator<Read.Bounded<?>> {
+    public static TransformPayloadTranslator create() {
+      return new BoundedReadPayloadTranslator();
+    }
+
+    private BoundedReadPayloadTranslator() {}
+
+    @Override
+    public String getUrn(Read.Bounded<?> transform) {
+      return PTransformTranslation.WINDOW_TRANSFORM_URN;
+    }
+
+    @Override
+    public FunctionSpec translate(
+        AppliedPTransform<?, ?, Read.Bounded<?>> transform, SdkComponents components) {
+      ReadPayload payload = toProto(transform.getTransform());
+      return RunnerApi.FunctionSpec.newBuilder()
+          .setUrn(PTransformTranslation.READ_TRANSFORM_URN)
+          .setParameter(Any.pack(payload))
+          .build();
+    }
+  }
+
+  /** Registers {@link UnboundedReadPayloadTranslator} and {@link BoundedReadPayloadTranslator}. */
+  @AutoService(TransformPayloadTranslatorRegistrar.class)
+  public static class Registrar implements TransformPayloadTranslatorRegistrar {
+    @Override
+    public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+        getTransformPayloadTranslators() {
+      return ImmutableMap.<Class<? extends PTransform>, TransformPayloadTranslator>builder()
+          .put(Read.Unbounded.class, new UnboundedReadPayloadTranslator())
+          .put(Read.Bounded.class, new BoundedReadPayloadTranslator())
+          .build();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/afaebb13/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
index 33faa02..5ed4d24 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
@@ -18,14 +18,18 @@
 
 package org.apache.beam.runners.core.construction;
 
+import com.google.auto.service.AutoService;
 import com.google.protobuf.Any;
 import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.Collections;
+import java.util.Map;
 import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.WindowIntoPayload;
 import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.Window.Assign;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -40,7 +44,7 @@ public class WindowIntoTranslation {
 
     @Override
     public String getUrn(Assign<?> transform) {
-      return PTransforms.WINDOW_TRANSFORM_URN;
+      return PTransformTranslation.WINDOW_TRANSFORM_URN;
     }
 
     @Override
@@ -65,4 +69,41 @@ public class WindowIntoTranslation {
     SdkFunctionSpec spec = payload.getWindowFn();
     return WindowingStrategyTranslation.windowFnFromProto(spec);
   }
+
+  /**
+   * A {@link TransformPayloadTranslator} for {@link Window}.
+   */
+  public static class WindowIntoPayloadTranslator
+      implements PTransformTranslation.TransformPayloadTranslator<Window.Assign<?>> {
+    public static TransformPayloadTranslator create() {
+      return new WindowIntoPayloadTranslator();
+    }
+
+    private WindowIntoPayloadTranslator() {}
+
+    @Override
+    public String getUrn(Window.Assign<?> transform) {
+      return PTransformTranslation.WINDOW_TRANSFORM_URN;
+    }
+
+    @Override
+    public FunctionSpec translate(
+        AppliedPTransform<?, ?, Window.Assign<?>> transform, SdkComponents components) {
+      WindowIntoPayload payload = toProto(transform.getTransform(), components);
+      return RunnerApi.FunctionSpec.newBuilder()
+          .setUrn(PTransformTranslation.WINDOW_TRANSFORM_URN)
+          .setParameter(Any.pack(payload))
+          .build();
+    }
+  }
+
+  /** Registers {@link WindowIntoPayloadTranslator}. */
+  @AutoService(TransformPayloadTranslatorRegistrar.class)
+  public static class Registrar implements TransformPayloadTranslatorRegistrar {
+    @Override
+    public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+        getTransformPayloadTranslators() {
+      return Collections.singletonMap(Window.Assign.class, new WindowIntoPayloadTranslator());
+    }
+  }
 }