You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/05/04 14:17:18 UTC

[hudi] branch master updated: [HUDI-4031] Avoid clustering update handling when no pending replacecommit (#5487)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1562bb658f [HUDI-4031] Avoid clustering update handling when no pending replacecommit (#5487)
1562bb658f is described below

commit 1562bb658f8f29f57763eaa6f9bd5a2ed7e80a7c
Author: Sagar Sumit <sa...@gmail.com>
AuthorDate: Wed May 4 19:47:11 2022 +0530

    [HUDI-4031] Avoid clustering update handling when no pending replacecommit (#5487)
---
 .../hudi/table/action/commit/BaseSparkCommitActionExecutor.java   | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index ade5508977..205da82ac1 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -112,10 +112,8 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
     }
   }
 
-  private HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords) {
+  private HoodieData<HoodieRecord<T>> clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords, Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
     context.setJobStatus(this.getClass().getSimpleName(), "Handling updates which are under clustering");
-    Set<HoodieFileGroupId> fileGroupsInPendingClustering =
-        table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
     UpdateStrategy<T, HoodieData<HoodieRecord<T>>> updateStrategy = (UpdateStrategy<T, HoodieData<HoodieRecord<T>>>) ReflectionUtils
         .loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering);
     Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> recordsAndPendingClusteringFileGroups =
@@ -166,7 +164,9 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
     }
 
     // handle records update with clustering
-    HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = clusteringHandleUpdate(inputRecords);
+    Set<HoodieFileGroupId> fileGroupsInPendingClustering =
+        table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
+    HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = fileGroupsInPendingClustering.isEmpty() ? inputRecords : clusteringHandleUpdate(inputRecords, fileGroupsInPendingClustering);
 
     context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data");
     HoodieData<WriteStatus> writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);