You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/09/29 01:42:51 UTC

[hudi] 10/17: [HUDI-4913] Fix HoodieSnapshotExporter for writing to a different S3 bucket or FS (#6785)

This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit dcec30f14c9a696f4e7006fc8519f417ebbee71b
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Tue Sep 27 12:21:19 2022 -0700

    [HUDI-4913] Fix HoodieSnapshotExporter for writing to a different S3 bucket or FS (#6785)
---
 .../hudi/utilities/HoodieSnapshotExporter.java     | 64 +++++++++++++---------
 1 file changed, 38 insertions(+), 26 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
index 753765fb6a..187f66d073 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -114,16 +114,18 @@ public class HoodieSnapshotExporter {
   }
 
   public void export(JavaSparkContext jsc, Config cfg) throws IOException {
-    FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
+    FileSystem outputFs = FSUtils.getFs(cfg.targetOutputPath, jsc.hadoopConfiguration());
     HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
 
-    if (outputPathExists(fs, cfg)) {
+    if (outputPathExists(outputFs, cfg)) {
       throw new HoodieSnapshotExporterException("The target output path already exists.");
     }
 
-    final String latestCommitTimestamp = getLatestCommitTimestamp(fs, cfg).<HoodieSnapshotExporterException>orElseThrow(() -> {
-      throw new HoodieSnapshotExporterException("No commits present. Nothing to snapshot.");
-    });
+    FileSystem sourceFs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
+    final String latestCommitTimestamp = getLatestCommitTimestamp(sourceFs, cfg)
+        .<HoodieSnapshotExporterException>orElseThrow(() -> {
+          throw new HoodieSnapshotExporterException("No commits present. Nothing to snapshot.");
+        });
     LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.",
         latestCommitTimestamp));
 
@@ -134,11 +136,11 @@ public class HoodieSnapshotExporter {
     LOG.info(String.format("The job needs to export %d partitions.", partitions.size()));
 
     if (cfg.outputFormat.equals(OutputFormatValidator.HUDI)) {
-      exportAsHudi(jsc, cfg, partitions, latestCommitTimestamp);
+      exportAsHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp);
     } else {
-      exportAsNonHudi(jsc, cfg, partitions, latestCommitTimestamp);
+      exportAsNonHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp);
     }
-    createSuccessTag(fs, cfg);
+    createSuccessTag(outputFs, cfg);
   }
 
   private boolean outputPathExists(FileSystem fs, Config cfg) throws IOException {
@@ -164,7 +166,8 @@ public class HoodieSnapshotExporter {
     }
   }
 
-  private void exportAsNonHudi(JavaSparkContext jsc, Config cfg, List<String> partitions, String latestCommitTimestamp) {
+  private void exportAsNonHudi(JavaSparkContext jsc, FileSystem sourceFs,
+                               Config cfg, List<String> partitions, String latestCommitTimestamp) {
     Partitioner defaultPartitioner = dataset -> {
       Dataset<Row> hoodieDroppedDataset = dataset.drop(JavaConversions.asScalaIterator(HoodieRecord.HOODIE_META_COLUMNS.iterator()).toSeq());
       return StringUtils.isNullOrEmpty(cfg.outputPartitionField)
@@ -178,7 +181,7 @@ public class HoodieSnapshotExporter {
 
     HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
     context.setJobStatus(this.getClass().getSimpleName(), "Exporting as non-HUDI dataset: " + cfg.targetOutputPath);
-    final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg);
+    final BaseFileOnlyView fsView = getBaseFileOnlyView(sourceFs, cfg);
     Iterator<String> exportingFilePaths = jsc
         .parallelize(partitions, partitions.size())
         .flatMap(partition -> fsView
@@ -193,8 +196,9 @@ public class HoodieSnapshotExporter {
         .save(cfg.targetOutputPath);
   }
 
-  private void exportAsHudi(JavaSparkContext jsc, Config cfg, List<String> partitions, String latestCommitTimestamp) throws IOException {
-    final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg);
+  private void exportAsHudi(JavaSparkContext jsc, FileSystem sourceFs,
+                            Config cfg, List<String> partitions, String latestCommitTimestamp) throws IOException {
+    final BaseFileOnlyView fsView = getBaseFileOnlyView(sourceFs, cfg);
 
     final HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
     final SerializableConfiguration serConf = context.getHadoopConf();
@@ -219,20 +223,26 @@ public class HoodieSnapshotExporter {
       String partition = tuple._1();
       Path sourceFilePath = new Path(tuple._2());
       Path toPartitionPath = FSUtils.getPartitionPath(cfg.targetOutputPath, partition);
-      FileSystem fs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
+      FileSystem executorSourceFs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
+      FileSystem executorOutputFs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
 
-      if (!fs.exists(toPartitionPath)) {
-        fs.mkdirs(toPartitionPath);
+      if (!executorOutputFs.exists(toPartitionPath)) {
+        executorOutputFs.mkdirs(toPartitionPath);
       }
-      FileUtil.copy(fs, sourceFilePath, fs, new Path(toPartitionPath, sourceFilePath.getName()), false,
-          fs.getConf());
+      FileUtil.copy(
+          executorSourceFs,
+          sourceFilePath,
+          executorOutputFs,
+          new Path(toPartitionPath, sourceFilePath.getName()),
+          false,
+          executorOutputFs.getConf());
     }, files.size());
 
     // Also copy the .commit files
     LOG.info(String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp));
-    final FileSystem fileSystem = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
+    FileSystem outputFs = FSUtils.getFs(cfg.targetOutputPath, jsc.hadoopConfiguration());
     FileStatus[] commitFilesToCopy =
-        fileSystem.listStatus(new Path(cfg.sourceBasePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> {
+        sourceFs.listStatus(new Path(cfg.sourceBasePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> {
           if (commitFilePath.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
             return true;
           } else {
@@ -244,20 +254,22 @@ public class HoodieSnapshotExporter {
     for (FileStatus commitStatus : commitFilesToCopy) {
       Path targetFilePath =
           new Path(cfg.targetOutputPath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitStatus.getPath().getName());
-      if (!fileSystem.exists(targetFilePath.getParent())) {
-        fileSystem.mkdirs(targetFilePath.getParent());
+      if (!outputFs.exists(targetFilePath.getParent())) {
+        outputFs.mkdirs(targetFilePath.getParent());
       }
-      if (fileSystem.exists(targetFilePath)) {
+      if (outputFs.exists(targetFilePath)) {
         LOG.error(
             String.format("The target output commit file (%s targetBasePath) already exists.", targetFilePath));
       }
-      FileUtil.copy(fileSystem, commitStatus.getPath(), fileSystem, targetFilePath, false, fileSystem.getConf());
+      FileUtil.copy(sourceFs, commitStatus.getPath(), outputFs, targetFilePath, false, outputFs.getConf());
     }
   }
 
-  private BaseFileOnlyView getBaseFileOnlyView(JavaSparkContext jsc, Config cfg) {
-    FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
-    HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(cfg.sourceBasePath).build();
+  private BaseFileOnlyView getBaseFileOnlyView(FileSystem sourceFs, Config cfg) {
+    HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder()
+        .setConf(sourceFs.getConf())
+        .setBasePath(cfg.sourceBasePath)
+        .build();
     return new HoodieTableFileSystemView(tableMetadata, tableMetadata
         .getActiveTimeline().getWriteTimeline().filterCompletedInstants());
   }