You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2023/01/31 11:18:49 UTC

[flink] 05/05: [FLINK-30823][runtime] Enable speculative execution for FileSystemOutputFormat

This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1420ee7ed69a3ce72eab8027570cc3af9fa828a3
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Mon Jan 30 22:55:58 2023 +0800

    [FLINK-30823][runtime] Enable speculative execution for FileSystemOutputFormat
    
    This closes #21796.
---
 .../connector/file/table/FileSystemCommitter.java  | 14 +++++-
 .../file/table/FileSystemOutputFormat.java         | 37 +++++++++++---
 .../file/table/PartitionTempFileManager.java       | 53 ++++++++++----------
 .../file/table/FileSystemCommitterTest.java        | 30 +++++------
 .../file/table/FileSystemOutputFormatTest.java     | 26 ++++++++--
 .../file/table/PartitionTempFileManagerTest.java   | 58 ++++++++++++++++++++++
 .../connector/file/table/PartitionWriterTest.java  | 23 +++++----
 7 files changed, 177 insertions(+), 64 deletions(-)

diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java
index 2e63117fcd5..960dfffefd0 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.BiPredicate;
 
 import static org.apache.flink.connector.file.table.PartitionTempFileManager.collectPartSpecToPaths;
 import static org.apache.flink.connector.file.table.PartitionTempFileManager.listTaskTemporaryPaths;
