You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/05/28 07:33:17 UTC
[beam] 02/06: Update windowAssignTest
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 982197ce985197e4153d79938db247c84d708fc7
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue May 7 09:35:46 2019 +0200
Update windowAssignTest
---
.../translation/batch/WindowAssignTest.java | 19 ++++++++-----------
1 file changed, 8 insertions(+), 11 deletions(-)
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
index 61da3ea..3011d88 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
@@ -27,13 +27,11 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -50,20 +48,19 @@ public class WindowAssignTest implements Serializable {
p = Pipeline.create(options);
}
- @Ignore
@Test
public void testWindowAssign() {
- PCollection<KV<Integer, Integer>> input =
+ PCollection<Integer> input =
p.apply(
Create.timestamped(
- TimestampedValue.of(KV.of(1, 1), new Instant(1)),
- TimestampedValue.of(KV.of(1, 2), new Instant(2)),
- TimestampedValue.of(KV.of(1, 3), new Instant(3)),
- TimestampedValue.of(KV.of(1, 4), new Instant(10)),
- TimestampedValue.of(KV.of(1, 5), new Instant(11))))
+ TimestampedValue.of(1, new Instant(1)),
+ TimestampedValue.of(2, new Instant(2)),
+ TimestampedValue.of(3, new Instant(3)),
+ TimestampedValue.of(4, new Instant(10)),
+ TimestampedValue.of(5, new Instant(11))))
.apply(Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(Sum.integersPerKey());
- PAssert.that(input).containsInAnyOrder(KV.of(1, 6), KV.of(1, 9));
+ .apply(Sum.integersGlobally().withoutDefaults());
+ PAssert.that(input).containsInAnyOrder(6, 9);
p.run();
}
}