You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/06/08 01:35:04 UTC

[15/50] beam git commit: Fix RawPTransform translation

Fix RawPTransform translation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/840492d9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/840492d9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/840492d9

Branch: refs/heads/DSL_SQL
Commit: 840492d9d8fb3b08cfe70a525655759fc1a31fdf
Parents: 7c608c3
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri May 26 14:18:03 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 6 13:10:33 2017 -0700

----------------------------------------------------------------------
 .../construction/PTransformTranslation.java     | 57 ++++++++++++++++----
 runners/direct-java/pom.xml                     |  5 --
 .../beam/runners/direct/DirectGroupByKey.java   |  5 +-
 .../direct/ParDoMultiOverrideFactory.java       |  3 +-
 .../direct/TestStreamEvaluatorFactory.java      |  3 +-
 .../direct/TransformEvaluatorRegistry.java      |  8 +--
 .../runners/direct/ViewOverrideFactory.java     |  3 +-
 7 files changed, 56 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/840492d9/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 7c5c593..32ecf43 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.protobuf.Any;
-import com.google.protobuf.Message;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
@@ -115,7 +114,20 @@ public class PTransformTranslation {
     // TODO: Display Data
 
     PTransform<?, ?> transform = appliedPTransform.getTransform();
-    if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
+    // A RawPTransform directly vends its payload. Because it will generally be
+    // a subclass, we cannot do dictionary lookup in KNOWN_PAYLOAD_TRANSLATORS.
+    if (transform instanceof RawPTransform) {
+      RawPTransform<?, ?> rawPTransform = (RawPTransform<?, ?>) transform;
+
+      if (rawPTransform.getUrn() != null) {
+        FunctionSpec.Builder payload = FunctionSpec.newBuilder().setUrn(rawPTransform.getUrn());
+        @Nullable Any parameter = rawPTransform.getPayload();
+        if (parameter != null) {
+          payload.setParameter(parameter);
+        }
+        transformBuilder.setSpec(payload);
+      }
+    } else if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
       FunctionSpec payload =
           KNOWN_PAYLOAD_TRANSLATORS
               .get(transform.getClass())
@@ -145,6 +157,25 @@ public class PTransformTranslation {
   }
 
   /**
+   * Returns the URN for the transform if it is known, otherwise {@code null}.
+   */
+  @Nullable
+  public static String urnForTransformOrNull(PTransform<?, ?> transform) {
+
+    // A RawPTransform directly vends its URN. Because it will generally be
+    // a subclass, we cannot do dictionary lookup in KNOWN_PAYLOAD_TRANSLATORS.
+    if (transform instanceof RawPTransform) {
+      return ((RawPTransform) transform).getUrn();
+    }
+
+    TransformPayloadTranslator translator = KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass());
+    if (translator == null) {
+      return null;
+    }
+    return translator.getUrn(transform);
+  }
+
+  /**
    * Returns the URN for the transform if it is known, otherwise throws.
    */
   public static String urnForTransform(PTransform<?, ?> transform) {
@@ -176,13 +207,14 @@ public class PTransformTranslation {
    * fully expanded in the pipeline proto.
    */
   public abstract static class RawPTransform<
-          InputT extends PInput, OutputT extends POutput, PayloadT extends Message>
+          InputT extends PInput, OutputT extends POutput>
       extends PTransform<InputT, OutputT> {
 
+    @Nullable
     public abstract String getUrn();
 
     @Nullable
-    PayloadT getPayload() {
+    public Any getPayload() {
       return null;
     }
   }
@@ -190,24 +222,29 @@ public class PTransformTranslation {
   /**
    * A translator that uses the explicit URN and payload from a {@link RawPTransform}.
    */
-  public static class RawPTransformTranslator<PayloadT extends Message>
-      implements TransformPayloadTranslator<RawPTransform<?, ?, PayloadT>> {
+  public static class RawPTransformTranslator
+      implements TransformPayloadTranslator<RawPTransform<?, ?>> {
     @Override
-    public String getUrn(RawPTransform<?, ?, PayloadT> transform) {
+    public String getUrn(RawPTransform<?, ?> transform) {
       return transform.getUrn();
     }
 
     @Override
     public FunctionSpec translate(
-        AppliedPTransform<?, ?, RawPTransform<?, ?, PayloadT>> transform,
+        AppliedPTransform<?, ?, RawPTransform<?, ?>> transform,
         SdkComponents components) {
-      PayloadT payload = transform.getTransform().getPayload();
+
+      // Anonymous composites have no spec
+      if (transform.getTransform().getUrn() == null) {
+        return null;
+      }
 
       FunctionSpec.Builder transformSpec =
           FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform()));
 
+      Any payload = transform.getTransform().getPayload();
       if (payload != null) {
-        transformSpec.setParameter(Any.pack(payload));
+        transformSpec.setParameter(payload);
       }
 
       return transformSpec.build();

http://git-wip-us.apache.org/repos/asf/beam/blob/840492d9/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index cba4b09..bec2113 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -208,11 +208,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.google.protobuf</groupId>
-      <artifactId>protobuf-java</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>joda-time</groupId>
       <artifactId>joda-time</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/840492d9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index f239070..2fc0dd4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.protobuf.Message;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.construction.ForwardingPTransform;
@@ -74,7 +73,7 @@ class DirectGroupByKey<K, V>
 
   static final class DirectGroupByKeyOnly<K, V>
       extends PTransformTranslation.RawPTransform<
-          PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>, Message> {
+          PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, V>>> {
     @Override
     public PCollection<KeyedWorkItem<K, V>> expand(PCollection<KV<K, V>> input) {
       return PCollection.createPrimitiveOutputInternal(
@@ -101,7 +100,7 @@ class DirectGroupByKey<K, V>
 
   static final class DirectGroupAlsoByWindow<K, V>
       extends PTransformTranslation.RawPTransform<
-          PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>, Message> {
+          PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>> {
 
     private final WindowingStrategy<?, ?> inputWindowingStrategy;
     private final WindowingStrategy<?, ?> outputWindowingStrategy;

http://git-wip-us.apache.org/repos/asf/beam/blob/840492d9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index df2054b..858ea34 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkState;
 
-import com.google.protobuf.Message;
 import java.util.Map;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
@@ -172,7 +171,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
 
   static class StatefulParDo<K, InputT, OutputT>
       extends PTransformTranslation.RawPTransform<
-          PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple, Message> {
+          PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple> {
     private final transient MultiOutput<KV<K, InputT>, OutputT> underlyingParDo;
     private final transient PCollection<KV<K, InputT>> originalInput;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/840492d9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index b1db58f..2da7a71 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -22,7 +22,6 @@ import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Iterables;
-import com.google.protobuf.Message;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -185,7 +184,7 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
     static final String DIRECT_TEST_STREAM_URN = "urn:beam:directrunner:transforms:test_stream:v1";
 
     static class DirectTestStream<T>
-        extends PTransformTranslation.RawPTransform<PBegin, PCollection<T>, Message> {
+        extends PTransformTranslation.RawPTransform<PBegin, PCollection<T>> {
       private final transient DirectRunner runner;
       private final TestStream<T> original;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/840492d9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index d144b20..0c907df 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -92,17 +92,17 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
           .<Class<? extends PTransform>, PTransformTranslation.TransformPayloadTranslator>builder()
           .put(
               DirectGroupByKey.DirectGroupByKeyOnly.class,
-              new PTransformTranslation.RawPTransformTranslator<>())
+              new PTransformTranslation.RawPTransformTranslator())
           .put(
               DirectGroupByKey.DirectGroupAlsoByWindow.class,
               new PTransformTranslation.RawPTransformTranslator())
           .put(
               ParDoMultiOverrideFactory.StatefulParDo.class,
-              new PTransformTranslation.RawPTransformTranslator<>())
+              new PTransformTranslation.RawPTransformTranslator())
           .put(
               ViewOverrideFactory.WriteView.class,
-              new PTransformTranslation.RawPTransformTranslator<>())
-          .put(DirectTestStream.class, new PTransformTranslation.RawPTransformTranslator<>())
+              new PTransformTranslation.RawPTransformTranslator())
+          .put(DirectTestStream.class, new PTransformTranslation.RawPTransformTranslator())
           .put(
               SplittableParDoViaKeyedWorkItems.ProcessElements.class,
               new SplittableParDoProcessElementsTranslator())

http://git-wip-us.apache.org/repos/asf/beam/blob/840492d9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
index 501b436..fdff63d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.runners.direct;
 
-import com.google.protobuf.Message;
 import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.runners.core.construction.ForwardingPTransform;
@@ -95,7 +94,7 @@ class ViewOverrideFactory<ElemT, ViewT>
    * to {@link ViewT}.
    */
   static final class WriteView<ElemT, ViewT>
-      extends RawPTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>, Message> {
+      extends RawPTransform<PCollection<Iterable<ElemT>>, PCollectionView<ViewT>> {
     private final CreatePCollectionView<ElemT, ViewT> og;
 
     WriteView(CreatePCollectionView<ElemT, ViewT> og) {