You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:28 UTC
[33/50] [abbrv] incubator-beam git commit: Fixes the
GroupAlsoByWindowTest.
Fixes the GroupAlsoByWindowTest.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/52614ea3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/52614ea3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/52614ea3
Branch: refs/heads/master
Commit: 52614ea36a7431d83f907d99d3fb251c2f2b3551
Parents: 69f7623
Author: kl0u <kk...@gmail.com>
Authored: Wed Mar 2 16:09:53 2016 +0100
Committer: Davor Bonaci <da...@users.noreply.github.com>
Committed: Fri Mar 4 10:04:23 2016 -0800
----------------------------------------------------------------------
.../streaming/GroupAlsoByWindowTest.java | 70 ++++++++++----------
1 file changed, 36 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/52614ea3/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
index 434f827..01f9c32 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
@@ -116,7 +116,7 @@ public class GroupAlsoByWindowTest {
new Instant(initialTime + 1),
new IntervalWindow(new Instant(0), new Instant(2000)),
PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
- , initialTime));
+ , initialTime + 1));
expectedOutput.add(new Watermark(initialTime + 2000));
expectedOutput.add(new StreamRecord<>(
@@ -124,14 +124,15 @@ public class GroupAlsoByWindowTest {
new Instant(initialTime + 1999),
new IntervalWindow(new Instant(0), new Instant(2000)),
PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 1, 1))
- , initialTime));
+ , initialTime + 1999));
+
expectedOutput.add(new StreamRecord<>(
WindowedValue.of(KV.of("key1", 6),
new Instant(initialTime + 1999),
new IntervalWindow(new Instant(0), new Instant(2000)),
PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 2, 2))
- , initialTime));
+ , initialTime + 1999));
expectedOutput.add(new Watermark(initialTime + 4000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
@@ -181,7 +182,7 @@ public class GroupAlsoByWindowTest {
new Instant(initialTime + 1),
new IntervalWindow(new Instant(1), new Instant(5700)),
PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
- , initialTime));
+ , initialTime + 1));
expectedOutput.add(new Watermark(initialTime + 6000));
expectedOutput.add(new StreamRecord<>(
@@ -189,7 +190,7 @@ public class GroupAlsoByWindowTest {
new Instant(initialTime + 6700),
new IntervalWindow(new Instant(1), new Instant(10900)),
PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
- , initialTime));
+ , initialTime + 6700));
expectedOutput.add(new Watermark(initialTime + 12000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
@@ -210,13 +211,13 @@ public class GroupAlsoByWindowTest {
new Instant(initialTime + 5000),
new IntervalWindow(new Instant(0), new Instant(10000)),
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
- , initialTime));
+ , initialTime + 5000));
expectedOutput.add(new StreamRecord<>(
WindowedValue.of(KV.of("key1", 6),
new Instant(initialTime + 1),
new IntervalWindow(new Instant(-5000), new Instant(5000)),
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
- , initialTime));
+ , initialTime + 1));
expectedOutput.add(new Watermark(initialTime + 10000));
expectedOutput.add(new StreamRecord<>(
@@ -224,19 +225,19 @@ public class GroupAlsoByWindowTest {
new Instant(initialTime + 15000),
new IntervalWindow(new Instant(10000), new Instant(20000)),
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
- , initialTime));
+ , initialTime + 15000));
expectedOutput.add(new StreamRecord<>(
WindowedValue.of(KV.of("key1", 3),
new Instant(initialTime + 10000),
new IntervalWindow(new Instant(5000), new Instant(15000)),
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
- , initialTime));
+ , initialTime + 10000));
expectedOutput.add(new StreamRecord<>(
WindowedValue.of(KV.of("key2", 1),
new Instant(initialTime + 19500),
new IntervalWindow(new Instant(10000), new Instant(20000)),
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
- , initialTime));
+ , initialTime + 19500));
expectedOutput.add(new Watermark(initialTime + 20000));
expectedOutput.add(new StreamRecord<>(
@@ -250,13 +251,13 @@ public class GroupAlsoByWindowTest {
*/
new IntervalWindow(new Instant(15000), new Instant(25000)),
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
- , initialTime));
+ , initialTime + 20000));
expectedOutput.add(new StreamRecord<>(
WindowedValue.of(KV.of("key1", 8),
new Instant(initialTime + 20000),
new IntervalWindow(new Instant(15000), new Instant(25000)),
PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
- , initialTime));
+ , initialTime + 20000));
expectedOutput.add(new Watermark(initialTime + 25000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
@@ -272,13 +273,13 @@ public class GroupAlsoByWindowTest {
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6),
- new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime));
+ new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 1));
expectedOutput.add(new Watermark(initialTime + 10000));
expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11),
- new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime));
+ new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 10000));
expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
- new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime));
+ new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
expectedOutput.add(new Watermark(initialTime + 20000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
@@ -295,13 +296,13 @@ public class GroupAlsoByWindowTest {
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
- new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime));
+ new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 1));
expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
- new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime));
+ new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 10000));
expectedOutput.add(new Watermark(initialTime + 10000));
expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
- new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0, 0)), initialTime));
+ new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0, 0)), initialTime + 19500));
expectedOutput.add(new Watermark(initialTime + 20000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
@@ -326,21 +327,21 @@ public class GroupAlsoByWindowTest {
* */
expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
- new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime));
+ new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1));
expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
- new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime));
+ new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000));
expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
- new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime));
+ new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500));
expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1),
- new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime));
+ new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200));
expectedOutput.add(new Watermark(initialTime + 10000));
expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1),
- new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime));
+ new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500));
expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
- new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime));
+ new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
expectedOutput.add(new Watermark(initialTime + 20000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
@@ -348,7 +349,7 @@ public class GroupAlsoByWindowTest {
testHarness.close();
}
- // Disabled
+ @Test
public void testCompoundAccumulatingPanesProgram() throws Exception {
WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategyAcc;
long initialTime = 0L;
@@ -357,21 +358,21 @@ public class GroupAlsoByWindowTest {
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
- new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime));
+ new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1));
expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
- new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime));
+ new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000));
expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 10),
- new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime));
+ new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500));
expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6),
- new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime));
+ new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200));
expectedOutput.add(new Watermark(initialTime + 10000));
expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11),
- new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime));
+ new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500));
expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
- new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime));
+ new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
expectedOutput.add(new Watermark(initialTime + 20000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
@@ -434,11 +435,12 @@ public class GroupAlsoByWindowTest {
StreamRecord<WindowedValue<KV<String, Integer>>> sr0 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o1;
StreamRecord<WindowedValue<KV<String, Integer>>> sr1 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o2;
- if (sr0.getTimestamp() != sr1.getTimestamp()) {
- return (int) (sr0.getTimestamp() - sr1.getTimestamp());
+ int comparison = (int) (sr0.getValue().getTimestamp().getMillis() - sr1.getValue().getTimestamp().getMillis());
+ if (comparison != 0) {
+ return comparison;
}
- int comparison = sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey());
+ comparison = sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey());
if(comparison == 0) {
comparison = Integer.compare(
sr0.getValue().getValue().getValue(),