You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/09/22 23:14:33 UTC

[flink] branch release-1.16 updated: [FLINK-29397][runtime] Check if changelog provider is null

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new a8979b29e08 [FLINK-29397][runtime] Check if changelog provider is null
a8979b29e08 is described below

commit a8979b29e084641a5160768f48400682a5d79bbb
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Sep 21 18:10:08 2022 +0200

    [FLINK-29397][runtime] Check if changelog provider is null
---
 .../flink/runtime/state/TestTaskStateManager.java  |  4 +-
 .../runtime/state/TestTaskStateManagerBuilder.java |  7 ++-
 .../flink/streaming/runtime/tasks/StreamTask.java  |  5 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    | 57 ++++++++++++++++++++++
 4 files changed, 69 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
index a98d0fd7b06..6e25d4336e0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
@@ -90,7 +90,7 @@ public class TestTaskStateManager implements TaskStateManager {
             ExecutionAttemptID executionAttemptID,
             CheckpointResponder checkpointResponder,
             LocalRecoveryConfig localRecoveryConfig,
-            StateChangelogStorage<?> changelogStorage,
+            @Nullable StateChangelogStorage<?> changelogStorage,
             Map<Long, TaskStateSnapshot> jobManagerTaskStateSnapshotsByCheckpointId,
             long reportedCheckpointId,
             OneShotLatch waitForReportLatch) {
@@ -98,7 +98,7 @@ public class TestTaskStateManager implements TaskStateManager {
         this.executionAttemptID = checkNotNull(executionAttemptID);
         this.checkpointResponder = checkNotNull(checkpointResponder);
         this.localRecoveryDirectoryProvider = checkNotNull(localRecoveryConfig);
-        this.stateChangelogStorage = checkNotNull(changelogStorage);
+        this.stateChangelogStorage = changelogStorage;
         this.jobManagerTaskStateSnapshotsByCheckpointId =
                 checkNotNull(jobManagerTaskStateSnapshotsByCheckpointId);
         this.taskManagerTaskStateSnapshotsByCheckpointId = new HashMap<>();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManagerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManagerBuilder.java
index 57e05371a09..8bfb5d1abb0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManagerBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManagerBuilder.java
@@ -27,6 +27,8 @@ import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -40,7 +42,10 @@ public class TestTaskStateManagerBuilder {
     private ExecutionAttemptID executionAttemptID = createExecutionAttemptId();
     private CheckpointResponder checkpointResponder = new TestCheckpointResponder();
     private LocalRecoveryConfig localRecoveryConfig = TestLocalRecoveryConfig.disabled();
+
+    @Nullable
     private StateChangelogStorage<?> stateChangelogStorage = new InMemoryStateChangelogStorage();
+
     private final Map<Long, TaskStateSnapshot> jobManagerTaskStateSnapshotsByCheckpointId =
             new HashMap<>();
     private long reportedCheckpointId = -1L;
@@ -75,7 +80,7 @@ public class TestTaskStateManagerBuilder {
                 this.stateChangelogStorage == null
                         || this.stateChangelogStorage instanceof InMemoryStateChangelogStorage,
                 "StateChangelogStorage was already initialized to " + this.stateChangelogStorage);
-        this.stateChangelogStorage = checkNotNull(stateChangelogStorage);
+        this.stateChangelogStorage = stateChangelogStorage;
         return this;
     }
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 6e1080fba55..ece19531c47 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -577,11 +577,14 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
         } else if (!inputProcessor.isAvailable()) {
             timer = new GaugePeriodTimer(ioMetrics.getIdleTimeMsPerSecond());
             resumeFuture = inputProcessor.getAvailableFuture();
-        } else {
+        } else if (changelogWriterAvailabilityProvider != null) {
             // currently, waiting for changelog availability is reported as busy
             // todo: add new metric (FLINK-24402)
             timer = null;
             resumeFuture = changelogWriterAvailabilityProvider.getAvailableFuture();
+        } else {
+            // data availability has changed in the meantime; retry immediately
+            return;
         }
         assertNoException(
                 resumeFuture.thenRun(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 8777de72fcc..9f5bbf227e9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.AvailabilityProvider;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.api.StopMode;
@@ -90,6 +91,7 @@ import org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
 import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TaskStateManagerImpl;
+import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
@@ -1193,6 +1195,29 @@ public class StreamTaskTest extends TestLogger {
         }
     }
 
+    @Test
+    public void testProcessWithRaceInDataAvailability() throws Exception {
+        try (final MockEnvironment environment =
+                MockEnvironment.builder()
+                        .setTaskStateManager(
+                                TestTaskStateManager.builder()
+                                        // replicate NPE of FLINK-29397
+                                        .setStateChangelogStorage(null)
+                                        .build())
+                        .build()) {
+            environment.addOutputs(
+                    Collections.singletonList(new AvailabilityTestResultPartitionWriter(true)));
+
+            final StreamInputProcessor inputProcessor = new RacyTestInputProcessor();
+            final StreamTask<?, ?> task =
+                    new MockStreamTaskBuilder(environment)
+                            .setStreamInputProcessor(inputProcessor)
+                            .build();
+
+            task.invoke();
+        }
+    }
+
     /**
      * In this weird construct, we are:
      *
@@ -1954,6 +1979,38 @@ public class StreamTaskTest extends TestLogger {
         }
     }
 
+    /**
+     * A stream input processor implementation that replicates a race condition where processInput
+     * reports that nothing is available, but isAvailable (called later) returns true.
+     */
+    private static class RacyTestInputProcessor implements StreamInputProcessor {
+
+        private boolean firstCall = true;
+
+        @Override
+        public DataInputStatus processInput() {
+            try {
+                return firstCall ? DataInputStatus.NOTHING_AVAILABLE : DataInputStatus.END_OF_INPUT;
+            } finally {
+                firstCall = false;
+            }
+        }
+
+        @Override
+        public CompletableFuture<Void> prepareSnapshot(
+                ChannelStateWriter channelStateWriter, final long checkpointId) {
+            return FutureUtils.completedVoidFuture();
+        }
+
+        @Override
+        public void close() throws IOException {}
+
+        @Override
+        public CompletableFuture<?> getAvailableFuture() {
+            return AvailabilityProvider.AVAILABLE;
+        }
+    }
+
     public static Task createTask(
             Class<? extends TaskInvokable> invokable,
             ShuffleEnvironment shuffleEnvironment,