You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/05/28 13:24:14 UTC

[beam] branch spark-runner_structured-streaming updated: Add a test that GBK preserves windowing

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
     new f86660f  Add a test that GBK preserves windowing
f86660f is described below

commit f86660f0d1c31f5ce948f08941cdb225f0a4fdac
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue May 28 15:23:18 2019 +0200

    Add a test that GBK preserves windowing
---
 .../translation/batch/GroupByKeyTest.java          | 42 ++++++++++++++++++++++
 1 file changed, 42 insertions(+)

diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java
index f608920..c95d07d 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java
@@ -29,10 +29,17 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -51,6 +58,41 @@ public class GroupByKeyTest implements Serializable {
   }
 
   @Test
+  public void testGroupByKeyPreservesWindowing(){
+    PCollection<KV<Integer, Iterable<Integer>>> input = p.apply(Create
+        .timestamped(
+            TimestampedValue.of(KV.of(1, 1), new Instant(1)),
+            TimestampedValue.of(KV.of(1, 3), new Instant(2)),
+            TimestampedValue.of(KV.of(1, 5), new Instant(11)),
+            TimestampedValue.of(KV.of(2, 2), new Instant(3)),
+            TimestampedValue.of(KV.of(2, 4), new Instant(11)),
+            TimestampedValue.of(KV.of(2, 6), new Instant(12))))
+        .apply(Window.into(FixedWindows.of(Duration.millis(10)))).apply(GroupByKey.create())
+    // do manual assertion for windows because Passert do not support multiple kv with same key (because multiple windows)
+        .apply(ParDo.of(new DoFn<KV<Integer, Iterable<Integer>>, KV<Integer, Iterable<Integer>>>() {
+
+          @ProcessElement public void processElement(ProcessContext context) {
+            KV<Integer, Iterable<Integer>> element = context.element();
+            if (element.getKey() == 1) {
+              if (Iterables.size(element.getValue()) == 2) {
+                assertThat(element.getValue(), containsInAnyOrder(1, 3)); // window [0-10)
+              } else {
+                assertThat(element.getValue(), containsInAnyOrder(5)); // window [10-20)
+              }
+            } else { //key == 2
+              if (Iterables.size(element.getValue()) == 2) {
+                assertThat(element.getValue(), containsInAnyOrder(4, 6)); // window [10-20)
+              } else {
+                assertThat(element.getValue(), containsInAnyOrder(2)); // window [0-10)
+              }
+            }
+            context.output(element);
+          }
+        }));
+    p.run();
+
+  }
+  @Test
   public void testGroupByKey() {
     List<KV<Integer, Integer>> elems = new ArrayList<>();
     elems.add(KV.of(1, 1));