You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/13 13:19:09 UTC

[GitHub] [flink] zentol commented on a change in pull request #18083: [FLINK-25402] Introduce working directory for Flink processes

zentol commented on a change in pull request #18083:
URL: https://github.com/apache/flink/pull/18083#discussion_r783911518



##########
File path: flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
##########
@@ -547,6 +547,17 @@
                                             code(SchedulerType.AdaptiveBatch.name()))
                                     .build());
 
+    /**
+     * The JobManager's ResourceID. If not configured, the ResourceID will be generated randomly.
+     */
+    @Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
+    public static final ConfigOption<String> JOB_MANAGER_RESOURCE_ID =
+            key("jobmanager.resource-id")

Review comment:
       I'm wondering if we should expose the concept of resource IDs here. Maybe we could do something more generic like "process id" or a plain "jobmanager id" or something. Ideally whatever we expose here eventually also finds its way into the metric system.

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
##########
@@ -157,6 +158,47 @@
                                     UncaughtExceptionHandleMode.LOG.name(),
                                     UncaughtExceptionHandleMode.FAIL.name()));
 
+    @Documentation.OverrideDefault("io.tmp.dirs")
+    @Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
+    public static final ConfigOption<String> PROCESS_WORKING_DIR_BASE =
+            ConfigOptions.key("process.working-dir")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Working directory for Flink processes. "
+                                                    + "The working directory can be used to store information that can be used upon process recovery. "
+                                                    + "If the not configured, then it will default to the first temporary directory defined via %s.",

Review comment:
       ```suggestion
                                                       + "If not configured, then it will default to the first temporary directory defined via %s.",
   ```
   Same issue applies to other options.

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
##########
@@ -157,6 +158,47 @@
                                     UncaughtExceptionHandleMode.LOG.name(),
                                     UncaughtExceptionHandleMode.FAIL.name()));
 
+    @Documentation.OverrideDefault("io.tmp.dirs")
+    @Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
+    public static final ConfigOption<String> PROCESS_WORKING_DIR_BASE =
+            ConfigOptions.key("process.working-dir")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Working directory for Flink processes. "
+                                                    + "The working directory can be used to store information that can be used upon process recovery. "
+                                                    + "If the not configured, then it will default to the first temporary directory defined via %s.",
+                                            TextElement.code(CoreOptions.TMP_DIRS.key()))
+                                    .build());
+
+    @Documentation.OverrideDefault("process.working-dir")
+    @Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
+    public static final ConfigOption<String> JOB_MANAGER_PROCESS_WORKING_DIR_BASE =
+            ConfigOptions.key("process.jobmanager.working-dir")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Working directory for the JobManager process. The working directory can be used to store information that can be used upon process recovery. If the not configured, then it will default to %s.",

Review comment:
       ```suggestion
                                               "Working directory for Flink JobManager processes. The working directory can be used to store information that can be used upon process recovery. If the not configured, then it will default to %s.",
   ```
   Just for consistency with the generic option.

##########
File path: docs/layouts/shortcodes/generated/all_taskmanager_section.html
##########
@@ -8,12 +8,6 @@
         </tr>
     </thead>
     <tbody>
-        <tr>
-            <td><h5>jobmanager.resource-id</h5></td>

Review comment:
       nit: wrong commit

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java
##########
@@ -73,6 +74,25 @@
         return splitPaths(configuration.getString(CoreOptions.TMP_DIRS));
     }
 
