You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/01 08:39:22 UTC
[flink] 01/02: [hotfix][checkpointing] Fix the formatting of
CheckpointBarrierUnaligner
This is an automated email from the ASF dual-hosted git repository.
zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e25b950f2c351c24515e769a175ec8d52e29f835
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu May 28 23:34:27 2020 +0800
[hotfix][checkpointing] Fix the formatting of CheckpointBarrierUnaligner
---
.../runtime/io/CheckpointBarrierUnaligner.java | 81 +++++++++-------------
1 file changed, 34 insertions(+), 47 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
index d39accf..01b1219 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
@@ -130,9 +130,6 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
/**
* For unaligned checkpoint, it never blocks processing from the task aspect.
- *
- * <p>For PoC, we do not consider the possibility that the unaligned checkpoint would
- * not perform due to the max configured unaligned checkpoint size.
*/
@Override
public boolean isBlocked(int channelIndex) {
@@ -140,16 +137,13 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
}
/**
- * We still need to trigger checkpoint while reading the first barrier from one channel, because this might happen
- * earlier than the previous async trigger via mailbox by netty thread. And the {@link AbstractInvokable} has the
- * deduplication logic to guarantee trigger checkpoint only once finally.
+ * We still need to trigger checkpoint while reading the first barrier from one channel, because
+ * this might happen earlier than the previous async trigger via mailbox by netty thread.
*
* <p>Note this is also suitable for the trigger case of local input channel.
*/
@Override
- public void processBarrier(
- CheckpointBarrier receivedBarrier,
- int channelIndex) throws Exception {
+ public void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
long barrierId = receivedBarrier.getId();
if (currentConsumedCheckpointId > barrierId || (currentConsumedCheckpointId == barrierId && !isCheckpointPending())) {
// ignore old and cancelled barriers
@@ -164,33 +158,32 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
hasInflightBuffers[channelIndex] = false;
numBarrierConsumed++;
}
- // processBarrier is called from task thread and can actually happen before notifyBarrierReceived on empty
- // buffer queues
- // to avoid replicating any logic, we simply call notifyBarrierReceived here as well
threadSafeUnaligner.notifyBarrierReceived(receivedBarrier, channelInfos[channelIndex]);
}
@Override
public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
- final long barrierId = cancelBarrier.getCheckpointId();
+ long cancelledId = cancelBarrier.getCheckpointId();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: Checkpoint {} canceled, aborting alignment.", taskName, cancelledId);
+ }
- if (currentConsumedCheckpointId >= barrierId && !isCheckpointPending()) {
+ if (currentConsumedCheckpointId >= cancelledId && !isCheckpointPending()) {
return;
}
if (isCheckpointPending()) {
LOG.warn("{}: Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " +
- "Skipping current checkpoint.",
- taskName,
- barrierId,
- currentConsumedCheckpointId);
- } else if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Checkpoint {} canceled, aborting alignment.", taskName, barrierId);
+ "Skipping current checkpoint.",
+ taskName,
+ cancelledId,
+ currentConsumedCheckpointId);
}
+
releaseBlocksAndResetBarriers();
- currentConsumedCheckpointId = barrierId;
+ currentConsumedCheckpointId = cancelledId;
threadSafeUnaligner.setCurrentReceivedCheckpointId(currentConsumedCheckpointId);
- notifyAbortOnCancellationBarrier(barrierId);
+ notifyAbortOnCancellationBarrier(cancelledId);
}
@Override
@@ -258,6 +251,11 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
return gateChannelOffsets[channelInfo.getGateIdx()] + channelInfo.getInputChannelIdx();
}
+ @VisibleForTesting
+ int getNumOpenChannels() {
+ return threadSafeUnaligner.getNumOpenChannels();
+ }
+
@ThreadSafe
private static class ThreadSafeUnaligner implements BufferReceivedListener, Closeable {
@@ -267,9 +265,7 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
*/
private final boolean[] storeNewBuffers;
- /**
- * The number of input channels which has read the barrier by task.
- */
+ /** The number of input channels which has received or processed the barrier. */
private int numBarriersReceived;
/** A future indicating that all barriers of the a given checkpoint have been read. */
@@ -284,21 +280,18 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
*/
private long currentReceivedCheckpointId = -1L;
- /** The number of opened channels. */
+ /** The number of open channels. */
private int numOpenChannels;
private final ChannelStateWriter channelStateWriter;
private final CheckpointBarrierUnaligner handler;
- public ThreadSafeUnaligner(
- int totalNumChannels,
- ChannelStateWriter channelStateWriter,
- CheckpointBarrierUnaligner handler) {
- storeNewBuffers = new boolean[totalNumChannels];
+ ThreadSafeUnaligner(int totalNumChannels, ChannelStateWriter channelStateWriter, CheckpointBarrierUnaligner handler) {
+ this.numOpenChannels = totalNumChannels;
+ this.storeNewBuffers = new boolean[totalNumChannels];
this.channelStateWriter = channelStateWriter;
this.handler = handler;
- numOpenChannels = totalNumChannels;
}
@Override
@@ -359,11 +352,10 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
// let the task know we are not completing this
long currentCheckpointId = currentReceivedCheckpointId;
handler.executeInTaskThread(() ->
- handler.notifyAbort(currentCheckpointId,
- new CheckpointException(
- "Barrier id: " + barrierId,
- CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)),
- "notifyAbort");
+ handler.notifyAbort(
+ currentCheckpointId,
+ new CheckpointException("Barrier id: " + barrierId, CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)),
+ "notifyAbort");
}
currentReceivedCheckpointId = barrierId;
@@ -373,7 +365,7 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
channelStateWriter.start(barrierId, barrier.getCheckpointOptions());
}
- public synchronized void resetReceivedBarriers(long checkpointId) {
+ synchronized void resetReceivedBarriers(long checkpointId) {
if (checkpointId >= currentReceivedCheckpointId && numBarriersReceived > 0) {
// avoid more data being serialized after abortion
Arrays.fill(storeNewBuffers, false);
@@ -382,7 +374,7 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
}
}
- public synchronized CompletableFuture<Void> getAllBarriersReceivedFuture(long checkpointId) {
+ synchronized CompletableFuture<Void> getAllBarriersReceivedFuture(long checkpointId) {
if (checkpointId < currentReceivedCheckpointId) {
return FutureUtils.completedVoidFuture();
}
@@ -392,22 +384,17 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
return allBarriersReceivedFuture;
}
- public synchronized void onChannelClosed() {
+ synchronized void onChannelClosed() {
numOpenChannels--;
}
- public synchronized void setCurrentReceivedCheckpointId(long currentReceivedCheckpointId) {
+ synchronized void setCurrentReceivedCheckpointId(long currentReceivedCheckpointId) {
this.currentReceivedCheckpointId = Math.max(currentReceivedCheckpointId, this.currentReceivedCheckpointId);
}
@VisibleForTesting
- public synchronized int getNumOpenChannels() {
+ synchronized int getNumOpenChannels() {
return numOpenChannels;
}
}
-
- @VisibleForTesting
- public int getNumOpenChannels() {
- return threadSafeUnaligner.getNumOpenChannels();
- }
}