You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/08 20:25:07 UTC
[1/4] flink git commit: [FLINK-4984] [checkpointing] Add Cancellation
Barriers as a way to signal aborted checkpoints
Repository: flink
Updated Branches:
refs/heads/release-1.1 4dd3efea4 -> 0962cb6f4
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index d4fdc59..cf1f98e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -18,16 +18,18 @@
package org.apache.flink.streaming.runtime.io;
-import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.state.StateHandle;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -35,6 +37,7 @@ import org.junit.Test;
import java.io.File;
import java.util.Arrays;
+import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -42,15 +45,23 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
/**
* Tests for the behavior of the {@link BarrierBuffer}.
*/
public class BarrierBufferTest {
+ private static final Random RND = new Random();
+
private static final int PAGE_SIZE = 512;
-
+
private static int SIZE_COUNTER = 0;
-
+
private static IOManager IO_MANAGER;
@BeforeClass
@@ -86,7 +97,9 @@ public class BarrierBufferTest {
for (BufferOrEvent boe : sequence) {
assertEquals(boe, buffer.getNextNonBlocked());
}
-
+
+ assertEquals(0L, buffer.getAlignmentDurationNanos());
+
assertNull(buffer.getNextNonBlocked());
assertNull(buffer.getNextNonBlocked());
@@ -120,6 +133,8 @@ public class BarrierBufferTest {
assertEquals(boe, buffer.getNextNonBlocked());
}
+ assertEquals(0L, buffer.getAlignmentDurationNanos());
+
assertNull(buffer.getNextNonBlocked());
assertNull(buffer.getNextNonBlocked());
@@ -222,13 +237,15 @@ public class BarrierBufferTest {
ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
buffer.registerCheckpointEventHandler(handler);
handler.setNextExpectedCheckpointId(1L);
-
+
// pre checkpoint 1
check(sequence[0], buffer.getNextNonBlocked());
check(sequence[1], buffer.getNextNonBlocked());
check(sequence[2], buffer.getNextNonBlocked());
assertEquals(1L, handler.getNextExpectedCheckpointId());
+ long startTs = System.nanoTime();
+
// blocking while aligning for checkpoint 1
check(sequence[7], buffer.getNextNonBlocked());
assertEquals(1L, handler.getNextExpectedCheckpointId());
@@ -236,6 +253,8 @@ public class BarrierBufferTest {
// checkpoint 1 done, returning buffered data
check(sequence[5], buffer.getNextNonBlocked());
assertEquals(2L, handler.getNextExpectedCheckpointId());
+ validateAlignmentTime(startTs, buffer);
+
check(sequence[6], buffer.getNextNonBlocked());
// pre checkpoint 2
@@ -245,10 +264,13 @@ public class BarrierBufferTest {
check(sequence[12], buffer.getNextNonBlocked());
check(sequence[13], buffer.getNextNonBlocked());
assertEquals(2L, handler.getNextExpectedCheckpointId());
-
+
// checkpoint 2 barriers come together
+ startTs = System.nanoTime();
check(sequence[17], buffer.getNextNonBlocked());
assertEquals(3L, handler.getNextExpectedCheckpointId());
+ validateAlignmentTime(startTs, buffer);
+
check(sequence[18], buffer.getNextNonBlocked());
// checkpoint 3 starts, data buffered
@@ -257,7 +279,7 @@ public class BarrierBufferTest {
check(sequence[21], buffer.getNextNonBlocked());
// checkpoint 4 happens without extra data
-
+
// pre checkpoint 5
check(sequence[27], buffer.getNextNonBlocked());
assertEquals(5L, handler.getNextExpectedCheckpointId());
@@ -301,7 +323,7 @@ public class BarrierBufferTest {
BufferOrEvent[] sequence = {
createBuffer(0), createBuffer(1), createBuffer(2),
createBarrier(1, 1), createBarrier(1, 2), createBarrier(1, 0),
-
+
createBuffer(2), createBuffer(1), createBuffer(0),
createBarrier(2, 1),
createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2),
@@ -327,12 +349,14 @@ public class BarrierBufferTest {
assertEquals(2L, handler.getNextExpectedCheckpointId());
check(sequence[7], buffer.getNextNonBlocked());
check(sequence[8], buffer.getNextNonBlocked());
-
+
// checkpoint 2 alignment
+ long startTs = System.nanoTime();
check(sequence[13], buffer.getNextNonBlocked());
check(sequence[14], buffer.getNextNonBlocked());
check(sequence[18], buffer.getNextNonBlocked());
check(sequence[19], buffer.getNextNonBlocked());
+ validateAlignmentTime(startTs, buffer);
// end of stream: remaining buffered contents
check(sequence[10], buffer.getNextNonBlocked());
@@ -343,7 +367,7 @@ public class BarrierBufferTest {
assertNull(buffer.getNextNonBlocked());
assertNull(buffer.getNextNonBlocked());
-
+
buffer.cleanup();
checkNoTempFilesRemain();
@@ -389,7 +413,7 @@ public class BarrierBufferTest {
createBarrier(3, 2),
createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0),
createBarrier(6, 1),
-
+
// complete checkpoint 4, checkpoint 5 remains not fully triggered
createBarrier(4, 2),
createBuffer(2),
@@ -419,12 +443,14 @@ public class BarrierBufferTest {
// alignment of checkpoint 2 - buffering also some barriers for
// checkpoints 3 and 4
+ long startTs = System.nanoTime();
check(sequence[13], buffer.getNextNonBlocked());
check(sequence[20], buffer.getNextNonBlocked());
check(sequence[23], buffer.getNextNonBlocked());
-
+
// checkpoint 2 completed
check(sequence[12], buffer.getNextNonBlocked());
+ validateAlignmentTime(startTs, buffer);
check(sequence[25], buffer.getNextNonBlocked());
check(sequence[27], buffer.getNextNonBlocked());
check(sequence[30], buffer.getNextNonBlocked());
@@ -507,36 +533,53 @@ public class BarrierBufferTest {
MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
- ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
- buffer.registerCheckpointEventHandler(handler);
- handler.setNextExpectedCheckpointId(1L);
+ StatefulTask<?> toNotify = mock(StatefulTask.class);
+ buffer.registerCheckpointEventHandler(toNotify);
- // checkpoint 1
+ long startTs;
+
+ // initial data
check(sequence[0], buffer.getNextNonBlocked());
check(sequence[1], buffer.getNextNonBlocked());
check(sequence[2], buffer.getNextNonBlocked());
+
+ // align checkpoint 1
+ startTs = System.nanoTime();
check(sequence[7], buffer.getNextNonBlocked());
assertEquals(1L, buffer.getCurrentCheckpointId());
-
+
+ // checkpoint done - replay buffered
check(sequence[5], buffer.getNextNonBlocked());
+ validateAlignmentTime(startTs, buffer);
+ verify(toNotify).triggerCheckpointOnBarrier(eq(1L), anyLong());
check(sequence[6], buffer.getNextNonBlocked());
+
check(sequence[9], buffer.getNextNonBlocked());
check(sequence[10], buffer.getNextNonBlocked());
// alignment of checkpoint 2
+ startTs = System.nanoTime();
check(sequence[13], buffer.getNextNonBlocked());
- assertEquals(2L, buffer.getCurrentCheckpointId());
check(sequence[15], buffer.getNextNonBlocked());
// checkpoint 2 aborted, checkpoint 3 started
check(sequence[12], buffer.getNextNonBlocked());
assertEquals(3L, buffer.getCurrentCheckpointId());
+ validateAlignmentTime(startTs, buffer);
+ verify(toNotify).abortCheckpointOnBarrier(2L);
check(sequence[16], buffer.getNextNonBlocked());
+
+ // checkpoint 3 alignment in progress
check(sequence[19], buffer.getNextNonBlocked());
- check(sequence[20], buffer.getNextNonBlocked());
-
+
// checkpoint 3 aborted (end of partition)
+ check(sequence[20], buffer.getNextNonBlocked());
+ verify(toNotify).abortCheckpointOnBarrier(3L);
+
+ // replay buffered data from checkpoint 3
check(sequence[18], buffer.getNextNonBlocked());
+
+ // all the remaining messages
check(sequence[21], buffer.getNextNonBlocked());
check(sequence[22], buffer.getNextNonBlocked());
check(sequence[23], buffer.getNextNonBlocked());
@@ -613,17 +656,21 @@ public class BarrierBufferTest {
check(sequence[19], buffer.getNextNonBlocked());
check(sequence[21], buffer.getNextNonBlocked());
+ long startTs = System.nanoTime();
+
// checkpoint 2 aborted, checkpoint 4 started. replay buffered
check(sequence[12], buffer.getNextNonBlocked());
assertEquals(4L, buffer.getCurrentCheckpointId());
check(sequence[16], buffer.getNextNonBlocked());
check(sequence[18], buffer.getNextNonBlocked());
check(sequence[22], buffer.getNextNonBlocked());
-
+
// align checkpoint 4 remainder
check(sequence[25], buffer.getNextNonBlocked());
check(sequence[26], buffer.getNextNonBlocked());
-
+
+ validateAlignmentTime(startTs, buffer);
+
// checkpoint 4 aborted (due to end of partition)
check(sequence[24], buffer.getNextNonBlocked());
check(sequence[27], buffer.getNextNonBlocked());
@@ -862,9 +909,9 @@ public class BarrierBufferTest {
assertNull(buffer.getNextNonBlocked());
assertNull(buffer.getNextNonBlocked());
-
+
buffer.cleanup();
-
+
checkNoTempFilesRemain();
}
catch (Exception e) {
@@ -874,26 +921,480 @@ public class BarrierBufferTest {
}
@Test
- public void testEndOfStreamWhileCheckpoint() {
+ public void testEndOfStreamWhileCheckpoint() throws Exception {
+ BufferOrEvent[] sequence = {
+ // one checkpoint
+ createBarrier(1, 0), createBarrier(1, 1), createBarrier(1, 2),
+
+ // some buffers
+ createBuffer(0), createBuffer(0), createBuffer(2),
+
+ // start the checkpoint that will be incomplete
+ createBarrier(2, 2), createBarrier(2, 0),
+ createBuffer(0), createBuffer(2), createBuffer(1),
+
+ // close one after the barrier one before the barrier
+ createEndOfPartition(2), createEndOfPartition(1),
+ createBuffer(0),
+
+ // final end of stream
+ createEndOfPartition(0)
+ };
+
+ MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ // data after first checkpoint
+ check(sequence[3], buffer.getNextNonBlocked());
+ check(sequence[4], buffer.getNextNonBlocked());
+ check(sequence[5], buffer.getNextNonBlocked());
+ assertEquals(1L, buffer.getCurrentCheckpointId());
+
+ // alignment of second checkpoint
+ check(sequence[10], buffer.getNextNonBlocked());
+ assertEquals(2L, buffer.getCurrentCheckpointId());
+
+ // first end-of-partition encountered: checkpoint will not be completed
+ check(sequence[12], buffer.getNextNonBlocked());
+ check(sequence[8], buffer.getNextNonBlocked());
+ check(sequence[9], buffer.getNextNonBlocked());
+ check(sequence[11], buffer.getNextNonBlocked());
+ check(sequence[13], buffer.getNextNonBlocked());
+ check(sequence[14], buffer.getNextNonBlocked());
+
+ // all done
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+
+ buffer.cleanup();
+
+ checkNoTempFilesRemain();
+ }
+
+ @Test
+ public void testSingleChannelAbortCheckpoint() throws Exception {
+ BufferOrEvent[] sequence = {
+ createBuffer(0),
+ createBarrier(1, 0),
+ createBuffer(0),
+ createBarrier(2, 0),
+ createCancellationBarrier(4, 0),
+ createBarrier(5, 0),
+ createBuffer(0),
+ createCancellationBarrier(6, 0),
+ createBuffer(0)
+ };
+
+ MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ StatefulTask<?> toNotify = mock(StatefulTask.class);
+ buffer.registerCheckpointEventHandler(toNotify);
+
+ check(sequence[0], buffer.getNextNonBlocked());
+ check(sequence[2], buffer.getNextNonBlocked());
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(1L), anyLong());
+ assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+ check(sequence[6], buffer.getNextNonBlocked());
+ assertEquals(5L, buffer.getCurrentCheckpointId());
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(2L), anyLong());
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(5L), anyLong());
+ assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+ check(sequence[8], buffer.getNextNonBlocked());
+ assertEquals(6L, buffer.getCurrentCheckpointId());
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+ assertEquals(0L, buffer.getAlignmentDurationNanos());
+ buffer.cleanup();
+ checkNoTempFilesRemain();
+ }
+
+ @Test
+ public void testMultiChannelAbortCheckpoint() throws Exception {
+ BufferOrEvent[] sequence = {
+ // some buffers and a successful checkpoint
+ /* 0 */ createBuffer(0), createBuffer(2), createBuffer(0),
+ /* 3 */ createBarrier(1, 1), createBarrier(1, 2),
+ /* 5 */ createBuffer(2), createBuffer(1),
+ /* 7 */ createBarrier(1, 0),
+ /* 8 */ createBuffer(0), createBuffer(2),
+
+ // aborted on last barrier
+ /* 10 */ createBarrier(2, 0), createBarrier(2, 2),
+ /* 12 */ createBuffer(0), createBuffer(2),
+ /* 14 */ createCancellationBarrier(2, 1),
+
+ // successful checkpoint
+ /* 15 */ createBuffer(2), createBuffer(1),
+ /* 17 */ createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0),
+
+ // abort on first barrier
+ /* 20 */ createBuffer(0), createBuffer(1),
+ /* 22 */ createCancellationBarrier(4, 1), createBarrier(4, 2),
+ /* 24 */ createBuffer(0),
+ /* 25 */ createBarrier(4, 0),
+
+ // another successful checkpoint
+ /* 26 */ createBuffer(0), createBuffer(1), createBuffer(2),
+ /* 29 */ createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0),
+ /* 32 */ createBuffer(0), createBuffer(1),
+
+ // abort multiple cancellations and a barrier after the cancellations
+ /* 34 */ createCancellationBarrier(6, 1), createCancellationBarrier(6, 2),
+ /* 36 */ createBarrier(6, 0),
+
+ /* 37 */ createBuffer(0)
+ };
+
+ MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ StatefulTask<?> toNotify = mock(StatefulTask.class);
+ buffer.registerCheckpointEventHandler(toNotify);
+
+ long startTs;
+
+ // successful first checkpoint, with some aligned buffers
+ check(sequence[0], buffer.getNextNonBlocked());
+ check(sequence[1], buffer.getNextNonBlocked());
+ check(sequence[2], buffer.getNextNonBlocked());
+ startTs = System.nanoTime();
+ check(sequence[5], buffer.getNextNonBlocked());
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(1L), anyLong());
+ validateAlignmentTime(startTs, buffer);
+
+ check(sequence[6], buffer.getNextNonBlocked());
+ check(sequence[8], buffer.getNextNonBlocked());
+ check(sequence[9], buffer.getNextNonBlocked());
+
+ // canceled checkpoint on last barrier
+ startTs = System.nanoTime();
+ check(sequence[12], buffer.getNextNonBlocked());
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+ validateAlignmentTime(startTs, buffer);
+ check(sequence[13], buffer.getNextNonBlocked());
+
+ // one more successful checkpoint
+ check(sequence[15], buffer.getNextNonBlocked());
+ check(sequence[16], buffer.getNextNonBlocked());
+ startTs = System.nanoTime();
+ check(sequence[20], buffer.getNextNonBlocked());
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(3L), anyLong());
+ validateAlignmentTime(startTs, buffer);
+ check(sequence[21], buffer.getNextNonBlocked());
+
+ // this checkpoint gets immediately canceled
+ check(sequence[24], buffer.getNextNonBlocked());
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+ assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+ // some buffers
+ check(sequence[26], buffer.getNextNonBlocked());
+ check(sequence[27], buffer.getNextNonBlocked());
+ check(sequence[28], buffer.getNextNonBlocked());
+
+ // a simple successful checkpoint
+ startTs = System.nanoTime();
+ check(sequence[32], buffer.getNextNonBlocked());
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(5L), anyLong());
+ validateAlignmentTime(startTs, buffer);
+ check(sequence[33], buffer.getNextNonBlocked());
+
+ check(sequence[37], buffer.getNextNonBlocked());
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+ assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+ // all done
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+
+ buffer.cleanup();
+ checkNoTempFilesRemain();
+ }
+
+ @Test
+ public void testAbortViaQueuedBarriers() throws Exception {
+ BufferOrEvent[] sequence = {
+ // starting a checkpoint
+ /* 0 */ createBuffer(1),
+ /* 1 */ createBarrier(1, 1), createBarrier(1, 2),
+ /* 3 */ createBuffer(2), createBuffer(0), createBuffer(1),
+
+ // queued barrier and cancellation barrier
+ /* 6 */ createCancellationBarrier(2, 2),
+ /* 7 */ createBarrier(2, 1),
+
+ // some intermediate buffers (some queued)
+ /* 8 */ createBuffer(0), createBuffer(1), createBuffer(2),
+
+ // complete initial checkpoint
+ /* 11 */ createBarrier(1, 0),
+
+ // some buffers (none queued, since checkpoint is aborted)
+ /* 12 */ createBuffer(2), createBuffer(1), createBuffer(0),
+
+ // final barrier of aborted checkpoint
+ /* 15 */ createBarrier(2, 0),
+
+ // some more buffers
+ /* 16 */ createBuffer(0), createBuffer(1), createBuffer(2)
+ };
+
+ MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ StatefulTask<?> toNotify = mock(StatefulTask.class);
+ buffer.registerCheckpointEventHandler(toNotify);
+
+ long startTs;
+
+ check(sequence[0], buffer.getNextNonBlocked());
+
+ // starting first checkpoint
+ startTs = System.nanoTime();
+ check(sequence[4], buffer.getNextNonBlocked());
+ check(sequence[8], buffer.getNextNonBlocked());
+
+ // finished first checkpoint
+ check(sequence[3], buffer.getNextNonBlocked());
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(1L), anyLong());
+ validateAlignmentTime(startTs, buffer);
+
+ check(sequence[5], buffer.getNextNonBlocked());
+
+ // re-read the queued cancellation barriers
+ check(sequence[9], buffer.getNextNonBlocked());
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+ assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+ check(sequence[10], buffer.getNextNonBlocked());
+ check(sequence[12], buffer.getNextNonBlocked());
+ check(sequence[13], buffer.getNextNonBlocked());
+ check(sequence[14], buffer.getNextNonBlocked());
+
+ check(sequence[16], buffer.getNextNonBlocked());
+ check(sequence[17], buffer.getNextNonBlocked());
+ check(sequence[18], buffer.getNextNonBlocked());
+
+ // no further alignment should have happened
+ assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+ // no further checkpoint (abort) notifications
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+
+ // all done
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+
+ buffer.cleanup();
+ checkNoTempFilesRemain();
+ }
+
+ /**
+ * This tests the where a replay of queued checkpoint barriers meets
+ * a canceled checkpoint.
+ *
+ * The replayed newer checkpoint barrier must not try to cancel the
+ * already canceled checkpoint.
+ */
+ @Test
+ public void testAbortWhileHavingQueuedBarriers() throws Exception {
+ BufferOrEvent[] sequence = {
+ // starting a checkpoint
+ /* 0 */ createBuffer(1),
+ /* 1 */ createBarrier(1, 1),
+ /* 2 */ createBuffer(2), createBuffer(0), createBuffer(1),
+
+ // queued barrier and cancellation barrier
+ /* 5 */ createBarrier(2, 1),
+
+ // some queued buffers
+ /* 6 */ createBuffer(2), createBuffer(1),
+
+ // cancel the initial checkpoint
+ /* 8 */ createCancellationBarrier(1, 0),
+
+ // some more buffers
+ /* 9 */ createBuffer(2), createBuffer(1), createBuffer(0),
+
+ // ignored barrier - already canceled and moved to next checkpoint
+ /* 12 */ createBarrier(1, 2),
+
+ // some more buffers
+ /* 13 */ createBuffer(0), createBuffer(1), createBuffer(2),
+
+ // complete next checkpoint regularly
+ /* 16 */ createBarrier(2, 0), createBarrier(2, 2),
+
+ // some more buffers
+ /* 18 */ createBuffer(0), createBuffer(1), createBuffer(2)
+ };
+
+ MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ StatefulTask<?> toNotify = mock(StatefulTask.class);
+ buffer.registerCheckpointEventHandler(toNotify);
+
+ long startTs;
+
+ check(sequence[0], buffer.getNextNonBlocked());
+
+ // starting first checkpoint
+ startTs = System.nanoTime();
+ check(sequence[2], buffer.getNextNonBlocked());
+ check(sequence[3], buffer.getNextNonBlocked());
+ check(sequence[6], buffer.getNextNonBlocked());
+
+ // cancelled by cancellation barrier
+ check(sequence[4], buffer.getNextNonBlocked());
+ validateAlignmentTime(startTs, buffer);
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(1L);
+
+ // the next checkpoint alignment starts now
+ startTs = System.nanoTime();
+ check(sequence[9], buffer.getNextNonBlocked());
+ check(sequence[11], buffer.getNextNonBlocked());
+ check(sequence[13], buffer.getNextNonBlocked());
+ check(sequence[15], buffer.getNextNonBlocked());
+
+ // checkpoint done
+ check(sequence[7], buffer.getNextNonBlocked());
+ validateAlignmentTime(startTs, buffer);
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(2L), anyLong());
+
+ // queued data
+ check(sequence[10], buffer.getNextNonBlocked());
+ check(sequence[14], buffer.getNextNonBlocked());
+
+ // trailing data
+ check(sequence[18], buffer.getNextNonBlocked());
+ check(sequence[19], buffer.getNextNonBlocked());
+ check(sequence[20], buffer.getNextNonBlocked());
+
+ // all done
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+
+ buffer.cleanup();
+ checkNoTempFilesRemain();
+
+ // check overall notifications
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+ }
+
+ /**
+ * This tests the where a cancellation barrier is received for a checkpoint already
+ * canceled due to receiving a newer checkpoint barrier.
+ */
+ @Test
+ public void testIgnoreCancelBarrierIfCheckpointSubsumed() throws Exception {
+ BufferOrEvent[] sequence = {
+ // starting a checkpoint
+ /* 0 */ createBuffer(2),
+ /* 1 */ createBarrier(3, 1), createBarrier(3, 0),
+ /* 3 */ createBuffer(0), createBuffer(1), createBuffer(2),
+
+ // newer checkpoint barrier cancels/subsumes pending checkpoint
+ /* 6 */ createBarrier(5, 2),
+
+ // some queued buffers
+ /* 7 */ createBuffer(2), createBuffer(1), createBuffer(0),
+
+ // cancel barrier the initial checkpoint /it is already canceled)
+ /* 10 */ createCancellationBarrier(3, 2),
+
+ // some more buffers
+ /* 11 */ createBuffer(2), createBuffer(0), createBuffer(1),
+
+ // complete next checkpoint regularly
+ /* 14 */ createBarrier(5, 0), createBarrier(5, 1),
+
+ // some more buffers
+ /* 16 */ createBuffer(0), createBuffer(1), createBuffer(2)
+ };
+
+ MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+ StatefulTask<?> toNotify = mock(StatefulTask.class);
+ buffer.registerCheckpointEventHandler(toNotify);
+
+ long startTs;
+
+ // validate the sequence
+
+ check(sequence[0], buffer.getNextNonBlocked());
+
+ // beginning of first checkpoint
+ check(sequence[5], buffer.getNextNonBlocked());
+
+ // future barrier aborts checkpoint
+ startTs = System.nanoTime();
+ check(sequence[3], buffer.getNextNonBlocked());
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(3L);
+ check(sequence[4], buffer.getNextNonBlocked());
+
+ // alignment of next checkpoint
+ check(sequence[8], buffer.getNextNonBlocked());
+ check(sequence[9], buffer.getNextNonBlocked());
+ check(sequence[12], buffer.getNextNonBlocked());
+ check(sequence[13], buffer.getNextNonBlocked());
+
+ // checkpoint finished
+ check(sequence[7], buffer.getNextNonBlocked());
+ validateAlignmentTime(startTs, buffer);
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(5L), anyLong());
+ check(sequence[11], buffer.getNextNonBlocked());
+
+ // remaining data
+ check(sequence[16], buffer.getNextNonBlocked());
+ check(sequence[17], buffer.getNextNonBlocked());
+ check(sequence[18], buffer.getNextNonBlocked());
+
+ // all done
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+
+ buffer.cleanup();
+ checkNoTempFilesRemain();
+
+ // check overall notifications
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
}
// ------------------------------------------------------------------------
// Utils
// ------------------------------------------------------------------------
- private static BufferOrEvent createBarrier(long id, int channel) {
- return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
+ private static BufferOrEvent createBarrier(long checkpointId, int channel) {
+ return new BufferOrEvent(new CheckpointBarrier(checkpointId, System.currentTimeMillis()), channel);
+ }
+
+ private static BufferOrEvent createCancellationBarrier(long checkpointId, int channel) {
+ return new BufferOrEvent(new CancelCheckpointMarker(checkpointId), channel);
}
private static BufferOrEvent createBuffer(int channel) {
- // since we have no access to the contents, we need to use the size as an
- // identifier to validate correctness here
- Buffer buf = new Buffer(
- MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE),
- FreeingBufferRecycler.INSTANCE);
-
- buf.setSize(SIZE_COUNTER++);
+ final int size = SIZE_COUNTER++;
+ byte[] bytes = new byte[size];
+ RND.nextBytes(bytes);
+
+ MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
+ memory.put(0, bytes);
+
+ Buffer buf = new Buffer(memory, FreeingBufferRecycler.INSTANCE);
+ buf.setSize(size);
+
+ // retain an additional time so it does not get disposed after being read by the input gate
+ buf.retain();
+
return new BufferOrEvent(buf, channel);
}
@@ -907,15 +1408,16 @@ public class BarrierBufferTest {
assertEquals(expected.isBuffer(), present.isBuffer());
if (expected.isBuffer()) {
- // since we have no access to the contents, we need to use the size as an
- // identifier to validate correctness here
assertEquals(expected.getBuffer().getSize(), present.getBuffer().getSize());
+ MemorySegment expectedMem = expected.getBuffer().getMemorySegment();
+ MemorySegment presentMem = present.getBuffer().getMemorySegment();
+ assertTrue("memory contents differs", expectedMem.compare(presentMem, 0, 0, PAGE_SIZE) == 0);
}
else {
assertEquals(expected.getEvent(), present.getEvent());
}
}
-
+
private static void checkNoTempFilesRemain() {
// validate that all temp files have been removed
for (File dir : IO_MANAGER.getSpillingDirectories()) {
@@ -926,12 +1428,17 @@ public class BarrierBufferTest {
}
}
}
-
+
+ private static void validateAlignmentTime(long startTimestamp, BarrierBuffer buffer) {
+ final long elapsed = System.nanoTime() - startTimestamp;
+ assertTrue("wrong alignment time", buffer.getAlignmentDurationNanos() <= elapsed);
+ }
+
// ------------------------------------------------------------------------
// Testing Mocks
// ------------------------------------------------------------------------
- private static class ValidatingCheckpointHandler implements EventListener<CheckpointBarrier> {
+ private static class ValidatingCheckpointHandler implements StatefulTask<StateHandle<Object>> {
private long nextExpectedCheckpointId = -1L;
@@ -944,11 +1451,31 @@ public class BarrierBufferTest {
}
@Override
- public void onEvent(CheckpointBarrier barrier) {
- assertNotNull(barrier);
- assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == barrier.getId());
- assertTrue(barrier.getTimestamp() > 0);
+ public void setInitialState(StateHandle<Object> stateHandle) throws Exception {
+ throw new UnsupportedOperationException("should never be called");
+ }
+
+ @Override
+ public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+ throw new UnsupportedOperationException("should never be called");
+ }
+
+ @Override
+ public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception {
+ assertTrue("wrong checkpoint id",
+ nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == checkpointId);
+
+ assertTrue(timestamp > 0);
+
nextExpectedCheckpointId++;
}
+
+ @Override
+ public void abortCheckpointOnBarrier(long checkpointId) {}
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ throw new UnsupportedOperationException("should never be called");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index b9b6e5f..903f585 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -19,25 +19,30 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.state.StateHandle;
import org.junit.Test;
import java.util.Arrays;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Tests for the behavior of the barrier tracker.
*/
public class BarrierTrackerTest {
-
+
private static final int PAGE_SIZE = 512;
-
+
@Test
public void testSingleChannelNoBarriers() {
try {
@@ -329,6 +334,98 @@ public class BarrierTrackerTest {
}
}
+ @Test
+ public void testSingleChannelAbortCheckpoint() throws Exception {
+ BufferOrEvent[] sequence = {
+ createBuffer(0),
+ createBarrier(1, 0),
+ createBuffer(0),
+ createBarrier(2, 0),
+ createCancellationBarrier(4, 0),
+ createBarrier(5, 0),
+ createBuffer(0),
+ createCancellationBarrier(6, 0),
+ createBuffer(0)
+ };
+
+ MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
+ BarrierTracker tracker = new BarrierTracker(gate);
+
+ // negative values mean an expected cancellation call!
+ CheckpointSequenceValidator validator =
+ new CheckpointSequenceValidator(1, 2, -4, 5, -6);
+ tracker.registerCheckpointEventHandler(validator);
+
+ for (BufferOrEvent boe : sequence) {
+ if (boe.isBuffer()) {
+ assertEquals(boe, tracker.getNextNonBlocked());
+ }
+ assertTrue(tracker.isEmpty());
+ }
+
+ assertNull(tracker.getNextNonBlocked());
+ assertNull(tracker.getNextNonBlocked());
+ }
+
+ @Test
+ public void testMultiChannelAbortCheckpoint() throws Exception {
+ BufferOrEvent[] sequence = {
+ // some buffers and a successful checkpoint
+ createBuffer(0), createBuffer(2), createBuffer(0),
+ createBarrier(1, 1), createBarrier(1, 2),
+ createBuffer(2), createBuffer(1),
+ createBarrier(1, 0),
+
+ // aborted on last barrier
+ createBuffer(0), createBuffer(2),
+ createBarrier(2, 0), createBarrier(2, 2),
+ createBuffer(0), createBuffer(2),
+ createCancellationBarrier(2, 1),
+
+ // successful checkpoint
+ createBuffer(2), createBuffer(1),
+ createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0),
+
+ // abort on first barrier
+ createBuffer(0), createBuffer(1),
+ createCancellationBarrier(4, 1), createBarrier(4, 2),
+ createBuffer(0),
+ createBarrier(4, 0),
+
+ // another successful checkpoint
+ createBuffer(0), createBuffer(1), createBuffer(2),
+ createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0),
+
+ // abort multiple cancellations and a barrier after the cancellations
+ createBuffer(0), createBuffer(1),
+ createCancellationBarrier(6, 1), createCancellationBarrier(6, 2),
+ createBarrier(6, 0),
+
+ createBuffer(0)
+ };
+
+ MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+ BarrierTracker tracker = new BarrierTracker(gate);
+
+ // negative values mean an expected cancellation call!
+ CheckpointSequenceValidator validator =
+ new CheckpointSequenceValidator(1, -2, 3, -4, 5, -6);
+ tracker.registerCheckpointEventHandler(validator);
+
+ for (BufferOrEvent boe : sequence) {
+ if (boe.isBuffer()) {
+ assertEquals(boe, tracker.getNextNonBlocked());
+ }
+ }
+
+ assertTrue(tracker.isEmpty());
+
+ assertNull(tracker.getNextNonBlocked());
+ assertNull(tracker.getNextNonBlocked());
+
+ assertTrue(tracker.isEmpty());
+ }
+
// ------------------------------------------------------------------------
// Utils
// ------------------------------------------------------------------------
@@ -337,6 +434,10 @@ public class BarrierTrackerTest {
return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
}
+ private static BufferOrEvent createCancellationBarrier(long id, int channel) {
+ return new BufferOrEvent(new CancelCheckpointMarker(id), channel);
+ }
+
private static BufferOrEvent createBuffer(int channel) {
return new BufferOrEvent(
new Buffer(MemorySegmentFactory.wrap(new byte[]{1, 2}), FreeingBufferRecycler.INSTANCE), channel);
@@ -346,22 +447,54 @@ public class BarrierTrackerTest {
// Testing Mocks
// ------------------------------------------------------------------------
- private static class CheckpointSequenceValidator implements EventListener<CheckpointBarrier> {
+ private static class CheckpointSequenceValidator implements StatefulTask<StateHandle<Object>> {
private final long[] checkpointIDs;
-
+
private int i = 0;
private CheckpointSequenceValidator(long... checkpointIDs) {
this.checkpointIDs = checkpointIDs;
}
-
+
@Override
- public void onEvent(CheckpointBarrier barrier) {
+ public void setInitialState(StateHandle<Object> state) throws Exception {
+ throw new UnsupportedOperationException("should never be called");
+ }
+
+ @Override
+ public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+ throw new UnsupportedOperationException("should never be called");
+ }
+
+ @Override
+ public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception {
assertTrue("More checkpoints than expected", i < checkpointIDs.length);
- assertNotNull(barrier);
- assertEquals("wrong checkpoint id", checkpointIDs[i++], barrier.getId());
- assertTrue(barrier.getTimestamp() > 0);
+
+ final long expectedId = checkpointIDs[i++];
+ if (expectedId >= 0) {
+ assertEquals("wrong checkpoint id", expectedId, checkpointId);
+ assertTrue(timestamp > 0);
+ } else {
+ fail("got 'triggerCheckpointOnBarrier()' when expecting an 'abortCheckpointOnBarrier()'");
+ }
+ }
+
+ @Override
+ public void abortCheckpointOnBarrier(long checkpointId) {
+ assertTrue("More checkpoints than expected", i < checkpointIDs.length);
+
+ final long expectedId = checkpointIDs[i++];
+ if (expectedId < 0) {
+ assertEquals("wrong checkpoint id for checkoint abort", -expectedId, checkpointId);
+ } else {
+ fail("got 'abortCheckpointOnBarrier()' when expecting an 'triggerCheckpointOnBarrier()'");
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ throw new UnsupportedOperationException("should never be called");
}
}
}
[3/4] flink git commit: [FLINK-4985] [checkpointing] Report canceled
/ declined checkpoints to the Checkpoint Coordinator
Posted by se...@apache.org.
[FLINK-4985] [checkpointing] Report canceled / declined checkpoints to the Checkpoint Coordinator
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a4fdfff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a4fdfff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a4fdfff
Branch: refs/heads/release-1.1
Commit: 1a4fdfff5d364a35e935604c0a5058a1a9f242f7
Parents: a1f028d
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Nov 8 17:13:19 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 8 19:07:16 2016 +0100
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 18 +-
.../AlignmentLimitExceededException.java | 33 +++
.../decline/CheckpointDeclineException.java | 35 +++
...ntDeclineOnCancellationBarrierException.java | 32 +++
.../CheckpointDeclineSubsumedException.java | 32 +++
...intDeclineTaskNotCheckpointingException.java | 32 +++
.../CheckpointDeclineTaskNotReadyException.java | 32 +++
.../decline/InputEndOfStreamException.java | 32 +++
.../flink/runtime/execution/Environment.java | 9 +
.../runtime/jobgraph/tasks/StatefulTask.java | 3 +-
.../messages/checkpoint/DeclineCheckpoint.java | 65 +++---
.../runtime/taskmanager/RuntimeEnvironment.java | 7 +
.../apache/flink/runtime/taskmanager/Task.java | 22 +-
.../checkpoint/CheckpointCoordinatorTest.java | 41 +---
.../savepoint/SavepointCoordinatorTest.java | 2 +-
.../jobmanager/JobManagerHARecoveryTest.java | 2 +-
.../operators/testutils/DummyEnvironment.java | 5 +
.../operators/testutils/MockEnvironment.java | 5 +
.../runtime/taskmanager/TaskAsyncCallTest.java | 2 +-
.../streaming/runtime/io/BarrierBuffer.java | 28 ++-
.../streaming/runtime/io/BarrierTracker.java | 4 +-
.../streaming/runtime/tasks/StreamTask.java | 25 ++-
.../streaming/runtime/io/BarrierBufferTest.java | 31 +--
.../runtime/io/BarrierTrackerTest.java | 2 +-
.../runtime/tasks/OneInputStreamTaskTest.java | 2 +
.../runtime/tasks/StreamMockEnvironment.java | 3 +
.../StreamTaskCancellationBarrierTest.java | 221 +++++++++++++++++++
.../runtime/tasks/StreamTaskTestHarness.java | 31 ++-
.../runtime/tasks/TwoInputStreamTaskTest.java | 2 +
29 files changed, 632 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 409f05b..8661ddc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -575,6 +575,7 @@ public class CheckpointCoordinator {
}
final long checkpointId = message.getCheckpointId();
+ final String reason = (message.getReason() != null ? message.getReason().getMessage() : "");
PendingCheckpoint checkpoint;
@@ -594,8 +595,8 @@ public class CheckpointCoordinator {
if (checkpoint != null && !checkpoint.isDiscarded()) {
isPendingCheckpoint = true;
- LOG.info("Discarding checkpoint " + checkpointId
- + " because of checkpoint decline from task " + message.getTaskExecutionId());
+ LOG.info("Discarding checkpoint {} because of checkpoint decline from task {} : {}",
+ checkpointId, message.getTaskExecutionId(), reason);
pendingCheckpoints.remove(checkpointId);
checkpoint.discard(userClassLoader);
@@ -604,19 +605,14 @@ public class CheckpointCoordinator {
onCancelCheckpoint(checkpointId);
boolean haveMoreRecentPending = false;
- Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator();
- while (entries.hasNext()) {
- PendingCheckpoint p = entries.next().getValue();
- if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) {
+ for (PendingCheckpoint p : pendingCheckpoints.values()) {
+ if (!p.isDiscarded() && p.getCheckpointId() >= checkpoint.getCheckpointId()) {
haveMoreRecentPending = true;
break;
}
}
- if (!haveMoreRecentPending && !triggerRequestQueued) {
- LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId);
- triggerCheckpoint(System.currentTimeMillis());
- } else if (!haveMoreRecentPending) {
- LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId);
+
+ if (!haveMoreRecentPending) {
triggerQueuedRequests();
}
} else if (checkpoint != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
new file mode 100644
index 0000000..64d57bc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because too many bytes were
+ * buffered in the alignment phase.
+ */
+public final class AlignmentLimitExceededException extends CheckpointDeclineException {
+
+ private static final long serialVersionUID = 1L;
+
+ public AlignmentLimitExceededException(long numBytes) {
+ super("The checkpoint alignment phase needed to buffer more than the configured maximum ("
+ + numBytes + " bytes).");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
new file mode 100644
index 0000000..8a2802c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Base class of all exceptions that indicate a declined checkpoint.
+ */
+public abstract class CheckpointDeclineException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public CheckpointDeclineException(String message) {
+ super(message);
+ }
+
+ public CheckpointDeclineException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
new file mode 100644
index 0000000..9ae4096
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a cancellation
+ * barrier was received.
+ */
+public final class CheckpointDeclineOnCancellationBarrierException extends CheckpointDeclineException {
+
+ private static final long serialVersionUID = 1L;
+
+ public CheckpointDeclineOnCancellationBarrierException() {
+ super("Task received cancellation from one of its inputs");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
new file mode 100644
index 0000000..5380469
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a newer checkpoint
+ * barrier was received on an input before the pending checkpoint's barrier.
+ */
+public final class CheckpointDeclineSubsumedException extends CheckpointDeclineException {
+
+ private static final long serialVersionUID = 1L;
+
+ public CheckpointDeclineSubsumedException(long newCheckpointId) {
+ super("Checkpoint was canceled because a barrier from newer checkpoint " + newCheckpointId + " was received.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
new file mode 100644
index 0000000..e5773d1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a task does not support
+ * checkpointing.
+ */
+public final class CheckpointDeclineTaskNotCheckpointingException extends CheckpointDeclineException {
+
+ private static final long serialVersionUID = 1L;
+
+ public CheckpointDeclineTaskNotCheckpointingException(String taskName) {
+ super("Task '" + taskName + "'does not support checkpointing");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
new file mode 100644
index 0000000..a1214fe
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because a task was not
+ * ready to perform a checkpoint.
+ */
+public final class CheckpointDeclineTaskNotReadyException extends CheckpointDeclineException {
+
+ private static final long serialVersionUID = 1L;
+
+ public CheckpointDeclineTaskNotReadyException(String taskName) {
+ super("Task " + taskName + " was not running");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
new file mode 100644
index 0000000..86b29dc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.decline;
+
+/**
+ * Exception indicating that a checkpoint was declined because one of the input
+ * stream reached its end before the alignment was complete.
+ */
+public final class InputEndOfStreamException extends CheckpointDeclineException {
+
+ private static final long serialVersionUID = 1L;
+
+ public InputEndOfStreamException() {
+ super("Checkpoint was declined because one input stream is finished");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 5ad5fe2..e84a5e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -167,6 +167,15 @@ public interface Environment {
void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state);
/**
+ * Declines a checkpoint. This tells the checkpoint coordinator that this task will
+ * not be able to successfully complete a certain checkpoint.
+ *
+ * @param checkpointId The ID of the declined checkpoint.
+ * @param cause An optional reason why the checkpoint was declined.
+ */
+ void declineCheckpoint(long checkpointId, Throwable cause);
+
+ /**
* Marks task execution failed for an external reason (a reason other than the task code itself
* throwing an exception). If the task is already in a terminal state
* (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing.
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index 7c581df..874ca4a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -66,8 +66,9 @@ public interface StatefulTask<T extends StateHandle<?>> {
* {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker} to their outputs.
*
* @param checkpointId The ID of the checkpoint to be aborted.
+ * @param cause The reason why the checkpoint was aborted during alignment
*/
- void abortCheckpointOnBarrier(long checkpointId) throws Exception;
+ void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception;
/**
* Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
index f26d2fb..dca212c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java
@@ -19,7 +19,14 @@
package org.apache.flink.runtime.messages.checkpoint;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
+import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.util.SerializedThrowable;
/**
* This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the
@@ -31,44 +38,48 @@ public class DeclineCheckpoint extends AbstractCheckpointMessage implements java
private static final long serialVersionUID = 2094094662279578953L;
- /** The timestamp associated with the checkpoint */
- private final long timestamp;
+ /** The reason why the checkpoint was declined */
+ private final Throwable reason;
- public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) {
- super(job, taskExecutionId, checkpointId);
- this.timestamp = timestamp;
+ public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
+ this(job, taskExecutionId, checkpointId, null);
}
- // --------------------------------------------------------------------------------------------
+ public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, Throwable reason) {
+ super(job, taskExecutionId, checkpointId);
- public long getTimestamp() {
- return timestamp;
+ if (reason == null ||
+ reason.getClass() == AlignmentLimitExceededException.class ||
+ reason.getClass() == CheckpointDeclineOnCancellationBarrierException.class ||
+ reason.getClass() == CheckpointDeclineSubsumedException.class ||
+ reason.getClass() == CheckpointDeclineTaskNotCheckpointingException.class ||
+ reason.getClass() == CheckpointDeclineTaskNotReadyException.class ||
+ reason.getClass() == InputEndOfStreamException.class)
+ {
+ // null or known common exceptions that cannot reference any dynamically loaded code
+ this.reason = reason;
+ } else {
+ // some other exception. replace with a serialized throwable, to be on the safe side
+ this.reason = new SerializedThrowable(reason);
+ }
}
// --------------------------------------------------------------------------------------------
- @Override
- public int hashCode() {
- return super.hashCode() + (int) (timestamp ^ (timestamp >>> 32));
+ /**
+ * Gets the reason why the checkpoint was declined.
+ *
+ * @return The reason why the checkpoint was declined
+ */
+ public Throwable getReason() {
+ return reason;
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- else if (o instanceof DeclineCheckpoint) {
- DeclineCheckpoint that = (DeclineCheckpoint) o;
- return this.timestamp == that.timestamp && super.equals(o);
- }
- else {
- return false;
- }
- }
+ // --------------------------------------------------------------------------------------------
@Override
public String toString() {
- return String.format("Declined Checkpoint %d@%d for (%s/%s)",
- getCheckpointId(), getTimestamp(), getJob(), getTaskExecutionId());
+ return String.format("Declined Checkpoint %d for (%s/%s): %s",
+ getCheckpointId(), getJob(), getTaskExecutionId(), reason);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 6fdf6f9..47149a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
@@ -268,6 +269,12 @@ public class RuntimeEnvironment implements Environment {
}
@Override
+ public void declineCheckpoint(long checkpointId, Throwable cause) {
+ DeclineCheckpoint message = new DeclineCheckpoint(jobId, executionId, checkpointId, cause);
+ jobManager.tell(message);
+ }
+
+ @Override
public void failExternally(Throwable cause) {
this.containingTask.failExternally(cause);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index ed15dbf..3eab7e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -29,6 +29,8 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -969,13 +971,16 @@ public class Task implements Runnable {
try {
boolean success = statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp);
if (!success) {
- DeclineCheckpoint decline = new DeclineCheckpoint(jobId, getExecutionId(), checkpointID, checkpointTimestamp);
+ // task was not ready to trigger this checkpoint
+ DeclineCheckpoint decline = new DeclineCheckpoint(
+ jobId, getExecutionId(), checkpointID,
+ new CheckpointDeclineTaskNotReadyException(taskName));
jobManager.tell(decline);
}
}
catch (Throwable t) {
if (getExecutionState() == ExecutionState.RUNNING) {
- failExternally(new RuntimeException(
+ failExternally(new Exception(
"Error while triggering checkpoint for " + taskName,
t));
}
@@ -987,10 +992,21 @@ public class Task implements Runnable {
else {
LOG.error("Task received a checkpoint request, but is not a checkpointing task - "
+ taskNameWithSubtask);
+
+ DeclineCheckpoint decline = new DeclineCheckpoint(
+ jobId, executionId, checkpointID,
+ new CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask));
+ jobManager.tell(decline);
}
}
else {
- LOG.debug("Ignoring request to trigger a checkpoint for non-running task.");
+ LOG.debug("Declining checkpoint request for non-running task");
+
+ // send back a message that we did not do the checkpoint
+ DeclineCheckpoint decline = new DeclineCheckpoint(
+ jobId, executionId, checkpointID,
+ new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
+ jobManager.tell(decline);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 62af42b..f3f988a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -288,44 +288,19 @@ public class CheckpointCoordinatorTest {
// decline checkpoint from the other task, this should cancel the checkpoint
// and trigger a new one
- coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId, checkpoint.getCheckpointTimestamp()));
+ coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
assertTrue(checkpoint.isDiscarded());
- // validate that we have a new pending checkpoint
- assertEquals(1, coord.getNumberOfPendingCheckpoints());
+ // validate that we have no new pending checkpoint
+ assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
- long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
- PendingCheckpoint checkpointNew = coord.getPendingCheckpoints().get(checkpointIdNew);
-
- assertNotNull(checkpointNew);
- assertEquals(checkpointIdNew, checkpointNew.getCheckpointId());
- assertEquals(jid, checkpointNew.getJobId());
- assertEquals(2, checkpointNew.getNumberOfNonAcknowledgedTasks());
- assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks());
- assertEquals(0, checkpointNew.getTaskStates().size());
- assertFalse(checkpointNew.isDiscarded());
- assertFalse(checkpointNew.isFullyAcknowledged());
- assertNotEquals(checkpoint.getCheckpointId(), checkpointNew.getCheckpointId());
-
- // check that the vertices received the new trigger checkpoint message
- {
- TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointIdNew, checkpointNew.getCheckpointTimestamp());
- TriggerCheckpoint expectedMessage2 = new TriggerCheckpoint(jid, attemptID2, checkpointIdNew, checkpointNew.getCheckpointTimestamp());
- verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
- verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
- }
-
// decline again, nothing should happen
// decline from the other task, nothing should happen
- coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId, checkpoint.getCheckpointTimestamp()));
- coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId, checkpoint.getCheckpointTimestamp()));
+ coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId));
+ coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpointId));
assertTrue(checkpoint.isDiscarded());
- // should still have the same second checkpoint pending
- long checkpointIdNew2 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
- assertEquals(checkpointIdNew2, checkpointIdNew);
-
coord.shutdown();
}
catch (Exception e) {
@@ -422,7 +397,7 @@ public class CheckpointCoordinatorTest {
// decline checkpoint from one of the tasks, this should cancel the checkpoint
// and trigger a new one
- coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
+ coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id));
assertTrue(checkpoint1.isDiscarded());
// validate that we have only one pending checkpoint left
@@ -446,8 +421,8 @@ public class CheckpointCoordinatorTest {
// decline again, nothing should happen
// decline from the other task, nothing should happen
- coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
- coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
+ coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpoint1Id));
+ coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id));
assertTrue(checkpoint1.isDiscarded());
coord.shutdown();
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
index b1b384d..dc2b23f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.java
@@ -202,7 +202,7 @@ public class SavepointCoordinatorTest extends TestLogger {
coordinator.receiveDeclineMessage(new DeclineCheckpoint(
jobId, vertices[1].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId, 0));
+ checkpointId));
// The pending checkpoint is completed
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 4dfaf95..2e3bace 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -455,7 +455,7 @@ public class JobManagerHARecoveryTest {
}
@Override
- public void abortCheckpointOnBarrier(long checkpointId) {
+ public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
throw new UnsupportedOperationException("should not be called!");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 5af34fb..ca68683 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -140,6 +140,11 @@ public class DummyEnvironment implements Environment {
public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {}
@Override
+ public void declineCheckpoint(long checkpointId, Throwable cause) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void failExternally(Throwable cause) {
throw new UnsupportedOperationException("DummyEnvironment does not support external task failure.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 9dea324..22dee63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -291,6 +291,11 @@ public class MockEnvironment implements Environment {
}
@Override
+ public void declineCheckpoint(long checkpointId, Throwable cause) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void failExternally(Throwable cause) {
throw new UnsupportedOperationException("MockEnvironment does not support external task failure.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 5b344eb..3ace0f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -226,7 +226,7 @@ public class TaskAsyncCallTest {
}
@Override
- public void abortCheckpointOnBarrier(long checkpointId) {
+ public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
throw new UnsupportedOperationException("Should not be called");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 36de717..7a8e7d3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -18,6 +18,10 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -142,7 +146,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
else {
if (next.getEvent().getClass() == EndOfPartitionEvent.class) {
- processEndOfPartition(next.getChannelIndex());
+ processEndOfPartition();
}
return next;
}
@@ -196,7 +200,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
"Skipping current checkpoint.", barrierId, currentCheckpointId);
// let the task know we are not completing this
- notifyAbort(currentCheckpointId);
+ notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));
// abort the current checkpoint
releaseBlocksAndResetBarriers();
@@ -241,7 +245,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
if (barrierId > currentCheckpointId) {
// new checkpoint
currentCheckpointId = barrierId;
- notifyAbort(barrierId);
+ notifyAbortOnCancellationBarrier(barrierId);
}
return;
}
@@ -258,7 +262,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
releaseBlocksAndResetBarriers();
- notifyAbort(barrierId);
+ notifyAbortOnCancellationBarrier(barrierId);
}
else if (barrierId > currentCheckpointId) {
// we canceled the next which also cancels the current
@@ -272,7 +276,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
currentCheckpointId = barrierId;
startOfAlignmentTimestamp = 0L;
latestAlignmentDurationNanos = 0L;
- notifyAbort(barrierId);
+ notifyAbortOnCancellationBarrier(barrierId);
}
// else: ignore trailing (cancellation) barrier from an earlier checkpoint (obsolete now)
@@ -292,7 +296,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
LOG.debug("Checkpoint {} canceled, skipping alignment", barrierId);
}
- notifyAbort(barrierId);
+ notifyAbortOnCancellationBarrier(barrierId);
}
// else: trailing barrier from either
@@ -300,12 +304,12 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
// - the current checkpoint if it was already canceled
}
- private void processEndOfPartition(int channel) throws Exception {
+ private void processEndOfPartition() throws Exception {
numClosedChannels++;
if (numBarriersReceived > 0) {
// let the task know we skip a checkpoint
- notifyAbort(currentCheckpointId);
+ notifyAbort(currentCheckpointId, new InputEndOfStreamException());
// no chance to complete this checkpoint
releaseBlocksAndResetBarriers();
@@ -319,9 +323,13 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
}
- private void notifyAbort(long checkpointId) throws Exception {
+ private void notifyAbortOnCancellationBarrier(long checkpointId) throws Exception {
+ notifyAbort(checkpointId, new CheckpointDeclineOnCancellationBarrierException());
+ }
+
+ private void notifyAbort(long checkpointId, CheckpointDeclineException cause) throws Exception {
if (toNotifyOnCheckpoint != null) {
- toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+ toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 5157336..8b4cc48 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
@@ -227,7 +228,8 @@ public class BarrierTracker implements CheckpointBarrierHandler {
private void notifyAbort(long checkpointId) throws Exception {
if (toNotifyOnCheckpoint != null) {
- toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+ toNotifyOnCheckpoint.abortCheckpointOnBarrier(
+ checkpointId, new CheckpointDeclineOnCancellationBarrierException());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index d55a9c5..4f0839f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -25,6 +25,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.execution.CancelTaskException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -592,13 +594,15 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
@Override
- public void abortCheckpointOnBarrier(long checkpointId) throws Exception {
+ public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception {
LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", checkpointId, getName());
+ // notify the coordinator that we decline this checkpoint
+ getEnvironment().declineCheckpoint(checkpointId, cause);
+
+ // notify all downstream operators that they should not wait for a barrier from us
synchronized (lock) {
- if (isRunning) {
- operatorChain.broadcastCheckpointCancelMarker(checkpointId);
- }
+ operatorChain.broadcastCheckpointCancelMarker(checkpointId);
}
}
@@ -669,7 +673,18 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
checkpointThread.start();
}
return true;
- } else {
+ }
+ else {
+ // we cannot perform our checkpoint - let the downstream operators know that they
+ // should not wait for any input from this operator
+
+ // we cannot broadcast the cancellation markers on the 'operator chain', because it may not
+ // yet be created
+ final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointId);
+ for (ResultPartitionWriter output : getEnvironment().getAllWriters()) {
+ output.writeEventToAllChannels(message);
+ }
+
return false;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index cf1f98e..20cb8f7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
@@ -45,6 +47,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
@@ -566,7 +569,7 @@ public class BarrierBufferTest {
check(sequence[12], buffer.getNextNonBlocked());
assertEquals(3L, buffer.getCurrentCheckpointId());
validateAlignmentTime(startTs, buffer);
- verify(toNotify).abortCheckpointOnBarrier(2L);
+ verify(toNotify).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineSubsumedException.class));
check(sequence[16], buffer.getNextNonBlocked());
// checkpoint 3 alignment in progress
@@ -574,7 +577,7 @@ public class BarrierBufferTest {
// checkpoint 3 aborted (end of partition)
check(sequence[20], buffer.getNextNonBlocked());
- verify(toNotify).abortCheckpointOnBarrier(3L);
+ verify(toNotify).abortCheckpointOnBarrier(eq(3L), any(CheckpointDeclineSubsumedException.class));
// replay buffered data from checkpoint 3
check(sequence[18], buffer.getNextNonBlocked());
@@ -999,13 +1002,13 @@ public class BarrierBufferTest {
check(sequence[6], buffer.getNextNonBlocked());
assertEquals(5L, buffer.getCurrentCheckpointId());
verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(2L), anyLong());
- verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class));
verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(5L), anyLong());
assertEquals(0L, buffer.getAlignmentDurationNanos());
check(sequence[8], buffer.getNextNonBlocked());
assertEquals(6L, buffer.getCurrentCheckpointId());
- verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), any(CheckpointDeclineOnCancellationBarrierException.class));
assertEquals(0L, buffer.getAlignmentDurationNanos());
buffer.cleanup();
@@ -1073,7 +1076,7 @@ public class BarrierBufferTest {
// canceled checkpoint on last barrier
startTs = System.nanoTime();
check(sequence[12], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class));
validateAlignmentTime(startTs, buffer);
check(sequence[13], buffer.getNextNonBlocked());
@@ -1088,7 +1091,7 @@ public class BarrierBufferTest {
// this checkpoint gets immediately canceled
check(sequence[24], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class));
assertEquals(0L, buffer.getAlignmentDurationNanos());
// some buffers
@@ -1104,7 +1107,7 @@ public class BarrierBufferTest {
check(sequence[33], buffer.getNextNonBlocked());
check(sequence[37], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), any(CheckpointDeclineOnCancellationBarrierException.class));
assertEquals(0L, buffer.getAlignmentDurationNanos());
// all done
@@ -1167,7 +1170,7 @@ public class BarrierBufferTest {
// re-read the queued cancellation barriers
check(sequence[9], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class));
assertEquals(0L, buffer.getAlignmentDurationNanos());
check(sequence[10], buffer.getNextNonBlocked());
@@ -1184,7 +1187,7 @@ public class BarrierBufferTest {
// no further checkpoint (abort) notifications
verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
- verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(CheckpointDeclineOnCancellationBarrierException.class));
// all done
assertNull(buffer.getNextNonBlocked());
@@ -1253,7 +1256,7 @@ public class BarrierBufferTest {
// cancelled by cancellation barrier
check(sequence[4], buffer.getNextNonBlocked());
validateAlignmentTime(startTs, buffer);
- verify(toNotify, times(1)).abortCheckpointOnBarrier(1L);
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(1L), any(CheckpointDeclineOnCancellationBarrierException.class));
// the next checkpoint alignment starts now
startTs = System.nanoTime();
@@ -1285,7 +1288,7 @@ public class BarrierBufferTest {
// check overall notifications
verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
- verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class));
}
/**
@@ -1337,7 +1340,7 @@ public class BarrierBufferTest {
// future barrier aborts checkpoint
startTs = System.nanoTime();
check(sequence[3], buffer.getNextNonBlocked());
- verify(toNotify, times(1)).abortCheckpointOnBarrier(3L);
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), any(CheckpointDeclineSubsumedException.class));
check(sequence[4], buffer.getNextNonBlocked());
// alignment of next checkpoint
@@ -1366,7 +1369,7 @@ public class BarrierBufferTest {
// check overall notifications
verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
- verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class));
}
// ------------------------------------------------------------------------
@@ -1471,7 +1474,7 @@ public class BarrierBufferTest {
}
@Override
- public void abortCheckpointOnBarrier(long checkpointId) {}
+ public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 903f585..978c212 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -481,7 +481,7 @@ public class BarrierTrackerTest {
}
@Override
- public void abortCheckpointOnBarrier(long checkpointId) {
+ public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
assertTrue("More checkpoints than expected", i < checkpointIDs.length);
final long expectedId = checkpointIDs[i++];
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 5fcc59e..9003f0e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -270,6 +271,7 @@ public class OneInputStreamTaskTest {
testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
+ expectedOutput.add(new CancelCheckpointMarker(0));
expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
expectedOutput.add(new CheckpointBarrier(1, 1));
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 7084208..36ad8ff 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -302,6 +302,9 @@ public class StreamMockEnvironment implements Environment {
}
@Override
+ public void declineCheckpoint(long checkpointId, Throwable cause) {}
+
+ @Override
public void failExternally(Throwable cause) {
throw new UnsupportedOperationException("StreamMockEnvironment does not support external task failure.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
new file mode 100644
index 0000000..8b8b659
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.operators.co.CoStreamMap;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*", "org.apache.log4j.*"})
+public class StreamTaskCancellationBarrierTest {
+
+ /**
+ * This test checks that tasks emit a proper cancel checkpoint barrier, if a "trigger checkpoint" message
+ * comes before they are ready.
+ */
+ @Test
+ public void testEmitCancellationBarrierWhenNotReady() throws Exception {
+ StreamTask<String, ?> task = new InitBlockingTask();
+ StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO);
+
+ // start the test - this cannot succeed across the 'init()' method
+ testHarness.invoke();
+
+ // tell the task to commence a checkpoint
+ boolean result = task.triggerCheckpoint(41L, System.currentTimeMillis());
+ assertFalse("task triggered checkpoint though not ready", result);
+
+ // a cancellation barrier should be downstream
+ Object emitted = testHarness.getOutput().poll();
+ assertNotNull("nothing emitted", emitted);
+ assertTrue("wrong type emitted", emitted instanceof CancelCheckpointMarker);
+ assertEquals("wrong checkpoint id", 41L, ((CancelCheckpointMarker) emitted).getCheckpointId());
+ }
+
+ /**
+ * This test verifies (for onw input tasks) that the Stream tasks react the following way to
+ * receiving a checkpoint cancellation barrier:
+ *
+ * - send a "decline checkpoint" notification out (to the JobManager)
+ * - emit a cancellation barrier downstream
+ */
+ @Test
+ public void testDeclineCallOnCancelBarrierOneInput() throws Exception {
+
+ OneInputStreamTask<String, String> task = new OneInputStreamTask<String, String>();
+ OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
+ task,
+ 1, 2,
+ BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ StreamMap<String, String> mapOperator = new StreamMap<>(new IdentityMap());
+ streamConfig.setStreamOperator(mapOperator);
+
+ StreamMockEnvironment environment = spy(testHarness.createEnvironment());
+
+ // start the task
+ testHarness.invoke(environment);
+ testHarness.waitForTaskRunning();
+
+ // emit cancellation barriers
+ testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 1);
+ testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 0);
+ testHarness.waitForInputProcessing();
+
+ // the decline call should go to the coordinator
+ verify(environment, times(1)).declineCheckpoint(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class));
+
+ // a cancellation barrier should be downstream
+ Object result = testHarness.getOutput().poll();
+ assertNotNull("nothing emitted", result);
+ assertTrue("wrong type emitted", result instanceof CancelCheckpointMarker);
+ assertEquals("wrong checkpoint id", 2L, ((CancelCheckpointMarker) result).getCheckpointId());
+
+ // cancel and shutdown
+ testHarness.endInput();
+ testHarness.waitForTaskCompletion();
+ }
+
+ /**
+ * This test verifies (for onw input tasks) that the Stream tasks react the following way to
+ * receiving a checkpoint cancellation barrier:
+ *
+ * - send a "decline checkpoint" notification out (to the JobManager)
+ * - emit a cancellation barrier downstream
+ */
+ @Test
+ public void testDeclineCallOnCancelBarrierTwoInputs() throws Exception {
+
+ TwoInputStreamTask<String, String, String> task = new TwoInputStreamTask<String, String, String>();
+ TwoInputStreamTaskTestHarness<String, String, String> testHarness = new TwoInputStreamTaskTestHarness<>(
+ task,
+ BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ CoStreamMap<String, String, String> op = new CoStreamMap<>(new UnionCoMap());
+ streamConfig.setStreamOperator(op);
+
+ StreamMockEnvironment environment = spy(testHarness.createEnvironment());
+
+ // start the task
+ testHarness.invoke(environment);
+ testHarness.waitForTaskRunning();
+
+ // emit cancellation barriers
+ testHarness.processEvent(new CancelCheckpointMarker(2L), 0, 0);
+ testHarness.processEvent(new CancelCheckpointMarker(2L), 1, 0);
+ testHarness.waitForInputProcessing();
+
+ // the decline call should go to the coordinator
+ verify(environment, times(1)).declineCheckpoint(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class));
+
+ // a cancellation barrier should be downstream
+ Object result = testHarness.getOutput().poll();
+ assertNotNull("nothing emitted", result);
+ assertTrue("wrong type emitted", result instanceof CancelCheckpointMarker);
+ assertEquals("wrong checkpoint id", 2L, ((CancelCheckpointMarker) result).getCheckpointId());
+
+ // cancel and shutdown
+ testHarness.endInput();
+ testHarness.waitForTaskCompletion();
+ }
+
+ // ------------------------------------------------------------------------
+ // test tasks / functions
+ // ------------------------------------------------------------------------
+
+ private static class InitBlockingTask extends StreamTask<String, AbstractStreamOperator<String>> {
+
+ private final Object lock = new Object();
+ private volatile boolean running = true;
+
+ @Override
+ protected void init() throws Exception {
+ synchronized (lock) {
+ while (running) {
+ lock.wait();
+ }
+ }
+ }
+
+ @Override
+ protected void run() throws Exception {}
+
+ @Override
+ protected void cleanup() throws Exception {}
+
+ @Override
+ protected void cancelTask() throws Exception {
+ running = false;
+ synchronized (lock) {
+ lock.notifyAll();
+ }
+ }
+ }
+
+ private static class IdentityMap implements MapFunction<String, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String map(String value) throws Exception {
+ return value;
+ }
+ }
+
+ private static class UnionCoMap implements CoMapFunction<String, String, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String map1(String value) throws Exception {
+ return value;
+ }
+
+ @Override
+ public String map2(String value) throws Exception {
+ return value;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 00e95b9..0bd8d9a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -150,21 +150,18 @@ public class StreamTaskTestHarness<OUT> {
}
+ public StreamMockEnvironment createEnvironment() {
+ return new StreamMockEnvironment(
+ jobConfig, taskConfig, executionConfig, memorySize, new MockInputSplitProvider(), bufferSize);
+ }
+
/**
* Invoke the Task. This resets the output of any previous invocation. This will start a new
* Thread to execute the Task in. Use {@link #waitForTaskCompletion()} to wait for the
* Task thread to finish running.
*/
public void invoke() throws Exception {
- mockEnv = new StreamMockEnvironment(jobConfig, taskConfig, executionConfig,
- memorySize, new MockInputSplitProvider(), bufferSize);
- task.setEnvironment(mockEnv);
-
- initializeInputs();
- initializeOutput();
-
- taskThread = new TaskThread(task);
- taskThread.start();
+ invoke(createEnvironment());
}
/**
@@ -205,7 +202,7 @@ public class StreamTaskTestHarness<OUT> {
if (taskThread.task instanceof StreamTask) {
StreamTask<?, ?> streamTask = (StreamTask<?, ?>) taskThread.task;
while (!streamTask.isRunning()) {
- Thread.sleep(100);
+ Thread.sleep(10);
if (!taskThread.isAlive()) {
if (taskThread.getError() != null) {
throw new Exception("Task Thread failed due to an error.", taskThread.getError());
@@ -282,15 +279,15 @@ public class StreamTaskTestHarness<OUT> {
/**
* This only returns after all input queues are empty.
*/
- public void waitForInputProcessing() {
-
-
+ public void waitForInputProcessing() throws Exception {
// first wait for all input queues to be empty
- try {
- Thread.sleep(1);
- } catch (InterruptedException ignored) {}
-
+
while (true) {
+ Throwable error = taskThread.getError();
+ if (error != null) {
+ throw new Exception("Exception in the task thread", error);
+ }
+
boolean allEmpty = true;
for (int i = 0; i < numInputGates; i++) {
if (!inputGates[i].allQueuesEmpty()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/1a4fdfff/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index b9211b1..92f8553 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
@@ -290,6 +291,7 @@ public class TwoInputStreamTaskTest {
testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
+ expectedOutput.add(new CancelCheckpointMarker(0));
expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
expectedOutput.add(new CheckpointBarrier(1, 1));
[4/4] flink git commit: [FLINK-4975] [checkpointing] Add a limit for
how much data may be buffered in alignment.
Posted by se...@apache.org.
[FLINK-4975] [checkpointing] Add a limit for how much data may be buffered in alignment.
If more data than the defined amount is buffered, the alignment is aborted and the checkpoint canceled.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0962cb6f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0962cb6f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0962cb6f
Branch: refs/heads/release-1.1
Commit: 0962cb6f45607fb21d50030e325e99fc2c37164a
Parents: 1a4fdff
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 3 15:28:15 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 8 19:07:16 2016 +0100
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 15 +
.../streaming/runtime/io/BarrierBuffer.java | 48 ++-
.../streaming/runtime/io/BufferSpiller.java | 39 ++-
.../runtime/io/StreamInputProcessor.java | 17 +-
.../runtime/io/StreamTwoInputProcessor.java | 17 +-
.../runtime/tasks/OneInputStreamTask.java | 3 +-
.../runtime/tasks/TwoInputStreamTask.java | 3 +-
.../io/BarrierBufferAlignmentLimitTest.java | 315 +++++++++++++++++++
8 files changed, 441 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index d1ad1c4..d9ccb35 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -277,6 +277,14 @@ public final class ConfigConstants {
@PublicEvolving
public static final String TASK_CANCELLATION_TIMEOUT_MILLIS = "task.cancellation.timeout";
+ /**
+ * The maximum number of bytes that a checkpoint alignment may buffer.
+ * If the checkpoint alignment buffers more than the configured amount of
+ * data, the checkpoint is aborted (skipped).
+ */
+ @PublicEvolving
+ public static final String TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT = "task.checkpoint.alignment.max-size";
+
// --------------------------- Runtime Algorithms -------------------------------
/**
@@ -873,6 +881,13 @@ public final class ConfigConstants {
*/
public static final long DEFAULT_TASK_CANCELLATION_TIMEOUT_MILLIS = 0; // deactivated
+ /**
+ * The default for the maximum number of bytes that a checkpoint alignment may buffer.
+ * {@code -1} = infinite.
+ */
+ @PublicEvolving
+ public static final long DEFAULT_TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT = -1L;
+
// ------------------------ Runtime Algorithms ------------------------
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 7a8e7d3..c4cf98e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
@@ -36,6 +37,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayDeque;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/**
* The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
* all inputs have received the barrier for a given checkpoint.
@@ -65,6 +68,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
* further data from the input gate. */
private final ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence> queuedBuffered;
+ /** The maximum number of bytes that may be buffered before an alignment is broken. -1 means unlimited */
+ private final long maxBufferedBytes;
+
/** The sequence of buffers/events that has been unblocked and must now be consumed
* before requesting further data from the input gate */
private BufferSpiller.SpilledBufferOrEventSequence currentBuffered;
@@ -82,6 +88,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
/** The number of already closed channels */
private int numClosedChannels;
+ /** The number of bytes in the queued spilled sequences */
+ private long numQueuedBytes;
+
/** The timestamp as in {@link System#nanoTime()} at which the last alignment started */
private long startOfAlignmentTimestamp;
@@ -92,14 +101,37 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
private boolean endOfStream;
/**
+ * Creates a new checkpoint stream aligner.
+ *
+ * <p>There is no limit to how much data may be buffered during an alignment.
*
* @param inputGate The input gate to draw the buffers and events from.
* @param ioManager The I/O manager that gives access to the temp directories.
- *
+ *
* @throws IOException Thrown, when the spilling to temp files cannot be initialized.
*/
public BarrierBuffer(InputGate inputGate, IOManager ioManager) throws IOException {
+ this (inputGate, ioManager, -1);
+ }
+
+ /**
+ * Creates a new checkpoint stream aligner.
+ *
+ * <p>The aligner will allow only alignments that buffer up to the given number of bytes.
+ * When that number is exceeded, it will stop the alignment and notify the task that the
+ * checkpoint has been cancelled.
+ *
+ * @param inputGate The input gate to draw the buffers and events from.
+ * @param ioManager The I/O manager that gives access to the temp directories.
+ * @param maxBufferedBytes The maximum bytes to be buffered before the checkpoint aborts.
+ *
+ * @throws IOException Thrown, when the spilling to temp files cannot be initialized.
+ */
+ public BarrierBuffer(InputGate inputGate, IOManager ioManager, long maxBufferedBytes) throws IOException {
+ checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0);
+
this.inputGate = inputGate;
+ this.maxBufferedBytes = maxBufferedBytes;
this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
@@ -131,6 +163,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
if (isBlocked(next.getChannelIndex())) {
// if the channel is blocked we, we just store the BufferOrEvent
bufferSpiller.add(next);
+ checkSizeLimit();
}
else if (next.isBuffer()) {
return next;
@@ -169,6 +202,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
currentBuffered = queuedBuffered.pollFirst();
if (currentBuffered != null) {
currentBuffered.open();
+ numQueuedBytes -= currentBuffered.size();
}
}
@@ -333,6 +367,16 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
}
+ private void checkSizeLimit() throws Exception {
+ if (maxBufferedBytes > 0 && (numQueuedBytes + bufferSpiller.getBytesWritten()) > maxBufferedBytes) {
+ // exceeded our limit - abort this checkpoint
+ LOG.info("Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded",
+ currentCheckpointId, maxBufferedBytes);
+
+ releaseBlocksAndResetBarriers();
+ notifyAbort(currentCheckpointId, new AlignmentLimitExceededException(maxBufferedBytes));
+ }
+ }
@Override
public void registerCheckpointEventHandler(StatefulTask<?> toNotifyOnCheckpoint) {
@@ -359,6 +403,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
seq.cleanup();
}
queuedBuffered.clear();
+ numQueuedBytes = 0L;
}
private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException {
@@ -429,6 +474,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
if (bufferedNow != null) {
bufferedNow.open();
queuedBuffered.addFirst(currentBuffered);
+ numQueuedBytes += currentBuffered.size();
currentBuffered = bufferedNow;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index dc8d245..8060d02 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -84,9 +84,9 @@ public class BufferSpiller {
/** A counter, to created numbered spill files */
private int fileCounter;
-
- /** A flag to check whether the spiller has written since the last roll over */
- private boolean hasWritten;
+
+ /** The number of bytes written since the last roll over */
+ private long bytesWritten;
/**
* Creates a new buffer spiller, spilling to one of the I/O manager's temp directories.
@@ -124,7 +124,6 @@ public class BufferSpiller {
* @throws IOException Thrown, if the buffer of event could not be spilled.
*/
public void add(BufferOrEvent boe) throws IOException {
- hasWritten = true;
try {
ByteBuffer contents;
if (boe.isBuffer()) {
@@ -140,7 +139,9 @@ public class BufferSpiller {
headBuffer.putInt(contents.remaining());
headBuffer.put((byte) (boe.isBuffer() ? 0 : 1));
headBuffer.flip();
-
+
+ bytesWritten += (headBuffer.remaining() + contents.remaining());
+
sources[1] = contents;
currentChannel.write(sources);
}
@@ -186,7 +187,7 @@ public class BufferSpiller {
}
private SpilledBufferOrEventSequence rollOverInternal(boolean newBuffer) throws IOException {
- if (!hasWritten) {
+ if (bytesWritten == 0) {
return null;
}
@@ -205,8 +206,8 @@ public class BufferSpiller {
// create ourselves a new spill file
createSpillingChannel();
-
- hasWritten = false;
+
+ bytesWritten = 0L;
return seq;
}
@@ -225,6 +226,14 @@ public class BufferSpiller {
}
}
+ /**
+ * Gets the number of bytes written in the current spill file.
+ * @return the number of bytes written in the current spill file
+ */
+ public long getBytesWritten() {
+ return bytesWritten;
+ }
+
// ------------------------------------------------------------------------
// For testing
// ------------------------------------------------------------------------
@@ -268,6 +277,9 @@ public class BufferSpiller {
/** The byte buffer for bulk reading */
private final ByteBuffer buffer;
+ /** We store this size as a constant because it is crucial it never changes */
+ private final long size;
+
/** The page size to instantiate properly sized memory segments */
private final int pageSize;
@@ -282,11 +294,13 @@ public class BufferSpiller {
* @param buffer The buffer used for bulk reading.
* @param pageSize The page size to use for the created memory segments.
*/
- SpilledBufferOrEventSequence(File file, FileChannel fileChannel, ByteBuffer buffer, int pageSize) {
+ SpilledBufferOrEventSequence(File file, FileChannel fileChannel, ByteBuffer buffer, int pageSize)
+ throws IOException {
this.file = file;
this.fileChannel = fileChannel;
this.buffer = buffer;
this.pageSize = pageSize;
+ this.size = fileChannel.size();
}
/**
@@ -408,5 +422,12 @@ public class BufferSpiller {
throw new IOException("Cannot remove temp file for stream alignment writer");
}
}
+
+ /**
+ * Gets the size of this spilled sequence.
+ */
+ public long size() throws IOException {
+ return size;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 7d9e4d2..bcca2bb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -22,6 +22,9 @@ import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
@@ -87,12 +90,22 @@ public class StreamInputProcessor<IN> {
StatefulTask<?> checkpointListener,
CheckpointingMode checkpointMode,
IOManager ioManager,
- boolean enableWatermarkMultiplexing) throws IOException {
+ boolean enableWatermarkMultiplexing,
+ Configuration taskManagerConfig) throws IOException {
InputGate inputGate = InputGateUtil.createInputGate(inputGates);
if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
- this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+ long maxAlign = taskManagerConfig.getLong(
+ ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT,
+ ConfigConstants.DEFAULT_TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
+
+ if (!(maxAlign == -1 || maxAlign > 0)) {
+ throw new IllegalConfigurationException(
+ ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT
+ + " must be positive or -1 (infinite)");
+ }
+ this.barrierHandler = new BarrierBuffer(inputGate, ioManager, maxAlign);
}
else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
this.barrierHandler = new BarrierTracker(inputGate);
http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index a3ae077..f116aff 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -20,6 +20,9 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
@@ -97,12 +100,22 @@ public class StreamTwoInputProcessor<IN1, IN2> {
StatefulTask<?> checkpointListener,
CheckpointingMode checkpointMode,
IOManager ioManager,
- boolean enableWatermarkMultiplexing) throws IOException {
+ boolean enableWatermarkMultiplexing,
+ Configuration taskManagerConfig) throws IOException {
final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);
if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
- this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+ long maxAlign = taskManagerConfig.getLong(
+ ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT,
+ ConfigConstants.DEFAULT_TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
+
+ if (!(maxAlign == -1 || maxAlign > 0)) {
+ throw new IllegalConfigurationException(
+ ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT
+ + " must be positive or -1 (infinite)");
+ }
+ this.barrierHandler = new BarrierBuffer(inputGate, ioManager, maxAlign);
}
else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
this.barrierHandler = new BarrierTracker(inputGate);
http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index d18ca16..8470c7c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -46,7 +46,8 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
this,
configuration.getCheckpointMode(),
getEnvironment().getIOManager(),
- isSerializingTimestamps());
+ isSerializingTimestamps(),
+ getEnvironment().getTaskManagerInfo().getConfiguration());
// make sure that stream tasks report their I/O statistics
AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 9252063..8718b88 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -71,7 +71,8 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
this,
configuration.getCheckpointMode(),
getEnvironment().getIOManager(),
- isSerializingTimestamps());
+ isSerializingTimestamps(),
+ getEnvironment().getTaskManagerInfo().getConfiguration());
// make sure that stream tasks report their I/O statistics
AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
new file mode 100644
index 0000000..529f809
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the barrier buffer's maximum limit of buffered/spilled bytes
+ */
+public class BarrierBufferAlignmentLimitTest {
+
+ private static final int PAGE_SIZE = 512;
+
+ private static final Random RND = new Random();
+
+ private static IOManager IO_MANAGER;
+
+ // ------------------------------------------------------------------------
+ // Setup
+ // ------------------------------------------------------------------------
+
+ @BeforeClass
+ public static void setup() {
+ IO_MANAGER = new IOManagerAsync();
+ }
+
+ @AfterClass
+ public static void shutdownIOManager() {
+ IO_MANAGER.shutdown();
+ }
+
+ // ------------------------------------------------------------------------
+ // Tests
+ // ------------------------------------------------------------------------
+
+ /**
+ * This tests that a single alignment that buffers too much data cancels
+ */
+ @Test
+ public void testBreakCheckpointAtAlignmentLimit() throws Exception {
+ BufferOrEvent[] sequence = {
+ // some initial buffers
+ /* 0 */ createBuffer(1, 100), createBuffer(2, 70),
+ /* 2 */ createBuffer(0, 42), createBuffer(2, 111),
+
+ // starting a checkpoint
+ /* 4 */ createBarrier(7, 1),
+ /* 5 */ createBuffer(1, 100), createBuffer(2, 200), createBuffer(1, 300), createBuffer(0, 50),
+ /* 9 */ createBarrier(7, 0),
+ /* 10 */ createBuffer(2, 100), createBuffer(0, 100), createBuffer(1, 200), createBuffer(0, 200),
+
+ // this buffer makes the alignment spill too large
+ /* 14 */ createBuffer(0, 101),
+
+ // additional data
+ /* 15 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100),
+
+ // checkpoint completes - this should not result in a "completion notification"
+ /* 18 */ createBarrier(7, 2),
+
+ // trailing buffers
+ /* 19 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100)
+ };
+
+ // the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
+ MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER, 1000);
+
+ StatefulTask<?> toNotify = mock(StatefulTask.class);
+ buffer.registerCheckpointEventHandler(toNotify);
+
+ // validating the sequence of buffers
+
+ check(sequence[0], buffer.getNextNonBlocked());
+ check(sequence[1], buffer.getNextNonBlocked());
+ check(sequence[2], buffer.getNextNonBlocked());
+ check(sequence[3], buffer.getNextNonBlocked());
+
+ // start of checkpoint
+ long startTs = System.nanoTime();
+ check(sequence[6], buffer.getNextNonBlocked());
+ check(sequence[8], buffer.getNextNonBlocked());
+ check(sequence[10], buffer.getNextNonBlocked());
+
+ // trying to pull the next makes the alignment overflow - so buffered buffers are replayed
+ check(sequence[5], buffer.getNextNonBlocked());
+ validateAlignmentTime(startTs, buffer);
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(7L), any(AlignmentLimitExceededException.class));
+
+ // playing back buffered events
+ check(sequence[7], buffer.getNextNonBlocked());
+ check(sequence[11], buffer.getNextNonBlocked());
+ check(sequence[12], buffer.getNextNonBlocked());
+ check(sequence[13], buffer.getNextNonBlocked());
+ check(sequence[14], buffer.getNextNonBlocked());
+
+ // the additional data
+ check(sequence[15], buffer.getNextNonBlocked());
+ check(sequence[16], buffer.getNextNonBlocked());
+ check(sequence[17], buffer.getNextNonBlocked());
+
+ check(sequence[19], buffer.getNextNonBlocked());
+ check(sequence[20], buffer.getNextNonBlocked());
+ check(sequence[21], buffer.getNextNonBlocked());
+
+ // no call for a completed checkpoint must have happened
+ verify(toNotify, times(0)).triggerCheckpointOnBarrier(anyLong(), anyLong());
+
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+
+ buffer.cleanup();
+ checkNoTempFilesRemain();
+ }
+
+ /**
+ * This tests the following case:
+ * - an alignment starts
+ * - barriers from a second checkpoint queue before the first completes
+ * - together they are larger than the threshold
+ * - after the first checkpoint (with second checkpoint data queued) aborts, the second completes
+ */
+ @Test
+ public void testAlignmentLimitWithQueuedAlignments() throws Exception {
+ BufferOrEvent[] sequence = {
+ // some initial buffers
+ /* 0 */ createBuffer(1, 100), createBuffer(2, 70),
+
+ // starting a checkpoint
+ /* 2 */ createBarrier(3, 2),
+ /* 3 */ createBuffer(1, 100), createBuffer(2, 100),
+ /* 5 */ createBarrier(3, 0),
+ /* 6 */ createBuffer(0, 100), createBuffer(1, 100),
+
+ // queue some data from the next checkpoint
+ /* 8 */ createBarrier(4, 0),
+ /* 9 */ createBuffer(0, 100), createBuffer(0, 120), createBuffer(1, 100),
+
+ // this one makes the alignment overflow
+ /* 12 */ createBuffer(2, 100),
+
+ // checkpoint completed
+ /* 13 */ createBarrier(3, 1),
+
+ // more for the next checkpoint
+ /* 14 */ createBarrier(4, 1),
+ /* 15 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100),
+
+ // next checkpoint completes
+ /* 18 */ createBarrier(4, 2),
+
+ // trailing data
+ /* 19 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100)
+ };
+
+ // the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
+ MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER, 500);
+
+ StatefulTask<?> toNotify = mock(StatefulTask.class);
+ buffer.registerCheckpointEventHandler(toNotify);
+
+ // validating the sequence of buffers
+ long startTs;
+
+ check(sequence[0], buffer.getNextNonBlocked());
+ check(sequence[1], buffer.getNextNonBlocked());
+
+ // start of checkpoint
+ startTs = System.nanoTime();
+ check(sequence[3], buffer.getNextNonBlocked());
+ check(sequence[7], buffer.getNextNonBlocked());
+
+ // next checkpoint also in progress
+ check(sequence[11], buffer.getNextNonBlocked());
+
+ // checkpoint alignment aborted due to too much data
+ check(sequence[4], buffer.getNextNonBlocked());
+ validateAlignmentTime(startTs, buffer);
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), any(AlignmentLimitExceededException.class));
+
+ // replay buffered data - in the middle, the alignment for checkpoint 4 starts
+ check(sequence[6], buffer.getNextNonBlocked());
+ startTs = System.nanoTime();
+ check(sequence[12], buffer.getNextNonBlocked());
+
+ // only checkpoint 4 is pending now - the last checkpoint 3 barrier will not trigger success
+ check(sequence[17], buffer.getNextNonBlocked());
+
+ // checkpoint 4 completed - check and validate buffered replay
+ check(sequence[9], buffer.getNextNonBlocked());
+ validateAlignmentTime(startTs, buffer);
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(4L), anyLong());
+
+ check(sequence[10], buffer.getNextNonBlocked());
+ check(sequence[15], buffer.getNextNonBlocked());
+ check(sequence[16], buffer.getNextNonBlocked());
+
+ // trailing data
+ check(sequence[19], buffer.getNextNonBlocked());
+ check(sequence[20], buffer.getNextNonBlocked());
+ check(sequence[21], buffer.getNextNonBlocked());
+
+ // only checkpoint 4 was successfully completed, not checkpoint 3
+ verify(toNotify, times(0)).triggerCheckpointOnBarrier(eq(3L), anyLong());
+
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+
+ buffer.cleanup();
+ checkNoTempFilesRemain();
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static BufferOrEvent createBuffer(int channel, int size) {
+ byte[] bytes = new byte[size];
+ RND.nextBytes(bytes);
+
+ MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
+ memory.put(0, bytes);
+
+ Buffer buf = new Buffer(memory, FreeingBufferRecycler.INSTANCE);
+ buf.setSize(size);
+
+ // retain an additional time so it does not get disposed after being read by the input gate
+ buf.retain();
+
+ return new BufferOrEvent(buf, channel);
+ }
+
+ private static BufferOrEvent createBarrier(long id, int channel) {
+ return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
+ }
+
+ private static void check(BufferOrEvent expected, BufferOrEvent present) {
+ assertNotNull(expected);
+ assertNotNull(present);
+ assertEquals(expected.isBuffer(), present.isBuffer());
+
+ if (expected.isBuffer()) {
+ assertEquals(expected.getBuffer().getSize(), present.getBuffer().getSize());
+ MemorySegment expectedMem = expected.getBuffer().getMemorySegment();
+ MemorySegment presentMem = present.getBuffer().getMemorySegment();
+ assertTrue("memory contents differs", expectedMem.compare(presentMem, 0, 0, PAGE_SIZE) == 0);
+ }
+ else {
+ assertEquals(expected.getEvent(), present.getEvent());
+ }
+ }
+
+ private static void validateAlignmentTime(long startTimestamp, BarrierBuffer buffer) {
+ final long elapsed = System.nanoTime() - startTimestamp;
+ assertTrue("wrong alignment time", buffer.getAlignmentDurationNanos() <= elapsed);
+ }
+
+ private static void checkNoTempFilesRemain() {
+ // validate that all temp files have been removed
+ for (File dir : IO_MANAGER.getSpillingDirectories()) {
+ for (String file : dir.list()) {
+ if (file != null && !(file.equals(".") || file.equals(".."))) {
+ fail("barrier buffer did not clean up temp files. remaining file: " + file);
+ }
+ }
+ }
+ }
+}
[2/4] flink git commit: [FLINK-4984] [checkpointing] Add Cancellation
Barriers as a way to signal aborted checkpoints
Posted by se...@apache.org.
[FLINK-4984] [checkpointing] Add Cancellation Barriers as a way to signal aborted checkpoints
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1f028de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1f028de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1f028de
Branch: refs/heads/release-1.1
Commit: a1f028dee49928ada014632bb27216b36e30250e
Parents: 4dd3efe
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Oct 23 18:41:32 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 8 18:27:47 2016 +0100
----------------------------------------------------------------------
.../io/network/api/CancelCheckpointMarker.java | 77 +++
.../api/serialization/EventSerializer.java | 57 +-
.../runtime/io/network/netty/NettyMessage.java | 2 +-
.../partition/PipelinedSubpartition.java | 3 +-
.../runtime/jobgraph/tasks/StatefulTask.java | 21 +
.../api/serialization/EventSerializerTest.java | 45 +-
.../jobmanager/JobManagerHARecoveryTest.java | 10 +
.../runtime/taskmanager/TaskAsyncCallTest.java | 10 +
.../streaming/runtime/io/BarrierBuffer.java | 265 ++++++--
.../streaming/runtime/io/BarrierTracker.java | 164 +++--
.../streaming/runtime/io/BufferSpiller.java | 2 +-
.../runtime/io/CheckpointBarrierHandler.java | 22 +-
.../runtime/io/StreamInputProcessor.java | 5 +-
.../runtime/io/StreamTwoInputProcessor.java | 5 +-
.../runtime/tasks/OneInputStreamTask.java | 2 +-
.../streaming/runtime/tasks/OperatorChain.java | 32 +-
.../streaming/runtime/tasks/StreamTask.java | 48 +-
.../runtime/tasks/TwoInputStreamTask.java | 2 +-
.../streaming/runtime/io/BarrierBufferTest.java | 617 +++++++++++++++++--
.../runtime/io/BarrierTrackerTest.java | 157 ++++-
20 files changed, 1291 insertions(+), 255 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
new file mode 100644
index 0000000..52a2517
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CancelCheckpointMarker.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.RuntimeEvent;
+
+import java.io.IOException;
+
+/**
+ * The CancelCheckpointMarker travels through the data streams, similar to the {@link CheckpointBarrier},
+ * but signals that a certain checkpoint should be canceled. Any in-progress alignment for that
+ * checkpoint needs to be canceled and regular processing should be resumed.
+ */
+public class CancelCheckpointMarker extends RuntimeEvent {
+
+ /** The id of the checkpoint to be canceled */
+ private final long checkpointId;
+
+ public CancelCheckpointMarker(long checkpointId) {
+ this.checkpointId = checkpointId;
+ }
+
+ public long getCheckpointId() {
+ return checkpointId;
+ }
+
+ // ------------------------------------------------------------------------
+ // These known and common event go through special code paths, rather than
+ // through generic serialization
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ throw new UnsupportedOperationException("this method should never be called");
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ throw new UnsupportedOperationException("this method should never be called");
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return (int) (checkpointId ^ (checkpointId >>> 32));
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return other != null &&
+ other.getClass() == CancelCheckpointMarker.class &&
+ this.checkpointId == ((CancelCheckpointMarker) other).checkpointId;
+ }
+
+ @Override
+ public String toString() {
+ return "CancelCheckpointMarker " + checkpointId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index a34f8cf..0bc3b28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.api.serialization;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
@@ -38,7 +39,7 @@ import java.nio.ByteOrder;
* Utility class to serialize and deserialize task events.
*/
public class EventSerializer {
-
+
private static final int END_OF_PARTITION_EVENT = 0;
private static final int CHECKPOINT_BARRIER_EVENT = 1;
@@ -46,17 +47,19 @@ public class EventSerializer {
private static final int END_OF_SUPERSTEP_EVENT = 2;
private static final int OTHER_EVENT = 3;
-
+
+ private static final int CANCEL_CHECKPOINT_MARKER_EVENT = 4;
+
// ------------------------------------------------------------------------
-
- public static ByteBuffer toSerializedEvent(AbstractEvent event) {
+
+ public static ByteBuffer toSerializedEvent(AbstractEvent event) throws IOException {
final Class<?> eventClass = event.getClass();
if (eventClass == EndOfPartitionEvent.class) {
return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_PARTITION_EVENT });
}
else if (eventClass == CheckpointBarrier.class) {
CheckpointBarrier barrier = (CheckpointBarrier) event;
-
+
ByteBuffer buf = ByteBuffer.allocate(20);
buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
buf.putLong(4, barrier.getId());
@@ -66,32 +69,39 @@ public class EventSerializer {
else if (eventClass == EndOfSuperstepEvent.class) {
return ByteBuffer.wrap(new byte[] { 0, 0, 0, END_OF_SUPERSTEP_EVENT });
}
+ else if (eventClass == CancelCheckpointMarker.class) {
+ CancelCheckpointMarker marker = (CancelCheckpointMarker) event;
+
+ ByteBuffer buf = ByteBuffer.allocate(12);
+ buf.putInt(0, CANCEL_CHECKPOINT_MARKER_EVENT);
+ buf.putLong(4, marker.getCheckpointId());
+ return buf;
+ }
else {
try {
final DataOutputSerializer serializer = new DataOutputSerializer(128);
serializer.writeInt(OTHER_EVENT);
serializer.writeUTF(event.getClass().getName());
event.write(serializer);
-
return serializer.wrapAsByteBuffer();
}
catch (IOException e) {
- throw new RuntimeException("Error while serializing event.", e);
+ throw new IOException("Error while serializing event.", e);
}
}
}
- public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) {
+ public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader classLoader) throws IOException {
if (buffer.remaining() < 4) {
- throw new RuntimeException("Incomplete event");
+ throw new IOException("Incomplete event");
}
-
+
final ByteOrder bufferOrder = buffer.order();
buffer.order(ByteOrder.BIG_ENDIAN);
-
+
try {
int type = buffer.getInt();
-
+
if (type == END_OF_PARTITION_EVENT) {
return EndOfPartitionEvent.INSTANCE;
}
@@ -103,35 +113,38 @@ public class EventSerializer {
else if (type == END_OF_SUPERSTEP_EVENT) {
return EndOfSuperstepEvent.INSTANCE;
}
+ else if (type == CANCEL_CHECKPOINT_MARKER_EVENT) {
+ long id = buffer.getLong();
+ return new CancelCheckpointMarker(id);
+ }
else if (type == OTHER_EVENT) {
try {
final DataInputDeserializer deserializer = new DataInputDeserializer(buffer);
-
final String className = deserializer.readUTF();
-
+
final Class<? extends AbstractEvent> clazz;
try {
clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);
}
catch (ClassNotFoundException e) {
- throw new RuntimeException("Could not load event class '" + className + "'.", e);
+ throw new IOException("Could not load event class '" + className + "'.", e);
}
catch (ClassCastException e) {
- throw new RuntimeException("The class '" + className + "' is not a valid subclass of '"
+ throw new IOException("The class '" + className + "' is not a valid subclass of '"
+ AbstractEvent.class.getName() + "'.", e);
}
-
+
final AbstractEvent event = InstantiationUtil.instantiate(clazz, AbstractEvent.class);
event.read(deserializer);
-
+
return event;
}
catch (Exception e) {
- throw new RuntimeException("Error while deserializing or instantiating event.", e);
+ throw new IOException("Error while deserializing or instantiating event.", e);
}
}
else {
- throw new RuntimeException("Corrupt byte stream for event");
+ throw new IOException("Corrupt byte stream for event");
}
}
finally {
@@ -143,7 +156,7 @@ public class EventSerializer {
// Buffer helpers
// ------------------------------------------------------------------------
- public static Buffer toBuffer(AbstractEvent event) {
+ public static Buffer toBuffer(AbstractEvent event) throws IOException {
final ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event);
MemorySegment data = MemorySegmentFactory.wrap(serializedEvent.array());
@@ -154,7 +167,7 @@ public class EventSerializer {
return buffer;
}
- public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) {
+ public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) throws IOException {
return fromSerializedEvent(buffer.getNioBuffer(), classLoader);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 3a24181..6f6001b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -463,7 +463,7 @@ abstract class NettyMessage {
}
@Override
- public void readFrom(ByteBuf buffer) {
+ public void readFrom(ByteBuf buffer) throws IOException {
// TODO Directly deserialize fromNetty's buffer
int length = buffer.readInt();
ByteBuffer serializedEvent = ByteBuffer.allocate(length);
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 2d7097d..b703acb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.util.event.NotificationListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayDeque;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -88,7 +89,7 @@ class PipelinedSubpartition extends ResultSubpartition {
}
@Override
- public void finish() {
+ public void finish() throws IOException {
final NotificationListener listener;
synchronized (buffers) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index f8bba1a..7c581df 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -47,6 +47,27 @@ public interface StatefulTask<T extends StateHandle<?>> {
*/
boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
+ /**
+ * This method is called when a checkpoint is triggered as a result of receiving checkpoint
+ * barriers on all input streams.
+ *
+ * @param checkpointId The ID of the checkpoint, incrementing.
+ * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
+ *
+ * @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded.
+ */
+ void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception;
+
+ /**
+ * Aborts a checkpoint as the result of receiving possibly some checkpoint barriers,
+ * but at least one {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker}.
+ *
+ * <p>This requires implementing tasks to forward a
+ * {@link org.apache.flink.runtime.io.network.api.CancelCheckpointMarker} to their outputs.
+ *
+ * @param checkpointId The ID of the checkpoint to be aborted.
+ */
+ void abortCheckpointOnBarrier(long checkpointId) throws Exception;
/**
* Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index ddfd758..d47a0b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.api.serialization;
import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
@@ -28,34 +29,30 @@ import org.junit.Test;
import java.nio.ByteBuffer;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
public class EventSerializerTest {
@Test
- public void testSerializeDeserializeEvent() {
- try {
- AbstractEvent[] events = {
- EndOfPartitionEvent.INSTANCE,
- EndOfSuperstepEvent.INSTANCE,
- new CheckpointBarrier(1678L, 4623784L),
- new TestTaskEvent(Math.random(), 12361231273L)
- };
-
- for (AbstractEvent evt : events) {
- ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt);
- assertTrue(serializedEvent.hasRemaining());
-
- AbstractEvent deserialized =
- EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader());
- assertNotNull(deserialized);
- assertEquals(evt, deserialized);
- }
-
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ public void testSerializeDeserializeEvent() throws Exception {
+ AbstractEvent[] events = {
+ EndOfPartitionEvent.INSTANCE,
+ EndOfSuperstepEvent.INSTANCE,
+ new CheckpointBarrier(1678L, 4623784L),
+ new TestTaskEvent(Math.random(), 12361231273L),
+ new CancelCheckpointMarker(287087987329842L)
+ };
+
+ for (AbstractEvent evt : events) {
+ ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt);
+ assertTrue(serializedEvent.hasRemaining());
+
+ AbstractEvent deserialized =
+ EventSerializer.fromSerializedEvent(serializedEvent, getClass().getClassLoader());
+ assertNotNull(deserialized);
+ assertEquals(evt, deserialized);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index f050e29..4dfaf95 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -450,6 +450,16 @@ public class JobManagerHARecoveryTest {
}
@Override
+ public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception {
+ throw new UnsupportedOperationException("should not be called!");
+ }
+
+ @Override
+ public void abortCheckpointOnBarrier(long checkpointId) {
+ throw new UnsupportedOperationException("should not be called!");
+ }
+
+ @Override
public void notifyCheckpointComplete(long checkpointId) {
if (completedCheckpoints++ > NUM_CHECKPOINTS_TO_COMPLETE) {
completedCheckpointsLatch.countDown();
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 0c0d064..5b344eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -221,6 +221,16 @@ public class TaskAsyncCallTest {
}
@Override
+ public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception {
+ throw new UnsupportedOperationException("Should not be called");
+ }
+
+ @Override
+ public void abortCheckpointOnBarrier(long checkpointId) {
+ throw new UnsupportedOperationException("Should not be called");
+ }
+
+ @Override
public void notifyCheckpointComplete(long checkpointId) {
if (checkpointId != lastCheckpointId && this.error == null) {
this.error = new Exception("calls out of order");
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index dcd76c6..36de717 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -17,42 +17,43 @@
package org.apache.flink.streaming.runtime.io;
-import java.io.IOException;
-import java.util.ArrayDeque;
-
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayDeque;
+
/**
* The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
* all inputs have received the barrier for a given checkpoint.
*
* <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the
* BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until
- * the blocks are released.</p>
+ * the blocks are released.
*/
@Internal
public class BarrierBuffer implements CheckpointBarrierHandler {
private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
-
+
/** The gate that the buffer draws its input from */
private final InputGate inputGate;
/** Flags that indicate whether a channel is currently blocked/buffered */
private final boolean[] blockedChannels;
-
+
/** The total number of channels that this buffer handles data from */
private final int totalNumberOfInputChannels;
-
+
/** To utility to write blocked data to a file channel */
private final BufferSpiller bufferSpiller;
@@ -65,17 +66,24 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
private BufferSpiller.SpilledBufferOrEventSequence currentBuffered;
/** Handler that receives the checkpoint notifications */
- private EventListener<CheckpointBarrier> checkpointHandler;
+ private StatefulTask<?> toNotifyOnCheckpoint;
/** The ID of the checkpoint for which we expect barriers */
private long currentCheckpointId = -1L;
- /** The number of received barriers (= number of blocked/buffered channels) */
+ /** The number of received barriers (= number of blocked/buffered channels)
+ * IMPORTANT: A canceled checkpoint must always have 0 barriers */
private int numBarriersReceived;
-
+
/** The number of already closed channels */
private int numClosedChannels;
-
+
+ /** The timestamp as in {@link System#nanoTime()} at which the last alignment started */
+ private long startOfAlignmentTimestamp;
+
+ /** The time (in nanoseconds) that the latest alignment took */
+ private long latestAlignmentDurationNanos;
+
/** Flag to indicate whether we have drawn all available input */
private boolean endOfStream;
@@ -90,7 +98,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
this.inputGate = inputGate;
this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
-
+
this.bufferSpiller = new BufferSpiller(ioManager, inputGate.getPageSize());
this.queuedBuffered = new ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence>();
}
@@ -100,7 +108,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
// ------------------------------------------------------------------------
@Override
- public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+ public BufferOrEvent getNextNonBlocked() throws Exception {
while (true) {
// process buffered BufferOrEvents before grabbing new ones
BufferOrEvent next;
@@ -114,7 +122,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
return getNextNonBlocked();
}
}
-
+
if (next != null) {
if (isBlocked(next.getChannelIndex())) {
// if the channel is blocked we, we just store the BufferOrEvent
@@ -129,27 +137,29 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());
}
}
+ else if (next.getEvent().getClass() == CancelCheckpointMarker.class) {
+ processCancellationBarrier((CancelCheckpointMarker) next.getEvent());
+ }
else {
if (next.getEvent().getClass() == EndOfPartitionEvent.class) {
- numClosedChannels++;
- // no chance to complete this checkpoint
- releaseBlocks();
+ processEndOfPartition(next.getChannelIndex());
}
return next;
}
}
else if (!endOfStream) {
- // end of stream. we feed the data that is still buffered
+ // end of input stream. stream continues with the buffered data
endOfStream = true;
- releaseBlocks();
+ releaseBlocksAndResetBarriers();
return getNextNonBlocked();
}
else {
+ // final end of both input and buffered data
return null;
}
}
}
-
+
private void completeBufferedSequence() throws IOException {
currentBuffered.cleanup();
currentBuffered = queuedBuffered.pollFirst();
@@ -157,66 +167,175 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
currentBuffered.open();
}
}
-
- private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws IOException {
+
+ private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
final long barrierId = receivedBarrier.getId();
+ // fast path for single channel cases
+ if (totalNumberOfInputChannels == 1) {
+ if (barrierId > currentCheckpointId) {
+ // new checkpoint
+ currentCheckpointId = barrierId;
+ notifyCheckpoint(receivedBarrier);
+ }
+ return;
+ }
+
+ // -- general code path for multiple input channels --
+
if (numBarriersReceived > 0) {
- // subsequent barrier of a checkpoint.
+ // this is only true if some alignment is already progress and was not canceled
+
if (barrierId == currentCheckpointId) {
// regular case
onBarrier(channelIndex);
}
else if (barrierId > currentCheckpointId) {
- // we did not complete the current checkpoint
+ // we did not complete the current checkpoint, another started before
LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.", barrierId, currentCheckpointId);
- releaseBlocks();
- currentCheckpointId = barrierId;
- onBarrier(channelIndex);
+ // let the task know we are not completing this
+ notifyAbort(currentCheckpointId);
+
+ // abort the current checkpoint
+ releaseBlocksAndResetBarriers();
+
+ // begin a the new checkpoint
+ beginNewAlignment(barrierId, channelIndex);
}
else {
- // ignore trailing barrier from aborted checkpoint
+ // ignore trailing barrier from an earlier checkpoint (obsolete now)
return;
}
-
}
else if (barrierId > currentCheckpointId) {
// first barrier of a new checkpoint
- currentCheckpointId = barrierId;
- onBarrier(channelIndex);
+ beginNewAlignment(barrierId, channelIndex);
}
else {
- // trailing barrier from previous (skipped) checkpoint
+ // either the current checkpoint was canceled (numBarriers == 0) or
+ // this barrier is from an old subsumed checkpoint
return;
}
- // check if we have all barriers
+ // check if we have all barriers - since canceled checkpoints always have zero barriers
+ // this can only happen on a non canceled checkpoint
if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
+ // actually trigger checkpoint
if (LOG.isDebugEnabled()) {
- LOG.debug("Received all barrier, triggering checkpoint {} at {}",
+ LOG.debug("Received all barriers, triggering checkpoint {} at {}",
receivedBarrier.getId(), receivedBarrier.getTimestamp());
}
- if (checkpointHandler != null) {
- checkpointHandler.onEvent(receivedBarrier);
+ releaseBlocksAndResetBarriers();
+ notifyCheckpoint(receivedBarrier);
+ }
+ }
+
+ private void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
+ final long barrierId = cancelBarrier.getCheckpointId();
+
+ // fast path for single channel cases
+ if (totalNumberOfInputChannels == 1) {
+ if (barrierId > currentCheckpointId) {
+ // new checkpoint
+ currentCheckpointId = barrierId;
+ notifyAbort(barrierId);
}
-
- releaseBlocks();
+ return;
+ }
+
+ // -- general code path for multiple input channels --
+
+ if (numBarriersReceived > 0) {
+ // this is only true if some alignment is in progress and nothing was canceled
+
+ if (barrierId == currentCheckpointId) {
+ // cancel this alignment
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checkpoint {} canceled, aborting alignment", barrierId);
+ }
+
+ releaseBlocksAndResetBarriers();
+ notifyAbort(barrierId);
+ }
+ else if (barrierId > currentCheckpointId) {
+ // we canceled the next which also cancels the current
+ LOG.warn("Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " +
+ "Skipping current checkpoint.", barrierId, currentCheckpointId);
+
+ // this stops the current alignment
+ releaseBlocksAndResetBarriers();
+
+ // the next checkpoint starts as canceled
+ currentCheckpointId = barrierId;
+ startOfAlignmentTimestamp = 0L;
+ latestAlignmentDurationNanos = 0L;
+ notifyAbort(barrierId);
+ }
+
+ // else: ignore trailing (cancellation) barrier from an earlier checkpoint (obsolete now)
+
+ }
+ else if (barrierId > currentCheckpointId) {
+ // first barrier of a new checkpoint is directly a cancellation
+
+ // by setting the currentCheckpointId to this checkpoint while keeping the numBarriers
+ // at zero means that no checkpoint barrier can start a new alignment
+ currentCheckpointId = barrierId;
+
+ startOfAlignmentTimestamp = 0L;
+ latestAlignmentDurationNanos = 0L;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Checkpoint {} canceled, skipping alignment", barrierId);
+ }
+
+ notifyAbort(barrierId);
}
+
+ // else: trailing barrier from either
+ // - a previous (subsumed) checkpoint
+ // - the current checkpoint if it was already canceled
}
-
+
+ private void processEndOfPartition(int channel) throws Exception {
+ numClosedChannels++;
+
+ if (numBarriersReceived > 0) {
+ // let the task know we skip a checkpoint
+ notifyAbort(currentCheckpointId);
+
+ // no chance to complete this checkpoint
+ releaseBlocksAndResetBarriers();
+ }
+ }
+
+ private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws Exception {
+ if (toNotifyOnCheckpoint != null) {
+ toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+ checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
+ }
+ }
+
+ private void notifyAbort(long checkpointId) throws Exception {
+ if (toNotifyOnCheckpoint != null) {
+ toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+ }
+ }
+
+
@Override
- public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
- if (this.checkpointHandler == null) {
- this.checkpointHandler = checkpointHandler;
+ public void registerCheckpointEventHandler(StatefulTask<?> toNotifyOnCheckpoint) {
+ if (this.toNotifyOnCheckpoint == null) {
+ this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
}
else {
- throw new IllegalStateException("BarrierBuffer already has a registered checkpoint handler");
+ throw new IllegalStateException("BarrierBuffer already has a registered checkpoint notifyee");
}
}
-
+
@Override
public boolean isEmpty() {
return currentBuffered == null;
@@ -231,8 +350,20 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
for (BufferSpiller.SpilledBufferOrEventSequence seq : queuedBuffered) {
seq.cleanup();
}
+ queuedBuffered.clear();
}
-
+
+ private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException {
+ currentCheckpointId = checkpointId;
+ onBarrier(channelIndex);
+
+ startOfAlignmentTimestamp = System.nanoTime();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Starting stream alignment for checkpoint " + checkpointId);
+ }
+ }
+
/**
* Checks whether the channel with the given index is blocked.
*
@@ -242,7 +373,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
private boolean isBlocked(int channelIndex) {
return blockedChannels[channelIndex];
}
-
+
/**
* Blocks the given channel index, from which a barrier has been received.
*
@@ -251,30 +382,28 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
private void onBarrier(int channelIndex) throws IOException {
if (!blockedChannels[channelIndex]) {
blockedChannels[channelIndex] = true;
+
numBarriersReceived++;
-
+
if (LOG.isDebugEnabled()) {
LOG.debug("Received barrier from channel " + channelIndex);
}
}
else {
- throw new IOException("Stream corrupt: Repeated barrier for same checkpoint and input stream");
+ throw new IOException("Stream corrupt: Repeated barrier for same checkpoint on input " + channelIndex);
}
}
/**
- * Releases the blocks on all channels. Makes sure the just written data
- * is the next to be consumed.
+ * Releases the blocks on all channels and resets the barrier count.
+ * Makes sure the just written data is the next to be consumed.
*/
- private void releaseBlocks() throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Releasing blocks");
- }
+ private void releaseBlocksAndResetBarriers() throws IOException {
+ LOG.debug("End of stream alignment, feeding buffered data back");
for (int i = 0; i < blockedChannels.length; i++) {
blockedChannels[i] = false;
}
- numBarriersReceived = 0;
if (currentBuffered == null) {
// common case: no more buffered data
@@ -295,10 +424,18 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
currentBuffered = bufferedNow;
}
}
+
+ // the next barrier that comes must assume it is the first
+ numBarriersReceived = 0;
+
+ if (startOfAlignmentTimestamp > 0) {
+ latestAlignmentDurationNanos = System.nanoTime() - startOfAlignmentTimestamp;
+ startOfAlignmentTimestamp = 0;
+ }
}
// ------------------------------------------------------------------------
- // For Testing
+ // Properties
// ------------------------------------------------------------------------
/**
@@ -309,7 +446,17 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
public long getCurrentCheckpointId() {
return this.currentCheckpointId;
}
-
+
+ @Override
+ public long getAlignmentDurationNanos() {
+ long start = this.startOfAlignmentTimestamp;
+ if (start <= 0) {
+ return latestAlignmentDurationNanos;
+ } else {
+ return System.nanoTime() - start;
+ }
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 9c9ec4f..5157336 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -19,12 +19,12 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-import java.io.IOException;
import java.util.ArrayDeque;
/**
@@ -34,9 +34,9 @@ import java.util.ArrayDeque;
*
* <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the input
* channels that have sent barriers, so it cannot be used to gain "exactly-once" processing
- * guarantees. It can, however, be used to gain "at least once" processing guarantees.</p>
+ * guarantees. It can, however, be used to gain "at least once" processing guarantees.
*
- * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs.</p>
+ * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs.
*/
@Internal
public class BarrierTracker implements CheckpointBarrierHandler {
@@ -57,11 +57,12 @@ public class BarrierTracker implements CheckpointBarrierHandler {
private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
/** The listener to be notified on complete checkpoints */
- private EventListener<CheckpointBarrier> checkpointHandler;
+ private StatefulTask<?> toNotifyOnCheckpoint;
/** The highest checkpoint ID encountered so far */
private long latestPendingCheckpointID = -1;
-
+
+ // ------------------------------------------------------------------------
public BarrierTracker(InputGate inputGate) {
this.inputGate = inputGate;
@@ -70,28 +71,33 @@ public class BarrierTracker implements CheckpointBarrierHandler {
}
@Override
- public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+ public BufferOrEvent getNextNonBlocked() throws Exception {
while (true) {
BufferOrEvent next = inputGate.getNextBufferOrEvent();
- if (next == null) {
- return null;
- }
- else if (next.isBuffer() || next.getEvent().getClass() != CheckpointBarrier.class) {
+ if (next == null || next.isBuffer()) {
+ // buffer or input exhausted
return next;
}
- else {
+ else if (next.getEvent().getClass() == CheckpointBarrier.class) {
processBarrier((CheckpointBarrier) next.getEvent());
}
+ else if (next.getEvent().getClass() == CancelCheckpointMarker.class) {
+ processCheckpointAbortBarrier((CancelCheckpointMarker) next.getEvent());
+ }
+ else {
+ // some other event
+ return next;
+ }
}
}
@Override
- public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
- if (this.checkpointHandler == null) {
- this.checkpointHandler = checkpointHandler;
+ public void registerCheckpointEventHandler(StatefulTask<?> toNotifyOnCheckpoint) {
+ if (this.toNotifyOnCheckpoint == null) {
+ this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
}
else {
- throw new IllegalStateException("BarrierTracker already has a registered checkpoint handler");
+ throw new IllegalStateException("BarrierTracker already has a registered checkpoint notifyee");
}
}
@@ -105,22 +111,27 @@ public class BarrierTracker implements CheckpointBarrierHandler {
return pendingCheckpoints.isEmpty();
}
- private void processBarrier(CheckpointBarrier receivedBarrier) {
+ @Override
+ public long getAlignmentDurationNanos() {
+ // this one does not do alignment at all
+ return 0L;
+ }
+
+ private void processBarrier(CheckpointBarrier receivedBarrier) throws Exception {
+ final long barrierId = receivedBarrier.getId();
+
// fast path for single channel trackers
if (totalNumberOfInputChannels == 1) {
- if (checkpointHandler != null) {
- checkpointHandler.onEvent(receivedBarrier);
- }
+ notifyCheckpoint(barrierId, receivedBarrier.getTimestamp());
return;
}
-
+
// general path for multiple input channels
- final long barrierId = receivedBarrier.getId();
// find the checkpoint barrier in the queue of bending barriers
CheckpointBarrierCount cbc = null;
int pos = 0;
-
+
for (CheckpointBarrierCount next : pendingCheckpoints) {
if (next.checkpointId == barrierId) {
cbc = next;
@@ -128,21 +139,21 @@ public class BarrierTracker implements CheckpointBarrierHandler {
}
pos++;
}
-
+
if (cbc != null) {
// add one to the count to that barrier and check for completion
int numBarriersNew = cbc.incrementBarrierCount();
if (numBarriersNew == totalNumberOfInputChannels) {
- // checkpoint can be triggered
+ // checkpoint can be triggered (or is aborted and all barriers have been seen)
// first, remove this checkpoint and all all prior pending
// checkpoints (which are now subsumed)
for (int i = 0; i <= pos; i++) {
pendingCheckpoints.pollFirst();
}
-
+
// notify the listener
- if (checkpointHandler != null) {
- checkpointHandler.onEvent(receivedBarrier);
+ if (!cbc.isAborted()) {
+ notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp());
}
}
}
@@ -163,45 +174,104 @@ public class BarrierTracker implements CheckpointBarrierHandler {
}
}
+ private void processCheckpointAbortBarrier(CancelCheckpointMarker barrier) throws Exception {
+ final long checkpointId = barrier.getCheckpointId();
+
+ // fast path for single channel trackers
+ if (totalNumberOfInputChannels == 1) {
+ notifyAbort(checkpointId);
+ return;
+ }
+
+ // -- general path for multiple input channels --
+
+ // find the checkpoint barrier in the queue of pending barriers
+ // while doing this we "abort" all checkpoints before that one
+ CheckpointBarrierCount cbc;
+ while ((cbc = pendingCheckpoints.peekFirst()) != null && cbc.checkpointId() < checkpointId) {
+ pendingCheckpoints.removeFirst();
+ }
+
+ if (cbc != null && cbc.checkpointId() == checkpointId) {
+ // make sure the checkpoint is remembered as aborted
+ if (cbc.markAborted()) {
+ // this was the first time the checkpoint was aborted - notify
+ notifyAbort(checkpointId);
+ }
+
+ // we still count the barriers to be able to remove the entry once all barriers have been seen
+ if (cbc.incrementBarrierCount() == totalNumberOfInputChannels) {
+ // we can remove this entry
+ pendingCheckpoints.removeFirst();
+ }
+ }
+ else {
+ notifyAbort(checkpointId);
+
+ // first barrier for this checkpoint - remember it as aborted
+ // since we polled away all entries with lower checkpoint IDs
+ // this entry will become the new first entry
+ if (pendingCheckpoints.size() < MAX_CHECKPOINTS_TO_TRACK) {
+ CheckpointBarrierCount abortedMarker = new CheckpointBarrierCount(checkpointId);
+ abortedMarker.markAborted();
+ pendingCheckpoints.addFirst(abortedMarker);
+ }
+ }
+ }
+
+ private void notifyCheckpoint(long checkpointId, long timestamp) throws Exception {
+ if (toNotifyOnCheckpoint != null) {
+ toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointId, timestamp);
+ }
+ }
+
+ private void notifyAbort(long checkpointId) throws Exception {
+ if (toNotifyOnCheckpoint != null) {
+ toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId);
+ }
+ }
+
// ------------------------------------------------------------------------
/**
* Simple class for a checkpoint ID with a barrier counter.
*/
private static final class CheckpointBarrierCount {
-
+
private final long checkpointId;
-
+
private int barrierCount;
-
- private CheckpointBarrierCount(long checkpointId) {
+
+ private boolean aborted;
+
+ CheckpointBarrierCount(long checkpointId) {
this.checkpointId = checkpointId;
this.barrierCount = 1;
}
+ public long checkpointId() {
+ return checkpointId;
+ }
+
public int incrementBarrierCount() {
return ++barrierCount;
}
-
- @Override
- public int hashCode() {
- return (int) ((checkpointId >>> 32) ^ checkpointId) + 17 * barrierCount;
+
+ public boolean isAborted() {
+ return aborted;
}
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof CheckpointBarrierCount) {
- CheckpointBarrierCount that = (CheckpointBarrierCount) obj;
- return this.checkpointId == that.checkpointId && this.barrierCount == that.barrierCount;
- }
- else {
- return false;
- }
+ public boolean markAborted() {
+ boolean firstAbort = !this.aborted;
+ this.aborted = true;
+ return firstAbort;
}
@Override
public String toString() {
- return String.format("checkpointID=%d, count=%d", checkpointId, barrierCount);
+ return isAborted() ?
+ String.format("checkpointID=%d - ABORTED", checkpointId) :
+ String.format("checkpointID=%d, count=%d", checkpointId, barrierCount);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 1b38a56..dc8d245 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -134,7 +134,7 @@ public class BufferSpiller {
else {
contents = EventSerializer.toSerializedEvent(boe.getEvent());
}
-
+
headBuffer.clear();
headBuffer.putInt(boe.getChannelIndex());
headBuffer.putInt(contents.remaining());
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index 5aa2030..ca23491 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -20,8 +20,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import java.io.IOException;
@@ -43,14 +42,14 @@ public interface CheckpointBarrierHandler {
* @throws java.lang.InterruptedException Thrown if the thread is interrupted while blocking during
* waiting for the next BufferOrEvent to become available.
*/
- BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException;
+ BufferOrEvent getNextNonBlocked() throws Exception;
/**
- * Registers the given event handler to be notified on successful checkpoints.
- *
- * @param checkpointHandler The handler to register.
+ * Registers the task be notified once all checkpoint barriers have been received for a checkpoint.
+ *
+ * @param task The task to notify
*/
- void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler);
+ void registerCheckpointEventHandler(StatefulTask<?> task);
/**
* Cleans up all internally held resources.
@@ -64,4 +63,13 @@ public interface CheckpointBarrierHandler {
* @return {@code True}, if no data is buffered internally, {@code false} otherwise.
*/
boolean isEmpty();
+
+ /**
+ * Gets the time that the latest alignment took, in nanoseconds.
+ * If there is currently an alignment in progress, it will return the time spent in the
+ * current alignment so far.
+ *
+ * @return The duration in nanoseconds
+ */
+ long getAlignmentDurationNanos();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index d11990e..7d9e4d2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -24,6 +24,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
@@ -37,7 +38,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -45,7 +45,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
/**
* Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
@@ -85,7 +84,7 @@ public class StreamInputProcessor<IN> {
@SuppressWarnings("unchecked")
public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
- EventListener<CheckpointBarrier> checkpointListener,
+ StatefulTask<?> checkpointListener,
CheckpointingMode checkpointMode,
IOManager ioManager,
boolean enableWatermarkMultiplexing) throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index ce764b7..a3ae077 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
@@ -34,7 +35,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import java.io.IOException;
import java.util.Arrays;
@@ -95,7 +94,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
Collection<InputGate> inputGates2,
TypeSerializer<IN1> inputSerializer1,
TypeSerializer<IN2> inputSerializer2,
- EventListener<CheckpointBarrier> checkpointListener,
+ StatefulTask<?> checkpointListener,
CheckpointingMode checkpointMode,
IOManager ioManager,
boolean enableWatermarkMultiplexing) throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 938d8c1..d18ca16 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -43,7 +43,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
if (numberOfInputs > 0) {
InputGate[] inputGates = getEnvironment().getAllInputGates();
inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer,
- getCheckpointBarrierListener(),
+ this,
configuration.getCheckpointMode(),
getEnvironment().getIOManager(),
isSerializingTimestamps());
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 0e24516..351acaa 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
@@ -126,15 +127,32 @@ public class OperatorChain<OUT> {
}
}
-
-
- public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException, InterruptedException {
- CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
- for (RecordWriterOutput<?> streamOutput : streamOutputs) {
- streamOutput.broadcastEvent(barrier);
+
+
+ public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException {
+ try {
+ CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
+ for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+ streamOutput.broadcastEvent(barrier);
+ }
+ }
+ catch (InterruptedException e) {
+ throw new IOException("Interrupted while broadcasting checkpoint barrier");
}
}
-
+
+ public void broadcastCheckpointCancelMarker(long id) throws IOException {
+ try {
+ CancelCheckpointMarker barrier = new CancelCheckpointMarker(id);
+ for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+ streamOutput.broadcastEvent(barrier);
+ }
+ }
+ catch (InterruptedException e) {
+ throw new IOException("Interrupted while broadcasting checkpoint cancellation");
+ }
+ }
+
public RecordWriterOutput<?>[] getStreamOutputs() {
return streamOutputs;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 8f28cef..d55a9c5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -37,7 +36,6 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
-import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
@@ -580,9 +578,34 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
}
- protected boolean performCheckpoint(final long checkpointId, final long timestamp) throws Exception {
+ @Override
+ public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception {
+ try {
+ performCheckpoint(checkpointId, timestamp);
+ }
+ catch (CancelTaskException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new Exception("Error while performing a checkpoint", e);
+ }
+ }
+
+ @Override
+ public void abortCheckpointOnBarrier(long checkpointId) throws Exception {
+ LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", checkpointId, getName());
+
+ synchronized (lock) {
+ if (isRunning) {
+ operatorChain.broadcastCheckpointCancelMarker(checkpointId);
+ }
+ }
+ }
+
+ private boolean performCheckpoint(final long checkpointId, final long timestamp) throws Exception {
+
LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
-
+
synchronized (lock) {
if (isRunning) {
@@ -759,23 +782,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
return getName();
}
- protected final EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
- return new EventListener<CheckpointBarrier>() {
- @Override
- public void onEvent(CheckpointBarrier barrier) {
- try {
- performCheckpoint(barrier.getId(), barrier.getTimestamp());
- }
- catch (CancelTaskException e) {
- throw e;
- }
- catch (Exception e) {
- throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
- }
- }
- };
- }
-
// ------------------------------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index c3305eb..9252063 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -68,7 +68,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2,
inputDeserializer1, inputDeserializer2,
- getCheckpointBarrierListener(),
+ this,
configuration.getCheckpointMode(),
getEnvironment().getIOManager(),
isSerializingTimestamps());