You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/01 08:39:22 UTC

[flink] 01/02: [hotfix][checkpointing] Fix the formatting of CheckpointBarrierUnaligner

This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e25b950f2c351c24515e769a175ec8d52e29f835
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu May 28 23:34:27 2020 +0800

    [hotfix][checkpointing] Fix the formatting of CheckpointBarrierUnaligner
---
 .../runtime/io/CheckpointBarrierUnaligner.java     | 81 +++++++++-------------
 1 file changed, 34 insertions(+), 47 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
index d39accf..01b1219 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
@@ -130,9 +130,6 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
 
 	/**
 	 * For unaligned checkpoint, it never blocks processing from the task aspect.
-	 *
-	 * <p>For PoC, we do not consider the possibility that the unaligned checkpoint would
-	 * not perform due to the max configured unaligned checkpoint size.
 	 */
 	@Override
 	public boolean isBlocked(int channelIndex) {
@@ -140,16 +137,13 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
 	}
 
 	/**
-	 * We still need to trigger checkpoint while reading the first barrier from one channel, because this might happen
-	 * earlier than the previous async trigger via mailbox by netty thread. And the {@link AbstractInvokable} has the
-	 * deduplication logic to guarantee trigger checkpoint only once finally.
+	 * We still need to trigger checkpoint while reading the first barrier from one channel, because
+	 * this might happen earlier than the previous async trigger via mailbox by netty thread.
 	 *
 	 * <p>Note this is also suitable for the trigger case of local input channel.
 	 */
 	@Override
-	public void processBarrier(
-			CheckpointBarrier receivedBarrier,
-			int channelIndex) throws Exception {
+	public void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
 		long barrierId = receivedBarrier.getId();
 		if (currentConsumedCheckpointId > barrierId || (currentConsumedCheckpointId == barrierId && !isCheckpointPending())) {
 			// ignore old and cancelled barriers
@@ -164,33 +158,32 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
 			hasInflightBuffers[channelIndex] = false;
 			numBarrierConsumed++;
 		}
-		// processBarrier is called from task thread and can actually happen before notifyBarrierReceived on empty
-		// buffer queues
-		// to avoid replicating any logic, we simply call notifyBarrierReceived here as well
 		threadSafeUnaligner.notifyBarrierReceived(receivedBarrier, channelInfos[channelIndex]);
 	}
 
 	@Override
 	public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception {
-		final long barrierId = cancelBarrier.getCheckpointId();
+		long cancelledId = cancelBarrier.getCheckpointId();
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("{}: Checkpoint {} canceled, aborting alignment.", taskName, cancelledId);
+		}
 
-		if (currentConsumedCheckpointId >= barrierId && !isCheckpointPending()) {
+		if (currentConsumedCheckpointId >= cancelledId && !isCheckpointPending()) {
 			return;
 		}
 
 		if (isCheckpointPending()) {
 			LOG.warn("{}: Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " +
-							"Skipping current checkpoint.",
-					taskName,
-					barrierId,
-					currentConsumedCheckpointId);
-		} else if (LOG.isDebugEnabled()) {
-			LOG.debug("{}: Checkpoint {} canceled, aborting alignment.", taskName, barrierId);
+					"Skipping current checkpoint.",
+				taskName,
+				cancelledId,
+				currentConsumedCheckpointId);
 		}
+
 		releaseBlocksAndResetBarriers();
-		currentConsumedCheckpointId = barrierId;
+		currentConsumedCheckpointId = cancelledId;
 		threadSafeUnaligner.setCurrentReceivedCheckpointId(currentConsumedCheckpointId);
-		notifyAbortOnCancellationBarrier(barrierId);
+		notifyAbortOnCancellationBarrier(cancelledId);
 	}
 
 	@Override
@@ -258,6 +251,11 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
 		return gateChannelOffsets[channelInfo.getGateIdx()] + channelInfo.getInputChannelIdx();
 	}
 
