You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/16 21:18:45 UTC
[3/6] incubator-beam git commit: Skip window assignment when windows
don't change
Skip window assignment when windows don't change
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c37de002
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c37de002
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c37de002
Branch: refs/heads/gearpump-runner
Commit: c37de00266b35ca6f04ddbe7b564f9054b26b622
Parents: 85d54ab
Author: manuzhang <ow...@gmail.com>
Authored: Fri Dec 16 16:49:38 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Dec 16 16:49:38 2016 +0800
----------------------------------------------------------------------
.../beam/runners/gearpump/GearpumpRunner.java | 60 ++++----------------
1 file changed, 11 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c37de002/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index ed0813d..8477870 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -23,20 +23,19 @@ import com.typesafe.config.ConfigValueFactory;
import java.util.HashMap;
import java.util.Map;
-import org.apache.beam.runners.core.AssignWindows;
import org.apache.beam.runners.gearpump.translators.TranslationContext;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.IdentityWindowFn;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
@@ -75,9 +74,10 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
public <OutputT extends POutput, InputT extends PInput> OutputT apply(
PTransform<InputT, OutputT> transform, InputT input) {
- if (Window.Bound.class.equals(transform.getClass())) {
+ if (Window.Bound.class.equals(transform.getClass())
+ && isNullOrIdentityWindowFn(((Window.Bound) transform).getWindowFn())) {
return (OutputT) super.apply(
- new AssignWindowsAndSetStrategy((Window.Bound) transform), input);
+ ParDo.of(new IdentityFn()), input);
} else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
&& ((PCollectionList<?>) input).size() == 0) {
return (OutputT) Pipeline.applyTransform(input.getPipeline().begin(), Create.of());
@@ -139,53 +139,15 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
return config.withValue(GEARPUMP_SERIALIZERS, ConfigValueFactory.fromMap(serializers));
}
+ private static class IdentityFn<T> extends DoFn<T, T> {
- /**
- * copied from DirectPipelineRunner.
- * used to replace Window.Bound till window function is added to Gearpump Stream DSL
- */
- private static class AssignWindowsAndSetStrategy<T, W extends BoundedWindow>
- extends PTransform<PCollection<T>, PCollection<T>> {
-
- private final Window.Bound<T> wrapped;
-
- AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
- this.wrapped = wrapped;
- }
-
- @Override
- public PCollection<T> apply(PCollection<T> input) {
- WindowingStrategy<?, ?> outputStrategy =
- wrapped.getOutputStrategyInternal(input.getWindowingStrategy());
-
- WindowFn<T, BoundedWindow> windowFn =
- (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
-
- if (!windowFn.isNonMerging()) {
- throw new UnsupportedOperationException(
- "merging window is not supported in Gearpump pipeline");
- }
-
- // If the Window.Bound transform only changed parts other than the WindowFn, then
- // we skip AssignWindows even though it should be harmless in a perfect world.
- // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly
- // crash if another GBK is performed without explicitly setting the WindowFn. So we skip
- // AssignWindows in this case.
- if (wrapped.getWindowFn() == null) {
- return input.apply("Identity", ParDo.of(new IdentityFn<T>()))
- .setWindowingStrategyInternal(outputStrategy);
- } else {
- return input
- .apply("AssignWindows", new AssignWindows<>(windowFn))
- .setWindowingStrategyInternal(outputStrategy);
- }
+ @ProcessElement
+ public void process(ProcessContext c) {
+ c.output(c.element());
}
}
- private static class IdentityFn<T> extends OldDoFn<T, T> {
- @Override
- public void processElement(ProcessContext c) {
- c.output(c.element());
- }
+ private boolean isNullOrIdentityWindowFn(WindowFn windowFn) {
+ return windowFn == null || windowFn.getClass().equals(IdentityWindowFn.class);
}
}