You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/06/20 21:13:54 UTC
[1/3] beam git commit: Flink runner: refactor the translator into two
phases: rewriting and translating.
Repository: beam
Updated Branches:
refs/heads/master e4ef23e16 -> 608a9c459
Flink runner: refactor the translator into two phases: rewriting and translating.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52794096
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52794096
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52794096
Branch: refs/heads/master
Commit: 52794096aa8b4d614423fd787835f5b89b1ea1ac
Parents: f69e3b5
Author: Pei He <pe...@apache.org>
Authored: Mon Jun 19 16:10:02 2017 -0700
Committer: Pei He <pe...@apache.org>
Committed: Tue Jun 20 14:12:13 2017 -0700
----------------------------------------------------------------------
.../FlinkPipelineExecutionEnvironment.java | 2 +
.../flink/FlinkStreamingPipelineTranslator.java | 23 ---------
.../runners/flink/FlinkTransformOverrides.java | 53 ++++++++++++++++++++
3 files changed, 55 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/52794096/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index fe5dd87..d2a2016 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -84,6 +84,8 @@ class FlinkPipelineExecutionEnvironment {
this.flinkBatchEnv = null;
this.flinkStreamEnv = null;
+ pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(options.isStreaming()));
+
PipelineTranslationOptimizer optimizer =
new PipelineTranslationOptimizer(TranslationMode.BATCH, options);
http://git-wip-us.apache.org/repos/asf/beam/blob/52794096/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index d768b01..27bb4ec 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -17,11 +17,7 @@
*/
package org.apache.beam.runners.flink;
-import com.google.common.collect.ImmutableList;
-import java.util.List;
import java.util.Map;
-import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
-import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.SplittableParDo;
@@ -29,12 +25,10 @@ import org.apache.beam.runners.core.construction.UnconsumedReads;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PValue;
@@ -70,25 +64,8 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
@Override
public void translate(Pipeline pipeline) {
- List<PTransformOverride> transformOverrides =
- ImmutableList.<PTransformOverride>builder()
- .add(
- PTransformOverride.of(
- PTransformMatchers.splittableParDoMulti(),
- new SplittableParDoOverrideFactory()))
- .add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class),
- new SplittableParDoViaKeyedWorkItems.OverrideFactory()))
- .add(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(CreatePCollectionView.class),
- new CreateStreamingFlinkView.Factory()))
- .build();
-
// Ensure all outputs of all reads are consumed.
UnconsumedReads.ensureAllReadsConsumed(pipeline);
- pipeline.replaceAll(transformOverrides);
super.translate(pipeline);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/52794096/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
new file mode 100644
index 0000000..1dc8de9
--- /dev/null
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.SplittableParDo;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
+
+/**
+ * {@link PTransform} overrides for Flink runner.
+ */
+public class FlinkTransformOverrides {
+ public static List<PTransformOverride> getDefaultOverrides(boolean streaming) {
+ if (streaming) {
+ return ImmutableList.<PTransformOverride>builder()
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.splittableParDoMulti(),
+ new FlinkStreamingPipelineTranslator.SplittableParDoOverrideFactory()))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class),
+ new SplittableParDoViaKeyedWorkItems.OverrideFactory()))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
+ new CreateStreamingFlinkView.Factory()))
+ .build();
+ } else {
+ return ImmutableList.of();
+ }
+ }
+}
[2/3] beam git commit: FlinkRunner: remove the unused
ReflectiveOneToOneOverrideFactory.
Posted by pe...@apache.org.
FlinkRunner: remove the unused ReflectiveOneToOneOverrideFactory.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f69e3b53
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f69e3b53
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f69e3b53
Branch: refs/heads/master
Commit: f69e3b53fafa4b79b21095d4b65edbe7cfeb7d2a
Parents: e4ef23e
Author: Pei He <pe...@apache.org>
Authored: Mon Jun 19 15:55:48 2017 -0700
Committer: Pei He <pe...@apache.org>
Committed: Tue Jun 20 14:12:13 2017 -0700
----------------------------------------------------------------------
.../flink/FlinkStreamingPipelineTranslator.java | 31 --------------------
1 file changed, 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f69e3b53/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index a88ff07..d768b01 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -24,7 +24,6 @@ import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
-import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.UnconsumedReads;
import org.apache.beam.sdk.Pipeline;
@@ -36,7 +35,6 @@ import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PValue;
@@ -198,35 +196,6 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
}
}
- private static class ReflectiveOneToOneOverrideFactory<
- InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>>
- extends SingleInputOutputOverrideFactory<
- PCollection<InputT>, PCollection<OutputT>, TransformT> {
- private final Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement;
- private final FlinkRunner runner;
-
- private ReflectiveOneToOneOverrideFactory(
- Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement,
- FlinkRunner runner) {
- this.replacement = replacement;
- this.runner = runner;
- }
-
- @Override
- public PTransformReplacement<PCollection<InputT>, PCollection<OutputT>> getReplacementTransform(
- AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, TransformT> transform) {
- return PTransformReplacement.of(
- PTransformReplacements.getSingletonMainInput(transform),
- InstanceBuilder.ofType(replacement)
- .withArg(FlinkRunner.class, runner)
- .withArg(
- (Class<PTransform<PCollection<InputT>, PCollection<OutputT>>>)
- transform.getTransform().getClass(),
- transform.getTransform())
- .build());
- }
- }
-
/**
* A {@link PTransformOverrideFactory} that overrides a <a
* href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a> with {@link SplittableParDo}.
[3/3] beam git commit: This closes #3275
Posted by pe...@apache.org.
This closes #3275
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/608a9c45
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/608a9c45
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/608a9c45
Branch: refs/heads/master
Commit: 608a9c4590ebd94e53ee1ec7f3ad60bfb4905c11
Parents: e4ef23e 5279409
Author: Pei He <pe...@apache.org>
Authored: Tue Jun 20 14:12:55 2017 -0700
Committer: Pei He <pe...@apache.org>
Committed: Tue Jun 20 14:12:55 2017 -0700
----------------------------------------------------------------------
.../FlinkPipelineExecutionEnvironment.java | 2 +
.../flink/FlinkStreamingPipelineTranslator.java | 54 --------------------
.../runners/flink/FlinkTransformOverrides.java | 53 +++++++++++++++++++
3 files changed, 55 insertions(+), 54 deletions(-)
----------------------------------------------------------------------