+    /**
+     * Extracts the first temporary directory from the given configuration.
+     *
+     * @param configuration to extract the temp directory from
+     * @return the first temporary directory
+     */
+    @Nonnull
+    public static File getFirstTempDirectory(Configuration configuration) {
+        final String[] tmpDirectories = splitPaths(configuration.getString(CoreOptions.TMP_DIRS));

Review comment:
       ```suggestion
           final String[] tmpDirectories = parseTempDirectories(configuration);
   ```

##########
File path: docs/layouts/shortcodes/generated/all_taskmanager_section.html
##########
@@ -8,6 +8,12 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            <td><h5>jobmanager.resource-id</h5></td>

Review comment:
       Why does it show up in this file?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/WorkingDirectory.java
##########
@@ -29,20 +29,32 @@
  */
 public class WorkingDirectory {
     private final File root;
+    private final File tmp;
 
     private WorkingDirectory(File root) throws IOException {
         this.root = root;
+        createDirectory(root);
 
-        if (!root.mkdirs() && !root.exists()) {
+        this.tmp = new File(root, "tmp");
+        createDirectory(tmp);
+        FileUtils.cleanDirectory(tmp);

Review comment:
       Are there any safeguards that the WorkingDirectory is not shared across (active) processes?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
##########
@@ -558,9 +587,29 @@ private Configuration generateClusterConfiguration(Configuration configuration)
      * @throws IOException if the temporary directories could not be cleaned up
      */
     protected void cleanupDirectories() throws IOException {
+        IOException ioException = null;
+
         final String webTmpDir = configuration.getString(WebOptions.TMP_DIR);

Review comment:
       The fact that this directory is also not being cleaned up seems incorrect; shouldn't that behavior be limited to the working directory? Either this directory should be part of the working directory, or it should always be cleaned up.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/WorkingDirectory.java
##########
@@ -38,6 +39,9 @@ private WorkingDirectory(File root) throws IOException {
         this.tmp = new File(root, "tmp");
         createDirectory(tmp);
         FileUtils.cleanDirectory(tmp);
+
+        localState = new File(root, "localState");

Review comment:
       Wondering if this should be somehow connected to `TaskManagerServices#LOCAL_STATE_SUB_DIRECTORY_ROOT`.

##########
File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
##########
@@ -40,7 +43,14 @@
                     .noDefaultValue()
                     .withDeprecatedKeys("state.backend.rocksdb.checkpointdir")
                     .withDescription(
-                            "The local directory (on the TaskManager) where RocksDB puts its files.");
+                            Description.builder()
+                                    .text(
+                                            "The local directory (on the TaskManager) where RocksDB puts its files. Per default, it will be <WORKING_DIR>/tmp. See %s for more details.",

Review comment:
       if `<workingDir>/tmp` is always cleaned when the process starts, then what do we gain by putting rocksdb files in there? I don't quite understand the justification in the FLIP.

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
##########
@@ -164,9 +164,14 @@
             ConfigOptions.key("taskmanager.state.local.root-dirs")
                     .noDefaultValue()
                     .withDescription(
-                            "The config parameter defining the root directories for storing file-based state for local "
-                                    + "recovery. Local recovery currently only covers keyed state backends. Currently, MemoryStateBackend does "
-                                    + "not support local recovery and ignore this option");
+                            Description.builder()

Review comment:
       Should this option be deprecated?

##########
File path: docs/layouts/shortcodes/generated/all_jobmanager_section.html
##########
@@ -56,6 +56,12 @@
             <td>Integer</td>
             <td>The size of the IO thread pool to run blocking operations for all spawned JobMasters. This includes recovery and completion of checkpoints. Increase this value if you experience slow checkpoint operations when running many jobs. If no value is specified, then Flink defaults to the number of available CPU cores.</td>
         </tr>
+        <tr>
+            <td><h5>jobmanager.resource-id</h5></td>

Review comment:
       nit: wrong commit

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
##########
@@ -157,6 +158,47 @@
                                     UncaughtExceptionHandleMode.LOG.name(),
                                     UncaughtExceptionHandleMode.FAIL.name()));
 
+    @Documentation.OverrideDefault("io.tmp.dirs")
+    @Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
+    public static final ConfigOption<String> PROCESS_WORKING_DIR_BASE =
+            ConfigOptions.key("process.working-dir")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Working directory for Flink processes. "
+                                                    + "The working directory can be used to store information that can be used upon process recovery. "
+                                                    + "If the not configured, then it will default to the first temporary directory defined via %s.",
+                                            TextElement.code(CoreOptions.TMP_DIRS.key()))
+                                    .build());
+
+    @Documentation.OverrideDefault("process.working-dir")

Review comment:
       Why is this not a fallbackKey?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
##########
@@ -507,9 +530,15 @@ private Configuration generateClusterConfiguration(Configuration configuration)
             final CompletableFuture<Void> rpcSystemClassLoaderCloseFuture =
                     FutureUtils.runAfterwards(serviceShutdownFuture, rpcSystem::close);
 
-            final CompletableFuture<Void> cleanupDirectoriesFuture =
-                    FutureUtils.runAfterwards(
-                            rpcSystemClassLoaderCloseFuture, this::cleanupDirectories);
+            final CompletableFuture<Void> cleanupDirectoriesFuture;
+
+            if (shutdownBehaviour == ShutdownBehaviour.STOP_APPLICATION) {

Review comment:
       This needs a comment to explain why we only do it for STOP_APPLICATION. it's not obvious what the difference between application & process are in this context.

##########
File path: flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
##########
@@ -157,6 +158,47 @@
                                     UncaughtExceptionHandleMode.LOG.name(),
                                     UncaughtExceptionHandleMode.FAIL.name()));
 
+    @Documentation.OverrideDefault("io.tmp.dirs")
+    @Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
+    public static final ConfigOption<String> PROCESS_WORKING_DIR_BASE =
+            ConfigOptions.key("process.working-dir")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Working directory for Flink processes. "
+                                                    + "The working directory can be used to store information that can be used upon process recovery. "
+                                                    + "If the not configured, then it will default to the first temporary directory defined via %s.",
+                                            TextElement.code(CoreOptions.TMP_DIRS.key()))
+                                    .build());
+
+    @Documentation.OverrideDefault("process.working-dir")
+    @Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
+    public static final ConfigOption<String> JOB_MANAGER_PROCESS_WORKING_DIR_BASE =
+            ConfigOptions.key("process.jobmanager.working-dir")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Working directory for the JobManager process. The working directory can be used to store information that can be used upon process recovery. If the not configured, then it will default to %s.",
+                                            TextElement.code(PROCESS_WORKING_DIR_BASE.key()))
+                                    .build());
+
+    @Documentation.OverrideDefault("process.working-dir")
+    @Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
+    public static final ConfigOption<String> TASK_MANAGER_PROCESS_WORKING_DIR_BASE =
+            ConfigOptions.key("process.taskmanager.working-dir")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Working directory for the TaskManager process. The working directory can be used to store information that can be used upon process recovery. If the not configured, then it will default to %s.",

