You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/03/04 19:11:28 UTC

[33/50] [abbrv] incubator-beam git commit: Fixes the GroupAlsoByWindowTest.

Fixes the GroupAlsoByWindowTest.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/52614ea3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/52614ea3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/52614ea3

Branch: refs/heads/master
Commit: 52614ea36a7431d83f907d99d3fb251c2f2b3551
Parents: 69f7623
Author: kl0u <kk...@gmail.com>
Authored: Wed Mar 2 16:09:53 2016 +0100
Committer: Davor Bonaci <da...@users.noreply.github.com>
Committed: Fri Mar 4 10:04:23 2016 -0800

----------------------------------------------------------------------
 .../streaming/GroupAlsoByWindowTest.java        | 70 ++++++++++----------
 1 file changed, 36 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/52614ea3/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
index 434f827..01f9c32 100644
--- a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
+++ b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/streaming/GroupAlsoByWindowTest.java
@@ -116,7 +116,7 @@ public class GroupAlsoByWindowTest {
 						new Instant(initialTime + 1),
 						new IntervalWindow(new Instant(0), new Instant(2000)),
 						PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
-				, initialTime));
+				, initialTime + 1));
 		expectedOutput.add(new Watermark(initialTime + 2000));
 
 		expectedOutput.add(new StreamRecord<>(
@@ -124,14 +124,15 @@ public class GroupAlsoByWindowTest {
 						new Instant(initialTime + 1999),
 						new IntervalWindow(new Instant(0), new Instant(2000)),
 						PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 1, 1))
-				, initialTime));
+				, initialTime + 1999));
+
 
 		expectedOutput.add(new StreamRecord<>(
 				WindowedValue.of(KV.of("key1", 6),
 						new Instant(initialTime + 1999),
 						new IntervalWindow(new Instant(0), new Instant(2000)),
 						PaneInfo.createPane(false, false, PaneInfo.Timing.LATE, 2, 2))
-				, initialTime));
+				, initialTime + 1999));
 		expectedOutput.add(new Watermark(initialTime + 4000));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
@@ -181,7 +182,7 @@ public class GroupAlsoByWindowTest {
 						new Instant(initialTime + 1),
 						new IntervalWindow(new Instant(1), new Instant(5700)),
 						PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
-				, initialTime));
+				, initialTime + 1));
 		expectedOutput.add(new Watermark(initialTime + 6000));
 
 		expectedOutput.add(new StreamRecord<>(
@@ -189,7 +190,7 @@ public class GroupAlsoByWindowTest {
 						new Instant(initialTime + 6700),
 						new IntervalWindow(new Instant(1), new Instant(10900)),
 						PaneInfo.createPane(true, false, PaneInfo.Timing.ON_TIME, 0, 0))
-				, initialTime));
+				, initialTime + 6700));
 		expectedOutput.add(new Watermark(initialTime + 12000));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
@@ -210,13 +211,13 @@ public class GroupAlsoByWindowTest {
 						new Instant(initialTime + 5000),
 						new IntervalWindow(new Instant(0), new Instant(10000)),
 						PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
-				, initialTime));
+				, initialTime + 5000));
 		expectedOutput.add(new StreamRecord<>(
 				WindowedValue.of(KV.of("key1", 6),
 						new Instant(initialTime + 1),
 						new IntervalWindow(new Instant(-5000), new Instant(5000)),
 						PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
-				, initialTime));
+				, initialTime + 1));
 		expectedOutput.add(new Watermark(initialTime + 10000));
 
 		expectedOutput.add(new StreamRecord<>(
@@ -224,19 +225,19 @@ public class GroupAlsoByWindowTest {
 						new Instant(initialTime + 15000),
 						new IntervalWindow(new Instant(10000), new Instant(20000)),
 						PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
-				, initialTime));
+				, initialTime + 15000));
 		expectedOutput.add(new StreamRecord<>(
 				WindowedValue.of(KV.of("key1", 3),
 						new Instant(initialTime + 10000),
 						new IntervalWindow(new Instant(5000), new Instant(15000)),
 						PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
-				, initialTime));
+				, initialTime + 10000));
 		expectedOutput.add(new StreamRecord<>(
 				WindowedValue.of(KV.of("key2", 1),
 						new Instant(initialTime + 19500),
 						new IntervalWindow(new Instant(10000), new Instant(20000)),
 						PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
-				, initialTime));
+				, initialTime + 19500));
 		expectedOutput.add(new Watermark(initialTime + 20000));
 
 		expectedOutput.add(new StreamRecord<>(
@@ -250,13 +251,13 @@ public class GroupAlsoByWindowTest {
 						 */
 						new IntervalWindow(new Instant(15000), new Instant(25000)),
 						PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
-				, initialTime));
+				, initialTime + 20000));
 		expectedOutput.add(new StreamRecord<>(
 				WindowedValue.of(KV.of("key1", 8),
 						new Instant(initialTime + 20000),
 						new IntervalWindow(new Instant(15000), new Instant(25000)),
 						PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME))
-				, initialTime));
+				, initialTime + 20000));
 		expectedOutput.add(new Watermark(initialTime + 25000));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
