You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/09/29 01:42:50 UTC
[hudi] 09/17: [HUDI-4848] Fixing repair deprecated partition tool (#6731)
This is an automated email from the ASF dual-hosted git repository.
yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 1abdb5787748aa2dde56f7c9d8d06f91ae2b5119
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Tue Sep 27 12:02:35 2022 -0700
[HUDI-4848] Fixing repair deprecated partition tool (#6731)
---
.../org/apache/hudi/cli/commands/SparkMain.java | 23 ++++++++++++++++------
1 file changed, 17 insertions(+), 6 deletions(-)
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index e43a5d037e..6649eaf766 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -59,6 +59,7 @@ import org.apache.hudi.utilities.HoodieCompactor;
import org.apache.hudi.utilities.deltastreamer.BootstrapExecutor;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -67,6 +68,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.StructType;
import java.io.IOException;
import java.util.ArrayList;
@@ -456,8 +458,15 @@ public class SparkMain {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build();
Map<String, String> propsMap = getPropsForRewrite(metaClient);
rewriteRecordsToNewPartition(basePath, newPartition, recordsToRewrite, metaClient, propsMap);
- // after re-writing, we can safely delete older data.
+ // after re-writing, we can safely delete older partition.
deleteOlderPartition(basePath, oldPartition, recordsToRewrite, propsMap);
+ // also, we can physically delete the old partition.
+ FileSystem fs = FSUtils.getFs(new Path(basePath), metaClient.getHadoopConf());
+ try {
+ fs.delete(new Path(basePath, oldPartition), true);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete older partition " + basePath);
+ }
}
return 0;
}
@@ -473,10 +482,14 @@ public class SparkMain {
}
private static void rewriteRecordsToNewPartition(String basePath, String newPartition, Dataset<Row> recordsToRewrite, HoodieTableMetaClient metaClient, Map<String, String> propsMap) {
- recordsToRewrite.withColumn(metaClient.getTableConfig().getPartitionFieldProp(), functions.lit(newPartition))
+ String partitionFieldProp = metaClient.getTableConfig().getPartitionFieldProp();
+ StructType structType = recordsToRewrite.schema();
+ int partitionIndex = structType.fieldIndex(partitionFieldProp);
+
+ recordsToRewrite.withColumn(metaClient.getTableConfig().getPartitionFieldProp(), functions.lit(null).cast(structType.apply(partitionIndex).dataType()))
.write()
.options(propsMap)
- .option("hoodie.datasource.write.operation", "insert")
+ .option("hoodie.datasource.write.operation", WriteOperationType.BULK_INSERT.value())
.format("hudi")
.mode("Append")
.save(basePath);
@@ -484,10 +497,8 @@ public class SparkMain {
private static Dataset<Row> getRecordsToRewrite(String basePath, String oldPartition, SQLContext sqlContext) {
return sqlContext.read()
- .option("hoodie.datasource.read.extract.partition.values.from.path", "false")
.format("hudi")
- .load(basePath)
- .filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " == '" + oldPartition + "'")
+ .load(basePath + "/" + oldPartition)
.drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)