You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/10 16:10:06 UTC
[flink] 02/08: [FLINK-17869][hotfix] Don't pass ChannelStateWrite
Future to AsyncCheckpointRunnable
This is an automated email from the ASF dual-hosted git repository.
zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 36609bb5dc2aee4061a47f7a767630f1f5912d96
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Jun 3 23:03:52 2020 +0200
[FLINK-17869][hotfix] Don't pass ChannelStateWrite Future to AsyncCheckpointRunnable
OperatorSnapshotFinalizer already waits and holds this future.
ChannelStateWriter.getWriteResult() can then be non-idempotent.
ChannelStateWriter.stop() can then be removed.
---
.../checkpoint/channel/ChannelStateWriter.java | 21 ++++++----------
.../checkpoint/channel/ChannelStateWriterImpl.java | 10 ++------
.../channel/ChannelStateWriterImplTest.java | 28 ++++++++++------------
.../checkpoint/channel/MockChannelStateWriter.java | 6 +----
.../channel/RecordingChannelStateWriter.java | 5 ----
.../runtime/state/ChannelPersistenceITCase.java | 2 +-
.../runtime/tasks/AsyncCheckpointRunnable.java | 6 -----
.../tasks/SubtaskCheckpointCoordinatorImpl.java | 15 +-----------
.../runtime/tasks/LocalStateForwardingTest.java | 2 --
9 files changed, 24 insertions(+), 71 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
index 5dad559..af2a708 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
@@ -123,7 +123,7 @@ public interface ChannelStateWriter extends Closeable {
* Finalize write of channel state data for the given checkpoint id.
* Must be called after {@link #start(long, CheckpointOptions)} and all of the input data of the given checkpoint added.
* When both {@link #finishInput} and {@link #finishOutput} were called the results can be (eventually) obtained
- * using {@link #getWriteResult}
+ * using {@link #getAndRemoveWriteResult}
*/
void finishInput(long checkpointId);
@@ -131,24 +131,21 @@ public interface ChannelStateWriter extends Closeable {
* Finalize write of channel state data for the given checkpoint id.
* Must be called after {@link #start(long, CheckpointOptions)} and all of the output data of the given checkpoint added.
* When both {@link #finishInput} and {@link #finishOutput} were called the results can be (eventually) obtained
- * using {@link #getWriteResult}
+ * using {@link #getAndRemoveWriteResult}
*/
void finishOutput(long checkpointId);
/**
* Aborts the checkpoint and fails pending result for this checkpoint.
+ * @param cleanup true if {@link #getAndRemoveWriteResult(long)} is not supposed to be called afterwards.
*/
void abort(long checkpointId, Throwable cause);
/**
- * Must be called after {@link #start(long, CheckpointOptions)}.
+ * Must be called after {@link #start(long, CheckpointOptions)} once.
+ * @throws IllegalArgumentException if the passed checkpointId is not known.
*/
- ChannelStateWriteResult getWriteResult(long checkpointId);
-
- /**
- * Cleans up the internal state for the given checkpoint.
- */
- void stop(long checkpointId);
+ ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) throws IllegalArgumentException;
ChannelStateWriter NO_OP = new NoOpChannelStateWriter();
@@ -181,16 +178,12 @@ public interface ChannelStateWriter extends Closeable {
}
@Override
- public ChannelStateWriteResult getWriteResult(long checkpointId) {
+ public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) {
return ChannelStateWriteResult.EMPTY;
}
@Override
public void close() {
}
-
- @Override
- public void stop(long checkpointId) {
- }
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index fc8655c..6158358 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -153,19 +153,13 @@ public class ChannelStateWriterImpl implements ChannelStateWriter {
}
@Override
- public ChannelStateWriteResult getWriteResult(long checkpointId) {
+ public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) {
LOG.debug("{} requested write result, checkpoint {}", taskName, checkpointId);
- ChannelStateWriteResult result = results.get(checkpointId);
+ ChannelStateWriteResult result = results.remove(checkpointId);
Preconditions.checkArgument(result != null, taskName + " channel state write result not found for checkpoint " + checkpointId);
return result;
}
- @Override
- public void stop(long checkpointId) {
- LOG.debug("{} stopping checkpoint {}", taskName, checkpointId);
- results.remove(checkpointId);
- }
-
public void open() {
executor.start();
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
index 8c7d7f2..9d7a7ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
@@ -38,7 +38,6 @@ import static org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamF
import static org.apache.flink.util.CloseableIterator.ofElements;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
/**
@@ -69,9 +68,7 @@ public class ChannelStateWriterImplTest {
ChannelStateWriteResult result;
try (ChannelStateWriterImpl writer = openWriter()) {
callStart(writer);
- result = writer.getWriteResult(CHECKPOINT_ID);
- ChannelStateWriteResult result2 = writer.getWriteResult(CHECKPOINT_ID);
- assertSame(result, result2);
+ result = writer.getAndRemoveWriteResult(CHECKPOINT_ID);
assertFalse(result.resultSubpartitionStateHandles.isDone());
assertFalse(result.inputChannelStateHandles.isDone());
}
@@ -79,22 +76,12 @@ public class ChannelStateWriterImplTest {
assertTrue(result.resultSubpartitionStateHandles.isDone());
}
- @Test(expected = IllegalArgumentException.class)
- public void testResultCleanup() throws IOException {
- try (ChannelStateWriterImpl writer = openWriter()) {
- callStart(writer);
- writer.getWriteResult(CHECKPOINT_ID);
- writer.stop(CHECKPOINT_ID);
- writer.getWriteResult(CHECKPOINT_ID);
- }
- }
-
@Test
public void testAbort() throws Exception {
NetworkBuffer buffer = getBuffer();
runWithSyncWorker((writer, worker) -> {
callStart(writer);
- ChannelStateWriteResult result = writer.getWriteResult(CHECKPOINT_ID);
+ ChannelStateWriteResult result = writer.getAndRemoveWriteResult(CHECKPOINT_ID);
callAddInputData(writer, buffer);
callAbort(writer);
worker.processAllRequests();
@@ -108,9 +95,18 @@ public class ChannelStateWriterImplTest {
NetworkBuffer buffer = getBuffer();
runWithSyncWorker((writer, worker) -> {
callStart(writer);
+ writer.abort(CHECKPOINT_ID, new TestException(), true);
+ writer.getAndRemoveWriteResult(CHECKPOINT_ID);
+ });
+ }
+
+ @Test
+ public void testAbortDoesNotClearsResults() throws Exception {
+ runWithSyncWorker((writer, worker) -> {
+ callStart(writer);
callAbort(writer);
worker.processAllRequests();
- writer.getWriteResult(CHECKPOINT_ID);
+ writer.getAndRemoveWriteResult(CHECKPOINT_ID);
});
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
index 0a61066d..88bd334 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
@@ -101,7 +101,7 @@ public class MockChannelStateWriter implements ChannelStateWriter {
}
@Override
- public ChannelStateWriteResult getWriteResult(long checkpointId) {
+ public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) {
return channelStateWriteResult;
}
@@ -117,8 +117,4 @@ public class MockChannelStateWriter implements ChannelStateWriter {
channelStateWriteResult.getInputChannelStateHandles().cancel(false);
channelStateWriteResult.getResultSubpartitionStateHandles().cancel(false);
}
-
- @Override
- public void stop(long checkpointId) {
- }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java
index d0cfe3f..151e38e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java
@@ -81,11 +81,6 @@ public class RecordingChannelStateWriter extends MockChannelStateWriter {
return lastFinishedCheckpointId;
}
- @Override
- public void stop(long checkpointId) {
- lastFinishedCheckpointId = checkpointId;
- }
-
public ListMultimap<InputChannelInfo, Buffer> getAddedInput() {
return addedInput;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
index b696c77..18caac0f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
@@ -110,7 +110,7 @@ public class ChannelPersistenceITCase {
writer.addOutputData(checkpointId, e.getKey(), SEQUENCE_NUMBER_UNKNOWN, e.getValue());
}
writer.finishOutput(checkpointId);
- ChannelStateWriteResult result = writer.getWriteResult(checkpointId);
+ ChannelStateWriteResult result = writer.getAndRemoveWriteResult(checkpointId);
result.getResultSubpartitionStateHandles().join(); // prevent abnormal complete in close
return result;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
index e89e962..af10411 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.Map;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
@@ -60,7 +59,6 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable {
private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;
private final CheckpointMetaData checkpointMetaData;
private final CheckpointMetrics checkpointMetrics;
- private final Future<?> channelWrittenFuture;
private final long asyncStartNanos;
private final AtomicReference<AsyncCheckpointState> asyncCheckpointState = new AtomicReference<>(AsyncCheckpointState.RUNNING);
@@ -68,7 +66,6 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable {
Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress,
CheckpointMetaData checkpointMetaData,
CheckpointMetrics checkpointMetrics,
- Future<?> channelWrittenFuture,
long asyncStartNanos,
String taskName,
Consumer<AsyncCheckpointRunnable> register,
@@ -79,7 +76,6 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable {
this.operatorSnapshotsInProgress = checkNotNull(operatorSnapshotsInProgress);
this.checkpointMetaData = checkNotNull(checkpointMetaData);
this.checkpointMetrics = checkNotNull(checkpointMetrics);
- this.channelWrittenFuture = checkNotNull(channelWrittenFuture);
this.asyncStartNanos = asyncStartNanos;
this.taskName = checkNotNull(taskName);
this.registerConsumer = register;
@@ -120,8 +116,6 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable {
checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
- channelWrittenFuture.get();
-
if (asyncCheckpointState.compareAndSet(AsyncCheckpointState.RUNNING, AsyncCheckpointState.COMPLETED)) {
reportCompletedSnapshotStates(
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index d14594f..adfaa44 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl;
-import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -65,7 +64,6 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Supplier;
@@ -432,22 +430,11 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
}
private void finishAndReportAsync(Map<OperatorID, OperatorSnapshotFutures> snapshotFutures, CheckpointMetaData metadata, CheckpointMetrics metrics, CheckpointOptions options) {
- final Future<?> channelWrittenFuture;
- if (includeChannelState(options)) {
- ChannelStateWriteResult writeResult = channelStateWriter.getWriteResult(metadata.getCheckpointId());
- channelWrittenFuture = CompletableFuture.allOf(
- writeResult.getInputChannelStateHandles(),
- writeResult.getResultSubpartitionStateHandles())
- .whenComplete((dummy, ex) -> channelStateWriter.stop(metadata.getCheckpointId()));
- } else {
- channelWrittenFuture = FutureUtils.completedVoidFuture();
- }
// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
executorService.execute(new AsyncCheckpointRunnable(
snapshotFutures,
metadata,
metrics,
- channelWrittenFuture,
System.nanoTime(),
taskName,
registerConsumer(),
@@ -490,7 +477,7 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
long started = System.nanoTime();
ChannelStateWriteResult channelStateWriteResult = includeChannelState(checkpointOptions) ?
- channelStateWriter.getWriteResult(checkpointId) :
+ channelStateWriter.getAndRemoveWriteResult(checkpointId) :
ChannelStateWriteResult.EMPTY;
CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(checkpointId, checkpointOptions.getTargetLocation());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
index 3e0703f..2eabca4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
@@ -59,7 +59,6 @@ import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
@@ -115,7 +114,6 @@ public class LocalStateForwardingTest extends TestLogger {
snapshots,
checkpointMetaData,
checkpointMetrics,
- CompletableFuture.completedFuture(null),
0L,
testStreamTask.getName(),
asyncCheckpointRunnable -> {},