@@ -272,13 +273,13 @@ public class GroupAlsoByWindowTest {
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6),
-				new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime));
+				new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 1));
 		expectedOutput.add(new Watermark(initialTime + 10000));
 
 		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11),
-				new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime));
+				new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 10000));
 		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
-				new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime));
+				new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
 		expectedOutput.add(new Watermark(initialTime + 20000));
 
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
@@ -295,13 +296,13 @@ public class GroupAlsoByWindowTest {
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-				new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime));
+				new Instant(initialTime + 1), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 1));
 		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-				new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime));
+				new Instant(initialTime + 10000), null, PaneInfo.createPane(true, true, PaneInfo.Timing.EARLY)), initialTime + 10000));
 		expectedOutput.add(new Watermark(initialTime + 10000));
 
 		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
-				new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0, 0)), initialTime));
+				new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME, 0, 0)), initialTime + 19500));
 		expectedOutput.add(new Watermark(initialTime + 20000));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
 
@@ -326,21 +327,21 @@ public class GroupAlsoByWindowTest {
 		 * */
 
 		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-				new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime));
+				new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1));
 		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-				new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime));
+				new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000));
 		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-				new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime));
+				new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500));
 
 		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1),
-				new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime));
+				new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200));
 
 		expectedOutput.add(new Watermark(initialTime + 10000));
 
 		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 1),
-				new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime));
+				new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500));
 		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
-				new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime));
+				new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
 
 		expectedOutput.add(new Watermark(initialTime + 20000));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
@@ -348,7 +349,7 @@ public class GroupAlsoByWindowTest {
 		testHarness.close();
 	}
 
-	// Disabled
+	@Test
 	public void testCompoundAccumulatingPanesProgram() throws Exception {
 		WindowingStrategy strategy = fixedWindowWithCompoundTriggerStrategyAcc;
 		long initialTime = 0L;
@@ -357,21 +358,21 @@ public class GroupAlsoByWindowTest {
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-				new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime));
+				new Instant(initialTime + 1), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 1));
 		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 5),
-				new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime));
+				new Instant(initialTime + 10000), null, PaneInfo.createPane(true, false, PaneInfo.Timing.EARLY)), initialTime + 10000));
 		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 10),
-				new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime));
+				new Instant(initialTime + 19500), null, PaneInfo.createPane(false, false, PaneInfo.Timing.EARLY, 1, -1)), initialTime + 19500));
 
 		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 6),
-				new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime));
+				new Instant(initialTime + 1200), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 1, 0)), initialTime + 1200));
 
 		expectedOutput.add(new Watermark(initialTime + 10000));
 
 		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key1", 11),
-				new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime));
+				new Instant(initialTime + 19500), null, PaneInfo.createPane(false, true, PaneInfo.Timing.ON_TIME, 2, 0)), initialTime + 19500));
 		expectedOutput.add(new StreamRecord<>(makeWindowedValue(strategy, KV.of("key2", 1),
-				new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime));
+				new Instant(initialTime + 19500), null, PaneInfo.createPane(true, true, PaneInfo.Timing.ON_TIME)), initialTime + 19500));
 
 		expectedOutput.add(new Watermark(initialTime + 20000));
 		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
@@ -434,11 +435,12 @@ public class GroupAlsoByWindowTest {
 				StreamRecord<WindowedValue<KV<String, Integer>>> sr0 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o1;
 				StreamRecord<WindowedValue<KV<String, Integer>>> sr1 = (StreamRecord<WindowedValue<KV<String, Integer>>>) o2;
 
-				if (sr0.getTimestamp() != sr1.getTimestamp()) {
-					return (int) (sr0.getTimestamp() - sr1.getTimestamp());
+				int comparison = (int) (sr0.getValue().getTimestamp().getMillis() - sr1.getValue().getTimestamp().getMillis());
+				if (comparison != 0) {
+					return comparison;
 				}
 
-				int comparison = sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey());
+				comparison = sr0.getValue().getValue().getKey().compareTo(sr1.getValue().getValue().getKey());
 				if(comparison == 0) {
 					comparison = Integer.compare(
 							sr0.getValue().getValue().getValue(),