You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/09/22 23:14:33 UTC
[flink] branch release-1.16 updated: [FLINK-29397][runtime] Check if changelog provider is null
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new a8979b29e08 [FLINK-29397][runtime] Check if changelog provider is null
a8979b29e08 is described below
commit a8979b29e084641a5160768f48400682a5d79bbb
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Sep 21 18:10:08 2022 +0200
[FLINK-29397][runtime] Check if changelog provider is null
---
.../flink/runtime/state/TestTaskStateManager.java | 4 +-
.../runtime/state/TestTaskStateManagerBuilder.java | 7 ++-
.../flink/streaming/runtime/tasks/StreamTask.java | 5 +-
.../streaming/runtime/tasks/StreamTaskTest.java | 57 ++++++++++++++++++++++
4 files changed, 69 insertions(+), 4 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
index a98d0fd7b06..6e25d4336e0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
@@ -90,7 +90,7 @@ public class TestTaskStateManager implements TaskStateManager {
ExecutionAttemptID executionAttemptID,
CheckpointResponder checkpointResponder,
LocalRecoveryConfig localRecoveryConfig,
- StateChangelogStorage<?> changelogStorage,
+ @Nullable StateChangelogStorage<?> changelogStorage,
Map<Long, TaskStateSnapshot> jobManagerTaskStateSnapshotsByCheckpointId,
long reportedCheckpointId,
OneShotLatch waitForReportLatch) {
@@ -98,7 +98,7 @@ public class TestTaskStateManager implements TaskStateManager {
this.executionAttemptID = checkNotNull(executionAttemptID);
this.checkpointResponder = checkNotNull(checkpointResponder);
this.localRecoveryDirectoryProvider = checkNotNull(localRecoveryConfig);
- this.stateChangelogStorage = checkNotNull(changelogStorage);
+ this.stateChangelogStorage = changelogStorage;
this.jobManagerTaskStateSnapshotsByCheckpointId =
checkNotNull(jobManagerTaskStateSnapshotsByCheckpointId);
this.taskManagerTaskStateSnapshotsByCheckpointId = new HashMap<>();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManagerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManagerBuilder.java
index 57e05371a09..8bfb5d1abb0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManagerBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManagerBuilder.java
@@ -27,6 +27,8 @@ import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
+
import java.util.HashMap;
import java.util.Map;
@@ -40,7 +42,10 @@ public class TestTaskStateManagerBuilder {
private ExecutionAttemptID executionAttemptID = createExecutionAttemptId();
private CheckpointResponder checkpointResponder = new TestCheckpointResponder();
private LocalRecoveryConfig localRecoveryConfig = TestLocalRecoveryConfig.disabled();
+
+ @Nullable
private StateChangelogStorage<?> stateChangelogStorage = new InMemoryStateChangelogStorage();
+
private final Map<Long, TaskStateSnapshot> jobManagerTaskStateSnapshotsByCheckpointId =
new HashMap<>();
private long reportedCheckpointId = -1L;
@@ -75,7 +80,7 @@ public class TestTaskStateManagerBuilder {
this.stateChangelogStorage == null
|| this.stateChangelogStorage instanceof InMemoryStateChangelogStorage,
"StateChangelogStorage was already initialized to " + this.stateChangelogStorage);
- this.stateChangelogStorage = checkNotNull(stateChangelogStorage);
+ this.stateChangelogStorage = stateChangelogStorage;
return this;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 6e1080fba55..ece19531c47 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -577,11 +577,14 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
} else if (!inputProcessor.isAvailable()) {
timer = new GaugePeriodTimer(ioMetrics.getIdleTimeMsPerSecond());
resumeFuture = inputProcessor.getAvailableFuture();
- } else {
+ } else if (changelogWriterAvailabilityProvider != null) {
// currently, waiting for changelog availability is reported as busy
// todo: add new metric (FLINK-24402)
timer = null;
resumeFuture = changelogWriterAvailabilityProvider.getAvailableFuture();
+ } else {
+ // data availability has changed in the meantime; retry immediately
+ return;
}
assertNoException(
resumeFuture.thenRun(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 8777de72fcc..9f5bbf227e9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.api.StopMode;
@@ -90,6 +91,7 @@ import org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
+import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
@@ -1193,6 +1195,29 @@ public class StreamTaskTest extends TestLogger {
}
}
+ @Test
+ public void testProcessWithRaceInDataAvailability() throws Exception {
+ try (final MockEnvironment environment =
+ MockEnvironment.builder()
+ .setTaskStateManager(
+ TestTaskStateManager.builder()
+ // replicate NPE of FLINK-29397
+ .setStateChangelogStorage(null)
+ .build())
+ .build()) {
+ environment.addOutputs(
+ Collections.singletonList(new AvailabilityTestResultPartitionWriter(true)));
+
+ final StreamInputProcessor inputProcessor = new RacyTestInputProcessor();
+ final StreamTask<?, ?> task =
+ new MockStreamTaskBuilder(environment)
+ .setStreamInputProcessor(inputProcessor)
+ .build();
+
+ task.invoke();
+ }
+ }
+
/**
* In this weird construct, we are:
*
@@ -1954,6 +1979,38 @@ public class StreamTaskTest extends TestLogger {
}
}
+ /**
+ * A stream input processor implementation that replicates a race condition where processInput
+ * reports that nothing is available, but isAvailable (called later) returns true.
+ */
+ private static class RacyTestInputProcessor implements StreamInputProcessor {
+
+ private boolean firstCall = true;
+
+ @Override
+ public DataInputStatus processInput() {
+ try {
+ return firstCall ? DataInputStatus.NOTHING_AVAILABLE : DataInputStatus.END_OF_INPUT;
+ } finally {
+ firstCall = false;
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> prepareSnapshot(
+ ChannelStateWriter channelStateWriter, final long checkpointId) {
+ return FutureUtils.completedVoidFuture();
+ }
+
+ @Override
+ public void close() throws IOException {}
+
+ @Override
+ public CompletableFuture<?> getAvailableFuture() {
+ return AvailabilityProvider.AVAILABLE;
+ }
+ }
+
public static Task createTask(
Class<? extends TaskInvokable> invokable,
ShuffleEnvironment shuffleEnvironment,