You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/01 14:41:11 UTC

[flink] 02/16: [hotfix][network] Make toNotifyOnCheckpoint field final in ChekpointBarrierHandlers

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3adc3eb2a6ece13f3befddd41f508d01bde067d7
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Jun 13 15:07:01 2019 +0200

    [hotfix][network] Make toNotifyOnCheckpoint field final in ChekpointBarrierHandlers
---
 .../flink/streaming/runtime/io/BarrierBuffer.java  | 27 ++++----
 .../streaming/runtime/io/BarrierDiscarder.java     | 15 -----
 .../flink/streaming/runtime/io/BarrierTracker.java | 19 +++---
 .../runtime/io/CheckpointBarrierHandler.java       |  9 ---
 .../streaming/runtime/io/InputProcessorUtil.java   | 10 ++-
 .../io/BarrierBufferAlignmentLimitTest.java        | 18 ++++--
 .../runtime/io/BarrierBufferTestBase.java          | 73 ++++++++++------------
 .../streaming/runtime/io/BarrierTrackerTest.java   | 59 ++++++++---------
 .../runtime/io/CreditBasedBarrierBufferTest.java   |  7 ++-
 .../runtime/io/SpillingBarrierBufferTest.java      |  7 ++-
 10 files changed, 105 insertions(+), 139 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index ad62360..b2e6ea1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -33,6 +33,8 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Optional;
@@ -86,8 +88,8 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	 */
 	private BufferOrEventSequence currentBuffered;
 
-	/** Handler that receives the checkpoint notifications. */
-	private AbstractInvokable toNotifyOnCheckpoint;
+	@Nullable
+	private final AbstractInvokable toNotifyOnCheckpoint;
 
 	/** The ID of the checkpoint for which we expect barriers. */
 	private long currentCheckpointId = -1L;
@@ -127,7 +129,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	 */
 	@VisibleForTesting
 	BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage) {
-		this (inputGate, bufferStorage, -1, "Testing: No task associated");
+		this (inputGate, bufferStorage, -1, "Testing: No task associated", null);
 	}
 
 	/**
@@ -141,8 +143,14 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	 * @param bufferStorage The storage to hold the buffers and events for blocked channels.
 	 * @param maxBufferedBytes The maximum bytes to be buffered before the checkpoint aborts.
 	 * @param taskName The task name for logging.
+	 * @param toNotifyOnCheckpoint optional Handler that receives the checkpoint notifications.
 	 */
-	BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage, long maxBufferedBytes, String taskName) {
+	BarrierBuffer(
+			InputGate inputGate,
+			BufferStorage bufferStorage,
+			long maxBufferedBytes,
+			String taskName,
+			@Nullable AbstractInvokable toNotifyOnCheckpoint) {
 		checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0);
 
 		this.inputGate = inputGate;
@@ -154,6 +162,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		this.queuedBuffered = new ArrayDeque<BufferOrEventSequence>();
 
 		this.taskName = taskName;
+		this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
 	}
 
 	@Override
@@ -452,16 +461,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	}
 
 	@Override
