You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/01/22 11:17:40 UTC
[flink] branch release-1.14 updated: [FLINK-25678][runtime] Make TaskExecutorStateChangelogStoragesManager.shutdown thread-safe
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 11a406e [FLINK-25678][runtime] Make TaskExecutorStateChangelogStoragesManager.shutdown thread-safe
11a406e is described below
commit 11a406e67057ca9260c16c08054c209e3452a291
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Jan 19 15:22:51 2022 +0100
[FLINK-25678][runtime] Make TaskExecutorStateChangelogStoragesManager.shutdown thread-safe
The method is called from the shutdown hook and must be thread-safe.
---
.../TaskExecutorStateChangelogStoragesManager.java | 100 ++++++++++++---------
1 file changed, 57 insertions(+), 43 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
index 1af6e35..934eff8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
@@ -29,16 +29,16 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-/**
- * This class holds the all {@link StateChangelogStorage} objects for a task executor (manager). No
- * thread-safe.
- */
+/** This class holds the all {@link StateChangelogStorage} objects for a task executor (manager). */
+@ThreadSafe
public class TaskExecutorStateChangelogStoragesManager {
/** Logger for this class. */
@@ -50,10 +50,14 @@ public class TaskExecutorStateChangelogStoragesManager {
* that own the instance of this. Maps from job id to all the subtask's state changelog
* storages. Value type Optional is for containing the null value.
*/
+ @GuardedBy("lock")
private final Map<JobID, Optional<StateChangelogStorage<?>>> changelogStoragesByJobId;
+ @GuardedBy("lock")
private boolean closed;
+ private final Object lock = new Object();
+
/** shutdown hook for this manager. */
private final Thread shutdownHook;
@@ -69,63 +73,73 @@ public class TaskExecutorStateChangelogStoragesManager {
@Nullable
public StateChangelogStorage<?> stateChangelogStorageForJob(
@Nonnull JobID jobId, Configuration configuration) throws IOException {
- if (closed) {
- throw new IllegalStateException(
- "TaskExecutorStateChangelogStoragesManager is already closed and cannot "
- + "register a new StateChangelogStorage.");
- }
-
- Optional<StateChangelogStorage<?>> stateChangelogStorage =
- changelogStoragesByJobId.get(jobId);
-
- if (stateChangelogStorage == null) {
- StateChangelogStorage<?> loaded = StateChangelogStorageLoader.load(configuration);
- stateChangelogStorage = Optional.ofNullable(loaded);
- changelogStoragesByJobId.put(jobId, stateChangelogStorage);
+ synchronized (lock) {
+ if (closed) {
+ throw new IllegalStateException(
+ "TaskExecutorStateChangelogStoragesManager is already closed and cannot "
+ + "register a new StateChangelogStorage.");
+ }
- if (loaded != null) {
- LOG.debug("Registered new state changelog storage for job {} : {}.", jobId, loaded);
+ Optional<StateChangelogStorage<?>> stateChangelogStorage =
+ changelogStoragesByJobId.get(jobId);
+
+ if (stateChangelogStorage == null) {
+ StateChangelogStorage<?> loaded = StateChangelogStorageLoader.load(configuration);
+ stateChangelogStorage = Optional.ofNullable(loaded);
+ changelogStoragesByJobId.put(jobId, stateChangelogStorage);
+
+ if (loaded != null) {
+ LOG.debug(
+ "Registered new state changelog storage for job {} : {}.",
+ jobId,
+ loaded);
+ } else {
+ LOG.info(
+ "Try to registered new state changelog storage for job {},"
+ + " but result is null.",
+ jobId);
+ }
+ } else if (stateChangelogStorage.isPresent()) {
+ LOG.debug(
+ "Found existing state changelog storage for job {}: {}.",
+ jobId,
+ stateChangelogStorage.get());
} else {
- LOG.info(
- "Try to registered new state changelog storage for job {},"
- + " but result is null.",
+ LOG.debug(
+ "Found a previously loaded NULL state changelog storage for job {}.",
jobId);
}
- } else if (stateChangelogStorage.isPresent()) {
- LOG.debug(
- "Found existing state changelog storage for job {}: {}.",
- jobId,
- stateChangelogStorage.get());
- } else {
- LOG.debug("Found a previously loaded NULL state changelog storage for job {}.", jobId);
- }
- return stateChangelogStorage.orElse(null);
+ return stateChangelogStorage.orElse(null);
+ }
}
public void releaseStateChangelogStorageForJob(@Nonnull JobID jobId) {
LOG.debug("Releasing state changelog storage under job id {}.", jobId);
- if (closed) {
- return;
+ Optional<StateChangelogStorage<?>> cleanupChangelogStorage;
+ synchronized (lock) {
+ if (closed) {
+ return;
+ }
+ cleanupChangelogStorage = changelogStoragesByJobId.remove(jobId);
}
- Optional<StateChangelogStorage<?>> cleanupChangelogStorage =
- changelogStoragesByJobId.remove(jobId);
-
if (cleanupChangelogStorage != null) {
cleanupChangelogStorage.ifPresent(this::doRelease);
}
}
public void shutdown() {
- if (closed) {
- return;
- }
- closed = true;
+ HashMap<JobID, Optional<StateChangelogStorage<?>>> toRelease;
+ synchronized (lock) {
+ if (closed) {
+ return;
+ }
+ closed = true;
- HashMap<JobID, Optional<StateChangelogStorage<?>>> toRelease =
- new HashMap<>(changelogStoragesByJobId);
- changelogStoragesByJobId.clear();
+ toRelease = new HashMap<>(changelogStoragesByJobId);
+ changelogStoragesByJobId.clear();
+ }
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);