@@ -85,8 +86,19 @@ public class FileSystemCommitter {
 
     /** For committing job's output after successful batch job completion. */
     public void commitPartitions() throws Exception {
+        commitPartitions((subtaskIndex, attemptNumber) -> true);
+    }
+
+    /**
+     * Commits the partitions with a filter to filter out invalid task attempt files. In speculative
+     * execution mode, there might be some files which do not belong to the finished attempt.
+     *
+     * @param taskAttemptFilter the filter that accepts subtaskIndex and attemptNumber
+     * @throws Exception if partition commitment fails
+     */
+    public void commitPartitions(BiPredicate<Integer, Integer> taskAttemptFilter) throws Exception {
         FileSystem fs = factory.create(tmpPath.toUri());
-        List<Path> taskPaths = listTaskTemporaryPaths(fs, tmpPath);
+        List<Path> taskPaths = listTaskTemporaryPaths(fs, tmpPath, taskAttemptFilter);
 
         try (PartitionLoader loader =
                 new PartitionLoader(
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
index 22aac83ddb1..f866b8ec230 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.file.table;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
 import org.apache.flink.api.common.io.FinalizeOnMaster;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.configuration.Configuration;
@@ -37,12 +38,17 @@ import java.util.List;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * File system {@link OutputFormat} for batch job. It commit in {@link #finalizeGlobal(int)}.
+ * File system {@link OutputFormat} for batch job. It commits in {@link
+ * #finalizeGlobal(FinalizationContext)}.
  *
  * @param <T> The type of the consumed records.
  */
 @Internal
-public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMaster, Serializable {
+public class FileSystemOutputFormat<T>
+        implements OutputFormat<T>,
+                FinalizeOnMaster,
+                Serializable,
+                SupportsConcurrentExecutionAttempts {
 
     private static final long serialVersionUID = 1L;
 
@@ -93,7 +99,7 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
     }
 
     @Override
-    public void finalizeGlobal(int parallelism) {
+    public void finalizeGlobal(FinalizationContext context) {
         try {
             List<PartitionCommitPolicy> policies = Collections.emptyList();
             if (partitionCommitPolicyFactory != null) {
@@ -120,7 +126,17 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
                             identifier,
                             staticPartitions,
                             policies);
-            committer.commitPartitions();
+            committer.commitPartitions(
+                    (subtaskIndex, attemptNumber) -> {
+                        try {
+                            if (context.getFinishedAttempt(subtaskIndex) == attemptNumber) {
+                                return true;
+                            }
+                        } catch (IllegalArgumentException ignored) {
+                            // maybe met a dir or file which does not belong to this job
+                        }
+                        return false;
+                    });
         } catch (Exception e) {
             throw new TableException("Exception in finalizeGlobal", e);
         } finally {
@@ -137,11 +153,16 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
     }
 
     @Override
-    public void open(int taskNumber, int numTasks) throws IOException {
+    public void open(InitializationContext context) throws IOException {
         try {
             PartitionTempFileManager fileManager =
-                    new PartitionTempFileManager(fsFactory, tmpPath, taskNumber, outputFileConfig);
-            PartitionWriter.Context<T> context =
+                    new PartitionTempFileManager(
+                            fsFactory,
+                            tmpPath,
+                            context.getTaskNumber(),
+                            context.getAttemptNumber(),
+                            outputFileConfig);
+            PartitionWriter.Context<T> writerContext =
                     new PartitionWriter.Context<>(parameters, formatFactory);
             writer =
                     PartitionWriterFactory.<T>get(
@@ -149,7 +170,7 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
                                     dynamicGrouped,
                                     staticPartitions)
                             .create(
-                                    context,
+                                    writerContext,
                                     fileManager,
                                     computer,
                                     new PartitionWriter.DefaultPartitionWriterListener());
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionTempFileManager.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionTempFileManager.java
index 14bf3a30c81..32161b0291f 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionTempFileManager.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionTempFileManager.java
@@ -33,6 +33,9 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.BiPredicate;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static org.apache.flink.table.utils.PartitionPathUtils.searchPartSpecAndPaths;
 
@@ -42,7 +45,7 @@ import static org.apache.flink.table.utils.PartitionPathUtils.searchPartSpecAndP
  *
  * <p>Temporary file directory contains the following directory parts: 1.temporary base path
  * directory. 2.task id directory. 3.directories to specify partitioning. 4.data files. eg:
- * /tmp/task-0/p0=1/p1=2/fileName.
+ * /tmp/task-0-attempt-0/p0=1/p1=2/fileName.
  */
 @Internal
 public class PartitionTempFileManager {
@@ -51,29 +54,21 @@ public class PartitionTempFileManager {
     private static final String TASK_DIR_PREFIX = "task-";
     private static final String ATTEMPT_PREFIX = "attempt-";
 
+    /** <b>ATTENTION:</b> please keep TASK_DIR_FORMAT matching with TASK_DIR_PATTERN. */
+    private static final String TASK_DIR_FORMAT = "%s%d-%s%d";
+
+    private static final Pattern TASK_DIR_PATTERN =
+            Pattern.compile(TASK_DIR_PREFIX + "(\\d+)-" + ATTEMPT_PREFIX + "(\\d+)");
     private final int taskNumber;
     private final Path taskTmpDir;
     private final OutputFileConfig outputFileConfig;
 
     private transient int nameCounter = 0;
 
-    public PartitionTempFileManager(FileSystemFactory factory, Path tmpPath, int taskNumber)
-            throws IOException {
-        this(factory, tmpPath, taskNumber, new OutputFileConfig("", ""));
-    }
-
-    PartitionTempFileManager(
-            FileSystemFactory factory,
-            Path tmpPath,
-            int taskNumber,
-            OutputFileConfig outputFileConfig)
+    public PartitionTempFileManager(
+            FileSystemFactory factory, Path tmpPath, int taskNumber, int attemptNumber)
             throws IOException {
-        this.taskNumber = taskNumber;
-        this.outputFileConfig = outputFileConfig;
-
-        // generate and clean task temp dir.
-        this.taskTmpDir = new Path(tmpPath, TASK_DIR_PREFIX + taskNumber);
-        factory.create(taskTmpDir.toUri()).delete(taskTmpDir, true);
+        this(factory, tmpPath, taskNumber, attemptNumber, new OutputFileConfig("", ""));
     }
 
     public PartitionTempFileManager(
@@ -89,7 +84,11 @@ public class PartitionTempFileManager {
         // generate task temp dir with task and attempt number like "task-0-attempt-0"
         String taskTmpDirName =
                 String.format(
-                        "%s%d-%s%d", TASK_DIR_PREFIX, taskNumber, ATTEMPT_PREFIX, attemptNumber);
+                        TASK_DIR_FORMAT,
+                        TASK_DIR_PREFIX,
+                        taskNumber,
+                        ATTEMPT_PREFIX,
+                        attemptNumber);
         this.taskTmpDir = new Path(tmpPath, taskTmpDirName);
         factory.create(taskTmpDir.toUri()).delete(taskTmpDir, true);
     }
@@ -112,22 +111,26 @@ public class PartitionTempFileManager {
                 outputFileConfig.getPartSuffix());
     }
 
-    private static boolean isTaskDir(String fileName) {
-        return fileName.startsWith(TASK_DIR_PREFIX);
-    }
-
     private static String taskName(int task) {
         return TASK_DIR_PREFIX + task;
     }
 
     /** Returns task temporary paths in this checkpoint. */
-    public static List<Path> listTaskTemporaryPaths(FileSystem fs, Path basePath) throws Exception {
+    public static List<Path> listTaskTemporaryPaths(
+            FileSystem fs, Path basePath, BiPredicate<Integer, Integer> taskAttemptFilter)
+            throws Exception {
         List<Path> taskTmpPaths = new ArrayList<>();
 
         if (fs.exists(basePath)) {
             for (FileStatus taskStatus : fs.listStatus(basePath)) {
-                if (isTaskDir(taskStatus.getPath().getName())) {
-                    taskTmpPaths.add(taskStatus.getPath());
+                final String taskDirName = taskStatus.getPath().getName();
+                final Matcher matcher = TASK_DIR_PATTERN.matcher(taskDirName);
+                if (matcher.matches()) {
+                    final int subtaskIndex = Integer.parseInt(matcher.group(1));
+                    final int attemptNumber = Integer.parseInt(matcher.group(2));
+                    if (taskAttemptFilter.test(subtaskIndex, attemptNumber)) {
+                        taskTmpPaths.add(taskStatus.getPath());
+                    }
                 }
             }
         } else {
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java
index 65e1c7d3147..a3ebda3d5dc 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java
@@ -83,9 +83,9 @@ public class FileSystemCommitterTest {
                         new LinkedHashMap<>(),
                         policies);
 
-        createFile(path, "task-1/p1=0/p2=0/", "f1", "f2");
-        createFile(path, "task-2/p1=0/p2=0/", "f3");
-        createFile(path, "task-2/p1=0/p2=1/", "f4");
+        createFile(path, "task-1-attempt-0/p1=0/p2=0/", "f1", "f2");
+        createFile(path, "task-2-attempt-0/p1=0/p2=0/", "f3");
+        createFile(path, "task-2-attempt-0/p1=0/p2=1/", "f4");
         committer.commitPartitions();
         assertThat(new File(outputPath.toFile(), "p1=0/p2=0/f1")).exists();
         assertThat(new File(outputPath.toFile(), "p1=0/p2=0/f2")).exists();
@@ -94,7 +94,7 @@ public class FileSystemCommitterTest {
         assertThat(new File(outputPath.toFile(), "p1=0/p2=1/f4")).exists();
         assertThat(new File(outputPath.toFile(), "p1=0/p2=1/" + SUCCESS_FILE_NAME)).exists();
 
-        createFile(path, "task-2/p1=0/p2=1/", "f5");
+        createFile(path, "task-2-attempt-0/p1=0/p2=1/", "f5");
         committer.commitPartitions();
         assertThat(new File(outputPath.toFile(), "p1=0/p2=0/f1")).exists();
         assertThat(new File(outputPath.toFile(), "p1=0/p2=0/f2")).exists();
@@ -114,7 +114,7 @@ public class FileSystemCommitterTest {
                         identifier,
                         new LinkedHashMap<>(),
                         policies);
-        createFile(path, "task-2/p1=0/p2=1/", "f6");
+        createFile(path, "task-2-attempt-0/p1=0/p2=1/", "f6");
         committer.commitPartitions();
         assertThat(new File(outputPath.toFile(), "p1=0/p2=1/f5")).exists();
         assertThat(new File(outputPath.toFile(), "p1=0/p2=1/f6")).exists();
@@ -135,15 +135,15 @@ public class FileSystemCommitterTest {
                         new LinkedHashMap<String, String>(),
                         policies);
 
-        createFile(path, "task-1/", "f1", "f2");
-        createFile(path, "task-2/", "f3");
+        createFile(path, "task-1-attempt-0/", "f1", "f2");
+        createFile(path, "task-2-attempt-0/", "f3");
         committer.commitPartitions();
         assertThat(new File(outputPath.toFile(), "f1")).exists();
         assertThat(new File(outputPath.toFile(), "f2")).exists();
         assertThat(new File(outputPath.toFile(), "f3")).exists();
         assertThat(new File(outputPath.toFile(), SUCCESS_FILE_NAME)).exists();
 
-        createFile(path, "task-2/", "f4");
+        createFile(path, "task-2-attempt-0/", "f4");
         committer.commitPartitions();
         assertThat(new File(outputPath.toFile(), "f4")).exists();
         assertThat(new File(outputPath.toFile(), SUCCESS_FILE_NAME)).exists();
@@ -159,7 +159,7 @@ public class FileSystemCommitterTest {
                         identifier,
                         new LinkedHashMap<String, String>(),
                         policies);
-        createFile(path, "task-2/", "f5");
+        createFile(path, "task-2-attempt-0/", "f5");
         committer.commitPartitions();
         assertThat(new File(outputPath.toFile(), "f4")).exists();
         assertThat(new File(outputPath.toFile(), "f5")).exists();
@@ -183,8 +183,8 @@ public class FileSystemCommitterTest {
                         staticPartitions,
                         policies);
 
-        createFile(path, "task-1/dt=2022-08-02/");
-        createFile(path, "task-2/dt=2022-08-02/");
+        createFile(path, "task-1-attempt-0/dt=2022-08-02/");
+        createFile(path, "task-2-attempt-0/dt=2022-08-02/");
 
         committer.commitPartitions();
 
@@ -201,8 +201,8 @@ public class FileSystemCommitterTest {
         createFile(outputPath, "dt=2022-08-02/f1");
         assertThat(new File(emptyPartitionFile, "f1")).exists();
 
-        createFile(path, "task-1/dt=2022-08-02/");
-        createFile(path, "task-2/dt=2022-08-02/");
+        createFile(path, "task-1-attempt-0/dt=2022-08-02/");
+        createFile(path, "task-2-attempt-0/dt=2022-08-02/");
         committer.commitPartitions();
 
         // assert partition dir is still empty because the partition dir is overwritten
@@ -216,8 +216,8 @@ public class FileSystemCommitterTest {
         createFile(outputPath, "dt=2022-08-02/f1");
         assertThat(new File(emptyPartitionFile, "f1")).exists();
 
-        createFile(path, "task-1/dt=2022-08-02/");
-        createFile(path, "task-2/dt=2022-08-02/");
+        createFile(path, "task-1-attempt-0/dt=2022-08-02/");
+        createFile(path, "task-2-attempt-0/dt=2022-08-02/");
         committer =
                 new FileSystemCommitter(
                         fileSystemFactory,
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java
index e3f31e3a708..964304d5ae7 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.file.table;
 
+import org.apache.flink.api.common.io.FinalizeOnMaster.FinalizationContext;
 import org.apache.flink.api.java.io.TextOutputFormat;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
@@ -52,6 +53,8 @@ class FileSystemOutputFormatTest {
     @TempDir private java.nio.file.Path tmpPath;
     @TempDir private java.nio.file.Path outputPath;
 
+    private final TestingFinalizationContext finalizationContext = new TestingFinalizationContext();
+
     private static Map<File, String> getFileContentByPath(java.nio.file.Path directory)
             throws IOException {
         Map<File, String> contents = new HashMap<>(4);
@@ -95,7 +98,7 @@ class FileSystemOutputFormatTest {
             assertThat(getFileContentByPath(tmpPath)).hasSize(1);
         }
 
-        ref.get().finalizeGlobal(1);
+        ref.get().finalizeGlobal(finalizationContext);
         Map<File, String> content = getFileContentByPath(outputPath);
         assertThat(content.values())
                 .containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n");
@@ -123,7 +126,7 @@ class FileSystemOutputFormatTest {
             assertThat(getFileContentByPath(tmpPath)).hasSize(1);
         }
 
-        ref.get().finalizeGlobal(1);
+        ref.get().finalizeGlobal(finalizationContext);
         Map<File, String> content = getFileContentByPath(outputPath);
         assertThat(content).hasSize(1);
         assertThat(content.values())
@@ -148,7 +151,7 @@ class FileSystemOutputFormatTest {
             assertThat(getFileContentByPath(tmpPath)).hasSize(1);
         }
 
-        ref.get().finalizeGlobal(1);
+        ref.get().finalizeGlobal(finalizationContext);
         Map<File, String> content = getFileContentByPath(outputPath);
         assertThat(content).hasSize(1);
         assertThat(content.keySet().iterator().next().getParentFile().getName()).isEqualTo("c=p1");
@@ -165,7 +168,7 @@ class FileSystemOutputFormatTest {
             assertThat(getFileContentByPath(tmpPath)).hasSize(2);
         }
 
-        ref.get().finalizeGlobal(1);
+        ref.get().finalizeGlobal(finalizationContext);
         Map<File, String> content = getFileContentByPath(outputPath);
         Map<String, String> sortedContent = new TreeMap<>();
         content.forEach((file, s) -> sortedContent.put(file.getParentFile().getName(), s));
@@ -191,7 +194,7 @@ class FileSystemOutputFormatTest {
             assertThat(getFileContentByPath(tmpPath)).hasSize(2);
         }
 
-        ref.get().finalizeGlobal(1);
+        ref.get().finalizeGlobal(finalizationContext);
         Map<File, String> content = getFileContentByPath(outputPath);
         Map<String, String> sortedContent = new TreeMap<>();
         content.forEach((file, s) -> sortedContent.put(file.getParentFile().getName(), s));
@@ -236,4 +239,17 @@ class FileSystemOutputFormatTest {
                 3,
                 0);
     }
+
+    private static class TestingFinalizationContext implements FinalizationContext {
+
+        @Override
+        public int getParallelism() {
+            return 1;
+        }
+
+        @Override
+        public int getFinishedAttempt(int subtaskIndex) {
+            return 0;
+        }
+    }
 }
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/PartitionTempFileManagerTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/PartitionTempFileManagerTest.java
new file mode 100644
index 00000000000..2f95bda2d8b
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/PartitionTempFileManagerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.List;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link PartitionTempFileManager}. */
+class PartitionTempFileManagerTest {
+
+    @TempDir private java.nio.file.Path tmpPath;
+
+    @Test
+    void testListTaskTemporaryPaths() throws Exception {
+        // only accept task-0-attempt-1
+        final BiPredicate<Integer, Integer> taskAttemptFilter =
+                (subtaskIndex, attemptNumber) -> subtaskIndex == 0 && attemptNumber == 1;
+
+        final FileSystem fs = FileSystem.get(tmpPath.toUri());
+        fs.mkdirs(new Path(tmpPath.toUri() + "/task-0-attempt-0")); // invalid attempt number
+        fs.mkdirs(new Path(tmpPath.toUri() + "/task-0-attempt-1")); // valid
+        fs.mkdirs(new Path(tmpPath.toUri() + "/task-1-attempt-0")); // invalid subtask index
+        fs.mkdirs(new Path(tmpPath.toUri() + "/.task-0-attempt-1")); // invisible dir
+        fs.mkdirs(new Path(tmpPath.toUri() + "/_SUCCESS")); // not a task dir
+
+        final List<Path> taskTmpPaths =
+                PartitionTempFileManager.listTaskTemporaryPaths(
+                        fs, new Path(tmpPath.toUri()), taskAttemptFilter);
+        final List<String> taskDirs =
+                taskTmpPaths.stream().map(Path::getName).collect(Collectors.toList());
+        assertThat(taskDirs).hasSize(1).containsExactly("task-0-attempt-1");
+    }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/PartitionWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/PartitionWriterTest.java
index 7093532b103..5f6e402316d 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/PartitionWriterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/PartitionWriterTest.java
@@ -50,7 +50,7 @@ class PartitionWriterTest {
 
     @BeforeEach
     void before() throws IOException {
-        manager = new PartitionTempFileManager(fsFactory, new Path(tmpDir.toUri()), 0);
+        manager = new PartitionTempFileManager(fsFactory, new Path(tmpDir.toUri()), 0, 0);
         usesLegacyRows.before();
     }
 
@@ -132,16 +132,17 @@ class PartitionWriterTest {
         writer.write(Row.of("p1", 2));
         writer.write(Row.of("p2", 2));
         writer.close();
-        assertThat(records.toString()).isEqualTo("{task-0=[p1,1, p1,2, p2,2]}");
+        assertThat(records.toString()).isEqualTo("{task-0-attempt-0=[p1,1, p1,2, p2,2]}");
 
-        manager = new PartitionTempFileManager(fsFactory, new Path(tmpDir.toUri()), 1);
+        manager = new PartitionTempFileManager(fsFactory, new Path(tmpDir.toUri()), 1, 0);
         writer = new SingleDirectoryWriter<>(context, manager, computer, new LinkedHashMap<>());
         writer.write(Row.of("p3", 3));
         writer.write(Row.of("p5", 5));
         writer.write(Row.of("p2", 2));
         writer.close();
         assertThat(records.toString())
-                .isEqualTo("{task-0=[p1,1, p1,2, p2,2], task-1=[p3,3, p5,5, p2,2]}");
+                .isEqualTo(
+                        "{task-0-attempt-0=[p1,1, p1,2, p2,2], task-1-attempt-0=[p3,3, p5,5, p2,2]}");
     }
 
     @Test
@@ -153,9 +154,10 @@ class PartitionWriterTest {
         writer.write(Row.of("p1", 2));
         writer.write(Row.of("p2", 2));
         writer.close();
-        assertThat(records.toString()).isEqualTo("{task-0/p=p1=[p1,1, p1,2], task-0/p=p2=[p2,2]}");
+        assertThat(records.toString())
+                .isEqualTo("{task-0-attempt-0/p=p1=[p1,1, p1,2], task-0-attempt-0/p=p2=[p2,2]}");
 
-        manager = new PartitionTempFileManager(fsFactory, new Path(tmpDir.toUri()), 1);
+        manager = new PartitionTempFileManager(fsFactory, new Path(tmpDir.toUri()), 1, 1);
         writer = new GroupedPartitionWriter<>(context, manager, computer);
         writer.write(Row.of("p3", 3));
         writer.write(Row.of("p4", 5));
@@ -163,7 +165,7 @@ class PartitionWriterTest {
         writer.close();
         assertThat(records.toString())
                 .isEqualTo(
-                        "{task-0/p=p1=[p1,1, p1,2], task-0/p=p2=[p2,2], task-1/p=p3=[p3,3], task-1/p=p4=[p4,5], task-1/p=p5=[p5,2]}");
+                        "{task-0-attempt-0/p=p1=[p1,1, p1,2], task-0-attempt-0/p=p2=[p2,2], task-1-attempt-1/p=p3=[p3,3], task-1-attempt-1/p=p4=[p4,5], task-1-attempt-1/p=p5=[p5,2]}");
     }
 
     @Test
@@ -175,9 +177,10 @@ class PartitionWriterTest {
         writer.write(Row.of("p2", 2));
         writer.write(Row.of("p1", 2));
         writer.close();
-        assertThat(records.toString()).isEqualTo("{task-0/p=p1=[p1,1, p1,2], task-0/p=p2=[p2,2]}");
+        assertThat(records.toString())
+                .isEqualTo("{task-0-attempt-0/p=p1=[p1,1, p1,2], task-0-attempt-0/p=p2=[p2,2]}");
 
-        manager = new PartitionTempFileManager(fsFactory, new Path(tmpDir.toUri()), 1);
+        manager = new PartitionTempFileManager(fsFactory, new Path(tmpDir.toUri()), 1, 1);
         writer = new DynamicPartitionWriter<>(context, manager, computer);
         writer.write(Row.of("p4", 5));
         writer.write(Row.of("p3", 3));
@@ -185,6 +188,6 @@ class PartitionWriterTest {
         writer.close();
         assertThat(records.toString())
                 .isEqualTo(
-                        "{task-0/p=p1=[p1,1, p1,2], task-0/p=p2=[p2,2], task-1/p=p4=[p4,5], task-1/p=p3=[p3,3], task-1/p=p5=[p5,2]}");
+                        "{task-0-attempt-0/p=p1=[p1,1, p1,2], task-0-attempt-0/p=p2=[p2,2], task-1-attempt-1/p=p4=[p4,5], task-1-attempt-1/p=p3=[p3,3], task-1-attempt-1/p=p5=[p5,2]}");
     }
 }