-	public void registerCheckpointEventHandler(AbstractInvokable toNotifyOnCheckpoint) {
-		if (this.toNotifyOnCheckpoint == null) {
-			this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
-		}
-		else {
-			throw new IllegalStateException("BarrierBuffer already has a registered checkpoint notifyee");
-		}
-	}
-
-	@Override
 	public boolean isEmpty() {
 		return currentBuffered == null;
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java
index e8d9f34..c33c940 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -45,10 +44,6 @@ public class BarrierDiscarder implements CheckpointBarrierHandler {
 	 */
 	private final int totalNumberOfInputChannels;
 
-
-	/** The listener to be notified on complete checkpoints. */
-	private AbstractInvokable toNotifyOnCheckpoint;
-
 	// ------------------------------------------------------------------------
 
 	public BarrierDiscarder(InputGate inputGate) {
@@ -88,16 +83,6 @@ public class BarrierDiscarder implements CheckpointBarrierHandler {
 	}
 
 	@Override
-	public void registerCheckpointEventHandler(AbstractInvokable toNotifyOnCheckpoint) {
-		if (this.toNotifyOnCheckpoint == null) {
-			this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
-		}
-		else {
-			throw new IllegalStateException("BarrierDiscarder already has a registered checkpoint notifyee");
-		}
-	}
-
-	@Override
 	public void cleanup() {
 
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 49d2991..f7629bb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -33,6 +33,8 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayDeque;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -77,7 +79,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 	private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
 
 	/** The listener to be notified on complete checkpoints. */
-	private AbstractInvokable toNotifyOnCheckpoint;
+	private final AbstractInvokable toNotifyOnCheckpoint;
 
 	/** The highest checkpoint ID encountered so far. */
 	private long latestPendingCheckpointID = -1;
@@ -85,9 +87,14 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 	// ------------------------------------------------------------------------
 
 	public BarrierTracker(InputGate inputGate) {
+		this(inputGate, null);
+	}
+
+	public BarrierTracker(InputGate inputGate, @Nullable AbstractInvokable toNotifyOnCheckpoint) {
 		this.inputGate = inputGate;
 		this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
 		this.pendingCheckpoints = new ArrayDeque<>();
+		this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
 	}
 
 	@Override
@@ -127,16 +134,6 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 	}
 
 	@Override
-	public void registerCheckpointEventHandler(AbstractInvokable toNotifyOnCheckpoint) {
-		if (this.toNotifyOnCheckpoint == null) {
-			this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
-		}
-		else {
-			throw new IllegalStateException("BarrierTracker already has a registered checkpoint notifyee");
-		}
-	}
-
-	@Override
 	public void cleanup() {
 		pendingCheckpoints.clear();
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index faffd44..2ee1a97 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.io.AsyncDataInput;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 
 import java.io.IOException;
 
@@ -32,14 +31,6 @@ import java.io.IOException;
  */
 @Internal
 public interface CheckpointBarrierHandler extends AsyncDataInput<BufferOrEvent> {
-
-	/**
-	 * Registers the task be notified once all checkpoint barriers have been received for a checkpoint.
-	 *
-	 * @param task The task to notify
-	 */
-	void registerCheckpointEventHandler(AbstractInvokable task);
-
 	/**
 	 * Cleans up all internally held resources.
 	 *
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index 289dd1a..ebef48f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -58,13 +58,15 @@ public class InputProcessorUtil {
 					inputGate,
 					new CachedBufferStorage(inputGate.getPageSize()),
 					maxAlign,
-					taskName);
+					taskName,
+					checkpointedTask);
 			} else {
 				barrierHandler = new BarrierBuffer(
 					inputGate,
 					new BufferSpiller(ioManager, inputGate.getPageSize()),
 					maxAlign,
-					taskName);
+					taskName,
+					checkpointedTask);
 			}
 		} else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
 			barrierHandler = new BarrierTracker(inputGate);
@@ -72,10 +74,6 @@ public class InputProcessorUtil {
 			throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode);
 		}
 
-		if (checkpointedTask != null) {
-			barrierHandler.registerCheckpointEventHandler(checkpointedTask);
-		}
-
 		return barrierHandler;
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
index 0a284e1..8c97938 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -115,10 +115,13 @@ public class BarrierBufferAlignmentLimitTest {
 
 		// the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
 		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-		BarrierBuffer buffer = new BarrierBuffer(gate, new BufferSpiller(ioManager, gate.getPageSize()), 1000, "Testing");
-
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
-		buffer.registerCheckpointEventHandler(toNotify);
+		BarrierBuffer buffer = new BarrierBuffer(
+			gate,
+			new BufferSpiller(ioManager, gate.getPageSize()),
+			1000,
+			"Testing",
+			toNotify);
 
 		// validating the sequence of buffers
 
@@ -210,10 +213,13 @@ public class BarrierBufferAlignmentLimitTest {
 
 		// the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
 		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-		BarrierBuffer buffer = new BarrierBuffer(gate, new BufferSpiller(ioManager, gate.getPageSize()), 500, "Testing");
-
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
-		buffer.registerCheckpointEventHandler(toNotify);
+		BarrierBuffer buffer = new BarrierBuffer(
+			gate,
+			new BufferSpiller(ioManager, gate.getPageSize()),
+			500,
+			"Testing",
+			toNotify);
 
 		// validating the sequence of buffers
 		long startTs;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
index 4bc05ff..908a199 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
@@ -41,6 +41,8 @@ import org.hamcrest.Description;
 import org.junit.After;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Random;
@@ -70,12 +72,23 @@ public abstract class BarrierBufferTestBase {
 
 	BarrierBuffer buffer;
 
-	protected BarrierBuffer createBarrierBuffer(int numberOfChannels, BufferOrEvent[] sequence) throws IOException {
+	protected BarrierBuffer createBarrierBuffer(
+		int numberOfChannels,
+		BufferOrEvent[] sequence,
+		@Nullable AbstractInvokable toNotify) throws IOException {
 		MockInputGate gate = new MockInputGate(PAGE_SIZE, numberOfChannels, Arrays.asList(sequence));
-		return createBarrierBuffer(gate);
+		return createBarrierBuffer(gate, toNotify);
+	}
+
+	protected BarrierBuffer createBarrierBuffer(int numberOfChannels, BufferOrEvent[] sequence) throws IOException {
+		return createBarrierBuffer(numberOfChannels, sequence, null);
 	}
 
-	abstract BarrierBuffer createBarrierBuffer(InputGate gate) throws IOException;
+	protected BarrierBuffer createBarrierBuffer(InputGate gate) throws IOException {
+		return createBarrierBuffer(gate, null);
+	}
+
+	abstract BarrierBuffer createBarrierBuffer(InputGate gate, @Nullable AbstractInvokable toNotify) throws IOException;
 
 	abstract void validateAlignmentBuffered(long actualBytesBuffered, BufferOrEvent... sequence);
 
@@ -147,10 +160,9 @@ public abstract class BarrierBufferTestBase {
 			createBarrier(4, 0), createBarrier(5, 0), createBarrier(6, 0),
 			createBuffer(0, PAGE_SIZE), createEndOfPartition(0)
 		};
-		buffer = createBarrierBuffer(1, sequence);
-
 		ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
-		buffer.registerCheckpointEventHandler(handler);
+		buffer = createBarrierBuffer(1, sequence, handler);
+
 		handler.setNextExpectedCheckpointId(1L);
 
 		for (BufferOrEvent boe : sequence) {
@@ -198,10 +210,9 @@ public abstract class BarrierBufferTestBase {
 			createBuffer(0, PAGE_SIZE),
 			createEndOfPartition(0), createEndOfPartition(1), createEndOfPartition(2)
 		};
-		buffer = createBarrierBuffer(3, sequence);
-
 		ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
-		buffer.registerCheckpointEventHandler(handler);
+		buffer = createBarrierBuffer(3, sequence, handler);
+
 		handler.setNextExpectedCheckpointId(1L);
 
 		// pre checkpoint 1
@@ -292,10 +303,9 @@ public abstract class BarrierBufferTestBase {
 			createBarrier(2, 2),
 			createBuffer(2, PAGE_SIZE), createEndOfPartition(2), createBuffer(0, PAGE_SIZE), createEndOfPartition(0)
 		};
-		buffer = createBarrierBuffer(3, sequence);
-
 		ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
-		buffer.registerCheckpointEventHandler(handler);
+		buffer = createBarrierBuffer(3, sequence, handler);
+
 		handler.setNextExpectedCheckpointId(1L);
 
 		// pre-checkpoint 1
@@ -368,10 +378,9 @@ public abstract class BarrierBufferTestBase {
 			createBuffer(2, PAGE_SIZE), createEndOfPartition(2),
 			createBuffer(0, PAGE_SIZE), createEndOfPartition(0)
 		};
-		buffer = createBarrierBuffer(3, sequence);
-
 		ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
-		buffer.registerCheckpointEventHandler(handler);
+		buffer = createBarrierBuffer(3, sequence, handler);
+
 		handler.setNextExpectedCheckpointId(1L);
 
 		// around checkpoint 1
@@ -461,10 +470,8 @@ public abstract class BarrierBufferTestBase {
 			createBuffer(2, PAGE_SIZE), createEndOfPartition(2),
 			createBuffer(0, PAGE_SIZE), createEndOfPartition(0)
 		};
-		buffer = createBarrierBuffer(3, sequence);
-
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
-		buffer.registerCheckpointEventHandler(toNotify);
+		buffer = createBarrierBuffer(3, sequence, toNotify);
 
 		long startTs;
 
@@ -548,10 +555,9 @@ public abstract class BarrierBufferTestBase {
 			createBuffer(2, PAGE_SIZE), createEndOfPartition(2),
 			createBuffer(0, PAGE_SIZE), createEndOfPartition(0)
 		};
-		buffer = createBarrierBuffer(3, sequence);
-
 		ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
-		buffer.registerCheckpointEventHandler(handler);
+		buffer = createBarrierBuffer(3, sequence, handler);
+
 		handler.setNextExpectedCheckpointId(1L);
 
 		// checkpoint 1
@@ -694,10 +700,9 @@ public abstract class BarrierBufferTestBase {
 			createBarrier(2, 2),
 			createBuffer(2, PAGE_SIZE), createEndOfPartition(2), createBuffer(0, PAGE_SIZE), createEndOfPartition(0)
 		};
-		buffer = createBarrierBuffer(3, sequence);
-
 		ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
-		buffer.registerCheckpointEventHandler(handler);
+		buffer = createBarrierBuffer(3, sequence, handler);
+
 		handler.setNextExpectedCheckpointId(1L);
 
 		// pre-checkpoint 1
@@ -841,10 +846,8 @@ public abstract class BarrierBufferTestBase {
 			createCancellationBarrier(6, 0),
 			createBuffer(0, PAGE_SIZE)
 		};
-		buffer = createBarrierBuffer(1, sequence);
-
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
-		buffer.registerCheckpointEventHandler(toNotify);
+		buffer = createBarrierBuffer(1, sequence, toNotify);
 
 		check(sequence[0], buffer.pollNext().get(), PAGE_SIZE);
 		check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
@@ -902,10 +905,8 @@ public abstract class BarrierBufferTestBase {
 
 			/* 37 */ createBuffer(0, PAGE_SIZE)
 		};
-		buffer = createBarrierBuffer(3, sequence);
-
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
-		buffer.registerCheckpointEventHandler(toNotify);
+		buffer = createBarrierBuffer(3, sequence, toNotify);
 
 		long startTs;
 
@@ -990,10 +991,8 @@ public abstract class BarrierBufferTestBase {
 				// some more buffers
 			/* 16 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE)
 		};
-		buffer = createBarrierBuffer(3, sequence);
-
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
-		buffer.registerCheckpointEventHandler(toNotify);
+		buffer = createBarrierBuffer(3, sequence, toNotify);
 
 		long startTs;
 
@@ -1074,10 +1073,8 @@ public abstract class BarrierBufferTestBase {
 				// some more buffers
 			/* 18 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE)
 		};
-		buffer = createBarrierBuffer(3, sequence);
-
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
-		buffer.registerCheckpointEventHandler(toNotify);
+		buffer = createBarrierBuffer(3, sequence, toNotify);
 
 		long startTs;
 
@@ -1151,10 +1148,8 @@ public abstract class BarrierBufferTestBase {
 				// some more buffers
 			/* 16 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE)
 		};
-		buffer = createBarrierBuffer(3, sequence);
-
 		AbstractInvokable toNotify = mock(AbstractInvokable.class);
-		buffer.registerCheckpointEventHandler(toNotify);
+		buffer = createBarrierBuffer(3, sequence, toNotify);
 
 		long startTs;
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 398a95a..cb58837 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -33,6 +33,8 @@ import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.junit.After;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 
 import static org.junit.Assert.assertEquals;
@@ -95,11 +97,9 @@ public class BarrierTrackerTest {
 				createBarrier(4, 0), createBarrier(5, 0), createBarrier(6, 0),
 				createBuffer(0)
 		};
-		tracker = createBarrierTracker(1, sequence);
-
 		CheckpointSequenceValidator validator =
-				new CheckpointSequenceValidator(1, 2, 3, 4, 5, 6);
-		tracker.registerCheckpointEventHandler(validator);
+			new CheckpointSequenceValidator(1, 2, 3, 4, 5, 6);
+		tracker = createBarrierTracker(1, sequence, validator);
 
 		for (BufferOrEvent boe : sequence) {
 			if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
@@ -119,11 +119,9 @@ public class BarrierTrackerTest {
 				createBarrier(7, 0), createBuffer(0), createBarrier(10, 0),
 				createBuffer(0)
 		};
-		tracker = createBarrierTracker(1, sequence);
-
 		CheckpointSequenceValidator validator =
-				new CheckpointSequenceValidator(1, 3, 4, 6, 7, 10);
-		tracker.registerCheckpointEventHandler(validator);
+			new CheckpointSequenceValidator(1, 3, 4, 6, 7, 10);
+		tracker = createBarrierTracker(1, sequence, validator);
 
 		for (BufferOrEvent boe : sequence) {
 			if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
@@ -152,11 +150,9 @@ public class BarrierTrackerTest {
 
 				createBuffer(0)
 		};
-		tracker = createBarrierTracker(3, sequence);
-
 		CheckpointSequenceValidator validator =
-				new CheckpointSequenceValidator(1, 2, 3, 4);
-		tracker.registerCheckpointEventHandler(validator);
+			new CheckpointSequenceValidator(1, 2, 3, 4);
+		tracker = createBarrierTracker(3, sequence, validator);
 
 		for (BufferOrEvent boe : sequence) {
 			if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
@@ -189,11 +185,9 @@ public class BarrierTrackerTest {
 
 				createBuffer(0)
 		};
-		tracker = createBarrierTracker(3, sequence);
-
 		CheckpointSequenceValidator validator =
-				new CheckpointSequenceValidator(1, 2, 4);
-		tracker.registerCheckpointEventHandler(validator);
+			new CheckpointSequenceValidator(1, 2, 4);
+		tracker = createBarrierTracker(3, sequence, validator);
 
 		for (BufferOrEvent boe : sequence) {
 			if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
@@ -265,11 +259,9 @@ public class BarrierTrackerTest {
 				// complete checkpoint 10
 				createBarrier(10, 0), createBarrier(10, 1),
 		};
-		tracker = createBarrierTracker(3, sequence);
-
 		CheckpointSequenceValidator validator =
-				new CheckpointSequenceValidator(2, 3, 4, 5, 7, 8, 9, 10);
-		tracker.registerCheckpointEventHandler(validator);
+			new CheckpointSequenceValidator(2, 3, 4, 5, 7, 8, 9, 10);
+		tracker = createBarrierTracker(3, sequence, validator);
 
 		for (BufferOrEvent boe : sequence) {
 			if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
@@ -291,12 +283,10 @@ public class BarrierTrackerTest {
 				createCancellationBarrier(6, 0),
 				createBuffer(0)
 		};
-		tracker = createBarrierTracker(1, sequence);
-
 		// negative values mean an expected cancellation call!
 		CheckpointSequenceValidator validator =
-				new CheckpointSequenceValidator(1, 2, -4, 5, -6);
-		tracker.registerCheckpointEventHandler(validator);
+			new CheckpointSequenceValidator(1, 2, -4, 5, -6);
+		tracker = createBarrierTracker(1, sequence, validator);
 
 		for (BufferOrEvent boe : sequence) {
 			if (boe.isBuffer()) {
@@ -342,12 +332,10 @@ public class BarrierTrackerTest {
 
 				createBuffer(0)
 		};
-		tracker = createBarrierTracker(3, sequence);
-
 		// negative values mean an expected cancellation call!
 		CheckpointSequenceValidator validator =
-				new CheckpointSequenceValidator(1, -2, 3, -4, 5, -6);
-		tracker.registerCheckpointEventHandler(validator);
+			new CheckpointSequenceValidator(1, -2, 3, -4, 5, -6);
+		tracker = createBarrierTracker(3, sequence, validator);
 
 		for (BufferOrEvent boe : sequence) {
 			if (boe.isBuffer()) {
@@ -371,11 +359,8 @@ public class BarrierTrackerTest {
 			createCancellationBarrier(2L, 2),
 			createBuffer(0)
 		};
-		tracker = createBarrierTracker(3, sequence);
-
 		AbstractInvokable statefulTask = mock(AbstractInvokable.class);
-
-		tracker.registerCheckpointEventHandler(statefulTask);
+		tracker = createBarrierTracker(3, sequence, statefulTask);
 
 		for (BufferOrEvent boe : sequence) {
 			if (boe.isBuffer() || (boe.getEvent().getClass() != CheckpointBarrier.class && boe.getEvent().getClass() != CancelCheckpointMarker.class)) {
@@ -390,10 +375,16 @@ public class BarrierTrackerTest {
 	// ------------------------------------------------------------------------
 	//  Utils
 	// ------------------------------------------------------------------------
-
 	private static BarrierTracker createBarrierTracker(int numberOfChannels, BufferOrEvent[] sequence) {
+		return createBarrierTracker(numberOfChannels, sequence, null);
+	}
+
+	private static BarrierTracker createBarrierTracker(
+			int numberOfChannels,
+			BufferOrEvent[] sequence,
+			@Nullable AbstractInvokable toNotifyOnCheckpoint) {
 		MockInputGate gate = new MockInputGate(PAGE_SIZE, numberOfChannels, Arrays.asList(sequence));
-		return new BarrierTracker(gate);
+		return new BarrierTracker(gate, toNotifyOnCheckpoint);
 	}
 
 	private static BufferOrEvent createBarrier(long id, int channel) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
index da88ffb..bbfe8b6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
@@ -20,8 +20,9 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 
-import java.io.IOException;
+import javax.annotation.Nullable;
 
 import static org.junit.Assert.assertEquals;
 
@@ -31,8 +32,8 @@ import static org.junit.Assert.assertEquals;
 public class CreditBasedBarrierBufferTest extends BarrierBufferTestBase {
 
 	@Override
-	public BarrierBuffer createBarrierBuffer(InputGate gate) throws IOException {
-		return new BarrierBuffer(gate, new CachedBufferStorage(PAGE_SIZE));
+	BarrierBuffer createBarrierBuffer(InputGate gate, @Nullable AbstractInvokable toNotify) {
+		return new BarrierBuffer(gate, new CachedBufferStorage(PAGE_SIZE), -1, "Testing", toNotify);
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
index 546fb62..2101f40 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBarrierBufferTest.java
@@ -22,10 +22,13 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
 
@@ -64,8 +67,8 @@ public class SpillingBarrierBufferTest extends BarrierBufferTestBase {
 	}
 
 	@Override
-	public BarrierBuffer createBarrierBuffer(InputGate gate) throws IOException{
-		return new BarrierBuffer(gate, new BufferSpiller(ioManager, PAGE_SIZE));
+	BarrierBuffer createBarrierBuffer(InputGate gate, @Nullable AbstractInvokable toNotify) throws IOException {
+		return new BarrierBuffer(gate, new BufferSpiller(ioManager, PAGE_SIZE), -1, "Testing", toNotify);
 	}
 
 	@Override