You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/17 20:31:10 UTC

[19/50] [abbrv] beam git commit: Tidy a troublesome TestStreamTest

Tidy a troublesome TestStreamTest


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/555ba40d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/555ba40d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/555ba40d

Branch: refs/heads/tez-runner
Commit: 555ba40d5934694476b5337b88276625252d684e
Parents: a593e49
Author: Kenneth Knowles <ke...@apache.org>
Authored: Fri Oct 27 10:51:38 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Mon Nov 13 15:03:21 2017 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/testing/TestStreamTest.java | 25 +++++++++++---------
 1 file changed, 14 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/555ba40d/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index bef6aa0..2f147dc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -240,21 +240,24 @@ public class TestStreamTest implements Serializable {
   @Category({NeedsRunner.class, UsesTestStream.class})
   public void testElementsAtAlmostPositiveInfinity() {
     Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
-    TestStream<String> stream = TestStream.create(StringUtf8Coder.of())
-        .addElements(TimestampedValue.of("foo", endOfGlobalWindow),
-            TimestampedValue.of("bar", endOfGlobalWindow))
-        .advanceWatermarkToInfinity();
+    TestStream<String> stream =
+        TestStream.create(StringUtf8Coder.of())
+            .addElements(
+                TimestampedValue.of("foo", endOfGlobalWindow),
+                TimestampedValue.of("bar", endOfGlobalWindow))
+            .advanceWatermarkToInfinity();
 
     FixedWindows windows = FixedWindows.of(Duration.standardHours(6));
-    PCollection<String> windowedValues = p.apply(stream)
-        .apply(Window.<String>into(windows))
-        .apply(WithKeys.<Integer, String>of(1))
-        .apply(GroupByKey.<Integer, String>create())
-        .apply(Values.<Iterable<String>>create())
-        .apply(Flatten.<String>iterables());
+    PCollection<String> windowedValues =
+        p.apply(stream)
+            .apply(Window.<String>into(windows))
+            .apply(WithKeys.<Integer, String>of(1))
+            .apply(GroupByKey.<Integer, String>create())
+            .apply(Values.<Iterable<String>>create())
+            .apply(Flatten.<String>iterables());
 
     PAssert.that(windowedValues)
-        .inWindow(windows.assignWindow(GlobalWindow.INSTANCE.maxTimestamp()))
+        .inWindow(windows.assignWindow(endOfGlobalWindow))
         .containsInAnyOrder("foo", "bar");
     p.run();
   }