You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/01/13 15:31:01 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #4489: [HUDI-3135] Fix Delete partitions with metadata table and fix show partitions in spark sql

nsivabalan commented on a change in pull request #4489:
URL: https://github.com/apache/hudi/pull/4489#discussion_r780670509



##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java
##########
@@ -42,27 +46,49 @@
 public class SparkDeletePartitionCommitActionExecutor<T extends HoodieRecordPayload<T>>
     extends SparkInsertOverwriteCommitActionExecutor<T> {
 
-  private List<String> partitions;
+  private final List<String> partitions;
+  private final boolean purge;
   public SparkDeletePartitionCommitActionExecutor(HoodieEngineContext context,
                                                   HoodieWriteConfig config, HoodieTable table,
-                                                  String instantTime, List<String> partitions) {
-    super(context, config, table, instantTime,null, WriteOperationType.DELETE_PARTITION);
+                                                  String instantTime, List<String> partitions,
+                                                  boolean purge) {
+    super(context, config, table, instantTime, null, WriteOperationType.DELETE_PARTITION);
     this.partitions = partitions;
+    this.purge = purge;
   }
 
   @Override
   public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute() {
-    JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
-    HoodieTimer timer = new HoodieTimer().startTimer();
-    Map<String, List<String>> partitionToReplaceFileIds = jsc.parallelize(partitions, partitions.size()).distinct()
-        .mapToPair(partitionPath -> new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap();
-    HoodieWriteMetadata result = new HoodieWriteMetadata();
-    result.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
-    result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer()));
+    try {
+      JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(context);
+      HoodieTimer timer = new HoodieTimer().startTimer();
+      Map<String, List<String>> partitionToReplaceFileIds = jsc.parallelize(partitions, partitions.size()).distinct()
+          .mapToPair(partitionPath -> new Tuple2<>(partitionPath, getAllExistingFileIds(partitionPath))).collectAsMap();
+      HoodieWriteMetadata result = new HoodieWriteMetadata();
+      result.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
+      result.setIndexUpdateDuration(Duration.ofMillis(timer.endTimer()));
 
-    result.setWriteStatuses(jsc.emptyRDD());
-    this.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())), instantTime);
-    this.commitOnAutoCommit(result);
-    return result;
+      // delete partition's path
+      if (purge) {
+        deletePartitionsPath();
+      }
+
+      result.setWriteStatuses(jsc.emptyRDD());
+      this.saveWorkloadProfileMetadataToInflight(new WorkloadProfile(Pair.of(new HashMap<>(), new WorkloadStat())), instantTime);
+      this.commitOnAutoCommit(result);
+      return result;
+    } catch (Exception e) {
+      throw new HoodieDeletePartitionException("Failed to execute delete partitions for commit time " + instantTime, e);
+    }
+  }
+
+  private void deletePartitionsPath() throws IOException {
+    String basePath = table.getMetaClient().getBasePath();
+    for (String partition : partitions) {
+      Path fullPartitionPath = FSUtils.getPartitionPath(basePath, partition);
+      if (table.getMetaClient().getFs().exists(fullPartitionPath)) {
+        table.getMetaClient().getFs().delete(fullPartitionPath, true);

Review comment:
       yeah, I was also thinking about the same. from the current code, looks like it will be irresversible. Probably we should let cleaner clean up deleted partitions when it comes around cleaning up replaced files and delete partitions if all file groups have been cleaned up.
   Or we can delete partition paths during clean action execution, just before transitioning clean.inflight to clean.completed. 
   https://github.com/apache/hudi/blob/98ec21507932ee942ca643c88ba9049746d44059/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java#L212
   
   may be, I will sync up with Raymond on this and see how we can go about it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org