You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/01 14:41:16 UTC
[flink] 07/16: [FLINK-12777][network] Extract
CheckpointBarrierAligner from BarrierBuffer
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit aa5f0618ef88161b23097b45df4055a58ca11685
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Jun 13 14:42:14 2019 +0200
[FLINK-12777][network] Extract CheckpointBarrierAligner from BarrierBuffer
---
.../runtime/io/AbstractBufferStorage.java | 5 -
.../flink/streaming/runtime/io/BarrierBuffer.java | 369 ++------------------
.../flink/streaming/runtime/io/BufferStorage.java | 2 -
...erBuffer.java => CheckpointBarrierAligner.java} | 379 ++++++---------------
4 files changed, 144 insertions(+), 611 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractBufferStorage.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractBufferStorage.java
index f7e4dd7..5eb30cf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractBufferStorage.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractBufferStorage.java
@@ -164,11 +164,6 @@ public abstract class AbstractBufferStorage implements BufferStorage {
}
@Override
- public long currentBufferedSize() {
- return currentBuffered != null ? currentBuffered.size() : 0L;
- }
-
- @Override
public long getMaxBufferedBytes() {
return maxBufferedBytes;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 0f8fa40..1594389 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -19,10 +19,6 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.checkpoint.CheckpointException;
-import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -54,45 +50,17 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
+ private final CheckpointBarrierAligner barrierAligner;
+
/** The gate that the buffer draws its input from. */
private final InputGate inputGate;
- /** Flags that indicate whether a channel is currently blocked/buffered. */
- private final boolean[] blockedChannels;
-
- /** The total number of channels that this buffer handles data from. */
- private final int totalNumberOfInputChannels;
-
- /** To utility to write blocked data to a file channel. */
private final BufferStorage bufferStorage;
- private final String taskName;
-
- @Nullable
- private final AbstractInvokable toNotifyOnCheckpoint;
-
- /** The ID of the checkpoint for which we expect barriers. */
- private long currentCheckpointId = -1L;
-
- /**
- * The number of received barriers (= number of blocked/buffered channels) IMPORTANT: A canceled
- * checkpoint must always have 0 barriers.
- */
- private int numBarriersReceived;
-
- /** The number of already closed channels. */
- private int numClosedChannels;
-
- /** The timestamp as in {@link System#nanoTime()} at which the last alignment started. */
- private long startOfAlignmentTimestamp;
-
- /** The time (in nanoseconds) that the latest alignment took. */
- private long latestAlignmentDurationNanos;
-
/** Flag to indicate whether we have drawn all available input. */
- private boolean endOfStream;
+ private boolean endOfInputGate;
- /** Indicate end of the input. Set to true after encountering {@link #endOfStream} and depleting
+ /** Indicate end of the input. Set to true after encountering {@link #endOfInputGate} and depleting
* {@link #bufferStorage}. */
private boolean isFinished;
@@ -122,19 +90,16 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
* @param toNotifyOnCheckpoint optional Handler that receives the checkpoint notifications.
*/
BarrierBuffer(
- InputGate inputGate,
- BufferStorage bufferStorage,
- String taskName,
- @Nullable AbstractInvokable toNotifyOnCheckpoint) {
-
+ InputGate inputGate,
+ BufferStorage bufferStorage,
+ String taskName,
+ @Nullable AbstractInvokable toNotifyOnCheckpoint) {
this.inputGate = inputGate;
- this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
- this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
-
this.bufferStorage = checkNotNull(bufferStorage);
-
- this.taskName = taskName;
- this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
+ this.barrierAligner = new CheckpointBarrierAligner(
+ inputGate.getNumberOfInputChannels(),
+ taskName,
+ toNotifyOnCheckpoint);
}
@Override
@@ -145,10 +110,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
return AVAILABLE;
}
- // ------------------------------------------------------------------------
- // Buffer and barrier handling
- // ------------------------------------------------------------------------
-
@Override
public Optional<BufferOrEvent> pollNext() throws Exception {
while (true) {
@@ -170,28 +131,36 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
BufferOrEvent bufferOrEvent = next.get();
- if (isBlocked(bufferOrEvent.getChannelIndex())) {
+ if (barrierAligner.isBlocked(bufferOrEvent.getChannelIndex())) {
// if the channel is blocked, we just store the BufferOrEvent
bufferStorage.add(bufferOrEvent);
if (bufferStorage.isFull()) {
- sizeLimitExceeded();
+ barrierAligner.checkpointSizeLimitExceeded(bufferStorage.getMaxBufferedBytes());
+ bufferStorage.rollOver();
}
}
else if (bufferOrEvent.isBuffer()) {
return next;
}
else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
- if (!endOfStream) {
+ CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
+ if (!endOfInputGate) {
// process barriers only if there is a chance of the checkpoint completing
- processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
+ if (barrierAligner.processBarrier(checkpointBarrier, bufferOrEvent.getChannelIndex(), bufferStorage.getPendingBytes())) {
+ bufferStorage.rollOver();
+ }
}
}
else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
- processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
+ if (barrierAligner.processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent())) {
+ bufferStorage.rollOver();
+ }
}
else {
if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
- processEndOfPartition();
+ if (barrierAligner.processEndOfPartition()) {
+ bufferStorage.rollOver();
+ }
}
return next;
}
@@ -203,216 +172,18 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
return Optional.empty();
}
- if (endOfStream) {
+ if (endOfInputGate) {
isFinished = true;
return Optional.empty();
} else {
// end of input stream. stream continues with the buffered data
- endOfStream = true;
- releaseBlocksAndResetBarriers();
+ endOfInputGate = true;
+ barrierAligner.releaseBlocksAndResetBarriers();
+ bufferStorage.rollOver();
return pollNext();
}
}
- private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
- final long barrierId = receivedBarrier.getId();
-
- // fast path for single channel cases
- if (totalNumberOfInputChannels == 1) {
- if (barrierId > currentCheckpointId) {
- // new checkpoint
- currentCheckpointId = barrierId;
- notifyCheckpoint(receivedBarrier);
- }
- return;
- }
-
- // -- general code path for multiple input channels --
-
- if (numBarriersReceived > 0) {
- // this is only true if some alignment is already progress and was not canceled
-
- if (barrierId == currentCheckpointId) {
- // regular case
- onBarrier(channelIndex);
- }
- else if (barrierId > currentCheckpointId) {
- // we did not complete the current checkpoint, another started before
- LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
- "Skipping current checkpoint.",
- taskName,
- barrierId,
- currentCheckpointId);
-
- // let the task know we are not completing this
- notifyAbort(currentCheckpointId,
- new CheckpointException(
- "Barrier id: " + barrierId,
- CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED));
-
- // abort the current checkpoint
- releaseBlocksAndResetBarriers();
-
- // begin a the new checkpoint
- beginNewAlignment(barrierId, channelIndex);
- }
- else {
- // ignore trailing barrier from an earlier checkpoint (obsolete now)
- return;
- }
- }
- else if (barrierId > currentCheckpointId) {
- // first barrier of a new checkpoint
- beginNewAlignment(barrierId, channelIndex);
- }
- else {
- // either the current checkpoint was canceled (numBarriers == 0) or
- // this barrier is from an old subsumed checkpoint
- return;
- }
-
- // check if we have all barriers - since canceled checkpoints always have zero barriers
- // this can only happen on a non canceled checkpoint
- if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
- // actually trigger checkpoint
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
- taskName,
- receivedBarrier.getId(),
- receivedBarrier.getTimestamp());
- }
-
- releaseBlocksAndResetBarriers();
- notifyCheckpoint(receivedBarrier);
- }
- }
-
- private void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
- final long barrierId = cancelBarrier.getCheckpointId();
-
- // fast path for single channel cases
- if (totalNumberOfInputChannels == 1) {
- if (barrierId > currentCheckpointId) {
- // new checkpoint
- currentCheckpointId = barrierId;
- notifyAbortOnCancellationBarrier(barrierId);
- }
- return;
- }
-
- // -- general code path for multiple input channels --
-
- if (numBarriersReceived > 0) {
- // this is only true if some alignment is in progress and nothing was canceled
-
- if (barrierId == currentCheckpointId) {
- // cancel this alignment
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Checkpoint {} canceled, aborting alignment.", taskName, barrierId);
- }
-
- releaseBlocksAndResetBarriers();
- notifyAbortOnCancellationBarrier(barrierId);
- }
- else if (barrierId > currentCheckpointId) {
- // we canceled the next which also cancels the current
- LOG.warn("{}: Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " +
- "Skipping current checkpoint.",
- taskName,
- barrierId,
- currentCheckpointId);
-
- // this stops the current alignment
- releaseBlocksAndResetBarriers();
-
- // the next checkpoint starts as canceled
- currentCheckpointId = barrierId;
- startOfAlignmentTimestamp = 0L;
- latestAlignmentDurationNanos = 0L;
-
- notifyAbortOnCancellationBarrier(barrierId);
- }
-
- // else: ignore trailing (cancellation) barrier from an earlier checkpoint (obsolete now)
-
- }
- else if (barrierId > currentCheckpointId) {
- // first barrier of a new checkpoint is directly a cancellation
-
- // by setting the currentCheckpointId to this checkpoint while keeping the numBarriers
- // at zero means that no checkpoint barrier can start a new alignment
- currentCheckpointId = barrierId;
-
- startOfAlignmentTimestamp = 0L;
- latestAlignmentDurationNanos = 0L;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Checkpoint {} canceled, skipping alignment.", taskName, barrierId);
- }
-
- notifyAbortOnCancellationBarrier(barrierId);
- }
-
- // else: trailing barrier from either
- // - a previous (subsumed) checkpoint
- // - the current checkpoint if it was already canceled
- }
-
- private void processEndOfPartition() throws Exception {
- numClosedChannels++;
-
- if (numBarriersReceived > 0) {
- // let the task know we skip a checkpoint
- notifyAbort(currentCheckpointId,
- new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
-
- // no chance to complete this checkpoint
- releaseBlocksAndResetBarriers();
- }
- }
-
- private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws Exception {
- if (toNotifyOnCheckpoint != null) {
- CheckpointMetaData checkpointMetaData =
- new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
-
- CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
- .setBytesBufferedInAlignment(bufferStorage.currentBufferedSize())
- .setAlignmentDurationNanos(latestAlignmentDurationNanos);
-
- toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
- checkpointMetaData,
- checkpointBarrier.getCheckpointOptions(),
- checkpointMetrics);
- }
- }
-
- private void notifyAbortOnCancellationBarrier(long checkpointId) throws Exception {
- notifyAbort(checkpointId,
- new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
- }
-
- private void notifyAbort(long checkpointId, CheckpointException cause) throws Exception {
- if (toNotifyOnCheckpoint != null) {
- toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause);
- }
- }
-
- private void sizeLimitExceeded() throws Exception {
- long maxBufferedBytes = bufferStorage.getMaxBufferedBytes();
- // exceeded our limit - abort this checkpoint
- LOG.info("{}: Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded.",
- taskName,
- currentCheckpointId,
- maxBufferedBytes);
-
- releaseBlocksAndResetBarriers();
- notifyAbort(currentCheckpointId,
- new CheckpointException(
- "Max buffered bytes: " + maxBufferedBytes,
- CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED));
- }
-
@Override
public boolean isEmpty() {
return bufferStorage.isEmpty();
@@ -428,69 +199,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
bufferStorage.close();
}
- private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException {
- currentCheckpointId = checkpointId;
- onBarrier(channelIndex);
-
- startOfAlignmentTimestamp = System.nanoTime();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Starting stream alignment for checkpoint {}.", taskName, checkpointId);
- }
- }
-
- /**
- * Checks whether the channel with the given index is blocked.
- *
- * @param channelIndex The channel index to check.
- * @return True if the channel is blocked, false if not.
- */
- private boolean isBlocked(int channelIndex) {
- return blockedChannels[channelIndex];
- }
-
- /**
- * Blocks the given channel index, from which a barrier has been received.
- *
- * @param channelIndex The channel index to block.
- */
- private void onBarrier(int channelIndex) throws IOException {
- if (!blockedChannels[channelIndex]) {
- blockedChannels[channelIndex] = true;
-
- numBarriersReceived++;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Received barrier from channel {}.", taskName, channelIndex);
- }
- }
- else {
- throw new IOException("Stream corrupt: Repeated barrier for same checkpoint on input " + channelIndex);
- }
- }
-
- /**
- * Releases the blocks on all channels and resets the barrier count.
- * Makes sure the just written data is the next to be consumed.
- */
- private void releaseBlocksAndResetBarriers() throws IOException {
- LOG.debug("{}: End of stream alignment, feeding buffered data back.", taskName);
-
- for (int i = 0; i < blockedChannels.length; i++) {
- blockedChannels[i] = false;
- }
-
- bufferStorage.rollOver();
-
- // the next barrier that comes must assume it is the first
- numBarriersReceived = 0;
-
- if (startOfAlignmentTimestamp > 0) {
- latestAlignmentDurationNanos = System.nanoTime() - startOfAlignmentTimestamp;
- startOfAlignmentTimestamp = 0;
- }
- }
-
// ------------------------------------------------------------------------
// Properties
// ------------------------------------------------------------------------
@@ -501,22 +209,17 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
* @return The ID of the pending of completed checkpoint.
*/
public long getCurrentCheckpointId() {
- return this.currentCheckpointId;
+ return barrierAligner.getCurrentCheckpointId();
}
@Override
public long getAlignmentDurationNanos() {
- long start = this.startOfAlignmentTimestamp;
- if (start <= 0) {
- return latestAlignmentDurationNanos;
- } else {
- return System.nanoTime() - start;
- }
+ return barrierAligner.getAlignmentDurationNanos();
}
@Override
public int getNumberOfInputChannels() {
- return totalNumberOfInputChannels;
+ return inputGate.getNumberOfInputChannels();
}
// ------------------------------------------------------------------------
@@ -525,10 +228,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
@Override
public String toString() {
- return String.format("%s: last checkpoint: %d, current barriers: %d, closed channels: %d",
- taskName,
- currentCheckpointId,
- numBarriersReceived,
- numClosedChannels);
+ return barrierAligner.toString();
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java
index 8e6194d..4ad7eac 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java
@@ -67,8 +67,6 @@ public interface BufferStorage extends AutoCloseable {
Optional<BufferOrEvent> pollNext() throws IOException;
- long currentBufferedSize();
-
long getMaxBufferedBytes();
/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
similarity index 55%
copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
copy to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
index 0f8fa40..30e05c1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
@@ -1,12 +1,13 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,16 +19,12 @@
package org.apache.flink.streaming.runtime.io;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.slf4j.Logger;
@@ -36,26 +33,16 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until
- * all inputs have received the barrier for a given checkpoint.
- *
- * <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the
- * BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until
- * the blocks are released.
+ * {@link CheckpointBarrierAligner} keep tracks of received {@link CheckpointBarrier} on given
+ * channels and controls the alignment, by deciding which channels should be blocked and when to
+ * release blocked channels.
*/
@Internal
-public class BarrierBuffer implements CheckpointBarrierHandler {
-
- private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
+public class CheckpointBarrierAligner {
- /** The gate that the buffer draws its input from. */
- private final InputGate inputGate;
+ private static final Logger LOG = LoggerFactory.getLogger(CheckpointBarrierAligner.class);
/** Flags that indicate whether a channel is currently blocked/buffered. */
private final boolean[] blockedChannels;
@@ -63,9 +50,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
/** The total number of channels that this buffer handles data from. */
private final int totalNumberOfInputChannels;
- /** To utility to write blocked data to a file channel. */
- private final BufferStorage bufferStorage;
-
private final String taskName;
@Nullable
@@ -89,132 +73,47 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
/** The time (in nanoseconds) that the latest alignment took. */
private long latestAlignmentDurationNanos;
- /** Flag to indicate whether we have drawn all available input. */
- private boolean endOfStream;
-
- /** Indicate end of the input. Set to true after encountering {@link #endOfStream} and depleting
- * {@link #bufferStorage}. */
- private boolean isFinished;
-
- /**
- * Creates a new checkpoint stream aligner.
- *
- * <p>There is no limit to how much data may be buffered during an alignment.
- *
- * @param inputGate The input gate to draw the buffers and events from.
- * @param bufferStorage The storage to hold the buffers and events for blocked channels.
- */
- @VisibleForTesting
- BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage) {
- this (inputGate, bufferStorage, "Testing: No task associated", null);
- }
-
- /**
- * Creates a new checkpoint stream aligner.
- *
- * <p>The aligner will allow only alignments that buffer up to the given number of bytes.
- * When that number is exceeded, it will stop the alignment and notify the task that the
- * checkpoint has been cancelled.
- *
- * @param inputGate The input gate to draw the buffers and events from.
- * @param bufferStorage The storage to hold the buffers and events for blocked channels.
- * @param taskName The task name for logging.
- * @param toNotifyOnCheckpoint optional Handler that receives the checkpoint notifications.
- */
- BarrierBuffer(
- InputGate inputGate,
- BufferStorage bufferStorage,
- String taskName,
- @Nullable AbstractInvokable toNotifyOnCheckpoint) {
-
- this.inputGate = inputGate;
- this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
- this.blockedChannels = new boolean[this.totalNumberOfInputChannels];
-
- this.bufferStorage = checkNotNull(bufferStorage);
-
+ CheckpointBarrierAligner(
+ int totalNumberOfInputChannels,
+ String taskName,
+ @Nullable AbstractInvokable toNotifyOnCheckpoint) {
+ this.totalNumberOfInputChannels = totalNumberOfInputChannels;
this.taskName = taskName;
this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
- }
- @Override
- public CompletableFuture<?> isAvailable() {
- if (bufferStorage.isEmpty()) {
- return inputGate.isAvailable();
- }
- return AVAILABLE;
+ this.blockedChannels = new boolean[totalNumberOfInputChannels];
}
- // ------------------------------------------------------------------------
- // Buffer and barrier handling
- // ------------------------------------------------------------------------
+ public void releaseBlocksAndResetBarriers() throws IOException {
+ LOG.debug("{}: End of stream alignment, feeding buffered data back.", taskName);
- @Override
- public Optional<BufferOrEvent> pollNext() throws Exception {
- while (true) {
- // process buffered BufferOrEvents before grabbing new ones
- Optional<BufferOrEvent> next;
- if (bufferStorage.isEmpty()) {
- next = inputGate.pollNext();
- }
- else {
- // TODO: FLINK-12536 for non credit-based flow control, getNext method is blocking
- next = bufferStorage.pollNext();
- if (!next.isPresent()) {
- return pollNext();
- }
- }
+ for (int i = 0; i < blockedChannels.length; i++) {
+ blockedChannels[i] = false;
+ }
- if (!next.isPresent()) {
- return handleEmptyBuffer();
- }
+ // the next barrier that comes must assume it is the first
+ numBarriersReceived = 0;
- BufferOrEvent bufferOrEvent = next.get();
- if (isBlocked(bufferOrEvent.getChannelIndex())) {
- // if the channel is blocked, we just store the BufferOrEvent
- bufferStorage.add(bufferOrEvent);
- if (bufferStorage.isFull()) {
- sizeLimitExceeded();
- }
- }
- else if (bufferOrEvent.isBuffer()) {
- return next;
- }
- else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
- if (!endOfStream) {
- // process barriers only if there is a chance of the checkpoint completing
- processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
- }
- }
- else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
- processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
- }
- else {
- if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
- processEndOfPartition();
- }
- return next;
- }
+ if (startOfAlignmentTimestamp > 0) {
+ latestAlignmentDurationNanos = System.nanoTime() - startOfAlignmentTimestamp;
+ startOfAlignmentTimestamp = 0;
}
}
- private Optional<BufferOrEvent> handleEmptyBuffer() throws Exception {
- if (!inputGate.isFinished()) {
- return Optional.empty();
- }
-
- if (endOfStream) {
- isFinished = true;
- return Optional.empty();
- } else {
- // end of input stream. stream continues with the buffered data
- endOfStream = true;
- releaseBlocksAndResetBarriers();
- return pollNext();
- }
+ /**
+ * Checks whether the channel with the given index is blocked.
+ *
+ * @param channelIndex The channel index to check.
+ * @return True if the channel is blocked, false if not.
+ */
+ public boolean isBlocked(int channelIndex) {
+ return blockedChannels[channelIndex];
}
- private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
+ /**
+ * @return true if some blocked data should be unblocked/rolled over.
+ */
+ public boolean processBarrier(CheckpointBarrier receivedBarrier, int channelIndex, long bufferedBytes) throws Exception {
final long barrierId = receivedBarrier.getId();
// fast path for single channel cases
@@ -222,11 +121,13 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
if (barrierId > currentCheckpointId) {
// new checkpoint
currentCheckpointId = barrierId;
- notifyCheckpoint(receivedBarrier);
+ notifyCheckpoint(receivedBarrier, bufferedBytes);
}
- return;
+ return false;
}
+ boolean checkpointAborted = false;
+
// -- general code path for multiple input channels --
if (numBarriersReceived > 0) {
@@ -252,13 +153,14 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
// abort the current checkpoint
releaseBlocksAndResetBarriers();
+ checkpointAborted = true;
// begin a the new checkpoint
beginNewAlignment(barrierId, channelIndex);
}
else {
// ignore trailing barrier from an earlier checkpoint (obsolete now)
- return;
+ return false;
}
}
else if (barrierId > currentCheckpointId) {
@@ -268,7 +170,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
else {
// either the current checkpoint was canceled (numBarriers == 0) or
// this barrier is from an old subsumed checkpoint
- return;
+ return false;
}
// check if we have all barriers - since canceled checkpoints always have zero barriers
@@ -283,11 +185,47 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
releaseBlocksAndResetBarriers();
- notifyCheckpoint(receivedBarrier);
+ notifyCheckpoint(receivedBarrier, bufferedBytes);
+ return true;
+ }
+ return checkpointAborted;
+ }
+
+ protected void beginNewAlignment(long checkpointId, int channelIndex) throws IOException {
+ currentCheckpointId = checkpointId;
+ onBarrier(channelIndex);
+
+ startOfAlignmentTimestamp = System.nanoTime();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: Starting stream alignment for checkpoint {}.", taskName, checkpointId);
}
}
- private void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
+ /**
+ * Blocks the given channel index, from which a barrier has been received.
+ *
+ * @param channelIndex The channel index to block.
+ */
+ protected void onBarrier(int channelIndex) throws IOException {
+ if (!blockedChannels[channelIndex]) {
+ blockedChannels[channelIndex] = true;
+
+ numBarriersReceived++;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: Received barrier from channel {}.", taskName, channelIndex);
+ }
+ }
+ else {
+ throw new IOException("Stream corrupt: Repeated barrier for same checkpoint on input " + channelIndex);
+ }
+ }
+
+ /**
+ * @return true if some blocked data should be unblocked/rolled over.
+ */
+ public boolean processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
final long barrierId = cancelBarrier.getCheckpointId();
// fast path for single channel cases
@@ -297,7 +235,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
currentCheckpointId = barrierId;
notifyAbortOnCancellationBarrier(barrierId);
}
- return;
+ return false;
}
// -- general code path for multiple input channels --
@@ -313,6 +251,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
releaseBlocksAndResetBarriers();
notifyAbortOnCancellationBarrier(barrierId);
+ return true;
}
else if (barrierId > currentCheckpointId) {
// we canceled the next which also cancels the current
@@ -331,6 +270,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
latestAlignmentDurationNanos = 0L;
notifyAbortOnCancellationBarrier(barrierId);
+ return true;
}
// else: ignore trailing (cancellation) barrier from an earlier checkpoint (obsolete now)
@@ -351,33 +291,39 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
notifyAbortOnCancellationBarrier(barrierId);
+ return false;
}
// else: trailing barrier from either
// - a previous (subsumed) checkpoint
// - the current checkpoint if it was already canceled
+ return false;
}
- private void processEndOfPartition() throws Exception {
+ /**
+ * @return true if some blocked data should be unblocked/rolled over.
+ */
+ public boolean processEndOfPartition() throws Exception {
numClosedChannels++;
if (numBarriersReceived > 0) {
// let the task know we skip a checkpoint
notifyAbort(currentCheckpointId,
new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
-
// no chance to complete this checkpoint
releaseBlocksAndResetBarriers();
+ return true;
}
+ return false;
}
- private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws Exception {
+ private void notifyCheckpoint(CheckpointBarrier checkpointBarrier, long bufferedBytes) throws Exception {
if (toNotifyOnCheckpoint != null) {
CheckpointMetaData checkpointMetaData =
new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
- .setBytesBufferedInAlignment(bufferStorage.currentBufferedSize())
+ .setBytesBufferedInAlignment(bufferedBytes)
.setAlignmentDurationNanos(latestAlignmentDurationNanos);
toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
@@ -398,132 +344,19 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
}
}
- private void sizeLimitExceeded() throws Exception {
- long maxBufferedBytes = bufferStorage.getMaxBufferedBytes();
- // exceeded our limit - abort this checkpoint
- LOG.info("{}: Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded.",
- taskName,
- currentCheckpointId,
- maxBufferedBytes);
-
- releaseBlocksAndResetBarriers();
- notifyAbort(currentCheckpointId,
- new CheckpointException(
- "Max buffered bytes: " + maxBufferedBytes,
- CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED));
- }
-
- @Override
- public boolean isEmpty() {
- return bufferStorage.isEmpty();
- }
-
- @Override
- public boolean isFinished() {
- return isFinished;
- }
-
- @Override
- public void cleanup() throws IOException {
- bufferStorage.close();
- }
-
- private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException {
- currentCheckpointId = checkpointId;
- onBarrier(channelIndex);
-
- startOfAlignmentTimestamp = System.nanoTime();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Starting stream alignment for checkpoint {}.", taskName, checkpointId);
- }
- }
-
- /**
- * Checks whether the channel with the given index is blocked.
- *
- * @param channelIndex The channel index to check.
- * @return True if the channel is blocked, false if not.
- */
- private boolean isBlocked(int channelIndex) {
- return blockedChannels[channelIndex];
- }
-
- /**
- * Blocks the given channel index, from which a barrier has been received.
- *
- * @param channelIndex The channel index to block.
- */
- private void onBarrier(int channelIndex) throws IOException {
- if (!blockedChannels[channelIndex]) {
- blockedChannels[channelIndex] = true;
-
- numBarriersReceived++;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Received barrier from channel {}.", taskName, channelIndex);
- }
- }
- else {
- throw new IOException("Stream corrupt: Repeated barrier for same checkpoint on input " + channelIndex);
- }
- }
-
- /**
- * Releases the blocks on all channels and resets the barrier count.
- * Makes sure the just written data is the next to be consumed.
- */
- private void releaseBlocksAndResetBarriers() throws IOException {
- LOG.debug("{}: End of stream alignment, feeding buffered data back.", taskName);
-
- for (int i = 0; i < blockedChannels.length; i++) {
- blockedChannels[i] = false;
- }
-
- bufferStorage.rollOver();
-
- // the next barrier that comes must assume it is the first
- numBarriersReceived = 0;
-
- if (startOfAlignmentTimestamp > 0) {
- latestAlignmentDurationNanos = System.nanoTime() - startOfAlignmentTimestamp;
- startOfAlignmentTimestamp = 0;
- }
- }
-
- // ------------------------------------------------------------------------
- // Properties
- // ------------------------------------------------------------------------
-
- /**
- * Gets the ID defining the current pending, or just completed, checkpoint.
- *
- * @return The ID of the pending of completed checkpoint.
- */
public long getCurrentCheckpointId() {
- return this.currentCheckpointId;
+ return currentCheckpointId;
}
- @Override
public long getAlignmentDurationNanos() {
- long start = this.startOfAlignmentTimestamp;
- if (start <= 0) {
+ if (startOfAlignmentTimestamp <= 0) {
return latestAlignmentDurationNanos;
} else {
- return System.nanoTime() - start;
+ return System.nanoTime() - startOfAlignmentTimestamp;
}
}
@Override
- public int getNumberOfInputChannels() {
- return totalNumberOfInputChannels;
- }
-
- // ------------------------------------------------------------------------
- // Utilities
- // ------------------------------------------------------------------------
-
- @Override
public String toString() {
return String.format("%s: last checkpoint: %d, current barriers: %d, closed channels: %d",
taskName,
@@ -531,4 +364,12 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
numBarriersReceived,
numClosedChannels);
}
+
+ public void checkpointSizeLimitExceeded(long maxBufferedBytes) throws Exception {
+ releaseBlocksAndResetBarriers();
+ notifyAbort(currentCheckpointId,
+ new CheckpointException(
+ "Max buffered bytes: " + maxBufferedBytes,
+ CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED));
+ }
}