You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/10 16:10:06 UTC

[flink] 02/08: [FLINK-17869][hotfix] Don't pass ChannelStateWrite Future to AsyncCheckpointRunnable

This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 36609bb5dc2aee4061a47f7a767630f1f5912d96
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Jun 3 23:03:52 2020 +0200

    [FLINK-17869][hotfix] Don't pass ChannelStateWrite Future to AsyncCheckpointRunnable
    
    OperatorSnapshotFinalizer already waits and holds this future.
    ChannelStateWriter.getWriteResult() can then be non-idempotent.
    ChannelStateWriter.stop() can then be removed.
---
 .../checkpoint/channel/ChannelStateWriter.java     | 21 ++++++----------
 .../checkpoint/channel/ChannelStateWriterImpl.java | 10 ++------
 .../channel/ChannelStateWriterImplTest.java        | 28 ++++++++++------------
 .../checkpoint/channel/MockChannelStateWriter.java |  6 +----
 .../channel/RecordingChannelStateWriter.java       |  5 ----
 .../runtime/state/ChannelPersistenceITCase.java    |  2 +-
 .../runtime/tasks/AsyncCheckpointRunnable.java     |  6 -----
 .../tasks/SubtaskCheckpointCoordinatorImpl.java    | 15 +-----------
 .../runtime/tasks/LocalStateForwardingTest.java    |  2 --
 9 files changed, 24 insertions(+), 71 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
