You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/10 10:26:00 UTC

[3/5] flink git commit: [hotfix] [tests] Increase robustness of Fast Time Window Operator Tests

[hotfix] [tests] Increase robustness of Fast Time Window Operator Tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/877c267b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/877c267b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/877c267b

Branch: refs/heads/master
Commit: 877c267b8f7d7f63b07598e1536c7b42567c8a8b
Parents: ed96cb5
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Oct 10 11:19:43 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Oct 10 12:17:04 2016 +0200

----------------------------------------------------------------------
 ...ulatingAlignedProcessingTimeWindowOperatorTest.java |  8 ++++++++
 ...egatingAlignedProcessingTimeWindowOperatorTest.java | 13 +++++++++++++
 2 files changed, 21 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/877c267b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 2f687f6..c82392a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -264,6 +264,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			// get and verify the result
 			out.waitForNElements(numElements, 60_000);
 
+			timerService.quiesceAndAwaitPending();
+
 			synchronized (lock) {
 				op.close();
 			}
@@ -322,6 +324,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 				Thread.sleep(1);
 			}
 
+			timerService.quiesceAndAwaitPending();
+
 			synchronized (lock) {
 				op.close();
 			}
@@ -407,6 +411,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			Collections.sort(result);
 			assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result);
 
+			timerService.quiesceAndAwaitPending();
+
 			synchronized (lock) {
 				op.close();
 			}
@@ -463,6 +469,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			Collections.sort(result);
 			assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
 
+			timerService.quiesceAndAwaitPending();
+
 			synchronized (lock) {
 				op.close();
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/877c267b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index cd82a9c..12a842f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -277,6 +277,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			List<Tuple2<Integer, Integer>> result = out.getElements();
 			assertEquals(numElements, result.size());
 
+			timerService.quiesceAndAwaitPending();
+
 			synchronized (lock) {
 				op.close();
 			}
@@ -352,6 +354,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 			List<Tuple2<Integer, Integer>> result = out.getElements();
 
+			timerService.quiesceAndAwaitPending();
+
 			synchronized (lock) {
 				op.close();
 			}
@@ -414,6 +418,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 				Thread.sleep(1);
 			}
 
+			timerService.quiesceAndAwaitPending();
+
 			synchronized (lock) {
 				op.close();
 			}
@@ -508,6 +514,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 					new Tuple2<>(2, 2)
 			), result);
 
+			timerService.quiesceAndAwaitPending();
+
 			synchronized (lock) {
 				op.close();
 			}
@@ -571,6 +579,11 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 				assertTrue(e.getMessage().contains("Artificial Test Exception"));
 			}
 
+			timerService.quiesceAndAwaitPending();
+			synchronized (lock) {
+				op.close();
+			}
+
 			shutdownTimerServiceAndWait(timerService);
 			op.dispose();