You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/17 20:31:10 UTC
[19/50] [abbrv] beam git commit: Tidy a troublesome TestStreamTest
Tidy a troublesome TestStreamTest
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/555ba40d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/555ba40d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/555ba40d
Branch: refs/heads/tez-runner
Commit: 555ba40d5934694476b5337b88276625252d684e
Parents: a593e49
Author: Kenneth Knowles <ke...@apache.org>
Authored: Fri Oct 27 10:51:38 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Mon Nov 13 15:03:21 2017 -0800
----------------------------------------------------------------------
.../apache/beam/sdk/testing/TestStreamTest.java | 25 +++++++++++---------
1 file changed, 14 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/555ba40d/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index bef6aa0..2f147dc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -240,21 +240,24 @@ public class TestStreamTest implements Serializable {
@Category({NeedsRunner.class, UsesTestStream.class})
public void testElementsAtAlmostPositiveInfinity() {
Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
- TestStream<String> stream = TestStream.create(StringUtf8Coder.of())
- .addElements(TimestampedValue.of("foo", endOfGlobalWindow),
- TimestampedValue.of("bar", endOfGlobalWindow))
- .advanceWatermarkToInfinity();
+ TestStream<String> stream =
+ TestStream.create(StringUtf8Coder.of())
+ .addElements(
+ TimestampedValue.of("foo", endOfGlobalWindow),
+ TimestampedValue.of("bar", endOfGlobalWindow))
+ .advanceWatermarkToInfinity();
FixedWindows windows = FixedWindows.of(Duration.standardHours(6));
- PCollection<String> windowedValues = p.apply(stream)
- .apply(Window.<String>into(windows))
- .apply(WithKeys.<Integer, String>of(1))
- .apply(GroupByKey.<Integer, String>create())
- .apply(Values.<Iterable<String>>create())
- .apply(Flatten.<String>iterables());
+ PCollection<String> windowedValues =
+ p.apply(stream)
+ .apply(Window.<String>into(windows))
+ .apply(WithKeys.<Integer, String>of(1))
+ .apply(GroupByKey.<Integer, String>create())
+ .apply(Values.<Iterable<String>>create())
+ .apply(Flatten.<String>iterables());
PAssert.that(windowedValues)
- .inWindow(windows.assignWindow(GlobalWindow.INSTANCE.maxTimestamp()))
+ .inWindow(windows.assignWindow(endOfGlobalWindow))
.containsInAnyOrder("foo", "bar");
p.run();
}