You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/09/29 01:42:50 UTC

[hudi] 09/17: [HUDI-4848] Fixing repair deprecated partition tool (#6731)

This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 1abdb5787748aa2dde56f7c9d8d06f91ae2b5119
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Tue Sep 27 12:02:35 2022 -0700

    [HUDI-4848] Fixing repair deprecated partition tool (#6731)
---
 .../org/apache/hudi/cli/commands/SparkMain.java    | 23 ++++++++++++++++------
 1 file changed, 17 insertions(+), 6 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index e43a5d037e..6649eaf766 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -59,6 +59,7 @@ import org.apache.hudi.utilities.HoodieCompactor;
 import org.apache.hudi.utilities.deltastreamer.BootstrapExecutor;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -67,6 +68,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.StructType;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -456,8 +458,15 @@ public class SparkMain {
       HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build();
       Map<String, String> propsMap = getPropsForRewrite(metaClient);
       rewriteRecordsToNewPartition(basePath, newPartition, recordsToRewrite, metaClient, propsMap);
-      // after re-writing, we can safely delete older data.
+      // after re-writing, we can safely delete older partition.
       deleteOlderPartition(basePath, oldPartition, recordsToRewrite, propsMap);
+      // also, we can physically delete the old partition.
+      FileSystem fs = FSUtils.getFs(new Path(basePath), metaClient.getHadoopConf());
+      try {
+        fs.delete(new Path(basePath, oldPartition), true);
+      } catch (IOException e) {
+        LOG.warn("Failed to delete older partition " + basePath);
+      }
     }
     return 0;
   }
@@ -473,10 +482,14 @@ public class SparkMain {
   }
 
   private static void rewriteRecordsToNewPartition(String basePath, String newPartition, Dataset<Row> recordsToRewrite, HoodieTableMetaClient metaClient, Map<String, String> propsMap) {
-    recordsToRewrite.withColumn(metaClient.getTableConfig().getPartitionFieldProp(), functions.lit(newPartition))
+    String partitionFieldProp = metaClient.getTableConfig().getPartitionFieldProp();
+    StructType structType = recordsToRewrite.schema();
+    int partitionIndex = structType.fieldIndex(partitionFieldProp);
+
+    recordsToRewrite.withColumn(metaClient.getTableConfig().getPartitionFieldProp(), functions.lit(null).cast(structType.apply(partitionIndex).dataType()))
         .write()
         .options(propsMap)
-        .option("hoodie.datasource.write.operation", "insert")
+        .option("hoodie.datasource.write.operation", WriteOperationType.BULK_INSERT.value())
         .format("hudi")
         .mode("Append")
         .save(basePath);
@@ -484,10 +497,8 @@ public class SparkMain {
 
   private static Dataset<Row> getRecordsToRewrite(String basePath, String oldPartition, SQLContext sqlContext) {
     return sqlContext.read()
-        .option("hoodie.datasource.read.extract.partition.values.from.path", "false")
         .format("hudi")
-        .load(basePath)
-        .filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " == '" + oldPartition + "'")
+        .load(basePath + "/" + oldPartition)
         .drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
         .drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
         .drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)