You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/24 20:36:45 UTC
[3/4] incubator-beam git commit: Fixes to timestamps in
GroupAlsoByWindowsProperties
Fixes to timestamps in GroupAlsoByWindowsProperties
These properties had poor test coverage, so their timestamps were not
updated alongside the new default for OutputTimeFn.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/55aae464
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/55aae464
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/55aae464
Branch: refs/heads/master
Commit: 55aae464530414f6e3ebe1103c32e39e6fc98a6f
Parents: d94a6f1
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 24 13:11:40 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 24 13:11:40 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/util/GroupAlsoByWindowsProperties.java | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55aae464/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
index 4518f9f..c4f3c8b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java
@@ -191,8 +191,9 @@ public class GroupAlsoByWindowsProperties {
CombineFn<Long, ?, Long> combineFn)
throws Exception {
- WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of(
- SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)));
+ WindowingStrategy<?, IntervalWindow> windowingStrategy =
+ WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10)))
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp());
List<WindowedValue<KV<String, Long>>> result =
runGABW(gabwFactory, windowingStrategy, "k",
@@ -360,14 +361,14 @@ public class GroupAlsoByWindowsProperties {
KvMatcher.isKv(
equalTo("k"),
equalTo(combineFn.apply(ImmutableList.of(1L, 2L)))),
- 0, // aggregate timestamp
+ window(0, 15).maxTimestamp().getMillis(), // aggregate timestamp
0, // window start
15), // window end
WindowMatchers.isSingleWindowedValue(
KvMatcher.isKv(
equalTo("k"),
equalTo(combineFn.apply(ImmutableList.of(4L)))),
- 15, // aggregate timestamp
+ window(15, 25).maxTimestamp().getMillis(), // aggregate timestamp
15, // window start
25))); // window end
}