You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2023/01/31 11:18:49 UTC
[flink] 05/05: [FLINK-30823][runtime] Enable speculative execution for FileSystemOutputFormat
This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1420ee7ed69a3ce72eab8027570cc3af9fa828a3
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Mon Jan 30 22:55:58 2023 +0800
[FLINK-30823][runtime] Enable speculative execution for FileSystemOutputFormat
This closes #21796.
---
.../connector/file/table/FileSystemCommitter.java | 14 +++++-
.../file/table/FileSystemOutputFormat.java | 37 +++++++++++---
.../file/table/PartitionTempFileManager.java | 53 ++++++++++----------
.../file/table/FileSystemCommitterTest.java | 30 +++++------
.../file/table/FileSystemOutputFormatTest.java | 26 ++++++++--
.../file/table/PartitionTempFileManagerTest.java | 58 ++++++++++++++++++++++
.../connector/file/table/PartitionWriterTest.java | 23 +++++----
7 files changed, 177 insertions(+), 64 deletions(-)
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java
index 2e63117fcd5..960dfffefd0 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.BiPredicate;
import static org.apache.flink.connector.file.table.PartitionTempFileManager.collectPartSpecToPaths;
import static org.apache.flink.connector.file.table.PartitionTempFileManager.listTaskTemporaryPaths;
@@ -85,8 +86,19 @@ public class FileSystemCommitter {
/** For committing job's output after successful batch job completion. */
public void commitPartitions() throws Exception {
+ commitPartitions((subtaskIndex, attemptNumber) -> true);
+ }
+
+ /**
+ * Commits the partitions with a filter to filter out invalid task attempt files. In speculative
+ * execution mode, there might be some files which do not belong to the finished attempt.
+ *
+ * @param taskAttemptFilter the filter that accepts subtaskIndex and attemptNumber
+ * @throws Exception if partition commitment fails
+ */
+ public void commitPartitions(BiPredicate<Integer, Integer> taskAttemptFilter) throws Exception {
FileSystem fs = factory.create(tmpPath.toUri());
- List<Path> taskPaths = listTaskTemporaryPaths(fs, tmpPath);
+ List<Path> taskPaths = listTaskTemporaryPaths(fs, tmpPath, taskAttemptFilter);
try (PartitionLoader loader =
new PartitionLoader(
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
index 22aac83ddb1..f866b8ec230 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
@@ -19,6 +19,7 @@
package org.apache.flink.connector.file.table;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
import org.apache.flink.api.common.io.FinalizeOnMaster;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.configuration.Configuration;
@@ -37,12 +38,17 @@ import java.util.List;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * File system {@link OutputFormat} for batch job. It commit in {@link #finalizeGlobal(int)}.
+ * File system {@link OutputFormat} for batch job. It commits in {@link
+ * #finalizeGlobal(FinalizationContext)}.
*
* @param <T> The type of the consumed records.
*/
@Internal
-public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMaster, Serializable {
+public class FileSystemOutputFormat<T>
+ implements OutputFormat<T>,
+ FinalizeOnMaster,
+ Serializable,
+ SupportsConcurrentExecutionAttempts {
private static final long serialVersionUID = 1L;
@@ -93,7 +99,7 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
}
@Override
- public void finalizeGlobal(int parallelism) {
+ public void finalizeGlobal(FinalizationContext context) {
try {
List<PartitionCommitPolicy> policies = Collections.emptyList();
if (partitionCommitPolicyFactory != null) {
@@ -120,7 +126,17 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
identifier,
staticPartitions,
policies);
- committer.commitPartitions();
+ committer.commitPartitions(
+ (subtaskIndex, attemptNumber) -> {
+ try {
+ if (context.getFinishedAttempt(subtaskIndex) == attemptNumber) {
+ return true;
+ }
+ } catch (IllegalArgumentException ignored) {
+ // maybe met a dir or file which does not belong to this job
+ }
+ return false;
+ });
} catch (Exception e) {
throw new TableException("Exception in finalizeGlobal", e);
} finally {
@@ -137,11 +153,16 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
}
@Override
- public void open(int taskNumber, int numTasks) throws IOException {
+ public void open(InitializationContext context) throws IOException {
try {
PartitionTempFileManager fileManager =
- new PartitionTempFileManager(fsFactory, tmpPath, taskNumber, outputFileConfig);
- PartitionWriter.Context<T> context =
+ new PartitionTempFileManager(
+ fsFactory,
+ tmpPath,
+ context.getTaskNumber(),
+ context.getAttemptNumber(),
+ outputFileConfig);
+ PartitionWriter.Context<T> writerContext =
new PartitionWriter.Context<>(parameters, formatFactory);
writer =
PartitionWriterFactory.<T>get(
@@ -149,7 +170,7 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
dynamicGrouped,
staticPartitions)
.create(
- context,
+ writerContext,
fileManager,
computer,
new PartitionWriter.DefaultPartitionWriterListener());
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionTempFileManager.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionTempFileManager.java
index 14bf3a30c81..32161b0291f 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionTempFileManager.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionTempFileManager.java
@@ -33,6 +33,9 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.BiPredicate;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import static org.apache.flink.table.utils.PartitionPathUtils.searchPartSpecAndPaths;
@@ -42,7 +45,7 @@ import static org.apache.flink.table.utils.PartitionPathUtils.searchPartSpecAndP
*
* <p>Temporary file directory contains the following directory parts: 1.temporary base path
* directory. 2.task id directory. 3.directories to specify partitioning. 4.data files. eg:
- * /tmp/task-0/p0=1/p1=2/fileName.
+ * /tmp/task-0-attempt-0/p0=1/p1=2/fileName.
*/
@Internal
public class PartitionTempFileManager {
@@ -51,29 +54,21 @@ public class PartitionTempFileManager {
private static final String TASK_DIR_PREFIX = "task-";
private static final String ATTEMPT_PREFIX = "attempt-";
+ /** <b>ATTENTION:</b> please keep TASK_DIR_FORMAT matching with TASK_DIR_PATTERN. */
+ private static final String TASK_DIR_FORMAT = "%s%d-%s%d";
+
+ private static final Pattern TASK_DIR_PATTERN =
+ Pattern.compile(TASK_DIR_PREFIX + "(\\d+)-" + ATTEMPT_PREFIX + "(\\d+)");
private final int taskNumber;
private final Path taskTmpDir;
private final OutputFileConfig outputFileConfig;
private transient int nameCounter = 0;
- public PartitionTempFileManager(FileSystemFactory factory, Path tmpPath, int taskNumber)
- throws IOException {
- this(factory, tmpPath, taskNumber, new OutputFileConfig("", ""));
- }
-
- PartitionTempFileManager(
- FileSystemFactory factory,
- Path tmpPath,
- int taskNumber,
- OutputFileConfig outputFileConfig)
+ public PartitionTempFileManager(
+ FileSystemFactory factory, Path tmpPath, int taskNumber, int attemptNumber)
throws IOException {
- this.taskNumber = taskNumber;
- this.outputFileConfig = outputFileConfig;
-
- // generate and clean task temp dir.
- this.taskTmpDir = new Path(tmpPath, TASK_DIR_PREFIX + taskNumber);
- factory.create(taskTmpDir.toUri()).delete(taskTmpDir, true);
+ this(factory, tmpPath, taskNumber, attemptNumber, new OutputFileConfig("", ""));
}
public PartitionTempFileManager(
@@ -89,7 +84,11 @@ public class PartitionTempFileManager {
// generate task temp dir with task and attempt number like "task-0-attempt-0"
String taskTmpDirName =
String.format(
- "%s%d-%s%d", TASK_DIR_PREFIX, taskNumber, ATTEMPT_PREFIX, attemptNumber);
+ TASK_DIR_FORMAT,
+ TASK_DIR_PREFIX,
+ taskNumber,
+ ATTEMPT_PREFIX,
+ attemptNumber);
this.taskTmpDir = new Path(tmpPath, taskTmpDirName);
factory.create(taskTmpDir.toUri()).delete(taskTmpDir, true);
}
@@ -112,22 +111,26 @@ public class PartitionTempFileManager {
outputFileConfig.getPartSuffix());
}
- private static boolean isTaskDir(String fileName) {
- return fileName.startsWith(TASK_DIR_PREFIX);
- }
-
private static String taskName(int task) {
return TASK_DIR_PREFIX + task;
}
/** Returns task temporary paths in this checkpoint. */
- public static List<Path> listTaskTemporaryPaths(FileSystem fs, Path basePath) throws Exception {
+ public static List<Path> listTaskTemporaryPaths(
+ FileSystem fs, Path basePath, BiPredicate<Integer, Integer> taskAttemptFilter)
+ throws Exception {
List<Path> taskTmpPaths = new ArrayList<>();
if (fs.exists(basePath)) {
for (FileStatus taskStatus : fs.listStatus(basePath)) {
- if (isTaskDir(taskStatus.getPath().getName())) {
- taskTmpPaths.add(taskStatus.getPath());
+ final String taskDirName = taskStatus.getPath().getName();
+ final Matcher matcher = TASK_DIR_PATTERN.matcher(taskDirName);
+ if (matcher.matches()) {
+ final int subtaskIndex = Integer.parseInt(matcher.group(1));
+ final int attemptNumber = Integer.parseInt(matcher.group(2));
+ if (taskAttemptFilter.test(subtaskIndex, attemptNumber)) {
+ taskTmpPaths.add(taskStatus.getPath());
+ }
}
}
} else {
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java
index 65e1c7d3147..a3ebda3d5dc 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java
@@ -83,9 +83,9 @@ public class FileSystemCommitterTest {
new LinkedHashMap<>(),
policies);
- createFile(path, "task-1/p1=0/p2=0/", "f1", "f2");
- createFile(path, "task-2/p1=0/p2=0/", "f3");
- createFile(path, "task-2/p1=0/p2=1/", "f4");
+ createFile(path, "task-1-attempt-0/p1=0/p2=0/", "f1", "f2");
+ createFile(path, "task-2-attempt-0/p1=0/p2=0/", "f3");
+ createFile(path, "task-2-attempt-0/p1=0/p2=1/", "f4");
committer.commitPartitions();
assertThat(new File(outputPath.toFile(), "p1=0/p2=0/f1")).exists();
assertThat(new File(outputPath.toFile(), "p1=0/p2=0/f2")).exists();
@@ -94,7 +94,7 @@ public class FileSystemCommitterTest {
assertThat(new File(outputPath.toFile(), "p1=0/p2=1/f4")).exists();
assertThat(new File(outputPath.toFile(), "p1=0/p2=1/" + SUCCESS_FILE_NAME)).exists();
- createFile(path, "task-2/p1=0/p2=1/", "f5");
+ createFile(path, "task-2-attempt-0/p1=0/p2=1/", "f5");
committer.commitPartitions();
assertThat(new File(outputPath.toFile(), "p1=0/p2=0/f1")).exists();
assertThat(new File(outputPath.toFile(), "p1=0/p2=0/f2")).exists();
@@ -114,7 +114,7 @@ public class FileSystemCommitterTest {
identifier,
new LinkedHashMap<>(),
policies);
- createFile(path, "task-2/p1=0/p2=1/", "f6");
+ createFile(path, "task-2-attempt-0/p1=0/p2=1/", "f6");
committer.commitPartitions();
assertThat(new File(outputPath.toFile(), "p1=0/p2=1/f5")).exists();
assertThat(new File(outputPath.toFile(), "p1=0/p2=1/f6")).exists();
@@ -135,15 +135,15 @@ public class FileSystemCommitterTest {
new LinkedHashMap<String, String>(),
policies);
- createFile(path, "task-1/", "f1", "f2");
- createFile(path, "task-2/", "f3");
+ createFile(path, "task-1-attempt-0/", "f1", "f2");
+ createFile(path, "task-2-attempt-0/", "f3");
committer.commitPartitions();
assertThat(new File(outputPath.toFile(), "f1")).exists();
assertThat(new File(outputPath.toFile(), "f2")).exists();
assertThat(new File(outputPath.toFile(), "f3")).exists();
assertThat(new File(outputPath.toFile(), SUCCESS_FILE_NAME)).exists();
- createFile(path, "task-2/", "f4");
+ createFile(path, "task-2-attempt-0/", "f4");
committer.commitPartitions();
assertThat(new File(outputPath.toFile(), "f4")).exists();
assertThat(new File(outputPath.toFile(), SUCCESS_FILE_NAME)).exists();
@@ -159,7 +159,7 @@ public class FileSystemCommitterTest {
identifier,
new LinkedHashMap<String, String>(),
policies);
- createFile(path, "task-2/", "f5");
+ createFile(path, "task-2-attempt-0/", "f5");
committer.commitPartitions();
assertThat(new File(outputPath.toFile(), "f4")).exists();
assertThat(new File(outputPath.toFile(), "f5")).exists();
@@ -183,8 +183,8 @@ public class FileSystemCommitterTest {
staticPartitions,
policies);
- createFile(path, "task-1/dt=2022-08-02/");
- createFile(path, "task-2/dt=2022-08-02/");
+ createFile(path, "task-1-attempt-0/dt=2022-08-02/");
+ createFile(path, "task-2-attempt-0/dt=2022-08-02/");
committer.commitPartitions();
@@ -201,8 +201,8 @@ public class FileSystemCommitterTest {
createFile(outputPath, "dt=2022-08-02/f1");
assertThat(new File(emptyPartitionFile, "f1")).exists();
- createFile(path, "task-1/dt=2022-08-02/");
- createFile(path, "task-2/dt=2022-08-02/");
+ createFile(path, "task-1-attempt-0/dt=2022-08-02/");
+ createFile(path, "task-2-attempt-0/dt=2022-08-02/");
committer.commitPartitions();
// assert partition dir is still empty because the partition dir is overwritten
@@ -216,8 +216,8 @@ public class FileSystemCommitterTest {
createFile(outputPath, "dt=2022-08-02/f1");
assertThat(new File(emptyPartitionFile, "f1")).exists();
- createFile(path, "task-1/dt=2022-08-02/");
- createFile(path, "task-2/dt=2022-08-02/");
+ createFile(path, "task-1-attempt-0/dt=2022-08-02/");
+ createFile(path, "task-2-attempt-0/dt=2022-08-02/");
committer =
new FileSystemCommitter(
fileSystemFactory,
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java
index e3f31e3a708..964304d5ae7 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemOutputFormatTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.file.table;
+import org.apache.flink.api.common.io.FinalizeOnMaster.FinalizationContext;
import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
@@ -52,6 +53,8 @@ class FileSystemOutputFormatTest {
@TempDir private java.nio.file.Path tmpPath;
@TempDir private java.nio.file.Path outputPath;
+ private final TestingFinalizationContext finalizationContext = new TestingFinalizationContext();
+
private static Map<File, String> getFileContentByPath(java.nio.file.Path directory)
throws IOException {
Map<File, String> contents = new HashMap<>(4);
@@ -95,7 +98,7 @@ class FileSystemOutputFormatTest {
assertThat(getFileContentByPath(tmpPath)).hasSize(1);
}
- ref.get().finalizeGlobal(1);
+ ref.get().finalizeGlobal(finalizationContext);
Map<File, String> content = getFileContentByPath(outputPath);
assertThat(content.values())
.containsExactly("a1,1,p1\n" + "a2,2,p1\n" + "a2,2,p2\n" + "a3,3,p1\n");
@@ -123,7 +126,7 @@ class FileSystemOutputFormatTest {
assertThat(getFileContentByPath(tmpPath)).hasSize(1);
}
- ref.get().finalizeGlobal(1);
+ ref.get().finalizeGlobal(finalizationContext);
Map<File, String> content = getFileContentByPath(outputPath);
assertThat(content).hasSize(1);
assertThat(content.values())
@@ -148,7 +151,7 @@ class FileSystemOutputFormatTest {
assertThat(getFileContentByPath(tmpPath)).hasSize(1);
}
- ref.get().finalizeGlobal(1);
+ ref.get().finalizeGlobal(finalizationContext);
Map<File, String> content = getFileContentByPath(outputPath);
assertThat(content).hasSize(1);
assertThat(content.keySet().iterator().next().getParentFile().getName()).isEqualTo("c=p1");
@@ -165,7 +168,7 @@ class FileSystemOutputFormatTest {
assertThat(getFileContentByPath(tmpPath)).hasSize(2);
}
- ref.get().finalizeGlobal(1);
+ ref.get().finalizeGlobal(finalizationContext);
Map<File, String> content = getFileContentByPath(outputPath);
Map<String, String> sortedContent = new TreeMap<>();
content.forEach((file, s) -> sortedContent.put(file.getParentFile().getName(), s));
@@ -191,7 +194,7 @@ class FileSystemOutputFormatTest {
assertThat(getFileContentByPath(tmpPath)).hasSize(2);
}
- ref.get().finalizeGlobal(1);
+ ref.get().finalizeGlobal(finalizationContext);
Map<File, String> content = getFileContentByPath(outputPath);
Map<String, String> sortedContent = new TreeMap<>();
content.forEach((file, s) -> sortedContent.put(file.getParentFile().getName(), s));
@@ -236,4 +239,17 @@ class FileSystemOutputFormatTest {
3,
0);
}
+
+ private static class TestingFinalizationContext implements FinalizationContext {
+
+ @Override
+ public int getParallelism() {
+ return 1;
+ }
+
+ @Override
+ public int getFinishedAttempt(int subtaskIndex) {
+ return 0;
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/PartitionTempFileManagerTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/PartitionTempFileManagerTest.java
new file mode 100644
index 00000000000..2f95bda2d8b
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/PartitionTempFileManagerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.List;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link PartitionTempFileManager}. */
+class PartitionTempFileManagerTest {
+
+ @TempDir private java.nio.file.Path tmpPath;
+
+ @Test
+ void testListTaskTemporaryPaths() throws Exception {
+ // only accept task-0-attempt-1
+ final BiPredicate<Integer, Integer> taskAttemptFilter =
+ (subtaskIndex, attemptNumber) -> subtaskIndex == 0 && attemptNumber == 1;
+
+ final FileSystem fs = FileSystem.get(tmpPath.toUri());
+ fs.mkdirs(new Path(tmpPath.toUri() + "/task-0-attempt-0")); // invalid attempt number
+ fs.mkdirs(new Path(tmpPath.toUri() + "/task-0-attempt-1")); // valid
+ fs.mkdirs(new Path(tmpPath.toUri() + "/task-1-attempt-0")); // invalid subtask index
+ fs.mkdirs(new Path(tmpPath.toUri() + "/.task-0-attempt-1")); // invisible dir
+ fs.mkdirs(new Path(tmpPath.toUri() + "/_SUCCESS")); // not a task dir
+
+ final List<Path> taskTmpPaths =
+ PartitionTempFileManager.listTaskTemporaryPaths(
+ fs, new Path(tmpPath.toUri()), taskAttemptFilter);
+ final List<String> taskDirs =
+ taskTmpPaths.stream().map(Path::getName).collect(Collectors.toList());
+ assertThat(taskDirs).hasSize(1).containsExactly("task-0-attempt-1");
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/PartitionWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/PartitionWriterTest.java
index 7093532b103..5f6e402316d 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/PartitionWriterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/PartitionWriterTest.java
@@ -50,7 +50,7 @@ class PartitionWriterTest {
@BeforeEach
void before() throws IOException {
- manager = new PartitionTempFileManager(fsFactory, new Path(tmpDir.toUri()), 0);
+ manager = new PartitionTempFileManager(fsFactory, new Path(tmpDir.toUri()), 0, 0);
usesLegacyRows.before();
}
@@ -132,16 +132,17 @@ class PartitionWriterTest {
writer.write(Row.of("p1", 2));
writer.write(Row.of("p2", 2));
writer.close();
- assertThat(records.toString()).isEqualTo("{task-0=[p1,1, p1,2, p2,2]}");
+ assertThat(records.toString()).isEqualTo("{task-0-attempt-0=[p1,1, p1,2, p2,2]}");
- manager = new PartitionTempFileManager(fsFactory, new Path(tmpDir.toUri()), 1);
+ manager = new PartitionTempFileManager(fsFactory, new Path(tmpDir.toUri()), 1, 0);
writer = new SingleDirectoryWriter<>(context, manager, computer, new LinkedHashMap<>());
writer.write(Row.of("p3", 3));
writer.write(Row.of("p5", 5));
writer.write(Row.of("p2", 2));
writer.close();
assertThat(records.toString())
- .isEqualTo("{task-0=[p1,1, p1,2, p2,2], task-1=[p3,3, p5,5, p2,2]}");
+ .isEqualTo(
+ "{task-0-attempt-0=[p1,1, p1,2, p2,2], task-1-attempt-0=[p3,3, p5,5, p2,2]}");
}
@Test
@@ -153,9 +154,10 @@ class PartitionWriterTest {
writer.write(Row.of("p1", 2));
writer.write(Row.of("p2", 2));
writer.close();
- assertThat(records.toString()).isEqualTo("{task-0/p=p1=[p1,1, p1,2], task-0/p=p2=[p2,2]}");
+ assertThat(records.toString())
+ .isEqualTo("{task-0-attempt-0/p=p1=[p1,1, p1,2], task-0-attempt-0/p=p2=[p2,2]}");
- manager = new PartitionTempFileManager(fsFactory, new Path(tmpDir.toUri()), 1);
+ manager = new PartitionTempFileManager(fsFactory, new Path(tmpDir.toUri()), 1, 1);
writer = new GroupedPartitionWriter<>(context, manager, computer);
writer.write(Row.of("p3", 3));
writer.write(Row.of("p4", 5));
@@ -163,7 +165,7 @@ class PartitionWriterTest {
writer.close();
assertThat(records.toString())
.isEqualTo(
- "{task-0/p=p1=[p1,1, p1,2], task-0/p=p2=[p2,2], task-1/p=p3=[p3,3], task-1/p=p4=[p4,5], task-1/p=p5=[p5,2]}");
+ "{task-0-attempt-0/p=p1=[p1,1, p1,2], task-0-attempt-0/p=p2=[p2,2], task-1-attempt-1/p=p3=[p3,3], task-1-attempt-1/p=p4=[p4,5], task-1-attempt-1/p=p5=[p5,2]}");
}
@Test
@@ -175,9 +177,10 @@ class PartitionWriterTest {
writer.write(Row.of("p2", 2));
writer.write(Row.of("p1", 2));
writer.close();
- assertThat(records.toString()).isEqualTo("{task-0/p=p1=[p1,1, p1,2], task-0/p=p2=[p2,2]}");
+ assertThat(records.toString())
+ .isEqualTo("{task-0-attempt-0/p=p1=[p1,1, p1,2], task-0-attempt-0/p=p2=[p2,2]}");
- manager = new PartitionTempFileManager(fsFactory, new Path(tmpDir.toUri()), 1);
+ manager = new PartitionTempFileManager(fsFactory, new Path(tmpDir.toUri()), 1, 1);
writer = new DynamicPartitionWriter<>(context, manager, computer);
writer.write(Row.of("p4", 5));
writer.write(Row.of("p3", 3));
@@ -185,6 +188,6 @@ class PartitionWriterTest {
writer.close();
assertThat(records.toString())
.isEqualTo(
- "{task-0/p=p1=[p1,1, p1,2], task-0/p=p2=[p2,2], task-1/p=p4=[p4,5], task-1/p=p3=[p3,3], task-1/p=p5=[p5,2]}");
+ "{task-0-attempt-0/p=p1=[p1,1, p1,2], task-0-attempt-0/p=p2=[p2,2], task-1-attempt-1/p=p4=[p4,5], task-1-attempt-1/p=p3=[p3,3], task-1-attempt-1/p=p5=[p5,2]}");
}
}