You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/22 10:58:43 UTC

flink git commit: [hotfix] Fix shaky EventTimeAllWindowCheckpointITCase

Repository: flink
Updated Branches:
  refs/heads/master b198c33a7 -> 42a4df2d4


[hotfix] Fix shaky EventTimeAllWindowCheckpointITCase

In very rare cases it could happen that a checkpoint would be performed
after the ValidatingSink signaled that it had seen all expected
elements. If this happened the job would be restarted with the already
complete state and we would never finish since no more elements would
arrive.

This adds a check in open() of ValidatingSink that signals success if we
already have the final state.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42a4df2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42a4df2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42a4df2d

Branch: refs/heads/master
Commit: 42a4df2d475910a9092a5f7251d4897d00b4fba9
Parents: b198c33
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Oct 21 15:06:42 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Oct 22 10:56:41 2015 +0200

----------------------------------------------------------------------
 .../EventTimeAllWindowCheckpointingITCase.java  | 25 ++++++++++++++------
 .../EventTimeWindowCheckpointingITCase.java     |  5 ----
 2 files changed, 18 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/42a4df2d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
index 2733349..84022f0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
@@ -432,17 +432,12 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 					synchronized (ctx.getCheckpointLock()) {
 						int next = numElementsEmitted++;
 						for (long i = 0; i < numKeys; i++) {
-							ctx.collectWithTimestamp(new Tuple2<Long, IntType>(i, new IntType(next)), next);
+							ctx.collectWithTimestamp(new Tuple2<>(i, new IntType(next)), next);
 						}
 						ctx.emitWatermark(new Watermark(next));
 					}
 				}
 				else {
-					// exit at some point so that we don't deadlock
-					if (numElementsEmitted > numElementsToEmit * 5) {
-//						running = false;
-						System.err.println("Succ Checkpoints: " + numSuccessfulCheckpoints + " numElemEmitted: " + numElementsEmitted + "num elements to emit: " + numElementsToEmit);
-					}
 					// if our work is done, delay a bit to prevent busy waiting
 					Thread.sleep(1);
 				}
@@ -491,6 +486,22 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 		public void open(Configuration parameters) throws Exception {
 			// this sink can only work with DOP 1
 			assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks());
+
+			// it can happen that a checkpoint happens when the complete success state is
+			// already set. In that case we restart with the final state and would never
+			// finish because no more elements arrive.
+			if (windowCounts.size() == numKeys) {
+				boolean seenAll = true;
+				for (Integer windowCount: windowCounts.values()) {
+					if (windowCount != numWindowsExpected) {
+						seenAll = false;
+						break;
+					}
+				}
+				if (seenAll) {
+					throw new SuccessException();
+				}
+			}
 		}
 
 		@Override
@@ -498,7 +509,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger {
 			boolean seenAll = true;
 			if (windowCounts.size() == numKeys) {
 				for (Integer windowCount: windowCounts.values()) {
-					if (windowCount < numWindowsExpected) {
+					if (windowCount != numWindowsExpected) {
 						seenAll = false;
 						break;
 					}

http://git-wip-us.apache.org/repos/asf/flink/blob/42a4df2d/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
index 4d1d2c3..6cf04f5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
@@ -439,11 +439,6 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger {
 					}
 				}
 				else {
-					// exit at some point so that we don't deadlock
-					if (numElementsEmitted > numElementsToEmit * 5) {
-//						running = false;
-						System.err.println("Succ Checkpoints: " + numSuccessfulCheckpoints + " numElemEmitted: " + numElementsEmitted + "num elements to emit: " + numElementsToEmit);
-					}
 
 					// if our work is done, delay a bit to prevent busy waiting
 					Thread.sleep(1);