You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/24 03:23:18 UTC

[06/12] beam git commit: support OutputTimeFn

support OutputTimeFn


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f6aaf0d9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f6aaf0d9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f6aaf0d9

Branch: refs/heads/gearpump-runner
Commit: f6aaf0d9ecd6b67ad6f7eed413af3fae3b3bdf6f
Parents: 3bf8263
Author: manuzhang <ow...@gmail.com>
Authored: Sat Jan 14 21:41:40 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sat Jan 14 21:41:40 2017 +0800

----------------------------------------------------------------------
 .../translators/GroupByKeyTranslator.java       | 57 +++++++++++++++++---
 .../translators/WindowBoundTranslator.java      | 20 ++++---
 2 files changed, 64 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f6aaf0d9/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 4eaf755..e16a178 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 
@@ -58,12 +59,16 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     JavaStream<WindowedValue<KV<K, V>>> inputStream =
         context.getInputStream(input);
     int parallelism = context.getPipelineOptions().getParallelism();
+    OutputTimeFn<? super BoundedWindow> outputTimeFn = (OutputTimeFn<? super BoundedWindow>)
+        input.getWindowingStrategy().getOutputTimeFn();
     JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream
         .window(Window.apply(new GearpumpWindowFn(input.getWindowingStrategy().getWindowFn()),
             EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
         .groupBy(new GroupByFn<K, V>(), parallelism, "group_by_Key_and_Window")
         .map(new ValueToIterable<K, V>(), "map_value_to_iterable")
-        .reduce(new MergeValue<K, V>(), "merge_value");
+        .map(new KeyedByTimestamp<K, V>(), "keyed_by_timestamp")
+        .reduce(new Merge<K, V>(outputTimeFn), "merge")
+        .map(new Values<K, V>(), "values");
 
     context.setOutputStream(context.getOutput(transform), outputStream);
   }
@@ -141,15 +146,53 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     }
   }
 
-  private static class MergeValue<K, V> extends
-      ReduceFunction<WindowedValue<KV<K, Iterable<V>>>> {
+  private static class KeyedByTimestamp<K, V>
+      extends MapFunction<WindowedValue<KV<K, Iterable<V>>>,
+      KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>> {
 
     @Override
-    public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, Iterable<V>>> wv1,
-        WindowedValue<KV<K, Iterable<V>>> wv2) {
-      return WindowedValue.of(KV.of(wv1.getValue().getKey(),
+    public KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> apply(
+        WindowedValue<KV<K, Iterable<V>>> wv) {
+      return KV.of(wv.getTimestamp(), wv);
+    }
+  }
+
+  private static class Merge<K, V> extends
+      ReduceFunction<KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>> {
+
+    private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
+
+    Merge(OutputTimeFn<? super BoundedWindow> outputTimeFn) {
+      this.outputTimeFn = outputTimeFn;
+    }
+
+    @Override
+    public KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> apply(
+        KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> kv1,
+        KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> kv2) {
+      org.joda.time.Instant t1 = kv1.getKey();
+      org.joda.time.Instant t2 = kv2.getKey();
+
+      WindowedValue<KV<K, Iterable<V>>> wv1 = kv1.getValue();
+      WindowedValue<KV<K, Iterable<V>>> wv2 = kv2.getValue();
+
+      return KV.of(outputTimeFn.combine(t1, t2),
+          WindowedValue.of(KV.of(wv1.getValue().getKey(),
               Iterables.concat(wv1.getValue().getValue(), wv2.getValue().getValue())),
-          wv1.getTimestamp(), wv1.getWindows(), wv1.getPane());
+              wv1.getTimestamp(), wv1.getWindows(), wv1.getPane()));
+    }
+  }
+
+  private static class Values<K, V> extends
+      MapFunction<KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>,
+          WindowedValue<KV<K, Iterable<V>>>> {
+
+    @Override
+    public WindowedValue<KV<K, Iterable<V>>> apply(KV<org.joda.time.Instant,
+        WindowedValue<KV<K, Iterable<V>>>> kv) {
+      org.joda.time.Instant timestamp = kv.getKey();
+      WindowedValue<KV<K, Iterable<V>>> wv = kv.getValue();
+      return WindowedValue.of(wv.getValue(), timestamp, wv.getWindows(), wv.getPane());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/f6aaf0d9/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
index d3c50a5..9bf1936 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -53,9 +54,11 @@ public class WindowBoundTranslator<T> implements  TransformTranslator<Window.Bou
         transform.getOutputStrategyInternal(input.getWindowingStrategy());
     WindowFn<T, BoundedWindow> windowFn =
         (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
+    OutputTimeFn<? super BoundedWindow> outputTimeFn = (OutputTimeFn<? super BoundedWindow>)
+        outputStrategy.getOutputTimeFn();
     JavaStream<WindowedValue<T>> outputStream =
         inputStream
-            .flatMap(new AssignWindows(windowFn), "assign_windows")
+            .flatMap(new AssignWindows(windowFn, outputTimeFn), "assign_windows")
             .process(AssignTimestampTask.class, 1, UserConfig.empty(), "assign_timestamp");
 
     context.setOutputStream(context.getOutput(transform), outputStream);
@@ -64,17 +67,21 @@ public class WindowBoundTranslator<T> implements  TransformTranslator<Window.Bou
   private static class AssignWindows<T> extends
       FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
 
-    private final WindowFn<T, BoundedWindow> fn;
+    private final WindowFn<T, BoundedWindow> windowFn;
+    private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
 
-    AssignWindows(WindowFn<T, BoundedWindow> fn) {
-      this.fn = fn;
+    AssignWindows(
+        WindowFn<T, BoundedWindow> windowFn,
+        OutputTimeFn<? super BoundedWindow> outputTimeFn) {
+      this.windowFn = windowFn;
+      this.outputTimeFn = outputTimeFn;
     }
 
     @Override
     public Iterator<WindowedValue<T>> apply(final WindowedValue<T> value) {
       List<WindowedValue<T>>  ret = new LinkedList<>();
       try {
-        Collection<BoundedWindow> windows = fn.assignWindows(fn.new AssignContext() {
+        Collection<BoundedWindow> windows = windowFn.assignWindows(windowFn.new AssignContext() {
           @Override
           public T element() {
             return value.getValue();
@@ -91,8 +98,9 @@ public class WindowBoundTranslator<T> implements  TransformTranslator<Window.Bou
           }
         });
         for (BoundedWindow window: windows) {
+          Instant timestamp = outputTimeFn.assignOutputTime(value.getTimestamp(), window);
           ret.add(WindowedValue.of(
-              value.getValue(), value.getTimestamp(), window, value.getPane()));
+              value.getValue(), timestamp, window, value.getPane()));
         }
       } catch (Exception e) {
         throw new RuntimeException(e);