Review comment:
       ```suggestion
                                               "Working directory for Flink TaskManager processes. The working directory can be used to store information that can be used upon process recovery. If the not configured, then it will default to %s.",
   ```
   Just for consistency with the generic option.

##########
File path: flink-tests/src/test/java/org/apache/flink/test/recovery/ClusterEntrypointITCase.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.DispatcherProcess;
+import org.apache.flink.test.util.TestProcessBuilder;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Duration;
+
+import static org.junit.Assert.assertTrue;
+
+/** Integration tests for the {@link org.apache.flink.runtime.entrypoint.ClusterEntrypoint}. */
+public class ClusterEntrypointITCase extends TestLogger {
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+    @Test
+    public void testWorkingDirectoryIsNotDeletedInCaseOfProcessFailure() throws Exception {
+        final File workingDirBase = TEMPORARY_FOLDER.newFolder();
+        final ResourceID resourceId = ResourceID.generate();
+
+        final Configuration configuration = new Configuration();
+        configuration.set(
+                ClusterOptions.PROCESS_WORKING_DIR_BASE, workingDirBase.getAbsolutePath());
+        configuration.set(JobManagerOptions.JOB_MANAGER_RESOURCE_ID, resourceId.toString());
+
+        final File workingDirectory =
+                ClusterEntrypointUtils.generateJobManagerWorkingDirectoryFile(
+                        configuration, resourceId);
+
+        final TestProcessBuilder.TestProcess taskManagerProcess =

Review comment:
       this is not a taskManagerProcess

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypointUtils.java
##########
@@ -128,4 +134,106 @@ public static void configureUncaughtExceptionHandler(Configuration config) {
                 new ClusterUncaughtExceptionHandler(
                         config.get(ClusterOptions.UNCAUGHT_EXCEPTION_HANDLING)));
     }
+
+    /**
+     * Creates the working directory for the TaskManager process. This method ensures that the
+     * working directory exists.
+     *
+     * @param configuration to extract the required settings from
+     * @param resourceId identifying the TaskManager process
+     * @return working directory
+     * @throws IOException if the working directory could not be created
+     */
+    public static WorkingDirectory createTaskManagerWorkingDirectory(
+            Configuration configuration, ResourceID resourceId) throws IOException {
+        return WorkingDirectory.create(
+                generateTaskManagerWorkingDirectoryFile(configuration, resourceId));
+    }
+
+    /**
+     * Generates the working directory {@link File} for the TaskManager process. This method does
+     * not ensure that the working directory exists.
+     *
+     * @param configuration to extract the required settings from
+     * @param resourceId identifying the TaskManager process
+     * @return working directory file
+     */
+    @VisibleForTesting
+    public static File generateTaskManagerWorkingDirectoryFile(
+            Configuration configuration, ResourceID resourceId) {
+        return generateWorkingDirectoryFile(
+                configuration,
+                Optional.of(ClusterOptions.TASK_MANAGER_PROCESS_WORKING_DIR_BASE),
+                "tm_" + resourceId);
+    }
+
+    /**
+     * Generates the working directory {@link File} for the JobManager process. This method does not
+     * ensure that the working directory exists.
+     *
+     * @param configuration to extract the required settings from
+     * @param resourceId identifying the JobManager process
+     * @return working directory file
+     */
+    @VisibleForTesting
+    public static File generateJobManagerWorkingDirectoryFile(
+            Configuration configuration, ResourceID resourceId) {
+        return generateWorkingDirectoryFile(
+                configuration,
+                Optional.of(ClusterOptions.JOB_MANAGER_PROCESS_WORKING_DIR_BASE),
+                "jm_" + resourceId);
+    }
+
+    /**
+     * Generate the working directory from the given configuration. If a preceding option is
+     * specified, then this config option will be read first for the working directory. Next {@link
+     * ClusterOptions#PROCESS_WORKING_DIR_BASE} will be tried. At last, {@link CoreOptions#TMP_DIRS}
+     * will be used to extract the working directory base from.
+     *
+     * @param configuration to extract the working directory from
+     * @param precedingOption optional preceding option
+     * @param workingDirectoryName name of the working directory to create
+     * @return working directory
+     */
+    public static File generateWorkingDirectoryFile(
+            Configuration configuration,
+            Optional<ConfigOption<String>> precedingOption,
+            String workingDirectoryName) {
+        final Optional<String> optionalWorkingDirectory =
+                getOptionalWorkingDirectory(configuration, precedingOption);
+
+        final File workingDirectoryBase =
+                optionalWorkingDirectory
+                        .map(File::new)
+                        .orElseGet(() -> ConfigurationUtils.getFirstTempDirectory(configuration));
+
+        return new File(workingDirectoryBase, workingDirectoryName);
+    }
+
+    private static Optional<String> getOptionalWorkingDirectory(
+            Configuration configuration, Optional<ConfigOption<String>> precedingOption) {

Review comment:
       This seems overly complicated. If the JM/TM options had a fallback key for PROCESS_WORKING_DIR_BASE this could be simpler.

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/WorkingDirectoryTest.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;

Review comment:
       new tests should use junit5

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
##########
@@ -95,12 +94,14 @@
     @Test
     public void testExitJvmOnOutOfMemory() throws Exception {
         // this test works only on linux
-        assumeTrue(OperatingSystem.isLinux());
+        //        assumeTrue(OperatingSystem.isLinux());

Review comment:
       maybe replace this with `assumeFalse(OperatingSystem.isWindows());`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org