You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/04 03:32:55 UTC
[hudi] 45/45: [HUDI-3572] support DAY_ROLLING strategy in ClusteringPlanPartitionFilterMode (#4966)
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 78fe5c73a4acaf5c4b8ff69c3c95fbd7d4c2dbaf
Author: 苏承祥 <11...@qq.com>
AuthorDate: Tue Jan 3 15:31:01 2023 +0800
[HUDI-3572] support DAY_ROLLING strategy in ClusteringPlanPartitionFilterMode (#4966)
(cherry picked from commit 41bea2fec54ae6c2376f5c88bd5a524b60b74a11)
---
.../cluster/ClusteringPlanPartitionFilter.java | 23 +++++++++++++++++
.../cluster/ClusteringPlanPartitionFilterMode.java | 3 ++-
.../TestSparkClusteringPlanPartitionFilter.java | 29 ++++++++++++++++++++++
3 files changed, 54 insertions(+), 1 deletion(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java
index 3a889de753..ecc3706f67 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilter.java
@@ -21,6 +21,9 @@ package org.apache.hudi.table.action.cluster;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
+import org.joda.time.DateTime;
+
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
@@ -31,6 +34,11 @@ import java.util.stream.Stream;
* NONE: skip filter
* RECENT DAYS: output recent partition given skip num and days lookback config
* SELECTED_PARTITIONS: output partition falls in the [start, end] condition
+ * DAY_ROLLING: Clustering all partitions once a day to avoid clustering data of all partitions each time.
+ * sort partitions asc, choose which partition index % 24 = now_hour.
+ * tips: If hoodie.clustering.inline=true, try to reach the limit of hoodie.clustering.inline.max.commits every hour.
+ * If hoodie.clustering.async.enabled=true, try to reach the limit of hoodie.clustering.async.max.commits every hour.
+ *
*/
public class ClusteringPlanPartitionFilter {
@@ -43,11 +51,26 @@ public class ClusteringPlanPartitionFilter {
return recentDaysFilter(partitions, config);
case SELECTED_PARTITIONS:
return selectedPartitionsFilter(partitions, config);
+ case DAY_ROLLING:
+ return dayRollingFilter(partitions, config);
default:
throw new HoodieClusteringException("Unknown partition filter, filter mode: " + mode);
}
}
+ private static List<String> dayRollingFilter(List<String> partitions, HoodieWriteConfig config) {
+ int hour = DateTime.now().getHourOfDay();
+ int len = partitions.size();
+ List<String> selectPt = new ArrayList<>();
+ partitions.sort(String::compareTo);
+ for (int i = 0; i < len; i++) {
+ if (i % 24 == hour) {
+ selectPt.add(partitions.get(i));
+ }
+ }
+ return selectPt;
+ }
+
private static List<String> recentDaysFilter(List<String> partitions, HoodieWriteConfig config) {
int targetPartitionsForClustering = config.getTargetPartitionsForClustering();
int skipPartitionsFromLatestForClustering = config.getSkipPartitionsFromLatestForClustering();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java
index fbaf79797f..261c1874cc 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanPartitionFilterMode.java
@@ -24,5 +24,6 @@ package org.apache.hudi.table.action.cluster;
public enum ClusteringPlanPartitionFilterMode {
NONE,
RECENT_DAYS,
- SELECTED_PARTITIONS
+ SELECTED_PARTITIONS,
+ DAY_ROLLING
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java
index a68a9e3360..70643a327d 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/cluster/strategy/TestSparkClusteringPlanPartitionFilter.java
@@ -26,6 +26,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
+import org.joda.time.DateTime;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
@@ -104,4 +105,32 @@ public class TestSparkClusteringPlanPartitionFilter {
assertEquals(1, list.size());
assertSame("20211222", list.get(0));
}
+
+ @Test
+ public void testDayRollingPartitionFilter() {
+ HoodieWriteConfig config = hoodieWriteConfigBuilder.withClusteringConfig(HoodieClusteringConfig.newBuilder()
+ .withClusteringPlanPartitionFilterMode(ClusteringPlanPartitionFilterMode.DAY_ROLLING)
+ .build())
+ .build();
+ PartitionAwareClusteringPlanStrategy sg = new SparkSizeBasedClusteringPlanStrategy(table, context, config);
+ ArrayList<String> fakeTimeBasedPartitionsPath = new ArrayList<>();
+ for (int i = 0; i < 24; i++) {
+ fakeTimeBasedPartitionsPath.add("20220301" + (i >= 10 ? String.valueOf(i) : "0" + i));
+ }
+ List filterPartitions = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
+ assertEquals(1, filterPartitions.size());
+ assertEquals(fakeTimeBasedPartitionsPath.get(DateTime.now().getHourOfDay()), filterPartitions.get(0));
+ fakeTimeBasedPartitionsPath = new ArrayList<>();
+ for (int i = 0; i < 24; i++) {
+ fakeTimeBasedPartitionsPath.add("20220301" + (i >= 10 ? String.valueOf(i) : "0" + i));
+ fakeTimeBasedPartitionsPath.add("20220302" + (i >= 10 ? String.valueOf(i) : "0" + i));
+ }
+ filterPartitions = sg.filterPartitionPaths(fakeTimeBasedPartitionsPath);
+ assertEquals(2, filterPartitions.size());
+
+ int hourOfDay = DateTime.now().getHourOfDay();
+ String suffix = hourOfDay >= 10 ? hourOfDay + "" : "0" + hourOfDay;
+ assertEquals("20220301" + suffix, filterPartitions.get(0));
+ assertEquals("20220302" + suffix, filterPartitions.get(1));
+ }
}