You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "TengHuo (via GitHub)" <gi...@apache.org> on 2023/03/02 03:39:00 UTC

[GitHub] [hudi] TengHuo commented on a diff in pull request #6991: [HUDI-5049] HoodieCatalog supports the implementation of dropPartition

TengHuo commented on code in PR #6991:
URL: https://github.com/apache/hudi/pull/6991#discussion_r1122565966


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java:
##########
@@ -394,7 +408,40 @@ public void createPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPa
   @Override
   public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec, boolean ignoreIfNotExists)
       throws PartitionNotExistException, CatalogException {
-    throw new UnsupportedOperationException("dropPartition is not implemented.");
+    if (!tableExists(tablePath)) {
+      if (ignoreIfNotExists) {
+        return;
+      } else {
+        throw new PartitionNotExistException(getName(), tablePath, catalogPartitionSpec);
+      }
+    }
+
+    String tablePathStr = inferTablePath(catalogPathStr, tablePath);
+    Map<String, String> options = TableOptionProperties.loadFromProperties(tablePathStr, hadoopConf);
+    boolean hiveStylePartitioning = Boolean.parseBoolean(options.getOrDefault(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "false"));
+    String partitionPathStr = HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning, catalogPartitionSpec);
+
+    if (!StreamerUtil.partitionExists(tablePathStr, partitionPathStr, hadoopConf)) {
+      if (ignoreIfNotExists) {
+        return;
+      } else {
+        throw new PartitionNotExistException(getName(), tablePath, catalogPartitionSpec);
+      }
+    }
+
+    // enable auto-commit though ~
+    options.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
+    try (HoodieFlinkWriteClient<?> writeClient = createWriteClient(options, tablePathStr, tablePath)) {
+      writeClient.deletePartitions(Collections.singletonList(partitionPathStr), HoodieActiveTimeline.createNewInstantTime())
+          .forEach(writeStatus -> {
+            if (writeStatus.hasErrors()) {
+              throw new HoodieMetadataException(String.format("Failed to commit metadata table records at file id %s.", writeStatus.getFileId()));
+            }
+          });
+      fs.delete(new Path(tablePathStr, partitionPathStr), true);

Review Comment:
   Hi
   
   May I ask why we need to do `fs.delete` here? Will it cause any problem?
   
   I refer the code in `HoodieSparkSqlWriter.scala` and `SparkDeletePartitionCommitActionExecutor.java`.
   There is no path deletion operation in spark side. 
   
   https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala#L270
   
   https://github.com/apache/hudi/blob/master/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkDeletePartitionCommitActionExecutor.java#L62



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org