You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/09 13:38:30 UTC
flink git commit: [FLINK-5285] Abort checkpoint only once in
BarrierTracker
Repository: flink
Updated Branches:
refs/heads/release-1.1 2b612f2d8 -> afaa27e9f
[FLINK-5285] Abort checkpoint only once in BarrierTracker
Prevent an interleaved sequence of cancellation markers for two consecutive checkpoints
to trigger a flood of cancellation markers for down stream operators. This is done by
aborting each checkpoint only once and don't re-create checkpoint barrier counts for already
aborted checkpoints.
Add test case
This closes #2964.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/afaa27e9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/afaa27e9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/afaa27e9
Branch: refs/heads/release-1.1
Commit: afaa27e9faeb0352a49f30de90e719572caa97c5
Parents: 2b612f2
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Dec 7 19:05:47 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Dec 9 14:37:31 2016 +0100
----------------------------------------------------------------------
.../streaming/runtime/io/BarrierBuffer.java | 5 ++-
.../streaming/runtime/io/BarrierTracker.java | 25 ++++++++-----
.../runtime/io/BarrierTrackerTest.java | 37 ++++++++++++++++++++
3 files changed, 57 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/afaa27e9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 5a60439..b71b564 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -312,6 +312,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
currentCheckpointId = barrierId;
startOfAlignmentTimestamp = 0L;
latestAlignmentDurationNanos = 0L;
+
+ notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));
+
notifyAbortOnCancellationBarrier(barrierId);
}
@@ -415,7 +418,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
startOfAlignmentTimestamp = System.nanoTime();
if (LOG.isDebugEnabled()) {
- LOG.debug("Starting stream alignment for checkpoint " + checkpointId);
+ LOG.debug("Starting stream alignment for checkpoint " + checkpointId + '.');
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/afaa27e9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 59b408d..fbe3042 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -210,6 +210,11 @@ public class BarrierTracker implements CheckpointBarrierHandler {
CheckpointBarrierCount cbc;
while ((cbc = pendingCheckpoints.peekFirst()) != null && cbc.checkpointId() < checkpointId) {
pendingCheckpoints.removeFirst();
+
+ if (cbc.markAborted()) {
+ // abort the subsumed checkpoints if not already done
+ notifyAbort(cbc.checkpointId());
+ }
}
if (cbc != null && cbc.checkpointId() == checkpointId) {
@@ -225,17 +230,19 @@ public class BarrierTracker implements CheckpointBarrierHandler {
pendingCheckpoints.removeFirst();
}
}
- else {
+ else if (checkpointId > latestPendingCheckpointID) {
notifyAbort(checkpointId);
- // first barrier for this checkpoint - remember it as aborted
- // since we polled away all entries with lower checkpoint IDs
- // this entry will become the new first entry
- if (pendingCheckpoints.size() < MAX_CHECKPOINTS_TO_TRACK) {
- CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId);
- abortedMarker.markAborted();
- pendingCheckpoints.addFirst(abortedMarker);
- }
+ latestPendingCheckpointID = checkpointId;
+
+ CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId);
+ abortedMarker.markAborted();
+ pendingCheckpoints.addFirst(abortedMarker);
+
+ // we have removed all other pending checkpoint barrier counts --> no need to check that
+ // we don't exceed the maximum checkpoints to track
+ } else {
+ // trailing cancellation barrier which was already cancelled
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/afaa27e9/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 978c212..729afe8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -35,6 +35,11 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
/**
* Tests for the behavior of the barrier tracker.
@@ -425,6 +430,38 @@ public class BarrierTrackerTest {
assertTrue(tracker.isEmpty());
}
+
+ /**
+ * Tests that each checkpoint is only aborted once in case of an interleaved cancellation
+ * barrier arrival of two consecutive checkpoints.
+ */
+ @Test
+ public void testInterleavedCancellationBarriers() throws Exception {
+ BufferOrEvent[] sequence = {
+ createBarrier(1L, 0),
+ createCancellationBarrier(2L, 0),
+ createCancellationBarrier(1L, 1),
+ createCancellationBarrier(2L, 1),
+ createCancellationBarrier(1L, 2),
+ createCancellationBarrier(2L, 2),
+ createBuffer(0)
+ };
+
+ MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+ BarrierTracker tracker = new BarrierTracker(gate);
+ StatefulTask statefulTask = mock(StatefulTask.class);
+
+ tracker.registerCheckpointEventHandler(statefulTask);
+
+ for (BufferOrEvent boe : sequence) {
+ if (boe.isBuffer() || (boe.getEvent().getClass() != CheckpointBarrier.class && boe.getEvent().getClass() != CancelCheckpointMarker.class)) {
+ assertEquals(boe, tracker.getNextNonBlocked());
+ }
+ }
+
+ verify(statefulTask, times(1)).abortCheckpointOnBarrier(eq(1L), any(Throwable.class));
+ verify(statefulTask, times(1)).abortCheckpointOnBarrier(eq(2L), any(Throwable.class));
+ }
// ------------------------------------------------------------------------
// Utils