You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2016/11/24 14:21:28 UTC
[1/2] incubator-beam git commit: Replace WindowAssignment OldDoFn by
FlatMap in Flink Runner
Repository: incubator-beam
Updated Branches:
refs/heads/master 3e4b2fd0d -> 8d1214a3b
Replace WindowAssignment OldDoFn by FlatMap in Flink Runner
The streaming runner had an OldDoFn that was used for assigning windows
using a WindowFn. This is now done with a FlatMap.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4a097729
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4a097729
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4a097729
Branch: refs/heads/master
Commit: 4a097729ac9fc65283f4f11f85812188589c8df3
Parents: 3e4b2fd
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Nov 8 11:03:21 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 24 11:39:30 2016 +0100
----------------------------------------------------------------------
.../FlinkStreamingTransformTranslators.java | 63 +++-----------------
1 file changed, 9 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4a097729/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 40dfbb9..47935eb 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.flink.translation;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.nio.ByteBuffer;
@@ -31,6 +30,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.flink.FlinkRunner;
+import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.types.FlinkCoder;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
@@ -53,7 +53,6 @@ import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
@@ -637,64 +636,20 @@ public class FlinkStreamingTransformTranslators {
TypeInformation<WindowedValue<T>> typeInfo =
context.getTypeInfo(context.getOutput(transform));
- OldDoFn<T, T> windowAssignerDoFn =
- createWindowAssigner(windowingStrategy.getWindowFn());
-
- @SuppressWarnings("unchecked")
- PCollection<T> inputPCollection = context.getInput(transform);
-
- TypeInformation<WindowedValue<T>> inputTypeInfo =
- context.getTypeInfo(inputPCollection);
-
- DoFnOperator<T, T, WindowedValue<T>> doFnOperator = new DoFnOperator<>(
- windowAssignerDoFn,
- inputTypeInfo,
- new TupleTag<T>("main output"),
- Collections.<TupleTag<?>>emptyList(),
- new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<T>>(),
- windowingStrategy,
- new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
- Collections.<PCollectionView<?>>emptyList(), /* side inputs */
- context.getPipelineOptions());
-
DataStream<WindowedValue<T>> inputDataStream =
context.getInputDataStream(context.getInput(transform));
- SingleOutputStreamOperator<WindowedValue<T>> outDataStream = inputDataStream
- .transform(transform.getName(), typeInfo, doFnOperator);
-
- context.setOutputDataStream(context.getOutput(transform), outDataStream);
- }
+ WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn();
- private static <T, W extends BoundedWindow> OldDoFn<T, T> createWindowAssigner(
- final WindowFn<T, W> windowFn) {
+ FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction =
+ new FlinkAssignWindows<>(windowFn);
- return new OldDoFn<T, T>() {
+ SingleOutputStreamOperator<WindowedValue<T>> outputDataStream = inputDataStream
+ .flatMap(assignWindowsFunction)
+ .name(context.getOutput(transform).getName())
+ .returns(typeInfo);
- @Override
- public void processElement(final ProcessContext c) throws Exception {
- Collection<W> windows = windowFn.assignWindows(
- windowFn.new AssignContext() {
- @Override
- public T element() {
- return c.element();
- }
-
- @Override
- public Instant timestamp() {
- return c.timestamp();
- }
-
- @Override
- public BoundedWindow window() {
- return Iterables.getOnlyElement(c.windowingInternals().windows());
- }
- });
-
- c.windowingInternals().outputWindowedValue(
- c.element(), c.timestamp(), windows, c.pane());
- }
- };
+ context.setOutputDataStream(context.getOutput(transform), outputDataStream);
}
}
[2/2] incubator-beam git commit: This closes #1435
Posted by al...@apache.org.
This closes #1435
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8d1214a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8d1214a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8d1214a3
Branch: refs/heads/master
Commit: 8d1214a3ba94b21102b74d346e73f24ecd9056f2
Parents: 3e4b2fd 4a09772
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Nov 24 15:20:49 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 24 15:20:49 2016 +0100
----------------------------------------------------------------------
.../FlinkStreamingTransformTranslators.java | 63 +++-----------------
1 file changed, 9 insertions(+), 54 deletions(-)
----------------------------------------------------------------------