You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/06/20 21:13:54 UTC

[1/3] beam git commit: Flink runner: refactor the translator into two phases: rewriting and translating.

Repository: beam
Updated Branches:
  refs/heads/master e4ef23e16 -> 608a9c459


Flink runner: refactor the translator into two phases: rewriting and translating.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52794096
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52794096
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52794096

Branch: refs/heads/master
Commit: 52794096aa8b4d614423fd787835f5b89b1ea1ac
Parents: f69e3b5
Author: Pei He <pe...@apache.org>
Authored: Mon Jun 19 16:10:02 2017 -0700
Committer: Pei He <pe...@apache.org>
Committed: Tue Jun 20 14:12:13 2017 -0700

----------------------------------------------------------------------
 .../FlinkPipelineExecutionEnvironment.java      |  2 +
 .../flink/FlinkStreamingPipelineTranslator.java | 23 ---------
 .../runners/flink/FlinkTransformOverrides.java  | 53 ++++++++++++++++++++
 3 files changed, 55 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/52794096/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index fe5dd87..d2a2016 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -84,6 +84,8 @@ class FlinkPipelineExecutionEnvironment {
     this.flinkBatchEnv = null;
     this.flinkStreamEnv = null;
 
+    pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(options.isStreaming()));
+
     PipelineTranslationOptimizer optimizer =
         new PipelineTranslationOptimizer(TranslationMode.BATCH, options);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/52794096/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index d768b01..27bb4ec 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -17,11 +17,7 @@
  */
 package org.apache.beam.runners.flink;
 
-import com.google.common.collect.ImmutableList;
-import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
-import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SplittableParDo;
@@ -29,12 +25,10 @@ import org.apache.beam.runners.core.construction.UnconsumedReads;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PValue;
@@ -70,25 +64,8 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
 
   @Override
   public void translate(Pipeline pipeline) {
-    List<PTransformOverride> transformOverrides =
-        ImmutableList.<PTransformOverride>builder()
-            .add(
-                PTransformOverride.of(
-                    PTransformMatchers.splittableParDoMulti(),
-                    new SplittableParDoOverrideFactory()))
-            .add(
-                PTransformOverride.of(
-                    PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class),
-                    new SplittableParDoViaKeyedWorkItems.OverrideFactory()))
-            .add(
-                PTransformOverride.of(
-                    PTransformMatchers.classEqualTo(CreatePCollectionView.class),
-                    new CreateStreamingFlinkView.Factory()))
-            .build();
-
     // Ensure all outputs of all reads are consumed.
     UnconsumedReads.ensureAllReadsConsumed(pipeline);
-    pipeline.replaceAll(transformOverrides);
     super.translate(pipeline);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/52794096/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
new file mode 100644
index 0000000..1dc8de9
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.SplittableParDo;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
+
+/**
+ * {@link PTransform} overrides for Flink runner.
+ */
+public class FlinkTransformOverrides {
+  public static List<PTransformOverride> getDefaultOverrides(boolean streaming) {
+    if (streaming) {
+      return ImmutableList.<PTransformOverride>builder()
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.splittableParDoMulti(),
+                  new FlinkStreamingPipelineTranslator.SplittableParDoOverrideFactory()))
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class),
+                  new SplittableParDoViaKeyedWorkItems.OverrideFactory()))
+          .add(
+              PTransformOverride.of(
+                  PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
+                  new CreateStreamingFlinkView.Factory()))
+          .build();
+    } else {
+      return ImmutableList.of();
+    }
+  }
+}


[2/3] beam git commit: FlinkRunner: remove the unused ReflectiveOneToOneOverrideFactory.

Posted by pe...@apache.org.
FlinkRunner: remove the unused ReflectiveOneToOneOverrideFactory.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f69e3b53
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f69e3b53
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f69e3b53

Branch: refs/heads/master
Commit: f69e3b53fafa4b79b21095d4b65edbe7cfeb7d2a
Parents: e4ef23e
Author: Pei He <pe...@apache.org>
Authored: Mon Jun 19 15:55:48 2017 -0700
Committer: Pei He <pe...@apache.org>
Committed: Tue Jun 20 14:12:13 2017 -0700

----------------------------------------------------------------------
 .../flink/FlinkStreamingPipelineTranslator.java | 31 --------------------
 1 file changed, 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f69e3b53/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index a88ff07..d768b01 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -24,7 +24,6 @@ import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
-import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.core.construction.SplittableParDo;
 import org.apache.beam.runners.core.construction.UnconsumedReads;
 import org.apache.beam.sdk.Pipeline;
@@ -36,7 +35,6 @@ import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PValue;
@@ -198,35 +196,6 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
     }
   }
 
-  private static class ReflectiveOneToOneOverrideFactory<
-          InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>>
-      extends SingleInputOutputOverrideFactory<
-          PCollection<InputT>, PCollection<OutputT>, TransformT> {
-    private final Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement;
-    private final FlinkRunner runner;
-
-    private ReflectiveOneToOneOverrideFactory(
-        Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement,
-        FlinkRunner runner) {
-      this.replacement = replacement;
-      this.runner = runner;
-    }
-
-    @Override
-    public PTransformReplacement<PCollection<InputT>, PCollection<OutputT>> getReplacementTransform(
-        AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, TransformT> transform) {
-      return PTransformReplacement.of(
-          PTransformReplacements.getSingletonMainInput(transform),
-          InstanceBuilder.ofType(replacement)
-              .withArg(FlinkRunner.class, runner)
-              .withArg(
-                  (Class<PTransform<PCollection<InputT>, PCollection<OutputT>>>)
-                      transform.getTransform().getClass(),
-                  transform.getTransform())
-              .build());
-    }
-  }
-
   /**
    * A {@link PTransformOverrideFactory} that overrides a <a
    * href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a> with {@link SplittableParDo}.


[3/3] beam git commit: This closes #3275

Posted by pe...@apache.org.
This closes #3275


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/608a9c45
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/608a9c45
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/608a9c45

Branch: refs/heads/master
Commit: 608a9c4590ebd94e53ee1ec7f3ad60bfb4905c11
Parents: e4ef23e 5279409
Author: Pei He <pe...@apache.org>
Authored: Tue Jun 20 14:12:55 2017 -0700
Committer: Pei He <pe...@apache.org>
Committed: Tue Jun 20 14:12:55 2017 -0700

----------------------------------------------------------------------
 .../FlinkPipelineExecutionEnvironment.java      |  2 +
 .../flink/FlinkStreamingPipelineTranslator.java | 54 --------------------
 .../runners/flink/FlinkTransformOverrides.java  | 53 +++++++++++++++++++
 3 files changed, 55 insertions(+), 54 deletions(-)
----------------------------------------------------------------------