You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/09/29 01:42:51 UTC
[hudi] 10/17: [HUDI-4913] Fix HoodieSnapshotExporter for writing to a different S3 bucket or FS (#6785)
This is an automated email from the ASF dual-hosted git repository.
yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit dcec30f14c9a696f4e7006fc8519f417ebbee71b
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Tue Sep 27 12:21:19 2022 -0700
[HUDI-4913] Fix HoodieSnapshotExporter for writing to a different S3 bucket or FS (#6785)
---
.../hudi/utilities/HoodieSnapshotExporter.java | 64 +++++++++++++---------
1 file changed, 38 insertions(+), 26 deletions(-)
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
index 753765fb6a..187f66d073 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -114,16 +114,18 @@ public class HoodieSnapshotExporter {
}
public void export(JavaSparkContext jsc, Config cfg) throws IOException {
- FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
+ FileSystem outputFs = FSUtils.getFs(cfg.targetOutputPath, jsc.hadoopConfiguration());
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
- if (outputPathExists(fs, cfg)) {
+ if (outputPathExists(outputFs, cfg)) {
throw new HoodieSnapshotExporterException("The target output path already exists.");
}
- final String latestCommitTimestamp = getLatestCommitTimestamp(fs, cfg).<HoodieSnapshotExporterException>orElseThrow(() -> {
- throw new HoodieSnapshotExporterException("No commits present. Nothing to snapshot.");
- });
+ FileSystem sourceFs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
+ final String latestCommitTimestamp = getLatestCommitTimestamp(sourceFs, cfg)
+ .<HoodieSnapshotExporterException>orElseThrow(() -> {
+ throw new HoodieSnapshotExporterException("No commits present. Nothing to snapshot.");
+ });
LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.",
latestCommitTimestamp));
@@ -134,11 +136,11 @@ public class HoodieSnapshotExporter {
LOG.info(String.format("The job needs to export %d partitions.", partitions.size()));
if (cfg.outputFormat.equals(OutputFormatValidator.HUDI)) {
- exportAsHudi(jsc, cfg, partitions, latestCommitTimestamp);
+ exportAsHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp);
} else {
- exportAsNonHudi(jsc, cfg, partitions, latestCommitTimestamp);
+ exportAsNonHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp);
}
- createSuccessTag(fs, cfg);
+ createSuccessTag(outputFs, cfg);
}
private boolean outputPathExists(FileSystem fs, Config cfg) throws IOException {
@@ -164,7 +166,8 @@ public class HoodieSnapshotExporter {
}
}
- private void exportAsNonHudi(JavaSparkContext jsc, Config cfg, List<String> partitions, String latestCommitTimestamp) {
+ private void exportAsNonHudi(JavaSparkContext jsc, FileSystem sourceFs,
+ Config cfg, List<String> partitions, String latestCommitTimestamp) {
Partitioner defaultPartitioner = dataset -> {
Dataset<Row> hoodieDroppedDataset = dataset.drop(JavaConversions.asScalaIterator(HoodieRecord.HOODIE_META_COLUMNS.iterator()).toSeq());
return StringUtils.isNullOrEmpty(cfg.outputPartitionField)
@@ -178,7 +181,7 @@ public class HoodieSnapshotExporter {
HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
context.setJobStatus(this.getClass().getSimpleName(), "Exporting as non-HUDI dataset: " + cfg.targetOutputPath);
- final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg);
+ final BaseFileOnlyView fsView = getBaseFileOnlyView(sourceFs, cfg);
Iterator<String> exportingFilePaths = jsc
.parallelize(partitions, partitions.size())
.flatMap(partition -> fsView
@@ -193,8 +196,9 @@ public class HoodieSnapshotExporter {
.save(cfg.targetOutputPath);
}
- private void exportAsHudi(JavaSparkContext jsc, Config cfg, List<String> partitions, String latestCommitTimestamp) throws IOException {
- final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg);
+ private void exportAsHudi(JavaSparkContext jsc, FileSystem sourceFs,
+ Config cfg, List<String> partitions, String latestCommitTimestamp) throws IOException {
+ final BaseFileOnlyView fsView = getBaseFileOnlyView(sourceFs, cfg);
final HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
final SerializableConfiguration serConf = context.getHadoopConf();
@@ -219,20 +223,26 @@ public class HoodieSnapshotExporter {
String partition = tuple._1();
Path sourceFilePath = new Path(tuple._2());
Path toPartitionPath = FSUtils.getPartitionPath(cfg.targetOutputPath, partition);
- FileSystem fs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
+ FileSystem executorSourceFs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
+ FileSystem executorOutputFs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
- if (!fs.exists(toPartitionPath)) {
- fs.mkdirs(toPartitionPath);
+ if (!executorOutputFs.exists(toPartitionPath)) {
+ executorOutputFs.mkdirs(toPartitionPath);
}
- FileUtil.copy(fs, sourceFilePath, fs, new Path(toPartitionPath, sourceFilePath.getName()), false,
- fs.getConf());
+ FileUtil.copy(
+ executorSourceFs,
+ sourceFilePath,
+ executorOutputFs,
+ new Path(toPartitionPath, sourceFilePath.getName()),
+ false,
+ executorOutputFs.getConf());
}, files.size());
// Also copy the .commit files
LOG.info(String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp));
- final FileSystem fileSystem = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
+ FileSystem outputFs = FSUtils.getFs(cfg.targetOutputPath, jsc.hadoopConfiguration());
FileStatus[] commitFilesToCopy =
- fileSystem.listStatus(new Path(cfg.sourceBasePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> {
+ sourceFs.listStatus(new Path(cfg.sourceBasePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> {
if (commitFilePath.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
return true;
} else {
@@ -244,20 +254,22 @@ public class HoodieSnapshotExporter {
for (FileStatus commitStatus : commitFilesToCopy) {
Path targetFilePath =
new Path(cfg.targetOutputPath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitStatus.getPath().getName());
- if (!fileSystem.exists(targetFilePath.getParent())) {
- fileSystem.mkdirs(targetFilePath.getParent());
+ if (!outputFs.exists(targetFilePath.getParent())) {
+ outputFs.mkdirs(targetFilePath.getParent());
}
- if (fileSystem.exists(targetFilePath)) {
+ if (outputFs.exists(targetFilePath)) {
LOG.error(
String.format("The target output commit file (%s targetBasePath) already exists.", targetFilePath));
}
- FileUtil.copy(fileSystem, commitStatus.getPath(), fileSystem, targetFilePath, false, fileSystem.getConf());
+ FileUtil.copy(sourceFs, commitStatus.getPath(), outputFs, targetFilePath, false, outputFs.getConf());
}
}
- private BaseFileOnlyView getBaseFileOnlyView(JavaSparkContext jsc, Config cfg) {
- FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
- HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(cfg.sourceBasePath).build();
+ private BaseFileOnlyView getBaseFileOnlyView(FileSystem sourceFs, Config cfg) {
+ HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder()
+ .setConf(sourceFs.getConf())
+ .setBasePath(cfg.sourceBasePath)
+ .build();
return new HoodieTableFileSystemView(tableMetadata, tableMetadata
.getActiveTimeline().getWriteTimeline().filterCompletedInstants());
}