You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2022/02/08 14:21:41 UTC

[flink] 07/14: [FLINK-25817] Move local state directory creation into TaskManagerServicesConfiguration.fromConfiguration

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d63c48dded3478cebd68bdf281eee57bf0f28789
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Wed Dec 29 11:45:43 2021 +0100

    [FLINK-25817] Move local state directory creation into TaskManagerServicesConfiguration.fromConfiguration
    
    This commit also adds the resource id to the localState directory in order to avoid clashes with other
    processes running on the same machine.
---
 .../runtime/taskexecutor/TaskManagerServices.java  | 16 ++---------
 .../TaskManagerServicesConfiguration.java          | 32 ++++++++++++++--------
 .../TaskExecutorLocalStateStoresManagerTest.java   | 13 ++++-----
 3 files changed, 28 insertions(+), 33 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index ee2c7218..d6d672d 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.blob.PermanentBlobService;
@@ -67,8 +66,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 public class TaskManagerServices {
     private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServices.class);
 
-    @VisibleForTesting public static final String LOCAL_STATE_SUB_DIRECTORY_ROOT = "localState";
-
     /** TaskManager services. */
     private final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation;
 
@@ -326,20 +323,13 @@ public class TaskManagerServices {
                         unresolvedTaskManagerLocation,
                         taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());
 
-        final String[] stateRootDirectoryStrings =
-                taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
-
-        final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];
-
-        for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
-            stateRootDirectoryFiles[i] =
-                    new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
-        }
+        final File[] stateRootDirectoryStrings =
+                taskManagerServicesConfiguration.getLocalRecoveryStateDirectories();
 
         final TaskExecutorLocalStateStoresManager taskStateManager =
                 new TaskExecutorLocalStateStoresManager(
                         taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
-                        stateRootDirectoryFiles,
+                        taskManagerServicesConfiguration.getLocalRecoveryStateDirectories(),
                         ioExecutor);
 
         final TaskExecutorStateChangelogStoragesManager changelogStoragesManager =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 1f16382..6006c67 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -50,6 +50,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class TaskManagerServicesConfiguration {
 
+    private static final String LOCAL_STATE_SUB_DIRECTORY_ROOT = "localState_";
+
     private final Configuration configuration;
 
     private final ResourceID resourceID;
@@ -64,7 +66,7 @@ public class TaskManagerServicesConfiguration {
 
     private final String[] tmpDirPaths;
 
-    private final String[] localRecoveryStateRootDirectories;
+    private final File[] localRecoveryStateDirectories;
 
     private final int numberOfSlots;
 
@@ -96,7 +98,7 @@ public class TaskManagerServicesConfiguration {
             int externalDataPort,
             boolean localCommunicationOnly,
             String[] tmpDirPaths,
-            String[] localRecoveryStateRootDirectories,
+            File[] localRecoveryStateDirectories,
             boolean localRecoveryEnabled,
             @Nullable QueryableStateConfiguration queryableStateConfig,
             int numberOfSlots,
@@ -116,7 +118,7 @@ public class TaskManagerServicesConfiguration {
         this.externalDataPort = externalDataPort;
         this.localCommunicationOnly = localCommunicationOnly;
         this.tmpDirPaths = checkNotNull(tmpDirPaths);
-        this.localRecoveryStateRootDirectories = checkNotNull(localRecoveryStateRootDirectories);
+        this.localRecoveryStateDirectories = checkNotNull(localRecoveryStateDirectories);
         this.localRecoveryEnabled = checkNotNull(localRecoveryEnabled);
         this.queryableStateConfig = queryableStateConfig;
         this.numberOfSlots = checkNotNull(numberOfSlots);
@@ -170,8 +172,8 @@ public class TaskManagerServicesConfiguration {
         return tmpDirPaths;
     }
 
-    String[] getLocalRecoveryStateRootDirectories() {
-        return localRecoveryStateRootDirectories;
+    File[] getLocalRecoveryStateDirectories() {
+        return localRecoveryStateDirectories;
     }
 
     boolean isLocalRecoveryEnabled() {
@@ -253,12 +255,18 @@ public class TaskManagerServicesConfiguration {
             TaskExecutorResourceSpec taskExecutorResourceSpec,
             WorkingDirectory workingDirectory)
             throws Exception {
-        String[] localStateRootDir = ConfigurationUtils.parseLocalStateDirectories(configuration);
-
-        if (localStateRootDir.length == 0) {
-            final File localStateDir = workingDirectory.getLocalStateDirectory();
-
-            localStateRootDir = new String[] {localStateDir.getAbsolutePath()};
+        String[] localStateRootDirs = ConfigurationUtils.parseLocalStateDirectories(configuration);
+        final File[] localStateDirs;
+
+        if (localStateRootDirs.length == 0) {
+            localStateDirs = new File[] {workingDirectory.getLocalStateDirectory()};
+        } else {
+            localStateDirs = new File[localStateRootDirs.length];
+            final String localStateDirectoryName = LOCAL_STATE_SUB_DIRECTORY_ROOT + resourceID;
+
+            for (int i = 0; i < localStateRootDirs.length; i++) {
+                localStateDirs[i] = new File(localStateRootDirs[i], localStateDirectoryName);
+            }
         }
 
         boolean localRecoveryMode = configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY);
@@ -298,7 +306,7 @@ public class TaskManagerServicesConfiguration {
                 externalDataPort,
                 localCommunicationOnly,
                 tmpDirs,
-                localStateRootDir,
+                localStateDirs,
                 localRecoveryMode,
                 queryableStateConfig,
                 ConfigurationParserUtils.getSlot(configuration),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index 89d6b9c..5d2c82d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -42,6 +42,9 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.net.InetAddress;
+import java.nio.file.Paths;
+
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link TaskExecutorLocalStateStoresManager}. */
 public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
@@ -89,15 +92,12 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
             String[] split = rootDirString.split(",");
             File[] rootDirectories = taskStateManager.getLocalStateRootDirectories();
             for (int i = 0; i < split.length; ++i) {
-                Assert.assertEquals(
-                        new File(split[i], TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
-                        rootDirectories[i]);
+                assertThat(rootDirectories[i].toPath()).startsWith(Paths.get(split[i]));
             }
 
             // verify local recovery mode
             Assert.assertTrue(taskStateManager.isLocalRecoveryEnabled());
 
-            Assert.assertEquals("localState", TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT);
             for (File rootDirectory : rootDirectories) {
                 FileUtils.deleteFileOrDirectory(rootDirectory);
             }
@@ -131,10 +131,7 @@ public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
 
             for (int i = 0; i < localStateRootDirectories.length; ++i) {
                 Assert.assertEquals(
-                        new File(
-                                workingDirectory.getLocalStateDirectory(),
-                                TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
-                        localStateRootDirectories[i]);
+                        workingDirectory.getLocalStateDirectory(), localStateRootDirectories[i]);
             }
 
             Assert.assertFalse(taskStateManager.isLocalRecoveryEnabled());