You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/05/28 07:33:17 UTC

[beam] 02/06: Update windowAssignTest

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 982197ce985197e4153d79938db247c84d708fc7
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue May 7 09:35:46 2019 +0200

    Update windowAssignTest
---
 .../translation/batch/WindowAssignTest.java           | 19 ++++++++-----------
 1 file changed, 8 insertions(+), 11 deletions(-)

diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
index 61da3ea..3011d88 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
@@ -27,13 +27,11 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -50,20 +48,19 @@ public class WindowAssignTest implements Serializable {
     p = Pipeline.create(options);
   }
 
-  @Ignore
   @Test
   public void testWindowAssign() {
-    PCollection<KV<Integer, Integer>> input =
+    PCollection<Integer> input =
         p.apply(
                 Create.timestamped(
-                    TimestampedValue.of(KV.of(1, 1), new Instant(1)),
-                    TimestampedValue.of(KV.of(1, 2), new Instant(2)),
-                    TimestampedValue.of(KV.of(1, 3), new Instant(3)),
-                    TimestampedValue.of(KV.of(1, 4), new Instant(10)),
-                    TimestampedValue.of(KV.of(1, 5), new Instant(11))))
+                    TimestampedValue.of(1, new Instant(1)),
+                    TimestampedValue.of(2, new Instant(2)),
+                    TimestampedValue.of(3, new Instant(3)),
+                    TimestampedValue.of(4, new Instant(10)),
+                    TimestampedValue.of(5, new Instant(11))))
             .apply(Window.into(FixedWindows.of(Duration.millis(10))))
-            .apply(Sum.integersPerKey());
-    PAssert.that(input).containsInAnyOrder(KV.of(1, 6), KV.of(1, 9));
+            .apply(Sum.integersGlobally().withoutDefaults());
+    PAssert.that(input).containsInAnyOrder(6, 9);
     p.run();
   }
 }