+	@VisibleForTesting
+	int getNumOpenChannels() {
+		return threadSafeUnaligner.getNumOpenChannels();
+	}
+
 	@ThreadSafe
 	private static class ThreadSafeUnaligner implements BufferReceivedListener, Closeable {
 
@@ -267,9 +265,7 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
 		 */
 		private final boolean[] storeNewBuffers;
 
-		/**
-		 * The number of input channels which has read the barrier by task.
-		 */
+		/** The number of input channels which has received or processed the barrier. */
 		private int numBarriersReceived;
 
 		/** A future indicating that all barriers of the a given checkpoint have been read. */
@@ -284,21 +280,18 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
 		 */
 		private long currentReceivedCheckpointId = -1L;
 
-		/** The number of opened channels. */
+		/** The number of open channels. */
 		private int numOpenChannels;
 
 		private final ChannelStateWriter channelStateWriter;
 
 		private final CheckpointBarrierUnaligner handler;
 
-		public ThreadSafeUnaligner(
-				int totalNumChannels,
-				ChannelStateWriter channelStateWriter,
-				CheckpointBarrierUnaligner handler) {
-			storeNewBuffers = new boolean[totalNumChannels];
+		ThreadSafeUnaligner(int totalNumChannels, ChannelStateWriter channelStateWriter, CheckpointBarrierUnaligner handler) {
+			this.numOpenChannels = totalNumChannels;
+			this.storeNewBuffers = new boolean[totalNumChannels];
 			this.channelStateWriter = channelStateWriter;
 			this.handler = handler;
-			numOpenChannels = totalNumChannels;
 		}
 
 		@Override
@@ -359,11 +352,10 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
 				// let the task know we are not completing this
 				long currentCheckpointId = currentReceivedCheckpointId;
 				handler.executeInTaskThread(() ->
-					handler.notifyAbort(currentCheckpointId,
-						new CheckpointException(
-							"Barrier id: " + barrierId,
-							CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)),
-					"notifyAbort");
+					handler.notifyAbort(
+						currentCheckpointId,
+						new CheckpointException("Barrier id: " + barrierId, CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)),
+						"notifyAbort");
 			}
 
 			currentReceivedCheckpointId = barrierId;
@@ -373,7 +365,7 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
 			channelStateWriter.start(barrierId, barrier.getCheckpointOptions());
 		}
 
-		public synchronized void resetReceivedBarriers(long checkpointId) {
+		synchronized void resetReceivedBarriers(long checkpointId) {
 			if (checkpointId >= currentReceivedCheckpointId && numBarriersReceived > 0) {
 				// avoid more data being serialized after abortion
 				Arrays.fill(storeNewBuffers, false);
@@ -382,7 +374,7 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
 			}
 		}
 
-		public synchronized CompletableFuture<Void> getAllBarriersReceivedFuture(long checkpointId) {
+		synchronized CompletableFuture<Void> getAllBarriersReceivedFuture(long checkpointId) {
 			if (checkpointId < currentReceivedCheckpointId) {
 				return FutureUtils.completedVoidFuture();
 			}
@@ -392,22 +384,17 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
 			return allBarriersReceivedFuture;
 		}
 
-		public synchronized void onChannelClosed() {
+		synchronized void onChannelClosed() {
 			numOpenChannels--;
 		}
 
-		public synchronized void setCurrentReceivedCheckpointId(long currentReceivedCheckpointId) {
+		synchronized void setCurrentReceivedCheckpointId(long currentReceivedCheckpointId) {
 			this.currentReceivedCheckpointId = Math.max(currentReceivedCheckpointId, this.currentReceivedCheckpointId);
 		}
 
 		@VisibleForTesting
-		public synchronized int getNumOpenChannels() {
+		synchronized int getNumOpenChannels() {
 			return numOpenChannels;
 		}
 	}
-
-	@VisibleForTesting
-	public int getNumOpenChannels() {
-		return threadSafeUnaligner.getNumOpenChannels();
-	}
 }