You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/24 20:36:45 UTC

[3/4] incubator-beam git commit: Fixes to timestamps in GroupAlsoByWindowsProperties

Fixes to timestamps in GroupAlsoByWindowsProperties

These properties had poor test coverage, so their timestamps were not
updated alongside the new default for OutputTimeFn.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/55aae464
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/55aae464
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/55aae464

Branch: refs/heads/master
Commit: 55aae464530414f6e3ebe1103c32e39e6fc98a6f
Parents: d94a6f1
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 24 13:11:40 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 24 13:11:40 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/util/GroupAlsoByWindowsProperties.java  | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55aae464/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
index 4518f9f..c4f3c8b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
@@ -191,8 +191,9 @@ public class GroupAlsoByWindowsProperties {
       CombineFn<Long, ?, Long> combineFn)
           throws Exception {
 
-    WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(
-        SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)));
+    WindowingStrategy<?, IntervalWindow> windowingStrategy =
+        WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
 
     List<WindowedValue<KV<String, Long>>> result =
         runGABW(gabwFactory, windowingStrategy, "k",
@@ -360,14 +361,14 @@ public class GroupAlsoByWindowsProperties {
             KvMatcher.isKv(
                 equalTo("k"),
                 equalTo(combineFn.apply(ImmutableList.of(1L, 2L)))),
-            0, // aggregate timestamp
+            window(0, 15).maxTimestamp().getMillis(), // aggregate timestamp
             0, // window start
             15), // window end
         WindowMatchers.isSingleWindowedValue(
             KvMatcher.isKv(
                 equalTo("k"),
                 equalTo(combineFn.apply(ImmutableList.of(4L)))),
-            15, // aggregate timestamp
+            window(15, 25).maxTimestamp().getMillis(), // aggregate timestamp
             15, // window start
             25))); // window end
   }