You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/01/22 11:17:40 UTC

[flink] branch release-1.14 updated: [FLINK-25678][runtime] Make TaskExecutorStateChangelogStoragesManager.shutdown thread-safe

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 11a406e  [FLINK-25678][runtime] Make TaskExecutorStateChangelogStoragesManager.shutdown thread-safe
11a406e is described below

commit 11a406e67057ca9260c16c08054c209e3452a291
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Jan 19 15:22:51 2022 +0100

    [FLINK-25678][runtime] Make TaskExecutorStateChangelogStoragesManager.shutdown thread-safe
    
    The method is called from the shutdown hook and must be thread-safe.
---
 .../TaskExecutorStateChangelogStoragesManager.java | 100 ++++++++++++---------
 1 file changed, 57 insertions(+), 43 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
index 1af6e35..934eff8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
@@ -29,16 +29,16 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
 
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
-/**
- * This class holds the all {@link StateChangelogStorage} objects for a task executor (manager). No
- * thread-safe.
- */
+/** This class holds the all {@link StateChangelogStorage} objects for a task executor (manager). */
+@ThreadSafe
 public class TaskExecutorStateChangelogStoragesManager {
 
     /** Logger for this class. */
@@ -50,10 +50,14 @@ public class TaskExecutorStateChangelogStoragesManager {
      * that own the instance of this. Maps from job id to all the subtask's state changelog
      * storages. Value type Optional is for containing the null value.
      */
+    @GuardedBy("lock")
     private final Map<JobID, Optional<StateChangelogStorage<?>>> changelogStoragesByJobId;
 
+    @GuardedBy("lock")
     private boolean closed;
 
+    private final Object lock = new Object();
+
     /** shutdown hook for this manager. */
     private final Thread shutdownHook;
 
@@ -69,63 +73,73 @@ public class TaskExecutorStateChangelogStoragesManager {
     @Nullable
     public StateChangelogStorage<?> stateChangelogStorageForJob(
             @Nonnull JobID jobId, Configuration configuration) throws IOException {
-        if (closed) {
-            throw new IllegalStateException(
-                    "TaskExecutorStateChangelogStoragesManager is already closed and cannot "
-                            + "register a new StateChangelogStorage.");
-        }
-
-        Optional<StateChangelogStorage<?>> stateChangelogStorage =
-                changelogStoragesByJobId.get(jobId);
-
-        if (stateChangelogStorage == null) {
-            StateChangelogStorage<?> loaded = StateChangelogStorageLoader.load(configuration);
-            stateChangelogStorage = Optional.ofNullable(loaded);
-            changelogStoragesByJobId.put(jobId, stateChangelogStorage);
+        synchronized (lock) {
+            if (closed) {
+                throw new IllegalStateException(
+                        "TaskExecutorStateChangelogStoragesManager is already closed and cannot "
+                                + "register a new StateChangelogStorage.");
+            }
 
-            if (loaded != null) {
-                LOG.debug("Registered new state changelog storage for job {} : {}.", jobId, loaded);
+            Optional<StateChangelogStorage<?>> stateChangelogStorage =
+                    changelogStoragesByJobId.get(jobId);
+
+            if (stateChangelogStorage == null) {
+                StateChangelogStorage<?> loaded = StateChangelogStorageLoader.load(configuration);
+                stateChangelogStorage = Optional.ofNullable(loaded);
+                changelogStoragesByJobId.put(jobId, stateChangelogStorage);
+
+                if (loaded != null) {
+                    LOG.debug(
+                            "Registered new state changelog storage for job {} : {}.",
+                            jobId,
+                            loaded);
+                } else {
+                    LOG.info(
+                            "Try to registered new state changelog storage for job {},"
+                                    + " but result is null.",
+                            jobId);
+                }
+            } else if (stateChangelogStorage.isPresent()) {
+                LOG.debug(
+                        "Found existing state changelog storage for job {}: {}.",
+                        jobId,
+                        stateChangelogStorage.get());
             } else {
-                LOG.info(
-                        "Try to registered new state changelog storage for job {},"
-                                + " but result is null.",
+                LOG.debug(
+                        "Found a previously loaded NULL state changelog storage for job {}.",
                         jobId);
             }
-        } else if (stateChangelogStorage.isPresent()) {
-            LOG.debug(
-                    "Found existing state changelog storage for job {}: {}.",
-                    jobId,
-                    stateChangelogStorage.get());
-        } else {
-            LOG.debug("Found a previously loaded NULL state changelog storage for job {}.", jobId);
-        }
 
-        return stateChangelogStorage.orElse(null);
+            return stateChangelogStorage.orElse(null);
+        }
     }
 
     public void releaseStateChangelogStorageForJob(@Nonnull JobID jobId) {
         LOG.debug("Releasing state changelog storage under job id {}.", jobId);
-        if (closed) {
-            return;
+        Optional<StateChangelogStorage<?>> cleanupChangelogStorage;
+        synchronized (lock) {
+            if (closed) {
+                return;
+            }
+            cleanupChangelogStorage = changelogStoragesByJobId.remove(jobId);
         }
 
-        Optional<StateChangelogStorage<?>> cleanupChangelogStorage =
-                changelogStoragesByJobId.remove(jobId);
-
         if (cleanupChangelogStorage != null) {
             cleanupChangelogStorage.ifPresent(this::doRelease);
         }
     }
 
     public void shutdown() {
-        if (closed) {
-            return;
-        }
-        closed = true;
+        HashMap<JobID, Optional<StateChangelogStorage<?>>> toRelease;
+        synchronized (lock) {
+            if (closed) {
+                return;
+            }
+            closed = true;
 
-        HashMap<JobID, Optional<StateChangelogStorage<?>>> toRelease =
-                new HashMap<>(changelogStoragesByJobId);
-        changelogStoragesByJobId.clear();
+            toRelease = new HashMap<>(changelogStoragesByJobId);
+            changelogStoragesByJobId.clear();
+        }
 
         ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);