You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/12/21 22:05:32 UTC

[beam] branch master updated: Translate Flink batch nodes by URN

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 875f3e1  Translate Flink batch nodes by URN
875f3e1 is described below

commit 875f3e1b0c943b56bda99e610ede872a07573be5
Author: Ben Sidhom <si...@google.com>
AuthorDate: Mon Dec 18 11:26:36 2017 -0800

    Translate Flink batch nodes by URN
    
    Before this change, the translator downcasts pipeline transforms. This
    cannot be done with jobs that have been submitted portably through the
    Job API. This change uses URNs (available on rehydrated pipelines) for
    translation.
---
 .../flink/FlinkBatchTransformTranslators.java      | 27 ++++++++++++++--------
 1 file changed, 17 insertions(+), 10 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index 3689698..a4265f5 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -28,6 +28,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction;
 import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction;
@@ -96,29 +98,34 @@ class FlinkBatchTransformTranslators {
 
   @SuppressWarnings("rawtypes")
   private static final Map<
-      Class<? extends PTransform>,
+      String,
       FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>();
 
   static {
-    TRANSLATORS.put(View.CreatePCollectionView.class, new CreatePCollectionViewTranslatorBatch());
+    TRANSLATORS.put(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN,
+        new CreatePCollectionViewTranslatorBatch());
 
-    TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch());
-    TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
-    TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorBatch());
+    TRANSLATORS.put(PTransformTranslation.COMBINE_TRANSFORM_URN,
+        new CombinePerKeyTranslatorBatch());
+    TRANSLATORS.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        new GroupByKeyTranslatorBatch());
+    TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorBatch());
 
-    TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslatorBatch());
+    TRANSLATORS.put(PTransformTranslation.FLATTEN_TRANSFORM_URN,
+        new FlattenPCollectionTranslatorBatch());
 
-    TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslatorBatch());
+    TRANSLATORS.put(PTransformTranslation.WINDOW_TRANSFORM_URN, new WindowAssignTranslatorBatch());
 
-    TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoTranslatorBatch());
+    TRANSLATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoTranslatorBatch());
 
-    TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch());
+    TRANSLATORS.put(PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslatorBatch());
   }
 
 
   static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator(
       PTransform<?, ?> transform) {
-    return TRANSLATORS.get(transform.getClass());
+    @Nullable String urn = PTransformTranslation.urnForTransformOrNull(transform);
+    return urn == null ? null : TRANSLATORS.get(urn);
   }
 
   private static class ReadSourceTranslatorBatch<T>

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].