You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:52 UTC

[27/50] [abbrv] beam git commit: Respect WindowFn#getOutputTime in gearpump-runner

Respect WindowFn#getOutputTime in gearpump-runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/98854d4d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/98854d4d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/98854d4d

Branch: refs/heads/master
Commit: 98854d4d01ca526ea4a44dc077d2cfb4cddf9914
Parents: 3c7e3e6
Author: manuzhang <ow...@gmail.com>
Authored: Fri May 19 09:19:42 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Jun 5 19:16:53 2017 +0800

----------------------------------------------------------------------
 .../gearpump/translators/GroupByKeyTranslator.java      | 12 ++++++++----
 .../gearpump/translators/GroupByKeyTranslatorTest.java  |  8 ++++----
 2 files changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/98854d4d/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 521f665..7d944a4 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -74,7 +74,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
             new GearpumpWindowFn(windowFn.isNonMerging()),
             EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
         .groupBy(new GroupByFn<K, V>(inputKeyCoder), parallelism, "group_by_Key_and_Window")
-        .map(new KeyedByTimestamp<K, V>(timestampCombiner), "keyed_by_timestamp")
+        .map(new KeyedByTimestamp<K, V>(windowFn, timestampCombiner), "keyed_by_timestamp")
         .fold(new Merge<>(windowFn, timestampCombiner), "merge")
         .map(new Values<K, V>(), "values");
 
@@ -146,17 +146,21 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
       extends MapFunction<WindowedValue<KV<K, V>>,
       KV<Instant, WindowedValue<KV<K, V>>>> {
 
+    private final WindowFn<KV<K, V>, BoundedWindow> windowFn;
     private final TimestampCombiner timestampCombiner;
 
-    public KeyedByTimestamp(TimestampCombiner timestampCombiner) {
+    public KeyedByTimestamp(WindowFn<KV<K, V>, BoundedWindow> windowFn,
+        TimestampCombiner timestampCombiner) {
+      this.windowFn = windowFn;
       this.timestampCombiner = timestampCombiner;
     }
 
     @Override
     public KV<org.joda.time.Instant, WindowedValue<KV<K, V>>> map(
         WindowedValue<KV<K, V>> wv) {
-      Instant timestamp = timestampCombiner.assign(
-          Iterables.getOnlyElement(wv.getWindows()), wv.getTimestamp());
+      BoundedWindow window = Iterables.getOnlyElement(wv.getWindows());
+      Instant timestamp = timestampCombiner.assign(window
+          , windowFn.getOutputTime(wv.getTimestamp(), window));
       return KV.of(timestamp, wv);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/98854d4d/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
index 86b60aa..d5b931b 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 import java.time.Instant;
@@ -95,18 +94,19 @@ public class GroupByKeyTranslatorTest {
   @Test
   @SuppressWarnings({"rawtypes", "unchecked"})
   public void testKeyedByTimestamp() {
+    WindowFn slidingWindows = Sessions.withGapDuration(Duration.millis(10));
     BoundedWindow window =
         new IntervalWindow(new org.joda.time.Instant(0), new org.joda.time.Instant(10));
     GroupByKeyTranslator.KeyedByTimestamp keyedByTimestamp =
-        new GroupByKeyTranslator.KeyedByTimestamp(timestampCombiner);
+        new GroupByKeyTranslator.KeyedByTimestamp(slidingWindows, timestampCombiner);
     WindowedValue<KV<String, String>> value =
         WindowedValue.of(
             KV.of("key", "val"), org.joda.time.Instant.now(), window, PaneInfo.NO_FIRING);
     KV<org.joda.time.Instant, WindowedValue<KV<String, String>>> result =
         keyedByTimestamp.map(value);
     org.joda.time.Instant time =
-        timestampCombiner.assign(Iterables.getOnlyElement(value.getWindows()),
-            value.getTimestamp());
+        timestampCombiner.assign(window,
+            slidingWindows.getOutputTime(value.getTimestamp(), window));
     assertThat(result, equalTo(KV.of(time, value)));
   }