You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/11/08 20:25:10 UTC
[4/4] flink git commit: [FLINK-4975] [checkpointing] Add a limit for
how much data may be buffered in alignment.
[FLINK-4975] [checkpointing] Add a limit for how much data may be buffered in alignment.
If more data than the defined amount is buffered, the alignment is aborted and the checkpoint canceled.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0962cb6f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0962cb6f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0962cb6f
Branch: refs/heads/release-1.1
Commit: 0962cb6f45607fb21d50030e325e99fc2c37164a
Parents: 1a4fdff
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 3 15:28:15 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 8 19:07:16 2016 +0100
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 15 +
.../streaming/runtime/io/BarrierBuffer.java | 48 ++-
.../streaming/runtime/io/BufferSpiller.java | 39 ++-
.../runtime/io/StreamInputProcessor.java | 17 +-
.../runtime/io/StreamTwoInputProcessor.java | 17 +-
.../runtime/tasks/OneInputStreamTask.java | 3 +-
.../runtime/tasks/TwoInputStreamTask.java | 3 +-
.../io/BarrierBufferAlignmentLimitTest.java | 315 +++++++++++++++++++
8 files changed, 441 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index d1ad1c4..d9ccb35 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -277,6 +277,14 @@ public final class ConfigConstants {
@PublicEvolving
public static final String TASK_CANCELLATION_TIMEOUT_MILLIS = "task.cancellation.timeout";
+ /**
+ * The maximum number of bytes that a checkpoint alignment may buffer.
+ * If the checkpoint alignment buffers more than the configured amount of
+ * data, the checkpoint is aborted (skipped).
+ */
+ @PublicEvolving
+ public static final String TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT = "task.checkpoint.alignment.max-size";
+
// --------------------------- Runtime Algorithms -------------------------------
/**
@@ -873,6 +881,13 @@ public final class ConfigConstants {
*/
public static final long DEFAULT_TASK_CANCELLATION_TIMEOUT_MILLIS = 0; // deactivated
+ /**
+ * The default for the maximum number of bytes that a checkpoint alignment may buffer.
+ * {@code -1} = infinite.
+ */
+ @PublicEvolving
+ public static final long DEFAULT_TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT = -1L;
+
// ------------------------ Runtime Algorithms ------------------------
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 7a8e7d3..c4cf98e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
@@ -36,6 +37,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayDeque;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/**
* The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
* all inputs have received the barrier for a given checkpoint.
@@ -65,6 +68,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
* further data from the input gate. */
private final ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence> queuedBuffered;
+ /** The maximum number of bytes that may be buffered before an alignment is broken. -1 means unlimited */
+ private final long maxBufferedBytes;
+
/** The sequence of buffers/events that has been unblocked and must now be consumed
* before requesting further data from the input gate */
private BufferSpiller.SpilledBufferOrEventSequence currentBuffered;
@@ -82,6 +88,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
/** The number of already closed channels */
private int numClosedChannels;
+ /** The number of bytes in the queued spilled sequences */
+ private long numQueuedBytes;
+
/** The timestamp as in {@link System#nanoTime()} at which the last alignment started */
private long startOfAlignmentTimestamp;
@@ -92,14 +101,37 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
private boolean endOfStream;
/**
+ * Creates a new checkpoint stream aligner.
+ *
+ * <p>There is no limit to how much data may be buffered during an alignment.
*
* @param inputGate The input gate to draw the buffers and events from.
* @param ioManager The I/O manager that gives access to the temp directories.
- *
+ *
* @throws IOException Thrown, when the spilling to temp files cannot be initialized.
*/
public BarrierBuffer(InputGate inputGate, IOManager ioManager) throws IOException {
+ this (inputGate, ioManager, -1);
+ }
+
+ /**
+ * Creates a new checkpoint stream aligner.
+ *
+ * <p>The aligner will allow only alignments that buffer up to the given number of bytes.
+ * When that number is exceeded, it will stop the alignment and notify the task that the
+ * checkpoint has been cancelled.
+ *
+ * @param inputGate The input gate to draw the buffers and events from.
+ * @param ioManager The I/O manager that gives access to the temp directories.
+ * @param maxBufferedBytes The maximum bytes to be buffered before the checkpoint aborts.
+ *
+ * @throws IOException Thrown, when the spilling to temp files cannot be initialized.
+ */
+ public BarrierBuffer(InputGate inputGate, IOManager ioManager, long maxBufferedBytes) throws IOException {
+ checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0);
+
this.inputGate = inputGate;
+ this.maxBufferedBytes = maxBufferedBytes;
this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
@@ -131,6 +163,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
if (isBlocked(next.getChannelIndex())) {
// if the channel is blocked we, we just store the BufferOrEvent
bufferSpiller.add(next);
+ checkSizeLimit();
}
else if (next.isBuffer()) {
return next;
@@ -169,6 +202,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
currentBuffered = queuedBuffered.pollFirst();
if (currentBuffered != null) {
currentBuffered.open();
+ numQueuedBytes -= currentBuffered.size();
}
}
@@ -333,6 +367,16 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
}
+ private void checkSizeLimit() throws Exception {
+ if (maxBufferedBytes > 0 && (numQueuedBytes + bufferSpiller.getBytesWritten()) > maxBufferedBytes) {
+ // exceeded our limit - abort this checkpoint
+ LOG.info("Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded",
+ currentCheckpointId, maxBufferedBytes);
+
+ releaseBlocksAndResetBarriers();
+ notifyAbort(currentCheckpointId, new AlignmentLimitExceededException(maxBufferedBytes));
+ }
+ }
@Override
public void registerCheckpointEventHandler(StatefulTask<?> toNotifyOnCheckpoint) {
@@ -359,6 +403,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
seq.cleanup();
}
queuedBuffered.clear();
+ numQueuedBytes = 0L;
}
private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException {
@@ -429,6 +474,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
if (bufferedNow != null) {
bufferedNow.open();
queuedBuffered.addFirst(currentBuffered);
+ numQueuedBytes += currentBuffered.size();
currentBuffered = bufferedNow;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index dc8d245..8060d02 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -84,9 +84,9 @@ public class BufferSpiller {
/** A counter, to created numbered spill files */
private int fileCounter;
-
- /** A flag to check whether the spiller has written since the last roll over */
- private boolean hasWritten;
+
+ /** The number of bytes written since the last roll over */
+ private long bytesWritten;
/**
* Creates a new buffer spiller, spilling to one of the I/O manager's temp directories.
@@ -124,7 +124,6 @@ public class BufferSpiller {
* @throws IOException Thrown, if the buffer of event could not be spilled.
*/
public void add(BufferOrEvent boe) throws IOException {
- hasWritten = true;
try {
ByteBuffer contents;
if (boe.isBuffer()) {
@@ -140,7 +139,9 @@ public class BufferSpiller {
headBuffer.putInt(contents.remaining());
headBuffer.put((byte) (boe.isBuffer() ? 0 : 1));
headBuffer.flip();
-
+
+ bytesWritten += (headBuffer.remaining() + contents.remaining());
+
sources[1] = contents;
currentChannel.write(sources);
}
@@ -186,7 +187,7 @@ public class BufferSpiller {
}
private SpilledBufferOrEventSequence rollOverInternal(boolean newBuffer) throws IOException {
- if (!hasWritten) {
+ if (bytesWritten == 0) {
return null;
}
@@ -205,8 +206,8 @@ public class BufferSpiller {
// create ourselves a new spill file
createSpillingChannel();
-
- hasWritten = false;
+
+ bytesWritten = 0L;
return seq;
}
@@ -225,6 +226,14 @@ public class BufferSpiller {
}
}
+ /**
+ * Gets the number of bytes written in the current spill file.
+ * @return the number of bytes written in the current spill file
+ */
+ public long getBytesWritten() {
+ return bytesWritten;
+ }
+
// ------------------------------------------------------------------------
// For testing
// ------------------------------------------------------------------------
@@ -268,6 +277,9 @@ public class BufferSpiller {
/** The byte buffer for bulk reading */
private final ByteBuffer buffer;
+ /** We store this size as a constant because it is crucial it never changes */
+ private final long size;
+
/** The page size to instantiate properly sized memory segments */
private final int pageSize;
@@ -282,11 +294,13 @@ public class BufferSpiller {
* @param buffer The buffer used for bulk reading.
* @param pageSize The page size to use for the created memory segments.
*/
- SpilledBufferOrEventSequence(File file, FileChannel fileChannel, ByteBuffer buffer, int pageSize) {
+ SpilledBufferOrEventSequence(File file, FileChannel fileChannel, ByteBuffer buffer, int pageSize)
+ throws IOException {
this.file = file;
this.fileChannel = fileChannel;
this.buffer = buffer;
this.pageSize = pageSize;
+ this.size = fileChannel.size();
}
/**
@@ -408,5 +422,12 @@ public class BufferSpiller {
throw new IOException("Cannot remove temp file for stream alignment writer");
}
}
+
+ /**
+ * Gets the size of this spilled sequence.
+ */
+ public long size() throws IOException {
+ return size;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 7d9e4d2..bcca2bb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -22,6 +22,9 @@ import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
@@ -87,12 +90,22 @@ public class StreamInputProcessor<IN> {
StatefulTask<?> checkpointListener,
CheckpointingMode checkpointMode,
IOManager ioManager,
- boolean enableWatermarkMultiplexing) throws IOException {
+ boolean enableWatermarkMultiplexing,
+ Configuration taskManagerConfig) throws IOException {
InputGate inputGate = InputGateUtil.createInputGate(inputGates);
if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
- this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+ long maxAlign = taskManagerConfig.getLong(
+ ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT,
+ ConfigConstants.DEFAULT_TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
+
+ if (!(maxAlign == -1 || maxAlign > 0)) {
+ throw new IllegalConfigurationException(
+ ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT
+ + " must be positive or -1 (infinite)");
+ }
+ this.barrierHandler = new BarrierBuffer(inputGate, ioManager, maxAlign);
}
else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
this.barrierHandler = new BarrierTracker(inputGate);
http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index a3ae077..f116aff 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -20,6 +20,9 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
@@ -97,12 +100,22 @@ public class StreamTwoInputProcessor<IN1, IN2> {
StatefulTask<?> checkpointListener,
CheckpointingMode checkpointMode,
IOManager ioManager,
- boolean enableWatermarkMultiplexing) throws IOException {
+ boolean enableWatermarkMultiplexing,
+ Configuration taskManagerConfig) throws IOException {
final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);
if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
- this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+ long maxAlign = taskManagerConfig.getLong(
+ ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT,
+ ConfigConstants.DEFAULT_TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
+
+ if (!(maxAlign == -1 || maxAlign > 0)) {
+ throw new IllegalConfigurationException(
+ ConfigConstants.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT
+ + " must be positive or -1 (infinite)");
+ }
+ this.barrierHandler = new BarrierBuffer(inputGate, ioManager, maxAlign);
}
else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
this.barrierHandler = new BarrierTracker(inputGate);
http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index d18ca16..8470c7c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -46,7 +46,8 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
this,
configuration.getCheckpointMode(),
getEnvironment().getIOManager(),
- isSerializingTimestamps());
+ isSerializingTimestamps(),
+ getEnvironment().getTaskManagerInfo().getConfiguration());
// make sure that stream tasks report their I/O statistics
AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 9252063..8718b88 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -71,7 +71,8 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
this,
configuration.getCheckpointMode(),
getEnvironment().getIOManager(),
- isSerializingTimestamps());
+ isSerializingTimestamps(),
+ getEnvironment().getTaskManagerInfo().getConfiguration());
// make sure that stream tasks report their I/O statistics
AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
http://git-wip-us.apache.org/repos/asf/flink/blob/0962cb6f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
new file mode 100644
index 0000000..529f809
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the barrier buffer's maximum limit of buffered/spilled bytes
+ */
+public class BarrierBufferAlignmentLimitTest {
+
+ private static final int PAGE_SIZE = 512;
+
+ private static final Random RND = new Random();
+
+ private static IOManager IO_MANAGER;
+
+ // ------------------------------------------------------------------------
+ // Setup
+ // ------------------------------------------------------------------------
+
+ @BeforeClass
+ public static void setup() {
+ IO_MANAGER = new IOManagerAsync();
+ }
+
+ @AfterClass
+ public static void shutdownIOManager() {
+ IO_MANAGER.shutdown();
+ }
+
+ // ------------------------------------------------------------------------
+ // Tests
+ // ------------------------------------------------------------------------
+
+ /**
+ * This tests that a single alignment that buffers too much data cancels
+ */
+ @Test
+ public void testBreakCheckpointAtAlignmentLimit() throws Exception {
+ BufferOrEvent[] sequence = {
+ // some initial buffers
+ /* 0 */ createBuffer(1, 100), createBuffer(2, 70),
+ /* 2 */ createBuffer(0, 42), createBuffer(2, 111),
+
+ // starting a checkpoint
+ /* 4 */ createBarrier(7, 1),
+ /* 5 */ createBuffer(1, 100), createBuffer(2, 200), createBuffer(1, 300), createBuffer(0, 50),
+ /* 9 */ createBarrier(7, 0),
+ /* 10 */ createBuffer(2, 100), createBuffer(0, 100), createBuffer(1, 200), createBuffer(0, 200),
+
+ // this buffer makes the alignment spill too large
+ /* 14 */ createBuffer(0, 101),
+
+ // additional data
+ /* 15 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100),
+
+ // checkpoint completes - this should not result in a "completion notification"
+ /* 18 */ createBarrier(7, 2),
+
+ // trailing buffers
+ /* 19 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100)
+ };
+
+ // the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
+ MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER, 1000);
+
+ StatefulTask<?> toNotify = mock(StatefulTask.class);
+ buffer.registerCheckpointEventHandler(toNotify);
+
+ // validating the sequence of buffers
+
+ check(sequence[0], buffer.getNextNonBlocked());
+ check(sequence[1], buffer.getNextNonBlocked());
+ check(sequence[2], buffer.getNextNonBlocked());
+ check(sequence[3], buffer.getNextNonBlocked());
+
+ // start of checkpoint
+ long startTs = System.nanoTime();
+ check(sequence[6], buffer.getNextNonBlocked());
+ check(sequence[8], buffer.getNextNonBlocked());
+ check(sequence[10], buffer.getNextNonBlocked());
+
+ // trying to pull the next makes the alignment overflow - so buffered buffers are replayed
+ check(sequence[5], buffer.getNextNonBlocked());
+ validateAlignmentTime(startTs, buffer);
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(7L), any(AlignmentLimitExceededException.class));
+
+ // playing back buffered events
+ check(sequence[7], buffer.getNextNonBlocked());
+ check(sequence[11], buffer.getNextNonBlocked());
+ check(sequence[12], buffer.getNextNonBlocked());
+ check(sequence[13], buffer.getNextNonBlocked());
+ check(sequence[14], buffer.getNextNonBlocked());
+
+ // the additional data
+ check(sequence[15], buffer.getNextNonBlocked());
+ check(sequence[16], buffer.getNextNonBlocked());
+ check(sequence[17], buffer.getNextNonBlocked());
+
+ check(sequence[19], buffer.getNextNonBlocked());
+ check(sequence[20], buffer.getNextNonBlocked());
+ check(sequence[21], buffer.getNextNonBlocked());
+
+ // no call for a completed checkpoint must have happened
+ verify(toNotify, times(0)).triggerCheckpointOnBarrier(anyLong(), anyLong());
+
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+
+ buffer.cleanup();
+ checkNoTempFilesRemain();
+ }
+
+ /**
+ * This tests the following case:
+ * - an alignment starts
+ * - barriers from a second checkpoint queue before the first completes
+ * - together they are larger than the threshold
+ * - after the first checkpoint (with second checkpoint data queued) aborts, the second completes
+ */
+ @Test
+ public void testAlignmentLimitWithQueuedAlignments() throws Exception {
+ BufferOrEvent[] sequence = {
+ // some initial buffers
+ /* 0 */ createBuffer(1, 100), createBuffer(2, 70),
+
+ // starting a checkpoint
+ /* 2 */ createBarrier(3, 2),
+ /* 3 */ createBuffer(1, 100), createBuffer(2, 100),
+ /* 5 */ createBarrier(3, 0),
+ /* 6 */ createBuffer(0, 100), createBuffer(1, 100),
+
+ // queue some data from the next checkpoint
+ /* 8 */ createBarrier(4, 0),
+ /* 9 */ createBuffer(0, 100), createBuffer(0, 120), createBuffer(1, 100),
+
+ // this one makes the alignment overflow
+ /* 12 */ createBuffer(2, 100),
+
+ // checkpoint completed
+ /* 13 */ createBarrier(3, 1),
+
+ // more for the next checkpoint
+ /* 14 */ createBarrier(4, 1),
+ /* 15 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100),
+
+ // next checkpoint completes
+ /* 18 */ createBarrier(4, 2),
+
+ // trailing data
+ /* 19 */ createBuffer(0, 100), createBuffer(1, 100), createBuffer(2, 100)
+ };
+
+ // the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
+ MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+ BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER, 500);
+
+ StatefulTask<?> toNotify = mock(StatefulTask.class);
+ buffer.registerCheckpointEventHandler(toNotify);
+
+ // validating the sequence of buffers
+ long startTs;
+
+ check(sequence[0], buffer.getNextNonBlocked());
+ check(sequence[1], buffer.getNextNonBlocked());
+
+ // start of checkpoint
+ startTs = System.nanoTime();
+ check(sequence[3], buffer.getNextNonBlocked());
+ check(sequence[7], buffer.getNextNonBlocked());
+
+ // next checkpoint also in progress
+ check(sequence[11], buffer.getNextNonBlocked());
+
+ // checkpoint alignment aborted due to too much data
+ check(sequence[4], buffer.getNextNonBlocked());
+ validateAlignmentTime(startTs, buffer);
+ verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), any(AlignmentLimitExceededException.class));
+
+ // replay buffered data - in the middle, the alignment for checkpoint 4 starts
+ check(sequence[6], buffer.getNextNonBlocked());
+ startTs = System.nanoTime();
+ check(sequence[12], buffer.getNextNonBlocked());
+
+ // only checkpoint 4 is pending now - the last checkpoint 3 barrier will not trigger success
+ check(sequence[17], buffer.getNextNonBlocked());
+
+ // checkpoint 4 completed - check and validate buffered replay
+ check(sequence[9], buffer.getNextNonBlocked());
+ validateAlignmentTime(startTs, buffer);
+ verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(4L), anyLong());
+
+ check(sequence[10], buffer.getNextNonBlocked());
+ check(sequence[15], buffer.getNextNonBlocked());
+ check(sequence[16], buffer.getNextNonBlocked());
+
+ // trailing data
+ check(sequence[19], buffer.getNextNonBlocked());
+ check(sequence[20], buffer.getNextNonBlocked());
+ check(sequence[21], buffer.getNextNonBlocked());
+
+ // only checkpoint 4 was successfully completed, not checkpoint 3
+ verify(toNotify, times(0)).triggerCheckpointOnBarrier(eq(3L), anyLong());
+
+ assertNull(buffer.getNextNonBlocked());
+ assertNull(buffer.getNextNonBlocked());
+
+ buffer.cleanup();
+ checkNoTempFilesRemain();
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
+ private static BufferOrEvent createBuffer(int channel, int size) {
+ byte[] bytes = new byte[size];
+ RND.nextBytes(bytes);
+
+ MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
+ memory.put(0, bytes);
+
+ Buffer buf = new Buffer(memory, FreeingBufferRecycler.INSTANCE);
+ buf.setSize(size);
+
+ // retain an additional time so it does not get disposed after being read by the input gate
+ buf.retain();
+
+ return new BufferOrEvent(buf, channel);
+ }
+
+ private static BufferOrEvent createBarrier(long id, int channel) {
+ return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
+ }
+
+ private static void check(BufferOrEvent expected, BufferOrEvent present) {
+ assertNotNull(expected);
+ assertNotNull(present);
+ assertEquals(expected.isBuffer(), present.isBuffer());
+
+ if (expected.isBuffer()) {
+ assertEquals(expected.getBuffer().getSize(), present.getBuffer().getSize());
+ MemorySegment expectedMem = expected.getBuffer().getMemorySegment();
+ MemorySegment presentMem = present.getBuffer().getMemorySegment();
+ assertTrue("memory contents differs", expectedMem.compare(presentMem, 0, 0, PAGE_SIZE) == 0);
+ }
+ else {
+ assertEquals(expected.getEvent(), present.getEvent());
+ }
+ }
+
+ private static void validateAlignmentTime(long startTimestamp, BarrierBuffer buffer) {
+ final long elapsed = System.nanoTime() - startTimestamp;
+ assertTrue("wrong alignment time", buffer.getAlignmentDurationNanos() <= elapsed);
+ }
+
+ private static void checkNoTempFilesRemain() {
+ // validate that all temp files have been removed
+ for (File dir : IO_MANAGER.getSpillingDirectories()) {
+ for (String file : dir.list()) {
+ if (file != null && !(file.equals(".") || file.equals(".."))) {
+ fail("barrier buffer did not clean up temp files. remaining file: " + file);
+ }
+ }
+ }
+ }
+}