You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/24 03:23:18 UTC
[06/12] beam git commit: support OutputTimeFn
support OutputTimeFn
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f6aaf0d9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f6aaf0d9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f6aaf0d9
Branch: refs/heads/gearpump-runner
Commit: f6aaf0d9ecd6b67ad6f7eed413af3fae3b3bdf6f
Parents: 3bf8263
Author: manuzhang <ow...@gmail.com>
Authored: Sat Jan 14 21:41:40 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Jan 14 21:41:40 2017 +0800
----------------------------------------------------------------------
.../translators/GroupByKeyTranslator.java | 57 +++++++++++++++++---
.../translators/WindowBoundTranslator.java | 20 ++++---
2 files changed, 64 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f6aaf0d9/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 4eaf755..e16a178 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
@@ -58,12 +59,16 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
JavaStream<WindowedValue<KV<K, V>>> inputStream =
context.getInputStream(input);
int parallelism = context.getPipelineOptions().getParallelism();
+ OutputTimeFn<? super BoundedWindow> outputTimeFn = (OutputTimeFn<? super BoundedWindow>)
+ input.getWindowingStrategy().getOutputTimeFn();
JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream
.window(Window.apply(new GearpumpWindowFn(input.getWindowingStrategy().getWindowFn()),
EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
.groupBy(new GroupByFn<K, V>(), parallelism, "group_by_Key_and_Window")
.map(new ValueToIterable<K, V>(), "map_value_to_iterable")
- .reduce(new MergeValue<K, V>(), "merge_value");
+ .map(new KeyedByTimestamp<K, V>(), "keyed_by_timestamp")
+ .reduce(new Merge<K, V>(outputTimeFn), "merge")
+ .map(new Values<K, V>(), "values");
context.setOutputStream(context.getOutput(transform), outputStream);
}
@@ -141,15 +146,53 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
}
}
- private static class MergeValue<K, V> extends
- ReduceFunction<WindowedValue<KV<K, Iterable<V>>>> {
+ private static class KeyedByTimestamp<K, V>
+ extends MapFunction<WindowedValue<KV<K, Iterable<V>>>,
+ KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>> {
@Override
- public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, Iterable<V>>> wv1,
- WindowedValue<KV<K, Iterable<V>>> wv2) {
- return WindowedValue.of(KV.of(wv1.getValue().getKey(),
+ public KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> apply(
+ WindowedValue<KV<K, Iterable<V>>> wv) {
+ return KV.of(wv.getTimestamp(), wv);
+ }
+ }
+
+ private static class Merge<K, V> extends
+ ReduceFunction<KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>> {
+
+ private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
+
+ Merge(OutputTimeFn<? super BoundedWindow> outputTimeFn) {
+ this.outputTimeFn = outputTimeFn;
+ }
+
+ @Override
+ public KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> apply(
+ KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> kv1,
+ KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> kv2) {
+ org.joda.time.Instant t1 = kv1.getKey();
+ org.joda.time.Instant t2 = kv2.getKey();
+
+ WindowedValue<KV<K, Iterable<V>>> wv1 = kv1.getValue();
+ WindowedValue<KV<K, Iterable<V>>> wv2 = kv2.getValue();
+
+ return KV.of(outputTimeFn.combine(t1, t2),
+ WindowedValue.of(KV.of(wv1.getValue().getKey(),
Iterables.concat(wv1.getValue().getValue(), wv2.getValue().getValue())),
- wv1.getTimestamp(), wv1.getWindows(), wv1.getPane());
+ wv1.getTimestamp(), wv1.getWindows(), wv1.getPane()));
+ }
+ }
+
+ private static class Values<K, V> extends
+ MapFunction<KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>,
+ WindowedValue<KV<K, Iterable<V>>>> {
+
+ @Override
+ public WindowedValue<KV<K, Iterable<V>>> apply(KV<org.joda.time.Instant,
+ WindowedValue<KV<K, Iterable<V>>>> kv) {
+ org.joda.time.Instant timestamp = kv.getKey();
+ WindowedValue<KV<K, Iterable<V>>> wv = kv.getValue();
+ return WindowedValue.of(wv.getValue(), timestamp, wv.getWindows(), wv.getPane());
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f6aaf0d9/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
index d3c50a5..9bf1936 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
import java.util.List;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
@@ -53,9 +54,11 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou
transform.getOutputStrategyInternal(input.getWindowingStrategy());
WindowFn<T, BoundedWindow> windowFn =
(WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
+ OutputTimeFn<? super BoundedWindow> outputTimeFn = (OutputTimeFn<? super BoundedWindow>)
+ outputStrategy.getOutputTimeFn();
JavaStream<WindowedValue<T>> outputStream =
inputStream
- .flatMap(new AssignWindows(windowFn), "assign_windows")
+ .flatMap(new AssignWindows(windowFn, outputTimeFn), "assign_windows")
.process(AssignTimestampTask.class, 1, UserConfig.empty(), "assign_timestamp");
context.setOutputStream(context.getOutput(transform), outputStream);
@@ -64,17 +67,21 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou
private static class AssignWindows<T> extends
FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
- private final WindowFn<T, BoundedWindow> fn;
+ private final WindowFn<T, BoundedWindow> windowFn;
+ private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
- AssignWindows(WindowFn<T, BoundedWindow> fn) {
- this.fn = fn;
+ AssignWindows(
+ WindowFn<T, BoundedWindow> windowFn,
+ OutputTimeFn<? super BoundedWindow> outputTimeFn) {
+ this.windowFn = windowFn;
+ this.outputTimeFn = outputTimeFn;
}
@Override
public Iterator<WindowedValue<T>> apply(final WindowedValue<T> value) {
List<WindowedValue<T>> ret = new LinkedList<>();
try {
- Collection<BoundedWindow> windows = fn.assignWindows(fn.new AssignContext() {
+ Collection<BoundedWindow> windows = windowFn.assignWindows(windowFn.new AssignContext() {
@Override
public T element() {
return value.getValue();
@@ -91,8 +98,9 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou
}
});
for (BoundedWindow window: windows) {
+ Instant timestamp = outputTimeFn.assignOutputTime(value.getTimestamp(), window);
ret.add(WindowedValue.of(
- value.getValue(), value.getTimestamp(), window, value.getPane()));
+ value.getValue(), timestamp, window, value.getPane()));
}
} catch (Exception e) {
throw new RuntimeException(e);