You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/09 13:38:30 UTC

flink git commit: [FLINK-5285] Abort checkpoint only once in BarrierTracker

Repository: flink
Updated Branches:
  refs/heads/release-1.1 2b612f2d8 -> afaa27e9f


[FLINK-5285] Abort checkpoint only once in BarrierTracker

Prevent an interleaved sequence of cancellation markers for two consecutive checkpoints
to trigger a flood of cancellation markers for down stream operators. This is done by
aborting each checkpoint only once and don't re-create checkpoint barrier counts for already
aborted checkpoints.

Add test case

This closes #2964.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/afaa27e9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/afaa27e9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/afaa27e9

Branch: refs/heads/release-1.1
Commit: afaa27e9faeb0352a49f30de90e719572caa97c5
Parents: 2b612f2
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Dec 7 19:05:47 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Dec 9 14:37:31 2016 +0100

----------------------------------------------------------------------
 .../streaming/runtime/io/BarrierBuffer.java     |  5 ++-
 .../streaming/runtime/io/BarrierTracker.java    | 25 ++++++++-----
 .../runtime/io/BarrierTrackerTest.java          | 37 ++++++++++++++++++++
 3 files changed, 57 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/afaa27e9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 5a60439..b71b564 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -312,6 +312,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				currentCheckpointId = barrierId;
 				startOfAlignmentTimestamp = 0L;
 				latestAlignmentDurationNanos = 0L;
+
+				notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));
+
 				notifyAbortOnCancellationBarrier(barrierId);
 			}
 
@@ -415,7 +418,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		startOfAlignmentTimestamp = System.nanoTime();
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Starting stream alignment for checkpoint " + checkpointId);
+			LOG.debug("Starting stream alignment for checkpoint " + checkpointId + '.');
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/afaa27e9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 59b408d..fbe3042 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -210,6 +210,11 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 		CheckpointBarrierCount cbc;
 		while ((cbc = pendingCheckpoints.peekFirst()) != null && cbc.checkpointId() < checkpointId) {
 			pendingCheckpoints.removeFirst();
+
+			if (cbc.markAborted()) {
+				// abort the subsumed checkpoints if not already done
+				notifyAbort(cbc.checkpointId());
+			}
 		}
 
 		if (cbc != null && cbc.checkpointId() == checkpointId) {
@@ -225,17 +230,19 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 				pendingCheckpoints.removeFirst();
 			}
 		}
-		else {
+		else if (checkpointId > latestPendingCheckpointID) {
 			notifyAbort(checkpointId);
 
-			// first barrier for this checkpoint - remember it as aborted
-			// since we polled away all entries with lower checkpoint IDs
-			// this entry will become the new first entry
-			if (pendingCheckpoints.size() < MAX_CHECKPOINTS_TO_TRACK) {
-				CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId);
-				abortedMarker.markAborted();
-				pendingCheckpoints.addFirst(abortedMarker);
-			}
+			latestPendingCheckpointID = checkpointId;
+
+			CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId);
+			abortedMarker.markAborted();
+			pendingCheckpoints.addFirst(abortedMarker);
+
+			// we have removed all other pending checkpoint barrier counts --> no need to check that
+			// we don't exceed the maximum checkpoints to track
+		} else {
+			// trailing cancellation barrier which was already cancelled
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/afaa27e9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 978c212..729afe8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -35,6 +35,11 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 /**
  * Tests for the behavior of the barrier tracker.
@@ -425,6 +430,38 @@ public class BarrierTrackerTest {
 
 		assertTrue(tracker.isEmpty());
 	}
+
+	/**
+	 * Tests that each checkpoint is only aborted once in case of an interleaved cancellation
+	 * barrier arrival of two consecutive checkpoints.
+	 */
+	@Test
+	public void testInterleavedCancellationBarriers() throws Exception {
+		BufferOrEvent[] sequence = {
+			createBarrier(1L, 0),
+			createCancellationBarrier(2L, 0),
+			createCancellationBarrier(1L, 1),
+			createCancellationBarrier(2L, 1),
+			createCancellationBarrier(1L, 2),
+			createCancellationBarrier(2L, 2),
+			createBuffer(0)
+		};
+
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		BarrierTracker tracker = new BarrierTracker(gate);
+		StatefulTask statefulTask = mock(StatefulTask.class);
+
+		tracker.registerCheckpointEventHandler(statefulTask);
+
+		for (BufferOrEvent boe : sequence) {
+			if (boe.isBuffer() || (boe.getEvent().getClass() != CheckpointBarrier.class && boe.getEvent().getClass() != CancelCheckpointMarker.class)) {
+				assertEquals(boe, tracker.getNextNonBlocked());
+			}
+		}
+
+		verify(statefulTask, times(1)).abortCheckpointOnBarrier(eq(1L), any(Throwable.class));
+		verify(statefulTask, times(1)).abortCheckpointOnBarrier(eq(2L), any(Throwable.class));
+	}
 	
 	// ------------------------------------------------------------------------
 	//  Utils