You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2023/01/17 09:56:09 UTC
[flink] branch master updated: [FLINK-29879][filesystem] Introduce operators for merging files in batch mode (#21257)
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b5f465036d2 [FLINK-29879][filesystem] Introduce operators for merging files in batch mode (#21257)
b5f465036d2 is described below
commit b5f465036d28ed422a1218bfb8549cbf3e6ecd2e
Author: yuxia Luo <lu...@alumni.sjtu.edu.cn>
AuthorDate: Tue Jan 17 17:55:57 2023 +0800
[FLINK-29879][filesystem] Introduce operators for merging files in batch mode (#21257)
---
.../file/table/DynamicPartitionWriter.java | 15 +-
.../connector/file/table/FileSystemCommitter.java | 41 ++++-
.../file/table/FileSystemOutputFormat.java | 6 +-
.../file/table/GroupedPartitionWriter.java | 15 +-
.../connector/file/table/PartitionLoader.java | 73 ++++++---
.../file/table/PartitionTempFileManager.java | 23 ++-
.../connector/file/table/PartitionWriter.java | 24 +++
.../file/table/PartitionWriterFactory.java | 9 +-
.../file/table/SingleDirectoryWriter.java | 30 ++--
.../batch/compact/BatchCompactCoordinator.java | 153 ++++++++++++++++++
.../table/batch/compact/BatchCompactOperator.java | 150 +++++++++++++++++
.../file/table/batch/compact/BatchFileWriter.java | 134 +++++++++++++++
.../batch/compact/BatchPartitionCommitterSink.java | 145 +++++++++++++++++
.../file/table/batch/compact/CompactFileUtils.java | 121 ++++++++++++++
.../file/table/stream/compact/CompactMessages.java | 20 +++
.../file/table/FileSystemCommitterTest.java | 7 +-
.../batch/compact/BatchCompactCoordinatorTest.java | 165 +++++++++++++++++++
.../batch/compact/BatchCompactOperatorTest.java | 133 +++++++++++++++
.../table/batch/compact/BatchFileWriterTest.java | 179 +++++++++++++++++++++
.../compact/BatchPartitionCommitterSinkTest.java | 137 ++++++++++++++++
.../stream/compact/AbstractCompactTestBase.java | 4 +-
21 files changed, 1535 insertions(+), 49 deletions(-)
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/DynamicPartitionWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/DynamicPartitionWriter.java
index c33db2ffdd3..20432a54156 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/DynamicPartitionWriter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/DynamicPartitionWriter.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.file.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.core.fs.Path;
import java.util.HashMap;
import java.util.Map;
@@ -37,13 +38,23 @@ public class DynamicPartitionWriter<T> implements PartitionWriter<T> {
private final PartitionTempFileManager manager;
private final PartitionComputer<T> computer;
private final Map<String, OutputFormat<T>> formats;
+ private final PartitionWriterListener writerListener;
public DynamicPartitionWriter(
Context<T> context, PartitionTempFileManager manager, PartitionComputer<T> computer) {
+ this(context, manager, computer, new DefaultPartitionWriterListener());
+ }
+
+ public DynamicPartitionWriter(
+ Context<T> context,
+ PartitionTempFileManager manager,
+ PartitionComputer<T> computer,
+ PartitionWriterListener writerListener) {
this.context = context;
this.manager = manager;
this.computer = computer;
this.formats = new HashMap<>();
+ this.writerListener = writerListener;
}
@Override
@@ -53,8 +64,10 @@ public class DynamicPartitionWriter<T> implements PartitionWriter<T> {
if (format == null) {
// create a new format to write new partition.
- format = context.createNewOutputFormat(manager.createPartitionDir(partition));
+ Path path = manager.createPartitionDir(partition);
+ format = context.createNewOutputFormat(path);
formats.put(partition, format);
+ writerListener.onFileOpened(partition, path);
}
format.writeRecord(computer.projectColumnsToWrite(in));
}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java
index c63bea9e3fd..2e63117fcd5 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemCommitter.java
@@ -23,12 +23,14 @@ import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.catalog.ObjectIdentifier;
+import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static org.apache.flink.connector.file.table.PartitionTempFileManager.collectPartSpecToPaths;
import static org.apache.flink.connector.file.table.PartitionTempFileManager.listTaskTemporaryPaths;
+import static org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath;
/**
* File system file committer implementation. It moves all files to output path from temporary path.
@@ -48,7 +50,7 @@ import static org.apache.flink.connector.file.table.PartitionTempFileManager.lis
* <p>See: {@link PartitionTempFileManager}. {@link PartitionLoader}.
*/
@Internal
-class FileSystemCommitter {
+public class FileSystemCommitter {
private final FileSystemFactory factory;
private final TableMetaStoreFactory metaStoreFactory;
@@ -60,7 +62,7 @@ class FileSystemCommitter {
private final LinkedHashMap<String, String> staticPartitions;
private final List<PartitionCommitPolicy> policies;
- FileSystemCommitter(
+ public FileSystemCommitter(
FileSystemFactory factory,
TableMetaStoreFactory metaStoreFactory,
boolean overwrite,
@@ -97,11 +99,11 @@ class FileSystemCommitter {
} else {
for (Map.Entry<LinkedHashMap<String, String>, List<Path>> entry :
collectPartSpecToPaths(fs, taskPaths, partitionColumnSize).entrySet()) {
- loader.loadPartition(entry.getKey(), entry.getValue());
+ loader.loadPartition(entry.getKey(), entry.getValue(), true);
}
}
} else {
- loader.loadNonPartition(taskPaths);
+ loader.loadNonPartition(taskPaths, true);
}
} finally {
for (Path taskPath : taskPaths) {
@@ -109,4 +111,35 @@ class FileSystemCommitter {
}
}
}
+
+ /**
+ * For committing job's output after successful batch job completion, it will commit with the
+ * given partitions and corresponding files written which means it'll move the temporary files
+ * to partition's location.
+ */
+ public void commitPartitionsWithFiles(Map<String, List<Path>> partitionsFiles)
+ throws Exception {
+ FileSystem fs = factory.create(tmpPath.toUri());
+ try (PartitionLoader loader =
+ new PartitionLoader(
+ overwrite, fs, metaStoreFactory, isToLocal, identifier, policies)) {
+ if (partitionColumnSize > 0) {
+ if (partitionsFiles.isEmpty() && !staticPartitions.isEmpty()) {
+ if (partitionColumnSize == staticPartitions.size()) {
+ loader.loadEmptyPartition(this.staticPartitions);
+ }
+ } else {
+ for (Map.Entry<String, List<Path>> partitionFile : partitionsFiles.entrySet()) {
+ LinkedHashMap<String, String> partSpec =
+ extractPartitionSpecFromPath(new Path(partitionFile.getKey()));
+ loader.loadPartition(partSpec, partitionFile.getValue(), false);
+ }
+ }
+ } else {
+ List<Path> files = new ArrayList<>();
+ partitionsFiles.values().forEach(files::addAll);
+ loader.loadNonPartition(files, false);
+ }
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
index 23768755ec5..22aac83ddb1 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java
@@ -148,7 +148,11 @@ public class FileSystemOutputFormat<T> implements OutputFormat<T>, FinalizeOnMas
partitionColumns.length - staticPartitions.size() > 0,
dynamicGrouped,
staticPartitions)
- .create(context, fileManager, computer);
+ .create(
+ context,
+ fileManager,
+ computer,
+ new PartitionWriter.DefaultPartitionWriterListener());
} catch (Exception e) {
throw new TableException("Exception in open", e);
}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/GroupedPartitionWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/GroupedPartitionWriter.java
index 2ab92c92fee..7aadfc7495e 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/GroupedPartitionWriter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/GroupedPartitionWriter.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.file.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.core.fs.Path;
import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
@@ -35,15 +36,25 @@ public class GroupedPartitionWriter<T> implements PartitionWriter<T> {
private final Context<T> context;
private final PartitionTempFileManager manager;
private final PartitionComputer<T> computer;
+ private final PartitionWriterListener writerListener;
private OutputFormat<T> currentFormat;
private String currentPartition;
public GroupedPartitionWriter(
Context<T> context, PartitionTempFileManager manager, PartitionComputer<T> computer) {
+ this(context, manager, computer, new DefaultPartitionWriterListener());
+ }
+
+ public GroupedPartitionWriter(
+ Context<T> context,
+ PartitionTempFileManager manager,
+ PartitionComputer<T> computer,
+ PartitionWriterListener writerListener) {
this.context = context;
this.manager = manager;
this.computer = computer;
+ this.writerListener = writerListener;
}
@Override
@@ -54,7 +65,9 @@ public class GroupedPartitionWriter<T> implements PartitionWriter<T> {
currentFormat.close();
}
- currentFormat = context.createNewOutputFormat(manager.createPartitionDir(partition));
+ Path path = manager.createPartitionDir(partition);
+ currentFormat = context.createNewOutputFormat(path);
+ writerListener.onFileOpened(partition, path);
currentPartition = partition;
}
currentFormat.writeRecord(computer.projectColumnsToWrite(in));
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java
index 4ff1a9c1f4d..74f730ae7ac 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionLoader.java
@@ -75,8 +75,18 @@ public class PartitionLoader implements Closeable {
this.policies = policies;
}
- /** Load a single partition. */
- public void loadPartition(LinkedHashMap<String, String> partSpec, List<Path> srcDirs)
+ /**
+ * Load a single partition.
+ *
+ * @param partSpec the specification for the single partition
+ * @param srcPaths the paths for the files used to load to the single partition
+ * @param srcPathIsDir whether the every path in {@param srcPaths} is directory or not. If true,
+ * it will load the files under the directory of the every path. If false, every path in
+ * {@param srcPaths} is considered as single file, and it will load the single file for
+ * every path.
+ */
+ public void loadPartition(
+ LinkedHashMap<String, String> partSpec, List<Path> srcPaths, boolean srcPathIsDir)
throws Exception {
Optional<Path> pathFromMeta = metaStore.getPartition(partSpec);
Path path =
@@ -86,14 +96,22 @@ public class PartitionLoader implements Closeable {
metaStore.getLocationPath(),
generatePartitionPath(partSpec)));
- overwriteAndMoveFiles(srcDirs, path);
+ overwriteAndMoveFiles(srcPaths, path, srcPathIsDir);
commitPartition(partSpec, path);
}
- /** Load a non-partition files to output path. */
- public void loadNonPartition(List<Path> srcDirs) throws Exception {
+ /**
+ * Load a non-partition files to output path.
+ *
+ * @param srcPaths the paths for the files used to load to the single partition
+ * @param srcPathIsDir whether the every path in {@param srcPaths} is directory or not. If true,
+ * it will load the files under the directory of the every path. If false, every path in
+ * {@param srcPaths} is considered as single file, and it will load the single file for
+ * every path.
+ */
+ public void loadNonPartition(List<Path> srcPaths, boolean srcPathIsDir) throws Exception {
Path tableLocation = metaStore.getLocationPath();
- overwriteAndMoveFiles(srcDirs, tableLocation);
+ overwriteAndMoveFiles(srcPaths, tableLocation, srcPathIsDir);
commitPartition(new LinkedHashMap<>(), tableLocation);
metaStore.finishWritingTable(tableLocation);
}
@@ -125,12 +143,13 @@ public class PartitionLoader implements Closeable {
commitPartition(partSpec, path);
}
- private void overwriteAndMoveFiles(List<Path> srcDirs, Path destDir) throws Exception {
+ private void overwriteAndMoveFiles(List<Path> srcPaths, Path destDir, boolean srcPathIsDir)
+ throws Exception {
FileSystem destFileSystem = destDir.getFileSystem();
boolean dirSuccessExist = destFileSystem.exists(destDir) || destFileSystem.mkdirs(destDir);
Preconditions.checkState(dirSuccessExist, "Failed to create dest path " + destDir);
overwrite(destDir);
- moveFiles(srcDirs, destDir);
+ moveFiles(srcPaths, destDir, srcPathIsDir);
}
private void overwrite(Path destDir) throws Exception {
@@ -148,23 +167,35 @@ public class PartitionLoader implements Closeable {
}
/** Moves files from srcDir to destDir. */
- private void moveFiles(List<Path> srcDirs, Path destDir) throws Exception {
- for (Path srcDir : srcDirs) {
- if (!srcDir.equals(destDir)) {
- FileStatus[] srcFiles = listStatusWithoutHidden(fs, srcDir);
- if (srcFiles != null) {
- for (FileStatus srcFile : srcFiles) {
- Path srcPath = srcFile.getPath();
- Path destPath = new Path(destDir, srcPath.getName());
- // if it's not to move to local file system, just rename it
- if (!isToLocal) {
- fs.rename(srcPath, destPath);
- } else {
- FileUtils.copy(srcPath, destPath, true);
+ private void moveFiles(List<Path> srcPaths, Path destDir, boolean srcPathIsDir)
+ throws Exception {
+ if (srcPathIsDir) {
+ // if the src path is still a directory, list the directory to get the files that needed
+ // to be moved.
+ for (Path srcDir : srcPaths) {
+ if (!srcDir.equals(destDir)) {
+ FileStatus[] srcFiles = listStatusWithoutHidden(fs, srcDir);
+ if (srcFiles != null) {
+ for (FileStatus srcFile : srcFiles) {
+ moveFile(srcFile.getPath(), destDir);
}
}
}
}
+ } else {
+ for (Path srcPath : srcPaths) {
+ moveFile(srcPath, destDir);
+ }
+ }
+ }
+
+ private void moveFile(Path srcPath, Path destDir) throws Exception {
+ Path destPath = new Path(destDir, srcPath.getName());
+ // if it's not to move to local file system, just rename it
+ if (!isToLocal) {
+ fs.rename(srcPath, destPath);
+ } else {
+ FileUtils.copy(srcPath, destPath, true);
}
}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionTempFileManager.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionTempFileManager.java
index 5672fd51914..14bf3a30c81 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionTempFileManager.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionTempFileManager.java
@@ -49,6 +49,7 @@ public class PartitionTempFileManager {
private static final Logger LOG = LoggerFactory.getLogger(PartitionTempFileManager.class);
private static final String TASK_DIR_PREFIX = "task-";
+ private static final String ATTEMPT_PREFIX = "attempt-";
private final int taskNumber;
private final Path taskTmpDir;
@@ -56,7 +57,7 @@ public class PartitionTempFileManager {
private transient int nameCounter = 0;
- PartitionTempFileManager(FileSystemFactory factory, Path tmpPath, int taskNumber)
+ public PartitionTempFileManager(FileSystemFactory factory, Path tmpPath, int taskNumber)
throws IOException {
this(factory, tmpPath, taskNumber, new OutputFileConfig("", ""));
}
@@ -75,6 +76,24 @@ public class PartitionTempFileManager {
factory.create(taskTmpDir.toUri()).delete(taskTmpDir, true);
}
+ public PartitionTempFileManager(
+ FileSystemFactory factory,
+ Path tmpPath,
+ int taskNumber,
+ int attemptNumber,
+ OutputFileConfig outputFileConfig)
+ throws IOException {
+ this.taskNumber = taskNumber;
+ this.outputFileConfig = outputFileConfig;
+
+ // generate task temp dir with task and attempt number like "task-0-attempt-0"
+ String taskTmpDirName =
+ String.format(
+ "%s%d-%s%d", TASK_DIR_PREFIX, taskNumber, ATTEMPT_PREFIX, attemptNumber);
+ this.taskTmpDir = new Path(tmpPath, taskTmpDirName);
+ factory.create(taskTmpDir.toUri()).delete(taskTmpDir, true);
+ }
+
/** Generate a new partition directory with partitions. */
public Path createPartitionDir(String... partitions) {
Path parentPath = taskTmpDir;
@@ -120,7 +139,7 @@ public class PartitionTempFileManager {
}
/** Collect all partitioned paths, aggregate according to partition spec. */
- static Map<LinkedHashMap<String, String>, List<Path>> collectPartSpecToPaths(
+ public static Map<LinkedHashMap<String, String>, List<Path>> collectPartSpecToPaths(
FileSystem fs, List<Path> taskPaths, int partColSize) {
Map<LinkedHashMap<String, String>, List<Path>> specToPaths = new HashMap<>();
for (Path taskPath : taskPaths) {
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionWriter.java
index bf057cf28d4..e1c9c63d0c1 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionWriter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionWriter.java
@@ -62,4 +62,28 @@ public interface PartitionWriter<T> {
return format;
}
}
+
+ /** Listener for partition writer. */
+ interface PartitionWriterListener {
+
+ /**
+ * Notifies a new file has been opened.
+ *
+ * <p>Note that this does not mean that the file has been created in the file system. It is
+ * only created logically and the actual file will be generated after it is committed.
+ *
+ * @param partition The partition for the newly opened file.
+ * @param file The newly created file.
+ */
+ void onFileOpened(String partition, Path file);
+ }
+
+ /** Default implementation for PartitionWriterListener. */
+ class DefaultPartitionWriterListener implements PartitionWriterListener {
+
+ @Override
+ public void onFileOpened(String partition, Path file) {
+ // do nothing
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionWriterFactory.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionWriterFactory.java
index 597595b3801..0eedec0b4f3 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionWriterFactory.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/PartitionWriterFactory.java
@@ -29,7 +29,10 @@ import java.util.LinkedHashMap;
public interface PartitionWriterFactory<T> extends Serializable {
PartitionWriter<T> create(
- Context<T> context, PartitionTempFileManager manager, PartitionComputer<T> computer)
+ Context<T> context,
+ PartitionTempFileManager manager,
+ PartitionComputer<T> computer,
+ PartitionWriter.PartitionWriterListener writerListener)
throws Exception;
/** Util for get a {@link PartitionWriterFactory}. */
@@ -41,9 +44,9 @@ public interface PartitionWriterFactory<T> extends Serializable {
return grouped ? GroupedPartitionWriter::new : DynamicPartitionWriter::new;
} else {
return (PartitionWriterFactory<T>)
- (context, manager, computer) ->
+ (context, manager, computer, writerListener) ->
new SingleDirectoryWriter<>(
- context, manager, computer, staticPartitions);
+ context, manager, computer, staticPartitions, writerListener);
}
}
}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/SingleDirectoryWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/SingleDirectoryWriter.java
index 11b1a39e00b..260c293b766 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/SingleDirectoryWriter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/SingleDirectoryWriter.java
@@ -20,8 +20,8 @@ package org.apache.flink.connector.file.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.core.fs.Path;
-import java.io.IOException;
import java.util.LinkedHashMap;
import static org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath;
@@ -38,6 +38,7 @@ public class SingleDirectoryWriter<T> implements PartitionWriter<T> {
private final PartitionTempFileManager manager;
private final PartitionComputer<T> computer;
private final LinkedHashMap<String, String> staticPartitions;
+ private final PartitionWriterListener writerListener;
private OutputFormat<T> format;
@@ -46,25 +47,32 @@ public class SingleDirectoryWriter<T> implements PartitionWriter<T> {
PartitionTempFileManager manager,
PartitionComputer<T> computer,
LinkedHashMap<String, String> staticPartitions) {
+ this(context, manager, computer, staticPartitions, new DefaultPartitionWriterListener());
+ }
+
+ public SingleDirectoryWriter(
+ Context<T> context,
+ PartitionTempFileManager manager,
+ PartitionComputer<T> computer,
+ LinkedHashMap<String, String> staticPartitions,
+ PartitionWriterListener writerListener) {
this.context = context;
this.manager = manager;
this.computer = computer;
this.staticPartitions = staticPartitions;
- }
-
- private void createFormat() throws IOException {
- this.format =
- context.createNewOutputFormat(
- staticPartitions.size() == 0
- ? manager.createPartitionDir()
- : manager.createPartitionDir(
- generatePartitionPath(staticPartitions)));
+ this.writerListener = writerListener;
}
@Override
public void write(T in) throws Exception {
if (format == null) {
- createFormat();
+ String partition = generatePartitionPath(staticPartitions);
+ Path path =
+ staticPartitions.isEmpty()
+ ? manager.createPartitionDir()
+ : manager.createPartitionDir(partition);
+ format = context.createNewOutputFormat(path);
+ writerListener.onFileOpened(partition, path);
}
format.writeRecord(computer.projectColumnsToWrite(in));
}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactCoordinator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactCoordinator.java
new file mode 100644
index 00000000000..41010b469dd
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactCoordinator.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch.compact;
+
+import org.apache.flink.connector.file.table.BinPacking;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactionUnit;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorInput;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorOutput;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.InputFile;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Coordinator for compaction in batch mode. It will collect the written files in {@link
+ * BatchFileWriter} and determine whether to compact files or not as well as what files should be
+ * merged into a single file.
+ *
+ * <p>NOTE: The coordination is a stable algorithm, which can ensure different attempts will produce
+ * same outputs.
+ */
+public class BatchCompactCoordinator extends AbstractStreamOperator<CoordinatorOutput>
+ implements OneInputStreamOperator<CoordinatorInput, CoordinatorOutput>, BoundedOneInput {
+
+ private static final long serialVersionUID = 1L;
+
+ private final SupplierWithException<FileSystem, IOException> fsFactory;
+ private final long compactAverageSize;
+ private final long compactTargetSize;
+ private final StreamRecord<CoordinatorOutput> element = new StreamRecord<>(null);
+
+ private transient FileSystem fs;
+ // the mapping from written partitions to the corresponding files.
+ private transient Map<String, List<Path>> inputFiles;
+
+ public BatchCompactCoordinator(
+ SupplierWithException<FileSystem, IOException> fsFactory,
+ long compactAverageSize,
+ long compactTargetSize) {
+ this.fsFactory = fsFactory;
+ this.compactAverageSize = compactAverageSize;
+ this.compactTargetSize = compactTargetSize;
+ }
+
+ @Override
+ public void open() throws Exception {
+ fs = fsFactory.get();
+ inputFiles = new HashMap<>();
+ }
+
+ @Override
+ public void processElement(StreamRecord<CoordinatorInput> element) throws Exception {
+ CoordinatorInput coordinatorInput = element.getValue();
+ if (coordinatorInput instanceof InputFile) {
+ InputFile file = (InputFile) coordinatorInput;
+ // collect the written files
+ inputFiles
+ .computeIfAbsent(file.getPartition(), k -> new ArrayList<>())
+ .add(file.getFile());
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported input message: " + coordinatorInput);
+ }
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ for (Map.Entry<String, List<Path>> partitionFiles : inputFiles.entrySet()) {
+ compactPartitionFiles(partitionFiles.getKey(), partitionFiles.getValue());
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ inputFiles.clear();
+ }
+
+ private void compactPartitionFiles(String partition, List<Path> paths) throws IOException {
+ if (paths.isEmpty()) {
+ return;
+ }
+ int unitId = 0;
+ final Map<Path, Long> filesSize = getFilesSize(fs, paths);
+ // calculate the average size of these files
+ if (getAverageSize(filesSize) < compactAverageSize) {
+ // we should compact
+ // get the written files corresponding to the partition
+ Function<Path, Long> sizeFunc = filesSize::get;
+ // determine what files should be merged to a file
+ List<List<Path>> compactUnits = BinPacking.pack(paths, sizeFunc, compactTargetSize);
+ for (List<Path> compactUnit : compactUnits) {
+ // emit the compact units containing the files path
+ output.collect(
+ element.replace(new CompactionUnit(unitId++, partition, compactUnit)));
+ }
+ } else {
+ // no need to merge these files, emit each single file to downstream for committing
+ for (Path path : paths) {
+ output.collect(
+ element.replace(
+ new CompactionUnit(
+ unitId++, partition, Collections.singletonList(path))));
+ }
+ }
+ }
+
+ private Map<Path, Long> getFilesSize(FileSystem fs, List<Path> paths) throws IOException {
+ Map<Path, Long> filesSize = new HashMap<>();
+ for (Path path : paths) {
+ long len = fs.getFileStatus(path).getLen();
+ filesSize.put(path, len);
+ }
+ return filesSize;
+ }
+
+ private double getAverageSize(Map<Path, Long> filesSize) {
+ int numFiles = 0;
+ long totalSz = 0;
+ for (Map.Entry<Path, Long> fileSize : filesSize.entrySet()) {
+ numFiles += 1;
+ totalSz += fileSize.getValue();
+ }
+ return totalSz / (numFiles * 1.0);
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactOperator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactOperator.java
new file mode 100644
index 00000000000..72f718c1316
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactOperator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch.compact;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactOutput;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactionUnit;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorOutput;
+import org.apache.flink.connector.file.table.stream.compact.CompactReader;
+import org.apache.flink.connector.file.table.stream.compact.CompactWriter;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * CompactOperator for compaction in batch mode. It will compact files to a target file and then
+ * emit the compacted file's path to downstream operator. The main logic is similar to {@link
+ * org.apache.flink.connector.file.table.stream.compact.CompactOperator} but skip some unnecessary
+ * operations in batch mode.
+ *
+ * <p>Note: if the size of the files to be compacted is 1, this operator won't do anything and just
+ * emit the file to downstream. Also, the name of the files to be compacted is not a hidden file,
+ * it's expected these files are in hidden or temporary directory. Please make sure it. This
+ * assumption can help skip rename hidden file.
+ */
+public class BatchCompactOperator<T> extends AbstractStreamOperator<CompactOutput>
+ implements OneInputStreamOperator<CoordinatorOutput, CompactOutput>, BoundedOneInput {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String UNCOMPACTED_PREFIX = "uncompacted-";
+ public static final String COMPACTED_PREFIX = "compacted-";
+ public static final String ATTEMPT_PREFIX = "attempt-";
+
+ private final SupplierWithException<FileSystem, IOException> fsFactory;
+ private final CompactReader.Factory<T> readerFactory;
+ private final CompactWriter.Factory<T> writerFactory;
+
+ private transient FileSystem fileSystem;
+ private transient Map<String, List<Path>> compactedFiles;
+
+ public BatchCompactOperator(
+ SupplierWithException<FileSystem, IOException> fsFactory,
+ CompactReader.Factory<T> readerFactory,
+ CompactWriter.Factory<T> writerFactory) {
+ this.fsFactory = fsFactory;
+ this.readerFactory = readerFactory;
+ this.writerFactory = writerFactory;
+ }
+
+ @Override
+ public void open() throws Exception {
+ fileSystem = fsFactory.get();
+ compactedFiles = new HashMap<>();
+ }
+
+ @Override
+ public void processElement(StreamRecord<CoordinatorOutput> element) throws Exception {
+ CoordinatorOutput value = element.getValue();
+ if (value instanceof CompactionUnit) {
+ CompactionUnit unit = (CompactionUnit) value;
+ String partition = unit.getPartition();
+ // these files should be merged to one file
+ List<Path> paths = unit.getPaths();
+ Configuration config =
+ getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
+ Path path = null;
+ if (paths.size() == 1) {
+ // for single file, we only need to move it to corresponding partition instead of
+ // compacting. we make the downstream commit operator to do the moving
+ path = paths.get(0);
+ } else if (paths.size() > 1) {
+ Path targetPath =
+ createCompactedFile(paths, getRuntimeContext().getAttemptNumber());
+ path =
+ CompactFileUtils.doCompact(
+ fileSystem,
+ partition,
+ paths,
+ targetPath,
+ config,
+ readerFactory,
+ writerFactory);
+ }
+ if (path != null) {
+ compactedFiles.computeIfAbsent(partition, k -> new ArrayList<>()).add(path);
+ }
+ } else {
+ throw new UnsupportedOperationException("Unsupported input message: " + value);
+ }
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ // emit the compacted files to downstream
+ output.collect(new StreamRecord<>(new CompactOutput(compactedFiles)));
+ }
+
+ @Override
+ public void close() throws Exception {
+ compactedFiles.clear();
+ }
+
+ private static Path createCompactedFile(List<Path> uncompactedFiles, int attemptNumber) {
+ Path path = convertFromUncompacted(uncompactedFiles.get(0));
+ // different attempt will have different target paths to avoid different attempts will
+ // write same path
+ return new Path(
+ path.getParent(), convertToCompactWithAttempt(attemptNumber, path.getName()));
+ }
+
+ public static Path convertFromUncompacted(Path path) {
+ Preconditions.checkArgument(
+ path.getName().startsWith(UNCOMPACTED_PREFIX),
+ "This should be uncompacted file: " + path);
+ return new Path(path.getParent(), path.getName().substring(UNCOMPACTED_PREFIX.length()));
+ }
+
+ private static String convertToCompactWithAttempt(int attemptNumber, String fileName) {
+ return String.format(
+ "%s%s%d-%s", COMPACTED_PREFIX, ATTEMPT_PREFIX, attemptNumber, fileName);
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java
new file mode 100644
index 00000000000..9650ce3b8bd
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch.compact;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.table.FileSystemFactory;
+import org.apache.flink.connector.file.table.OutputFormatFactory;
+import org.apache.flink.connector.file.table.PartitionComputer;
+import org.apache.flink.connector.file.table.PartitionTempFileManager;
+import org.apache.flink.connector.file.table.PartitionWriter;
+import org.apache.flink.connector.file.table.PartitionWriterFactory;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorInput;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.InputFile;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.TableException;
+
+import java.util.LinkedHashMap;
+
+/**
+ * An operator for writing files in batch mode. Once creating a new file to write, the writing
+ * operator will emit the written file to downstream.
+ */
+public class BatchFileWriter<T> extends AbstractStreamOperator<CoordinatorInput>
+ implements OneInputStreamOperator<T, CoordinatorInput>, BoundedOneInput {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FileSystemFactory fsFactory;
+ private final Path tmpPath;
+ private final String[] partitionColumns;
+ private final boolean dynamicGrouped;
+ private final LinkedHashMap<String, String> staticPartitions;
+ private final PartitionComputer<T> computer;
+ private final OutputFormatFactory<T> formatFactory;
+ private final OutputFileConfig outputFileConfig;
+
+ private transient PartitionWriter<T> writer;
+
+ public BatchFileWriter(
+ FileSystemFactory fsFactory,
+ Path tmpPath,
+ String[] partitionColumns,
+ boolean dynamicGrouped,
+ LinkedHashMap<String, String> staticPartitions,
+ OutputFormatFactory<T> formatFactory,
+ PartitionComputer<T> computer,
+ OutputFileConfig outputFileConfig) {
+ this.fsFactory = fsFactory;
+ this.tmpPath = tmpPath;
+ this.partitionColumns = partitionColumns;
+ this.dynamicGrouped = dynamicGrouped;
+ this.staticPartitions = staticPartitions;
+ this.formatFactory = formatFactory;
+ this.computer = computer;
+ this.outputFileConfig = outputFileConfig;
+ setChainingStrategy(ChainingStrategy.ALWAYS);
+ }
+
+ @Override
+ public void open() throws Exception {
+ try {
+ PartitionTempFileManager fileManager =
+ new PartitionTempFileManager(
+ fsFactory,
+ tmpPath,
+ getRuntimeContext().getIndexOfThisSubtask(),
+ getRuntimeContext().getAttemptNumber(),
+ outputFileConfig);
+ Configuration config =
+ getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
+ PartitionWriter.Context<T> context =
+ new PartitionWriter.Context<>(config, formatFactory);
+
+ // when write a file, the listener emit the written files and the partition the files
+ // belonged
+ PartitionWriter.PartitionWriterListener writerListener =
+ (partition, file) ->
+ output.collect(new StreamRecord<>(new InputFile(partition, file)));
+ writer =
+ PartitionWriterFactory.<T>get(
+ partitionColumns.length - staticPartitions.size() > 0,
+ dynamicGrouped,
+ staticPartitions)
+ .create(context, fileManager, computer, writerListener);
+
+ } catch (Exception e) {
+ throw new TableException("Exception in open", e);
+ }
+ }
+
+ @Override
+ public void processElement(StreamRecord<T> element) throws Exception {
+ try {
+ writer.write(element.getValue());
+ } catch (Exception e) {
+ throw new TableException("Exception in writeRecord", e);
+ }
+ }
+
+ @Override
+ public void endInput() throws Exception {}
+
+ @Override
+ public void close() throws Exception {
+ try {
+ staticPartitions.clear();
+ writer.close();
+ } catch (Exception e) {
+ throw new TableException("Exception in close", e);
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSink.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSink.java
new file mode 100644
index 00000000000..ea1c6cb13bb
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSink.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch.compact;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.table.FileSystemCommitter;
+import org.apache.flink.connector.file.table.FileSystemFactory;
+import org.apache.flink.connector.file.table.PartitionCommitPolicy;
+import org.apache.flink.connector.file.table.PartitionCommitPolicyFactory;
+import org.apache.flink.connector.file.table.TableMetaStoreFactory;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactOutput;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Committer operator for partition in batch mode. This is the single (non-parallel) task. It
+ * collects all the partition information including partition and written files send from upstream.
+ */
+public class BatchPartitionCommitterSink extends RichSinkFunction<CompactOutput> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FileSystemFactory fsFactory;
+ private final TableMetaStoreFactory msFactory;
+ private final PartitionCommitPolicyFactory partitionCommitPolicyFactory;
+ private final Path tmpPath;
+ private final boolean overwrite;
+ private final boolean isToLocal;
+ private final String[] partitionColumns;
+ private final LinkedHashMap<String, String> staticPartitions;
+ private final ObjectIdentifier identifier;
+
+ private transient Map<String, List<Path>> partitionsFiles;
+
+ public BatchPartitionCommitterSink(
+ FileSystemFactory fsFactory,
+ TableMetaStoreFactory msFactory,
+ boolean overwrite,
+ boolean isToLocal,
+ Path tmpPath,
+ String[] partitionColumns,
+ LinkedHashMap<String, String> staticPartitions,
+ ObjectIdentifier identifier,
+ PartitionCommitPolicyFactory partitionCommitPolicyFactory) {
+ this.fsFactory = fsFactory;
+ this.msFactory = msFactory;
+ this.partitionCommitPolicyFactory = partitionCommitPolicyFactory;
+ this.tmpPath = tmpPath;
+ this.identifier = identifier;
+ this.overwrite = overwrite;
+ this.isToLocal = isToLocal;
+ this.partitionColumns = partitionColumns;
+ this.staticPartitions = staticPartitions;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ partitionsFiles = new HashMap<>();
+ }
+
+ @Override
+ public void invoke(CompactMessages.CompactOutput compactOutput, Context context)
+ throws Exception {
+ for (Map.Entry<String, List<Path>> compactFiles :
+ compactOutput.getCompactedFiles().entrySet()) {
+ // collect the written partition and written files
+ partitionsFiles
+ .computeIfAbsent(compactFiles.getKey(), k -> new ArrayList<>())
+ .addAll(compactFiles.getValue());
+ }
+ }
+
+ @Override
+ public void finish() throws Exception {
+ try {
+ List<PartitionCommitPolicy> policies = Collections.emptyList();
+ if (partitionCommitPolicyFactory != null) {
+ policies =
+ partitionCommitPolicyFactory.createPolicyChain(
+ getRuntimeContext().getUserCodeClassLoader(),
+ () -> {
+ try {
+ return fsFactory.create(tmpPath.toUri());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ // commit the partitions with the given files
+ // it will move the written temporary files to partition's location
+ FileSystemCommitter committer =
+ new FileSystemCommitter(
+ fsFactory,
+ msFactory,
+ overwrite,
+ tmpPath,
+ partitionColumns.length,
+ isToLocal,
+ identifier,
+ staticPartitions,
+ policies);
+ committer.commitPartitionsWithFiles(partitionsFiles);
+ } catch (Exception e) {
+ throw new TableException("Exception in finish", e);
+ } finally {
+ try {
+ fsFactory.create(tmpPath.toUri()).delete(tmpPath, true);
+ } catch (IOException ignore) {
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ staticPartitions.clear();
+ partitionsFiles.clear();
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/CompactFileUtils.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/CompactFileUtils.java
new file mode 100644
index 00000000000..eee69bcb12e
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/CompactFileUtils.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch.compact;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.table.stream.compact.CompactContext;
+import org.apache.flink.connector.file.table.stream.compact.CompactReader;
+import org.apache.flink.connector.file.table.stream.compact.CompactWriter;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Utils for compacting files. */
+public class CompactFileUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CompactFileUtils.class);
+
+ /**
+ * Do Compaction: - Target file exists, do nothing. Otherwise, it'll read the input files and
+ * write the target file to achieve compaction purpose.
+ */
+ public static @Nullable <T> Path doCompact(
+ FileSystem fileSystem,
+ String partition,
+ List<Path> paths,
+ Path target,
+ Configuration config,
+ CompactReader.Factory<T> readerFactory,
+ CompactWriter.Factory<T> writerFactory)
+ throws IOException {
+ if (paths.size() == 0) {
+ return null;
+ }
+
+ if (fileSystem.exists(target)) {
+ return target;
+ }
+
+ checkExist(fileSystem, paths);
+
+ long startMillis = System.currentTimeMillis();
+
+ Map<Path, Long> inputMap = new HashMap<>();
+ for (Path path : paths) {
+ inputMap.put(path, fileSystem.getFileStatus(path).getLen());
+ }
+
+ doMultiFilesCompact(
+ partition, paths, target, config, fileSystem, readerFactory, writerFactory);
+ Map<Path, Long> targetMap = new HashMap<>();
+ targetMap.put(target, fileSystem.getFileStatus(target).getLen());
+ double costSeconds = ((double) (System.currentTimeMillis() - startMillis)) / 1000;
+ LOG.info(
+ "Compaction time cost is '{}S', output per file as following format: name=size(byte), target file is '{}', input files are '{}'",
+ costSeconds,
+ targetMap,
+ inputMap);
+ return target;
+ }
+
+ private static <T> void doMultiFilesCompact(
+ String partition,
+ List<Path> files,
+ Path dst,
+ Configuration config,
+ FileSystem fileSystem,
+ CompactReader.Factory<T> readerFactory,
+ CompactWriter.Factory<T> writerFactory)
+ throws IOException {
+ CompactWriter<T> writer =
+ writerFactory.create(CompactContext.create(config, fileSystem, partition, dst));
+
+ for (Path path : files) {
+ try (CompactReader<T> reader =
+ readerFactory.create(
+ CompactContext.create(config, fileSystem, partition, path))) {
+ T record;
+ while ((record = reader.read()) != null) {
+ writer.write(record);
+ }
+ }
+ }
+
+ // commit immediately
+ writer.commit();
+ }
+
+ private static void checkExist(FileSystem fileSystem, List<Path> candidates)
+ throws IOException {
+ for (Path path : candidates) {
+ if (!fileSystem.exists(path)) {
+ throw new IOException("Compaction file not exist: " + path);
+ }
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactMessages.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactMessages.java
index fc905cd34a9..a28b57eb03f 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactMessages.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactMessages.java
@@ -25,6 +25,7 @@ import java.io.Serializable;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
/**
@@ -136,6 +137,25 @@ public class CompactMessages {
}
}
+ /**
+ * The output of {@link
+ * org.apache.flink.connector.file.table.batch.compact.BatchCompactOperator}.
+ */
+ public static class CompactOutput implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Map<String, List<Path>> compactedFiles;
+
+ public CompactOutput(Map<String, List<Path>> compactedFiles) {
+ this.compactedFiles = compactedFiles;
+ }
+
+ public Map<String, List<Path>> getCompactedFiles() {
+ return compactedFiles;
+ }
+ }
+
/** A flag to end compaction. */
public static class EndCompaction implements CoordinatorOutput {
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java
index 313f07f81f4..65e1c7d3147 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/FileSystemCommitterTest.java
@@ -38,7 +38,7 @@ import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link FileSystemCommitter}. */
-class FileSystemCommitterTest {
+public class FileSystemCommitterTest {
private static final String SUCCESS_FILE_NAME = "_SUCCESS";
@@ -258,12 +258,13 @@ class FileSystemCommitterTest {
assertThat(outputPath.toFile().list()).isEqualTo(new String[0]);
}
- static class TestMetaStoreFactory implements TableMetaStoreFactory {
+ /** A {@link TableMetaStoreFactory} for test purpose. */
+ public static class TestMetaStoreFactory implements TableMetaStoreFactory {
private static final long serialVersionUID = 1L;
private final Path outputPath;
- TestMetaStoreFactory(Path outputPath) {
+ public TestMetaStoreFactory(Path outputPath) {
this.outputPath = outputPath;
}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactCoordinatorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactCoordinatorTest.java
new file mode 100644
index 00000000000..8aaff411f4c
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactCoordinatorTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch.compact;
+
+import org.apache.flink.connector.file.table.stream.compact.AbstractCompactTestBase;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactionUnit;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorInput;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorOutput;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.InputFile;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for batch compact coordinator. */
+public class BatchCompactCoordinatorTest extends AbstractCompactTestBase {
+
+ @Test
+ public void testCompactIsNotNeeded() throws Exception {
+ long averageSize = 5;
+ long targetSize = 50;
+ BatchCompactCoordinator compactCoordinator =
+ new BatchCompactCoordinator(() -> folder.getFileSystem(), averageSize, targetSize);
+
+ try (OneInputStreamOperatorTestHarness<CoordinatorInput, CoordinatorOutput> testHarness =
+ new OneInputStreamOperatorTestHarness<>(compactCoordinator)) {
+ testHarness.setup();
+ testHarness.open();
+
+ Path f1 = newFile("f1", 10);
+ Path f2 = newFile("f2", 5);
+ Path f3 = newFile("f3", 16);
+
+ testHarness.processElement(new StreamRecord<>(new InputFile("", f1)));
+ testHarness.processElement(new StreamRecord<>(new InputFile("", f2)));
+ testHarness.processElement(new StreamRecord<>(new InputFile("", f3)));
+ testHarness.endInput();
+
+ // the file average size is not less than 5, so it's no need to compact, the output
+ // compact unit should only contain one file
+ assertCompactUnits(
+ testHarness.extractOutputValues(),
+ Arrays.asList(
+ new CompactionUnit(0, "", Collections.singletonList(f1)),
+ new CompactionUnit(1, "", Collections.singletonList(f2)),
+ new CompactionUnit(2, "", Collections.singletonList(f3))));
+ }
+ }
+
+ @Test
+ public void testCompactNonPartitionedTable() throws Exception {
+ long averageSize = 14;
+ long targetSize = 16;
+ BatchCompactCoordinator compactCoordinator =
+ new BatchCompactCoordinator(() -> folder.getFileSystem(), averageSize, targetSize);
+ try (OneInputStreamOperatorTestHarness<CoordinatorInput, CoordinatorOutput> testHarness =
+ new OneInputStreamOperatorTestHarness<>(compactCoordinator)) {
+ testHarness.setup();
+ testHarness.open();
+
+ Path f1 = newFile("f1", 10);
+ Path f2 = newFile("f2", 5);
+ Path f3 = newFile("f3", 20);
+
+ testHarness.processElement(new StreamRecord<>(new InputFile("", f1)));
+ testHarness.processElement(new StreamRecord<>(new InputFile("", f2)));
+ testHarness.processElement(new StreamRecord<>(new InputFile("", f3)));
+ testHarness.endInput();
+
+ List<CoordinatorOutput> coordinatorOutputs = testHarness.extractOutputValues();
+ // f1 + f2 should be merged
+ assertCompactUnits(
+ coordinatorOutputs,
+ Arrays.asList(
+ new CompactionUnit(0, "", Arrays.asList(f1, f2)),
+ new CompactionUnit(1, "", Collections.singletonList(f3))));
+ }
+ }
+
+ @Test
+ public void testCompactPartitionedTable() throws Exception {
+ long averageSize = 10;
+ long targetSize = 16;
+ BatchCompactCoordinator compactCoordinator =
+ new BatchCompactCoordinator(() -> folder.getFileSystem(), averageSize, targetSize);
+
+ try (OneInputStreamOperatorTestHarness<CoordinatorInput, CoordinatorOutput> testHarness =
+ new OneInputStreamOperatorTestHarness<>(compactCoordinator)) {
+ testHarness.setup();
+ testHarness.open();
+
+ // the files for partition "p1=1/", average size is 7.5, should be merged
+ Path f1 = newFile("f1", 10);
+ Path f2 = newFile("f2", 5);
+
+ // the files for partition "p1=2/", average size is 14, shouldn't be merged
+ Path f3 = newFile("f3", 20);
+ Path f4 = newFile("f4", 8);
+
+ // partition "p1=1/" should be merged
+ testHarness.processElement(new StreamRecord<>(new InputFile("p1=1/", f1)));
+ testHarness.processElement(new StreamRecord<>(new InputFile("p1=1/", f2)));
+
+ // partition "p1=2/" should be merged
+ testHarness.processElement(new StreamRecord<>(new InputFile("p1=2/", f3)));
+ testHarness.processElement(new StreamRecord<>(new InputFile("p1=2/", f4)));
+
+ testHarness.endInput();
+
+ List<CoordinatorOutput> coordinatorOutputs = testHarness.extractOutputValues();
+ // f1, f2 should be packed to compact unit
+ // f3/f4 is a single compact unit
+ assertCompactUnits(
+ coordinatorOutputs,
+ Arrays.asList(
+ new CompactionUnit(0, "p1=1/", Arrays.asList(f1, f2)),
+ new CompactionUnit(1, "p1=2/", Collections.singletonList(f3)),
+ new CompactionUnit(2, "p1=2/", Collections.singletonList(f4))));
+ }
+ }
+
+ private void assertCompactUnits(
+ List<CoordinatorOutput> coordinatorOutputs,
+ List<CompactionUnit> expectCompactionUnits) {
+ assertThat(coordinatorOutputs.size()).isEqualTo(expectCompactionUnits.size());
+ coordinatorOutputs.sort(Comparator.comparing(o -> ((CompactionUnit) o).getPartition()));
+ expectCompactionUnits.sort(Comparator.comparing(CompactionUnit::getPartition));
+ for (int i = 0; i < coordinatorOutputs.size(); i++) {
+ CoordinatorOutput coordinatorOutput = coordinatorOutputs.get(i);
+ assertThat(coordinatorOutput).isInstanceOf(CompactionUnit.class);
+
+ CompactionUnit compactionUnit = (CompactionUnit) coordinatorOutput;
+ CompactionUnit expectCompactionUnit = expectCompactionUnits.get(i);
+
+ // assert compact unit is equal
+ assertThat(compactionUnit.getPartition())
+ .isEqualTo(expectCompactionUnit.getPartition());
+ assertThat(compactionUnit.getPaths()).isEqualTo(expectCompactionUnit.getPaths());
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactOperatorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactOperatorTest.java
new file mode 100644
index 00000000000..80170fae1ab
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchCompactOperatorTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch.compact;
+
+import org.apache.flink.connector.file.table.stream.compact.AbstractCompactTestBase;
+import org.apache.flink.connector.file.table.stream.compact.CompactBulkReader;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactOutput;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactionUnit;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorOutput;
+import org.apache.flink.connector.file.table.stream.compact.CompactWriter;
+import org.apache.flink.connector.file.table.stream.compact.TestByteFormat;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link BatchCompactOperator}. */
+public class BatchCompactOperatorTest extends AbstractCompactTestBase {
+
+ @Test
+ public void testCompact() throws Exception {
+ // test compact
+ BatchCompactOperator<Byte> compactOperator = createBatchCompactOperator();
+ try (OneInputStreamOperatorTestHarness<CoordinatorOutput, CompactOutput> testHarness =
+ new OneInputStreamOperatorTestHarness<>(compactOperator)) {
+ testHarness.setup();
+ testHarness.open();
+
+ Path f0 = newFile("uncompacted-f0", 3);
+ Path f1 = newFile("uncompacted-f1", 2);
+ Path f2 = newFile("uncompacted-f2", 2);
+
+ Path f3 = newFile("uncompacted-f3", 10);
+
+ testHarness.processElement(
+ new StreamRecord<>(new CompactionUnit(1, "p=p1/", Arrays.asList(f0, f1, f2))));
+ testHarness.processElement(
+ new StreamRecord<>(
+ new CompactionUnit(2, "p=p2/", Collections.singletonList(f3))));
+
+ testHarness.endInput();
+ List<CompactOutput> compactOutputs = testHarness.extractOutputValues();
+ Map<String, List<Path>> expectCompactedFiles = new HashMap<>();
+
+ expectCompactedFiles.put(
+ "p=p1/",
+ Collections.singletonList(new Path(folder + "/compacted-attempt-0-f0")));
+ // for single file, we won't compact, so the name won't change
+ expectCompactedFiles.put(
+ "p=p2/", Collections.singletonList(new Path(folder + "/uncompacted-f3")));
+
+ // check compacted file
+ byte[] bytes =
+ FileUtils.readAllBytes(
+ new File(folder.getPath(), "compacted-attempt-0-f0").toPath());
+ Arrays.sort(bytes);
+ assertThat(bytes).isEqualTo(new byte[] {0, 0, 0, 1, 1, 1, 2});
+
+ bytes = FileUtils.readAllBytes(new File(folder.getPath(), "uncompacted-f3").toPath());
+ assertThat(bytes).isEqualTo(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+
+ assertCompactOutput(
+ compactOutputs,
+ Collections.singletonList(new CompactOutput(expectCompactedFiles)));
+ }
+ }
+
+ private void assertCompactOutput(
+ List<CompactOutput> actualCompactOutputs, List<CompactOutput> expectCompactOutputs) {
+ assertThat(actualCompactOutputs.size()).isEqualTo(expectCompactOutputs.size());
+ for (int i = 0; i < actualCompactOutputs.size(); i++) {
+ CompactOutput actualCompactOutput = actualCompactOutputs.get(i);
+ CompactOutput expectCompactOutput = expectCompactOutputs.get(i);
+ assertThat(actualCompactOutput.getCompactedFiles())
+ .isEqualTo(expectCompactOutput.getCompactedFiles());
+ }
+ }
+
+ private BatchCompactOperator<Byte> createBatchCompactOperator() {
+ return new BatchCompactOperator<>(
+ () -> folder.getFileSystem(),
+ CompactBulkReader.factory(TestByteFormat.bulkFormat()),
+ context -> {
+ Path path = context.getPath();
+ Path tempPath = new Path(path.getParent(), "." + path.getName());
+ FSDataOutputStream out =
+ context.getFileSystem()
+ .create(tempPath, FileSystem.WriteMode.OVERWRITE);
+ return new CompactWriter<Byte>() {
+ @Override
+ public void write(Byte record) throws IOException {
+ out.write(record);
+ }
+
+ @Override
+ public void commit() throws IOException {
+ out.close();
+ context.getFileSystem().rename(tempPath, path);
+ }
+ };
+ });
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriterTest.java
new file mode 100644
index 00000000000..cf1358c4a55
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriterTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch.compact;
+
+import org.apache.flink.api.java.io.TextOutputFormat;
+import org.apache.flink.connector.file.table.FileSystemFactory;
+import org.apache.flink.connector.file.table.PartitionTempFileManager;
+import org.apache.flink.connector.file.table.RowPartitionComputer;
+import org.apache.flink.connector.file.table.stream.compact.AbstractCompactTestBase;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CoordinatorInput;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages.InputFile;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link BatchFileWriter}. */
+public class BatchFileWriterTest extends AbstractCompactTestBase {
+ private final String[] columnNames = new String[] {"a", "b", "c"};
+ private FileSystemFactory fsFactory = FileSystem::get;
+
+ @Test
+ public void testWriteWithoutPartition() throws Exception {
+ String[] partitionColumns = new String[0];
+ BatchFileWriter<Row> fileWriter =
+ createBatchFileWriter(columnNames, partitionColumns, new LinkedHashMap<>(), false);
+ PartitionTempFileManager tempFileManager =
+ new PartitionTempFileManager(
+ fsFactory, folder, 0, 0, OutputFileConfig.builder().build());
+
+ try (OneInputStreamOperatorTestHarness<Row, CoordinatorInput> testHarness =
+ new OneInputStreamOperatorTestHarness<>(fileWriter)) {
+ testHarness.setup();
+ testHarness.open();
+
+ // write data
+ testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, 2)));
+ testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, 2)));
+ List<CoordinatorInput> coordinatorInputs = testHarness.extractOutputValues();
+
+ // should only write and emit one file
+ assertInputFile(
+ coordinatorInputs,
+ 1,
+ Collections.singletonList(""),
+ Collections.singletonList(tempFileManager.createPartitionDir()));
+ }
+ }
+
+ @Test
+ public void testWriteWithStaticPartition() throws Exception {
+ String[] partitionColumns = new String[] {"b", "c"};
+ LinkedHashMap<String, String> staticParts = new LinkedHashMap<>();
+ staticParts.put("b", "p1");
+ staticParts.put("c", "p2");
+
+ BatchFileWriter<Row> fileWriter =
+ createBatchFileWriter(partitionColumns, partitionColumns, staticParts, false);
+ PartitionTempFileManager tempFileManager =
+ new PartitionTempFileManager(
+ fsFactory, folder, 0, 0, OutputFileConfig.builder().build());
+
+ try (OneInputStreamOperatorTestHarness<Row, CoordinatorInput> testHarness =
+ new OneInputStreamOperatorTestHarness<>(fileWriter)) {
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, 2)));
+ testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, 2)));
+ List<CoordinatorInput> coordinatorInputs = testHarness.extractOutputValues();
+
+ // should only write and emit one file
+ assertInputFile(
+ coordinatorInputs,
+ 1,
+ Collections.singletonList("b=p1/c=p2/"),
+ Collections.singletonList(tempFileManager.createPartitionDir("b=p1/c=p2/")));
+ }
+ }
+
+ @Test
+ public void testWriteWithoutDynamicPartitionGrouped() throws Exception {
+ // test without dynamicGrouped
+ testWriteWithDynamicPartition(false);
+ }
+
+ @Test
+ public void testWriteWithDynamicPartitionGrouped() throws Exception {
+ // test with dynamicGrouped
+ testWriteWithDynamicPartition(true);
+ }
+
+ private void testWriteWithDynamicPartition(boolean dynamicGrouped) throws Exception {
+ String[] partitionColumns = new String[] {"b", "c"};
+
+ BatchFileWriter<Row> fileWriter =
+ createBatchFileWriter(
+ columnNames, partitionColumns, new LinkedHashMap<>(), dynamicGrouped);
+ PartitionTempFileManager tempFileManager =
+ new PartitionTempFileManager(
+ fsFactory, folder, 0, 0, OutputFileConfig.builder().build());
+
+ try (OneInputStreamOperatorTestHarness<Row, CoordinatorInput> testHarness =
+ new OneInputStreamOperatorTestHarness<>(fileWriter)) {
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(Row.of("a1", 1, 2)));
+ testHarness.processElement(new StreamRecord<>(Row.of("a1", 2, 1)));
+ List<CoordinatorInput> coordinatorInputs = testHarness.extractOutputValues();
+ // should write two files, one if for partition
+ assertInputFile(
+ coordinatorInputs,
+ 2,
+ Arrays.asList("b=1/c=2/", "b=2/c=1/"),
+ Arrays.asList(
+ tempFileManager.createPartitionDir("b=1/c=2/"),
+ tempFileManager.createPartitionDir("b=2/c=1/")));
+ }
+ }
+
+ private void assertInputFile(
+ List<CoordinatorInput> coordinatorInputs,
+ int expectSize,
+ List<String> expectedPartitions,
+ List<Path> expectedFilePaths) {
+ assertThat(coordinatorInputs).hasSize(expectSize);
+ for (int i = 0; i < expectSize; i++) {
+ CoordinatorInput input = coordinatorInputs.get(i);
+ assertThat(input).isInstanceOf(InputFile.class);
+ InputFile inputFile = (InputFile) input;
+ assertThat(inputFile.getPartition()).isEqualTo(expectedPartitions.get(i));
+ assertThat(inputFile.getFile()).isEqualTo(expectedFilePaths.get(i));
+ }
+ }
+
+ private BatchFileWriter<Row> createBatchFileWriter(
+ String[] columnNames,
+ String[] partitionColumns,
+ LinkedHashMap<String, String> staticPartitions,
+ boolean dynamicGrouped) {
+ return new BatchFileWriter<>(
+ fsFactory,
+ folder,
+ partitionColumns,
+ dynamicGrouped,
+ staticPartitions,
+ TextOutputFormat::new,
+ new RowPartitionComputer("default", columnNames, partitionColumns),
+ OutputFileConfig.builder().build());
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSinkTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSinkTest.java
new file mode 100644
index 00000000000..13f9b84198b
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSinkTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table.batch.compact;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.table.FileSystemCommitterTest;
+import org.apache.flink.connector.file.table.FileSystemFactory;
+import org.apache.flink.connector.file.table.PartitionCommitPolicyFactory;
+import org.apache.flink.connector.file.table.TableMetaStoreFactory;
+import org.apache.flink.connector.file.table.stream.compact.CompactMessages;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+
+/** Test for {@link BatchPartitionCommitterSink}. */
+public class BatchPartitionCommitterSinkTest {
+ private final FileSystemFactory fileSystemFactory = FileSystem::get;
+
+ private TableMetaStoreFactory metaStoreFactory;
+ private ObjectIdentifier identifier;
+
+ @TempDir private java.nio.file.Path path;
+ @TempDir private java.nio.file.Path outputPath;
+
+ @BeforeEach
+ public void before() {
+ metaStoreFactory =
+ new FileSystemCommitterTest.TestMetaStoreFactory(new Path(outputPath.toString()));
+ identifier = ObjectIdentifier.of("hiveCatalog", "default", "test");
+ }
+
+ @Test
+ public void testPartitionCommit() throws Exception {
+ BatchPartitionCommitterSink committerSink =
+ new BatchPartitionCommitterSink(
+ fileSystemFactory,
+ metaStoreFactory,
+ false,
+ false,
+ new Path(path.toString()),
+ new String[] {"p1", "p2"},
+ new LinkedHashMap<>(),
+ identifier,
+ new PartitionCommitPolicyFactory(null, null, null));
+ committerSink.open(new Configuration());
+
+ List<Path> pathList1 = createFiles(path, "task-1/p1=0/p2=0/", "f1", "f2");
+ List<Path> pathList2 = createFiles(path, "task-2/p1=0/p2=0/", "f3");
+ List<Path> pathList3 = createFiles(path, "task-2/p1=0/p2=1/", "f4");
+ Map<String, List<Path>> compactedFiles = new HashMap<>();
+ pathList1.addAll(pathList2);
+ compactedFiles.put("p1=0/p2=0/", pathList1);
+ compactedFiles.put("p1=0/p2=1/", pathList3);
+
+ committerSink.invoke(new CompactMessages.CompactOutput(compactedFiles), TEST_SINK_CONTEXT);
+ committerSink.setRuntimeContext(TEST_RUNTIME_CONTEXT);
+ committerSink.finish();
+ committerSink.close();
+ assertThat(new File(outputPath.toFile(), "p1=0/p2=0/f1")).exists();
+ assertThat(new File(outputPath.toFile(), "p1=0/p2=0/f2")).exists();
+ assertThat(new File(outputPath.toFile(), "p1=0/p2=0/f3")).exists();
+ assertThat(new File(outputPath.toFile(), "p1=0/p2=1/f4")).exists();
+ }
+
+ private List<Path> createFiles(java.nio.file.Path parent, String path, String... files)
+ throws IOException {
+ java.nio.file.Path dir = Files.createDirectories(Paths.get(parent.toString(), path));
+ List<Path> paths = new ArrayList<>();
+ for (String file : files) {
+ paths.add(new Path(Files.createFile(dir.resolve(file)).toFile().getPath()));
+ }
+ return paths;
+ }
+
+ private static final RuntimeContext TEST_RUNTIME_CONTEXT = getMockRuntimeContext();
+ private static final SinkFunction.Context TEST_SINK_CONTEXT =
+ new SinkFunction.Context() {
+ @Override
+ public long currentProcessingTime() {
+ return 0;
+ }
+
+ @Override
+ public long currentWatermark() {
+ return 0;
+ }
+
+ @Override
+ public Long timestamp() {
+ return null;
+ }
+ };
+
+ private static RuntimeContext getMockRuntimeContext() {
+ RuntimeContext context = Mockito.mock(RuntimeContext.class);
+ doReturn(Thread.currentThread().getContextClassLoader())
+ .when(context)
+ .getUserCodeClassLoader();
+ return context;
+ }
+}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/AbstractCompactTestBase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/AbstractCompactTestBase.java
index 8e18069444a..2ea12647ec6 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/AbstractCompactTestBase.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/AbstractCompactTestBase.java
@@ -34,14 +34,14 @@ public abstract class AbstractCompactTestBase {
@TempDir private java.nio.file.Path path;
- Path folder;
+ protected Path folder;
@BeforeEach
void before() {
folder = new Path(path.toString());
}
- Path newFile(String name, int len) throws IOException {
+ protected Path newFile(String name, int len) throws IOException {
Path path = new Path(folder, name);
File file = new File(path.getPath());
file.delete();