index 5dad559..af2a708 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
@@ -123,7 +123,7 @@ public interface ChannelStateWriter extends Closeable {
 	 * Finalize write of channel state data for the given checkpoint id.
 	 * Must be called after {@link #start(long, CheckpointOptions)} and all of the input data of the given checkpoint added.
 	 * When both {@link #finishInput} and {@link #finishOutput} were called the results can be (eventually) obtained
-	 * using {@link #getWriteResult}
+	 * using {@link #getAndRemoveWriteResult}
 	 */
 	void finishInput(long checkpointId);
 
@@ -131,24 +131,21 @@ public interface ChannelStateWriter extends Closeable {
 	 * Finalize write of channel state data for the given checkpoint id.
 	 * Must be called after {@link #start(long, CheckpointOptions)} and all of the output data of the given checkpoint added.
 	 * When both {@link #finishInput} and {@link #finishOutput} were called the results can be (eventually) obtained
-	 * using {@link #getWriteResult}
+	 * using {@link #getAndRemoveWriteResult}
 	 */
 	void finishOutput(long checkpointId);
 
 	/**
 	 * Aborts the checkpoint and fails pending result for this checkpoint.
+	 * @param cleanup true if {@link #getAndRemoveWriteResult(long)} is not supposed to be called afterwards.
 	 */
 	void abort(long checkpointId, Throwable cause);
 
 	/**
-	 * Must be called after {@link #start(long, CheckpointOptions)}.
+	 * Must be called after {@link #start(long, CheckpointOptions)} once.
+	 * @throws IllegalArgumentException if the passed checkpointId is not known.
 	 */
-	ChannelStateWriteResult getWriteResult(long checkpointId);
-
-	/**
-	 * Cleans up the internal state for the given checkpoint.
-	 */
-	void stop(long checkpointId);
+	ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) throws IllegalArgumentException;
 
 	ChannelStateWriter NO_OP = new NoOpChannelStateWriter();
 
@@ -181,16 +178,12 @@ public interface ChannelStateWriter extends Closeable {
 		}
 
 		@Override
-		public ChannelStateWriteResult getWriteResult(long checkpointId) {
+		public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) {
 			return ChannelStateWriteResult.EMPTY;
 		}
 
 		@Override
 		public void close() {
 		}
-
-		@Override
-		public void stop(long checkpointId) {
-		}
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index fc8655c..6158358 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -153,19 +153,13 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
 	}
 
 	@Override
-	public ChannelStateWriteResult getWriteResult(long checkpointId) {
+	public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) {
 		LOG.debug("{} requested write result, checkpoint {}", taskName, checkpointId);
-		ChannelStateWriteResult result = results.get(checkpointId);
+		ChannelStateWriteResult result = results.remove(checkpointId);
 		Preconditions.checkArgument(result != null, taskName + " channel state write result not found for checkpoint " + checkpointId);
 		return result;
 	}
 
-	@Override
-	public void stop(long checkpointId) {
-		LOG.debug("{} stopping checkpoint {}", taskName, checkpointId);
-		results.remove(checkpointId);
-	}
-
 	public void open() {
 		executor.start();
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
index 8c7d7f2..9d7a7ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
@@ -38,7 +38,6 @@ import static org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamF
 import static org.apache.flink.util.CloseableIterator.ofElements;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -69,9 +68,7 @@ public class ChannelStateWriterImplTest {
 		ChannelStateWriteResult result;
 		try (ChannelStateWriterImpl writer = openWriter()) {
 			callStart(writer);
-			result = writer.getWriteResult(CHECKPOINT_ID);
-			ChannelStateWriteResult result2 = writer.getWriteResult(CHECKPOINT_ID);
-			assertSame(result, result2);
+			result = writer.getAndRemoveWriteResult(CHECKPOINT_ID);
 			assertFalse(result.resultSubpartitionStateHandles.isDone());
 			assertFalse(result.inputChannelStateHandles.isDone());
 		}
@@ -79,22 +76,12 @@ public class ChannelStateWriterImplTest {
 		assertTrue(result.resultSubpartitionStateHandles.isDone());
 	}
 
-	@Test(expected = IllegalArgumentException.class)
-	public void testResultCleanup() throws IOException {
-		try (ChannelStateWriterImpl writer = openWriter()) {
-			callStart(writer);
-			writer.getWriteResult(CHECKPOINT_ID);
-			writer.stop(CHECKPOINT_ID);
-			writer.getWriteResult(CHECKPOINT_ID);
-		}
-	}
-
 	@Test
 	public void testAbort() throws Exception {
 		NetworkBuffer buffer = getBuffer();
 		runWithSyncWorker((writer, worker) -> {
 			callStart(writer);
-			ChannelStateWriteResult result = writer.getWriteResult(CHECKPOINT_ID);
+			ChannelStateWriteResult result = writer.getAndRemoveWriteResult(CHECKPOINT_ID);
 			callAddInputData(writer, buffer);
 			callAbort(writer);
 			worker.processAllRequests();
@@ -108,9 +95,18 @@ public class ChannelStateWriterImplTest {
 		NetworkBuffer buffer = getBuffer();
 		runWithSyncWorker((writer, worker) -> {
 			callStart(writer);
+			writer.abort(CHECKPOINT_ID, new TestException(), true);
+			writer.getAndRemoveWriteResult(CHECKPOINT_ID);
+		});
+	}
+
+	@Test
+	public void testAbortDoesNotClearsResults() throws Exception {
+		runWithSyncWorker((writer, worker) -> {
+			callStart(writer);
 			callAbort(writer);
 			worker.processAllRequests();
-			writer.getWriteResult(CHECKPOINT_ID);
+			writer.getAndRemoveWriteResult(CHECKPOINT_ID);
 		});
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
index 0a61066d..88bd334 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
@@ -101,7 +101,7 @@ public class MockChannelStateWriter implements ChannelStateWriter {
 	}
 
 	@Override
-	public ChannelStateWriteResult getWriteResult(long checkpointId) {
+	public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) {
 		return channelStateWriteResult;
 	}
 
@@ -117,8 +117,4 @@ public class MockChannelStateWriter implements ChannelStateWriter {
 		channelStateWriteResult.getInputChannelStateHandles().cancel(false);
 		channelStateWriteResult.getResultSubpartitionStateHandles().cancel(false);
 	}
-
-	@Override
-	public void stop(long checkpointId) {
-	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java
index d0cfe3f..151e38e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java
@@ -81,11 +81,6 @@ public class RecordingChannelStateWriter extends MockChannelStateWriter {
 		return lastFinishedCheckpointId;
 	}
 
-	@Override
-	public void stop(long checkpointId) {
-		lastFinishedCheckpointId = checkpointId;
-	}
-
 	public ListMultimap<InputChannelInfo, Buffer> getAddedInput() {
 		return addedInput;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
index b696c77..18caac0f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
@@ -110,7 +110,7 @@ public class ChannelPersistenceITCase {
 				writer.addOutputData(checkpointId, e.getKey(), SEQUENCE_NUMBER_UNKNOWN, e.getValue());
 			}
 			writer.finishOutput(checkpointId);
-			ChannelStateWriteResult result = writer.getWriteResult(checkpointId);
+			ChannelStateWriteResult result = writer.getAndRemoveWriteResult(checkpointId);
 			result.getResultSubpartitionStateHandles().join(); // prevent abnormal complete in close
 			return result;
 		}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
index e89e962..af10411 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.util.Map;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
@@ -60,7 +59,6 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable {
 	private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;
 	private final CheckpointMetaData checkpointMetaData;
 	private final CheckpointMetrics checkpointMetrics;
-	private final Future<?> channelWrittenFuture;
 	private final long asyncStartNanos;
 	private final AtomicReference<AsyncCheckpointState> asyncCheckpointState = new AtomicReference<>(AsyncCheckpointState.RUNNING);
 
@@ -68,7 +66,6 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable {
 			Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress,
 			CheckpointMetaData checkpointMetaData,
 			CheckpointMetrics checkpointMetrics,
-			Future<?> channelWrittenFuture,
 			long asyncStartNanos,
 			String taskName,
 			Consumer<AsyncCheckpointRunnable> register,
@@ -79,7 +76,6 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable {
 		this.operatorSnapshotsInProgress = checkNotNull(operatorSnapshotsInProgress);
 		this.checkpointMetaData = checkNotNull(checkpointMetaData);
 		this.checkpointMetrics = checkNotNull(checkpointMetrics);
-		this.channelWrittenFuture = checkNotNull(channelWrittenFuture);
 		this.asyncStartNanos = asyncStartNanos;
 		this.taskName = checkNotNull(taskName);
 		this.registerConsumer = register;
@@ -120,8 +116,6 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable {
 
 			checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
 
-			channelWrittenFuture.get();
-
 			if (asyncCheckpointState.compareAndSet(AsyncCheckpointState.RUNNING, AsyncCheckpointState.COMPLETED)) {
 
 				reportCompletedSnapshotStates(
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index d14594f..adfaa44 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl;
-import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -65,7 +64,6 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
@@ -432,22 +430,11 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
 	}
 
 	private void finishAndReportAsync(Map<OperatorID, OperatorSnapshotFutures> snapshotFutures, CheckpointMetaData metadata, CheckpointMetrics metrics, CheckpointOptions options) {
-		final Future<?> channelWrittenFuture;
-		if (includeChannelState(options)) {
-			ChannelStateWriteResult writeResult = channelStateWriter.getWriteResult(metadata.getCheckpointId());
-			channelWrittenFuture = CompletableFuture.allOf(
-					writeResult.getInputChannelStateHandles(),
-					writeResult.getResultSubpartitionStateHandles())
-				.whenComplete((dummy, ex) -> channelStateWriter.stop(metadata.getCheckpointId()));
-		} else {
-			channelWrittenFuture = FutureUtils.completedVoidFuture();
-		}
 		// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
 		executorService.execute(new AsyncCheckpointRunnable(
 			snapshotFutures,
 			metadata,
 			metrics,
-			channelWrittenFuture,
 			System.nanoTime(),
 			taskName,
 			registerConsumer(),
@@ -490,7 +477,7 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
 		long started = System.nanoTime();
 
 		ChannelStateWriteResult channelStateWriteResult = includeChannelState(checkpointOptions) ?
-								channelStateWriter.getWriteResult(checkpointId) :
+								channelStateWriter.getAndRemoveWriteResult(checkpointId) :
 								ChannelStateWriteResult.EMPTY;
 
 		CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(checkpointId, checkpointOptions.getTargetLocation());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
index 3e0703f..2eabca4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
@@ -59,7 +59,6 @@ import javax.annotation.Nullable;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.concurrent.RunnableFuture;
@@ -115,7 +114,6 @@ public class LocalStateForwardingTest extends TestLogger {
 			snapshots,
 			checkpointMetaData,
 			checkpointMetrics,
-			CompletableFuture.completedFuture(null),
 			0L,
 			testStreamTask.getName(),
 			asyncCheckpointRunnable -> {},