You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/08 20:25:10 UTC

[4/4] flink git commit: [FLINK-4975] [checkpointing] Add a limit for how much data may be buffered in alignment.

[FLINK-4975] [checkpointing] Add a limit for how much data may be buffered in alignment.

If more data than the defined amount is buffered, the alignment is aborted and the checkpoint canceled.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0962cb6f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0962cb6f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0962cb6f

Branch: refs/heads/release-1.1
Commit: 0962cb6f45607fb21d50030e325e99fc2c37164a
Parents: 1a4fdff
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 3 15:28:15 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 8 19:07:16 2016 +0100

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  15 +
 .../streaming/runtime/io/BarrierBuffer.java     |  48 ++-
 .../streaming/runtime/io/BufferSpiller.java     |  39 ++-
 .../runtime/io/StreamInputProcessor.java        |  17 +-
 .../runtime/io/StreamTwoInputProcessor.java     |  17 +-
 .../runtime/tasks/OneInputStreamTask.java       |   3 +-
 .../runtime/tasks/TwoInputStreamTask.java       |   3 +-
 .../io/BarrierBufferAlignmentLimitTest.java     | 315 +++++++++++++++++++
 8 files changed, 441 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index d1ad1c4..d9ccb35 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -277,6 +277,14 @@ public final class ConfigConstants {
 	@PublicEvolving
 	public static final String TASK_CANCELLATION_TIMEOUT_MILLIS = "task.cancellation.timeout";
 
+	/**
+	 * The maximum number of bytes that a checkpoint alignment may buffer.
+	 * If the checkpoint alignment buffers more than the configured amount of
+	 * data, the checkpoint is aborted (skipped).
+	 */
+	@PublicEvolving
+	public static final String TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT = "task.checkpoint.alignment.max-size";
+
 	// --------------------------- Runtime Algorithms -------------------------------
 	
 	/**
@@ -873,6 +881,13 @@ public final class ConfigConstants {
 	 */
 	public static final long DEFAULT_TASK_CANCELLATION_TIMEOUT_MILLIS = 0; // deactivated
 
+	/**
+	 * The default for the maximum number of bytes that a checkpoint alignment may buffer.
+	 * {@code -1} = infinite.
+	 */
+	@PublicEvolving
+	public static final long DEFAULT_TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT = -1L;
+
 	// ------------------------ Runtime Algorithms ------------------------
 	
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 7a8e7d3..c4cf98e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
@@ -36,6 +37,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayDeque;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
  * all inputs have received the barrier for a given checkpoint.
@@ -65,6 +68,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	 * further data from the input gate. */
 	private final ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence> queuedBuffered;
 
+	/** The maximum number of bytes that may be buffered before an alignment is broken. -1 means unlimited */
+	private final long maxBufferedBytes;
+
 	/** The sequence of buffers/events that has been unblocked and must now be consumed
 	 * before requesting further data from the input gate */
 	private BufferSpiller.SpilledBufferOrEventSequence currentBuffered;
@@ -82,6 +88,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	/** The number of already closed channels */
 	private int numClosedChannels;
 
+	/** The number of bytes in the queued spilled sequences */
+	private long numQueuedBytes;
+
 	/** The timestamp as in {@link System#nanoTime()} at which the last alignment started */
 	private long startOfAlignmentTimestamp;
 
@@ -92,14 +101,37 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	private boolean endOfStream;
 
 	/**
+	 * Creates a new checkpoint stream aligner.
+	 * 
+	 * <p>There is no limit to how much data may be buffered during an alignment.
 	 * 
 	 * @param inputGate The input gate to draw the buffers and events from.
 	 * @param ioManager The I/O manager that gives access to the temp directories.
-	 * 
+	 *
 	 * @throws IOException Thrown, when the spilling to temp files cannot be initialized.
 	 */
 	public BarrierBuffer(InputGate inputGate, IOManager ioManager) throws IOException {
+		this (inputGate, ioManager, -1);
+	}
+
+	/**
+	 * Creates a new checkpoint stream aligner.
+	 * 
+	 * <p>The aligner will allow only alignments that buffer up to the given number of bytes.
+	 * When that number is exceeded, it will stop the alignment and notify the task that the
+	 * checkpoint has been cancelled.
+	 * 
+	 * @param inputGate The input gate to draw the buffers and events from.
+	 * @param ioManager The I/O manager that gives access to the temp directories.
+	 * @param maxBufferedBytes The maximum bytes to be buffered before the checkpoint aborts.
+	 * 
+	 * @throws IOException Thrown, when the spilling to temp files cannot be initialized.
+	 */
+	public BarrierBuffer(InputGate inputGate, IOManager ioManager, long maxBufferedBytes) throws IOException {
+		checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0);
+
 		this.inputGate = inputGate;
+		this.maxBufferedBytes = maxBufferedBytes;
 		this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
 		this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
 
@@ -131,6 +163,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				if (isBlocked(next.getChannelIndex())) {
 					// if the channel is blocked we, we just store the BufferOrEvent
 					bufferSpiller.add(next);
+					checkSizeLimit();
 				}
 				else if (next.isBuffer()) {
 					return next;
@@ -169,6 +202,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		currentBuffered = queuedBuffered.pollFirst();
 		if (currentBuffered != null) {
 			currentBuffered.open();
+			numQueuedBytes -= currentBuffered.size();
 		}
 	}
 
@@ -333,6 +367,16 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		}
 	}
 
