You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/16 21:18:45 UTC

[3/6] incubator-beam git commit: Skip window assignment when windows don't change

Skip window assignment when windows don't change


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c37de002
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c37de002
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c37de002

Branch: refs/heads/gearpump-runner
Commit: c37de00266b35ca6f04ddbe7b564f9054b26b622
Parents: 85d54ab
Author: manuzhang <ow...@gmail.com>
Authored: Fri Dec 16 16:49:38 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Dec 16 16:49:38 2016 +0800

----------------------------------------------------------------------
 .../beam/runners/gearpump/GearpumpRunner.java   | 60 ++++----------------
 1 file changed, 11 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c37de002/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index ed0813d..8477870 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -23,20 +23,19 @@ import com.typesafe.config.ConfigValueFactory;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.beam.runners.core.AssignWindows;
 import org.apache.beam.runners.gearpump.translators.TranslationContext;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.IdentityWindowFn;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -75,9 +74,10 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
 
   public <OutputT extends POutput, InputT extends PInput> OutputT apply(
       PTransform<InputT, OutputT> transform, InputT input) {
-    if (Window.Bound.class.equals(transform.getClass())) {
+    if (Window.Bound.class.equals(transform.getClass())
+        && isNullOrIdentityWindowFn(((Window.Bound) transform).getWindowFn())) {
       return (OutputT) super.apply(
-              new AssignWindowsAndSetStrategy((Window.Bound) transform), input);
+              ParDo.of(new IdentityFn()), input);
     } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
             && ((PCollectionList<?>) input).size() == 0) {
       return (OutputT) Pipeline.applyTransform(input.getPipeline().begin(), Create.of());
@@ -139,53 +139,15 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
     return config.withValue(GEARPUMP_SERIALIZERS, ConfigValueFactory.fromMap(serializers));
   }
 
+  private static class IdentityFn<T> extends DoFn<T, T> {
 
-  /**
-   * copied from DirectPipelineRunner.
-   * used to replace Window.Bound till window function is added to Gearpump Stream DSL
-   */
-  private static class AssignWindowsAndSetStrategy<T, W extends BoundedWindow>
-      extends PTransform<PCollection<T>, PCollection<T>> {
-
-    private final Window.Bound<T> wrapped;
-
-    AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
-      this.wrapped = wrapped;
-    }
-
-    @Override
-    public PCollection<T> apply(PCollection<T> input) {
-      WindowingStrategy<?, ?> outputStrategy =
-          wrapped.getOutputStrategyInternal(input.getWindowingStrategy());
-
-      WindowFn<T, BoundedWindow> windowFn =
-          (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
-
-      if (!windowFn.isNonMerging()) {
-        throw new UnsupportedOperationException(
-            "merging window is not supported in Gearpump pipeline");
-      }
-
-      // If the Window.Bound transform only changed parts other than the WindowFn, then
-      // we skip AssignWindows even though it should be harmless in a perfect world.
-      // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly
-      // crash if another GBK is performed without explicitly setting the WindowFn. So we skip
-      // AssignWindows in this case.
-      if (wrapped.getWindowFn() == null) {
-        return input.apply("Identity", ParDo.of(new IdentityFn<T>()))
-            .setWindowingStrategyInternal(outputStrategy);
-      } else {
-        return input
-            .apply("AssignWindows", new AssignWindows<>(windowFn))
-            .setWindowingStrategyInternal(outputStrategy);
-      }
+    @ProcessElement
+    public void process(ProcessContext c) {
+      c.output(c.element());
     }
   }
 
-  private static class IdentityFn<T> extends OldDoFn<T, T> {
-    @Override
-    public void processElement(ProcessContext c) {
-      c.output(c.element());
-    }
+  private boolean isNullOrIdentityWindowFn(WindowFn windowFn) {
+    return windowFn == null || windowFn.getClass().equals(IdentityWindowFn.class);
   }
 }