You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/04 03:32:55 UTC

[hudi] 45/45: [HUDI-3572] support DAY_ROLLING strategy in ClusteringPlanPartitionFilterMode (#4966)

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 78fe5c73a4acaf5c4b8ff69c3c95fbd7d4c2dbaf
Author: 苏承祥 <11...@qq.com>
AuthorDate: Tue Jan 3 15:31:01 2023 +0800

    [HUDI-3572] support DAY_ROLLING strategy in ClusteringPlanPartitionFilterMode (#4966)
    
    (cherry picked from commit 41bea2fec54ae6c2376f5c88bd5a524b60b74a11)
---
 .../cluster/ClusteringPlanPartitionFilter.java     | 23 +++++++++++++++++
 .../cluster/ClusteringPlanPartitionFilterMode.java |  3 ++-
 .../TestSparkClusteringPlanPartitionFilter.java    | 29 ++++++++++++++++++++++
 3 files changed, 54 insertions(+), 1 deletion(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java
index 3a889de753..ecc3706f67 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java
@@ -21,6 +21,9 @@ package org.apache.hudi.table.action.cluster;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieClusteringException;
 
+import org.joda.time.DateTime;
+
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -31,6 +34,11 @@ import java.util.stream.Stream;
  *  NONE: skip filter
  *  RECENT DAYS: output recent partition given skip num and days lookback config
  *  SELECTED_PARTITIONS: output partition falls in the [start, end] condition
+ *  DAY_ROLLING: Clustering all partitions once a day to avoid clustering data of all partitions each time.
+ *  sort partitions asc, choose which partition index % 24 = now_hour.
+ *  tips: If hoodie.clustering.inline=true, try to reach the limit of hoodie.clustering.inline.max.commits every hour.
+ *        If hoodie.clustering.async.enabled=true, try to reach the limit of hoodie.clustering.async.max.commits every hour.
+ *
  */
 public class ClusteringPlanPartitionFilter {
 
@@ -43,11 +51,26 @@ public class ClusteringPlanPartitionFilter {
         return recentDaysFilter(partitions, config);
       case SELECTED_PARTITIONS:
         return selectedPartitionsFilter(partitions, config);
+      case DAY_ROLLING:
+        return dayRollingFilter(partitions, config);
       default:
         throw new HoodieClusteringException("Unknown partition filter, filter mode: " + mode);
     }
   }
 
+  private static List<String> dayRollingFilter(List<String> partitions, HoodieWriteConfig config) {
+    int hour = DateTime.now().getHourOfDay();
+    int len = partitions.size();
+    List<String> selectPt = new ArrayList<>();
+    partitions.sort(String::compareTo);
+    for (int i = 0; i < len; i++) {
+      if (i % 24 == hour) {
+        selectPt.add(partitions.get(i));
+      }
+    }
+    return selectPt;
+  }
+
   private static List<String> recentDaysFilter(List<String> partitions, HoodieWriteConfig config) {
     int targetPartitionsForClustering = config.getTargetPartitionsForClustering();
     int skipPartitionsFromLatestForClustering = config.getSkipPartitionsFromLatestForClustering();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java
index fbaf79797f..261c1874cc 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java
@@ -24,5 +24,6 @@ package org.apache.hudi.table.action.cluster;
 public enum ClusteringPlanPartitionFilterMode {
   NONE,
   RECENT_DAYS,
-  SELECTED_PARTITIONS
+  SELECTED_PARTITIONS,
+  DAY_ROLLING
 }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java
index a68a9e3360..70643a327d 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java
@@ -26,6 +26,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
 import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
 
+import org.joda.time.DateTime;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mock;
@@ -104,4 +105,32 @@ public class TestSparkClusteringPlanPartitionFilter {
     assertEquals(1, list.size());
     assertSame("20211222", list.get(0));
   }
+
+  @Test
+  public void testDayRollingPartitionFilter() {
+    HoodieWriteConfig config = hoodieWriteConfigBuilder.withClusteringConfig(HoodieClusteringConfig.newBuilder()
+            .withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.DAY_ROLLING)
+            .build())
+        .build();
+    PartitionAwareClusteringPlanStrategy sg = new SparkSizeBasedClusteringPlanStrategy(table, context, config);
+    ArrayList<String> fakeTimeBasedPartitionsPath = new ArrayList<>();
+    for (int i = 0; i < 24; i++) {
+      fakeTimeBasedPartitionsPath.add("20220301" + (i >= 10 ? String.valueOf(i) : "0" + i));
+    }
+    List filterPartitions = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
+    assertEquals(1, filterPartitions.size());
+    assertEquals(fakeTimeBasedPartitionsPath.get(DateTime.now().getHourOfDay()), filterPartitions.get(0));
+    fakeTimeBasedPartitionsPath = new ArrayList<>();
+    for (int i = 0; i < 24; i++) {
+      fakeTimeBasedPartitionsPath.add("20220301" + (i >= 10 ? String.valueOf(i) : "0" + i));
+      fakeTimeBasedPartitionsPath.add("20220302" + (i >= 10 ? String.valueOf(i) : "0" + i));
+    }
+    filterPartitions = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
+    assertEquals(2, filterPartitions.size());
+
+    int hourOfDay = DateTime.now().getHourOfDay();
+    String suffix = hourOfDay >= 10 ? hourOfDay + "" : "0" + hourOfDay;
+    assertEquals("20220301" + suffix, filterPartitions.get(0));
+    assertEquals("20220302" + suffix, filterPartitions.get(1));
+  }
 }