+	private void checkSizeLimit() throws Exception {
+		if (maxBufferedBytes > 0 && (numQueuedBytes + bufferSpiller.getBytesWritten()) > maxBufferedBytes) {
+			// exceeded our limit - abort this checkpoint
+			LOG.info("Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded",
+					currentCheckpointId, maxBufferedBytes);
+
+			releaseBlocksAndResetBarriers();
+			notifyAbort(currentCheckpointId, new AlignmentLimitExceededException(maxBufferedBytes));
+		}
+	}
 
 	@Override
 	public void registerCheckpointEventHandler(StatefulTask<?> toNotifyOnCheckpoint) {
@@ -359,6 +403,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			seq.cleanup();
 		}
 		queuedBuffered.clear();
+		numQueuedBytes = 0L;
 	}
 
 	private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException {
@@ -429,6 +474,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			if (bufferedNow != null) {
 				bufferedNow.open();
 				queuedBuffered.addFirst(currentBuffered);
+				numQueuedBytes += currentBuffered.size();
 				currentBuffered = bufferedNow;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index dc8d245..8060d02 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -84,9 +84,9 @@ public class BufferSpiller {
 	
 	/** A counter, to created numbered spill files */
 	private int fileCounter;
-	
-	/** A flag to check whether the spiller has written since the last roll over */
-	private boolean hasWritten;
+
+	/** The number of bytes written since the last roll over */
+	private long bytesWritten;
 	
 	/**
 	 * Creates a new buffer spiller, spilling to one of the I/O manager's temp directories.
@@ -124,7 +124,6 @@ public class BufferSpiller {
 	 * @throws IOException Thrown, if the buffer of event could not be spilled.
 	 */
 	public void add(BufferOrEvent boe) throws IOException {
-		hasWritten = true;
 		try {
 			ByteBuffer contents;
 			if (boe.isBuffer()) {
@@ -140,7 +139,9 @@ public class BufferSpiller {
 			headBuffer.putInt(contents.remaining());
 			headBuffer.put((byte) (boe.isBuffer() ? 0 : 1));
 			headBuffer.flip();
-			
+
+			bytesWritten += (headBuffer.remaining() + contents.remaining());
+
 			sources[1] = contents;
 			currentChannel.write(sources);
 		}
@@ -186,7 +187,7 @@ public class BufferSpiller {
 	}
 	
 	private SpilledBufferOrEventSequence rollOverInternal(boolean newBuffer) throws IOException {
-		if (!hasWritten) {
+		if (bytesWritten == 0) {
 			return null;
 		}
 		
@@ -205,8 +206,8 @@ public class BufferSpiller {
 		
 		// create ourselves a new spill file
 		createSpillingChannel();
-		
-		hasWritten = false;
+
+		bytesWritten = 0L;
 		return seq;
 	}
 
@@ -225,6 +226,14 @@ public class BufferSpiller {
 		}
 	}
 
+	/**
+	 * Gets the number of bytes written in the current spill file.
+	 * @return the number of bytes written in the current spill file
+	 */
+	public long getBytesWritten() {
+		return bytesWritten;
+	}
+
 	// ------------------------------------------------------------------------
 	//  For testing
 	// ------------------------------------------------------------------------
@@ -268,6 +277,9 @@ public class BufferSpiller {
 		/** The byte buffer for bulk reading */
 		private final ByteBuffer buffer;
 
+		/** We store this size as a constant because it is crucial it never changes */
+		private final long size;
+
 		/** The page size to instantiate properly sized memory segments */
 		private final int pageSize;
 
@@ -282,11 +294,13 @@ public class BufferSpiller {
 		 * @param buffer The buffer used for bulk reading.
 		 * @param pageSize The page size to use for the created memory segments.
 		 */
-		SpilledBufferOrEventSequence(File file, FileChannel fileChannel, ByteBuffer buffer, int pageSize) {
+		SpilledBufferOrEventSequence(File file, FileChannel fileChannel, ByteBuffer buffer, int pageSize)
+				throws IOException {
 			this.file = file;
 			this.fileChannel = fileChannel;
 			this.buffer = buffer;
 			this.pageSize = pageSize;
+			this.size = fileChannel.size();
 		}
 
 		/**
@@ -408,5 +422,12 @@ public class BufferSpiller {
 				throw new IOException("Cannot remove temp file for stream alignment writer");
 			}
 		}
+
+		/**
+		 * Gets the size of this spilled sequence.
+		 */
+		public long size() throws IOException {
+			return size;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 7d9e4d2..bcca2bb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -22,6 +22,9 @@ import java.io.IOException;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
@@ -87,12 +90,22 @@ public class StreamInputProcessor<IN> {
 								StatefulTask<?> checkpointListener,
 								CheckpointingMode checkpointMode,
 								IOManager ioManager,
-								boolean enableWatermarkMultiplexing) throws IOException {
+								boolean enableWatermarkMultiplexing,
+								Configuration taskManagerConfig) throws IOException {
 
 		InputGate inputGate = InputGateUtil.createInputGate(inputGates);
 
 		if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
-			this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+			long maxAlign = taskManagerConfig.getLong(
+					ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT,
+					ConfigConstants.DEFAULT_TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
+
+			if (!(maxAlign == -1 || maxAlign > 0)) {
+				throw new IllegalConfigurationException(
+						ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT
+						+ " must be positive or -1 (infinite)");
+			}
+			this.barrierHandler = new BarrierBuffer(inputGate, ioManager, maxAlign);
 		}
 		else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
 			this.barrierHandler = new BarrierTracker(inputGate);

http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index a3ae077..f116aff 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -20,6 +20,9 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
@@ -97,12 +100,22 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 			StatefulTask<?> checkpointListener,
 			CheckpointingMode checkpointMode,
 			IOManager ioManager,
-			boolean enableWatermarkMultiplexing) throws IOException {
+			boolean enableWatermarkMultiplexing,
+			Configuration taskManagerConfig) throws IOException {
 		
 		final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);
 
 		if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
-			this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+			long maxAlign = taskManagerConfig.getLong(
+					ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT,
+					ConfigConstants.DEFAULT_TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
+
+			if (!(maxAlign == -1 || maxAlign > 0)) {
+				throw new IllegalConfigurationException(
+						ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT
+								+ " must be positive or -1 (infinite)");
+			}
+			this.barrierHandler = new BarrierBuffer(inputGate, ioManager, maxAlign);
 		}
 		else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
 			this.barrierHandler = new BarrierTracker(inputGate);

http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index d18ca16..8470c7c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -46,7 +46,8 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 					this,
 					configuration.getCheckpointMode(),
 					getEnvironment().getIOManager(),
-					isSerializingTimestamps());
+					isSerializingTimestamps(),
+					getEnvironment().getTaskManagerInfo().getConfiguration());
 
 			// make sure that stream tasks report their I/O statistics
 			AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();

http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 9252063..8718b88 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -71,7 +71,8 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 				this,
 				configuration.getCheckpointMode(),
 				getEnvironment().getIOManager(),
-				isSerializingTimestamps());
+				isSerializingTimestamps(),
+				getEnvironment().getTaskManagerInfo().getConfiguration());
 
 		// make sure that stream tasks report their I/O statistics
 		AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();

http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
new file mode 100644
index 0000000..529f809
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the barrier buffer's maximum limit of buffered/spilled bytes 
+ */
+public class BarrierBufferAlignmentLimitTest {
+
+	private static final int PAGE_SIZE = 512;
+
+	private static final Random RND = new Random();
+
+	private static IOManager IO_MANAGER;
+
+	// ------------------------------------------------------------------------
+	//  Setup
+	// ------------------------------------------------------------------------
+
+	@BeforeClass
+	public static void setup() {
+		IO_MANAGER = new IOManagerAsync();
+	}
+
+	@AfterClass
+	public static void shutdownIOManager() {
+		IO_MANAGER.shutdown();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Tests
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This tests that a single alignment that buffers too much data cancels
+	 */
+	@Test
+	public void testBreakCheckpointAtAlignmentLimit() throws Exception {
+		BufferOrEvent[] sequence = {
+				// some initial buffers
+				/*  0 */ createBuffer(1, 100), createBuffer(2, 70),
+				/*  2 */ createBuffer(0, 42), createBuffer(2, 111),
+
+				// starting a checkpoint
+				/*  4 */ createBarrier(7, 1), 
+				/*  5 */ createBuffer(1, 100), createBuffer(2, 200), createBuffer(1, 300), createBuffer(0, 50),
+				/*  9 */ createBarrier(7, 0),
+				/* 10 */ createBuffer(2, 100), createBuffer(0, 100), createBuffer(1, 200), createBuffer(0, 200),
+
+				// this buffer makes the alignment spill too large
+				/* 14 */ createBuffer(0, 101),
+
+				// additional data
+				/* 15 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100),
+				
+				// checkpoint completes - this should not result in a "completion notification"
+				/* 18 */ createBarrier(7, 2),
+
+				// trailing buffers
+				/* 19 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100)
+		};
+
+		// the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER, 1000);
+
+		StatefulTask<?> toNotify = mock(StatefulTask.class);
+		buffer.registerCheckpointEventHandler(toNotify);
+
+		// validating the sequence of buffers
+
+		check(sequence[0], buffer.getNextNonBlocked());
+		check(sequence[1], buffer.getNextNonBlocked());
+		check(sequence[2], buffer.getNextNonBlocked());
+		check(sequence[3], buffer.getNextNonBlocked());
+
+		// start of checkpoint
+		long startTs = System.nanoTime();
+		check(sequence[6], buffer.getNextNonBlocked());
+		check(sequence[8], buffer.getNextNonBlocked());
+		check(sequence[10], buffer.getNextNonBlocked());
+
+		// trying to pull the next makes the alignment overflow - so buffered buffers are replayed
+		check(sequence[5], buffer.getNextNonBlocked());
+		validateAlignmentTime(startTs, buffer);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(7L), any(AlignmentLimitExceededException.class));
+
+		// playing back buffered events
+		check(sequence[7], buffer.getNextNonBlocked());
+		check(sequence[11], buffer.getNextNonBlocked());
+		check(sequence[12], buffer.getNextNonBlocked());
+		check(sequence[13], buffer.getNextNonBlocked());
+		check(sequence[14], buffer.getNextNonBlocked());
+
+		// the additional data
+		check(sequence[15], buffer.getNextNonBlocked());
+		check(sequence[16], buffer.getNextNonBlocked());
+		check(sequence[17], buffer.getNextNonBlocked());
+
+		check(sequence[19], buffer.getNextNonBlocked());
+		check(sequence[20], buffer.getNextNonBlocked());
+		check(sequence[21], buffer.getNextNonBlocked());
+
+		// no call for a completed checkpoint must have happened
+		verify(toNotify, times(0)).triggerCheckpointOnBarrier(anyLong(), anyLong());
+
+		assertNull(buffer.getNextNonBlocked());
+		assertNull(buffer.getNextNonBlocked());
+
+		buffer.cleanup();
+		checkNoTempFilesRemain();
+	}
+
+	/**
+	 * This tests the following case:
+	 *   - an alignment starts
+	 *   - barriers from a second checkpoint queue before the first completes
+	 *   - together they are larger than the threshold
+	 *   - after the first checkpoint (with second checkpoint data queued) aborts, the second completes 
+	 */
+	@Test
+	public void testAlignmentLimitWithQueuedAlignments() throws Exception {
+		BufferOrEvent[] sequence = {
+				// some initial buffers
+				/*  0 */ createBuffer(1, 100), createBuffer(2, 70),
+
+				// starting a checkpoint
+				/*  2 */ createBarrier(3, 2), 
+				/*  3 */ createBuffer(1, 100), createBuffer(2, 100), 
+				/*  5 */ createBarrier(3, 0),
+				/*  6 */ createBuffer(0, 100), createBuffer(1, 100),
+
+				// queue some data from the next checkpoint
+				/*  8 */ createBarrier(4, 0),
+				/*  9 */ createBuffer(0, 100), createBuffer(0, 120), createBuffer(1, 100),
+
+				// this one makes the alignment overflow
+				/* 12 */ createBuffer(2, 100),
+
+				// checkpoint completed
+				/* 13 */ createBarrier(3, 1),
+
+				// more for the next checkpoint
+				/* 14 */ createBarrier(4, 1),
+				/* 15 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100),
+
+				// next checkpoint completes
+				/* 18 */ createBarrier(4, 2),
+
+				// trailing data
+				/* 19 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100)
+		};
+
+		// the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER, 500);
+
+		StatefulTask<?> toNotify = mock(StatefulTask.class);
+		buffer.registerCheckpointEventHandler(toNotify);
+
+		// validating the sequence of buffers
+		long startTs;
+
+		check(sequence[0], buffer.getNextNonBlocked());
+		check(sequence[1], buffer.getNextNonBlocked());
+
+		// start of checkpoint
+		startTs = System.nanoTime();
+		check(sequence[3], buffer.getNextNonBlocked());
+		check(sequence[7], buffer.getNextNonBlocked());
+
+		// next checkpoint also in progress
+		check(sequence[11], buffer.getNextNonBlocked());
+
+		// checkpoint alignment aborted due to too much data
+		check(sequence[4], buffer.getNextNonBlocked());
+		validateAlignmentTime(startTs, buffer);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), any(AlignmentLimitExceededException.class));
+
+		// replay buffered data - in the middle, the alignment for checkpoint 4 starts
+		check(sequence[6], buffer.getNextNonBlocked());
+		startTs = System.nanoTime();
+		check(sequence[12], buffer.getNextNonBlocked());
+
+		// only checkpoint 4 is pending now - the last checkpoint 3 barrier will not trigger success 
+		check(sequence[17], buffer.getNextNonBlocked());
+
+		// checkpoint 4 completed - check and validate buffered replay
+		check(sequence[9], buffer.getNextNonBlocked());
+		validateAlignmentTime(startTs, buffer);
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(4L), anyLong());
+
+		check(sequence[10], buffer.getNextNonBlocked());
+		check(sequence[15], buffer.getNextNonBlocked());
+		check(sequence[16], buffer.getNextNonBlocked());
+
+		// trailing data
+		check(sequence[19], buffer.getNextNonBlocked());
+		check(sequence[20], buffer.getNextNonBlocked());
+		check(sequence[21], buffer.getNextNonBlocked());
+
+		// only checkpoint 4 was successfully completed, not checkpoint 3
+		verify(toNotify, times(0)).triggerCheckpointOnBarrier(eq(3L), anyLong());
+
+		assertNull(buffer.getNextNonBlocked());
+		assertNull(buffer.getNextNonBlocked());
+
+		buffer.cleanup();
+		checkNoTempFilesRemain();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static BufferOrEvent createBuffer(int channel, int size) {
+		byte[] bytes = new byte[size];
+		RND.nextBytes(bytes);
+
+		MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
+		memory.put(0, bytes);
+
+		Buffer buf = new Buffer(memory, FreeingBufferRecycler.INSTANCE);
+		buf.setSize(size);
+
+		// retain an additional time so it does not get disposed after being read by the input gate
+		buf.retain();
+
+		return new BufferOrEvent(buf, channel);
+	}
+
+	private static BufferOrEvent createBarrier(long id, int channel) {
+		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
+	}
+
+	private static void check(BufferOrEvent expected, BufferOrEvent present) {
+		assertNotNull(expected);
+		assertNotNull(present);
+		assertEquals(expected.isBuffer(), present.isBuffer());
+
+		if (expected.isBuffer()) {
+			assertEquals(expected.getBuffer().getSize(), present.getBuffer().getSize());
+			MemorySegment expectedMem = expected.getBuffer().getMemorySegment();
+			MemorySegment presentMem = present.getBuffer().getMemorySegment();
+			assertTrue("memory contents differs", expectedMem.compare(presentMem, 0, 0, PAGE_SIZE) == 0);
+		}
+		else {
+			assertEquals(expected.getEvent(), present.getEvent());
+		}
+	}
+
+	private static void validateAlignmentTime(long startTimestamp, BarrierBuffer buffer) {
+		final long elapsed = System.nanoTime() - startTimestamp;
+		assertTrue("wrong alignment time", buffer.getAlignmentDurationNanos() <= elapsed);
+	}
+
+	private static void checkNoTempFilesRemain() {
+		// validate that all temp files have been removed
+		for (File dir : IO_MANAGER.getSpillingDirectories()) {
+			for (String file : dir.list()) {
+				if (file != null && !(file.equals(".") || file.equals(".."))) {
+					fail("barrier buffer did not clean up temp files. remaining file: " + file);
+				}
+			}
+		}
+	}
+}