You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/09/17 17:08:30 UTC

[hudi] branch master updated: [HUDI-4842] Support compaction strategy based on delta log file num (#6670)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cd2ea2a10b [HUDI-4842] Support compaction strategy based on delta log file num (#6670)
cd2ea2a10b is described below

commit cd2ea2a10b5b1f4e44a5fc844198c25d768fb2ca
Author: 苏承祥 <sc...@aliyun.com>
AuthorDate: Sun Sep 18 01:08:19 2022 +0800

    [HUDI-4842] Support compaction strategy based on delta log file num (#6670)
    
    Co-authored-by: 苏承祥 <su...@tuya.com>
---
 .../apache/hudi/config/HoodieCompactionConfig.java | 11 +++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 12 +++--
 .../LogFileNumBasedCompactionStrategy.java         | 49 ++++++++++++++++++++
 .../strategy/TestHoodieCompactionStrategy.java     | 54 ++++++++++++++++++----
 4 files changed, 113 insertions(+), 13 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index d1d0e67261..3b61d49727 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -106,6 +106,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
       .withDocumentation("Only if the log file size is greater than the threshold in bytes,"
           + " the file group will be compacted.");
 
+  public static final ConfigProperty<Long> COMPACTION_LOG_FILE_NUM_THRESHOLD = ConfigProperty
+      .key("hoodie.compaction.logfile.num.threshold")
+      .defaultValue(0L)
+      .withDocumentation("Only if the log file num is greater than the threshold,"
+          + " the file group will be compacted.");
+
   public static final ConfigProperty<String> COMPACTION_STRATEGY = ConfigProperty
       .key("hoodie.compaction.strategy")
       .defaultValue(LogFileSizeBasedCompactionStrategy.class.getName())
@@ -381,6 +387,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withCompactionLogFileNumThreshold(int logFileNumThreshold) {
+      compactionConfig.setValue(COMPACTION_LOG_FILE_NUM_THRESHOLD, String.valueOf(logFileNumThreshold));
+      return this;
+    }
+
     public Builder withPreserveCommitMetadata(boolean preserveCommitMetadata) {
       compactionConfig.setValue(PRESERVE_COMMIT_METADATA, String.valueOf(preserveCommitMetadata));
       return this;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 90a468368f..78c8d67752 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -199,10 +199,10 @@ public class HoodieWriteConfig extends HoodieConfig {
           + "before writing records to the table.");
 
   public static final ConfigProperty<String> BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS = ConfigProperty
-          .key("hoodie.bulkinsert.user.defined.partitioner.sort.columns")
-          .noDefaultValue()
-          .withDocumentation("Columns to sort the data by when use org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner as user defined partitioner during bulk_insert. "
-                  + "For example 'column1,column2'");
+      .key("hoodie.bulkinsert.user.defined.partitioner.sort.columns")
+      .noDefaultValue()
+      .withDocumentation("Columns to sort the data by when use org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner as user defined partitioner during bulk_insert. "
+          + "For example 'column1,column2'");
 
   public static final ConfigProperty<String> BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME = ConfigProperty
       .key("hoodie.bulkinsert.user.defined.partitioner.class")
@@ -1275,6 +1275,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getLong(HoodieCompactionConfig.COMPACTION_LOG_FILE_SIZE_THRESHOLD);
   }
 
+  public Long getCompactionLogFileNumThreshold() {
+    return getLong(HoodieCompactionConfig.COMPACTION_LOG_FILE_NUM_THRESHOLD);
+  }
+
   public Boolean getCompactionLazyBlockReadEnabled() {
     return getBoolean(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE);
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileNumBasedCompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileNumBasedCompactionStrategy.java
new file mode 100644
index 0000000000..6f79b684d0
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileNumBasedCompactionStrategy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact.strategy;
+
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * LogFileLengthBasedCompactionStrategy orders the compactions based on the total log files num,
+ * filters the file group which log files length is greater than the threshold and limits the compactions within a configured IO bound.
+ */
+public class LogFileNumBasedCompactionStrategy extends BoundedIOCompactionStrategy
+    implements Comparator<HoodieCompactionOperation> {
+
+  @Override
+  public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig, List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) {
+    Long numThreshold = writeConfig.getCompactionLogFileNumThreshold();
+    List<HoodieCompactionOperation> filterOperator = operations.stream()
+        .filter(e -> e.getDeltaFilePaths().size() >= numThreshold)
+        .sorted(this).collect(Collectors.toList());
+    return super.orderAndFilter(writeConfig, filterOperator, pendingCompactionPlans);
+  }
+
+  @Override
+  public int compare(HoodieCompactionOperation hco1, HoodieCompactionOperation hco2) {
+    return hco2.getDeltaFilePaths().size() - hco1.getDeltaFilePaths().size();
+  }
+}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
index 0c7190092e..319d6ea031 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
@@ -49,8 +49,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class TestHoodieCompactionStrategy {
 
   private static final long MB = 1024 * 1024L;
-  private String[] partitionPaths = {"2017/01/01", "2017/01/02", "2017/01/03"};
   private static final Random RANDOM = new Random();
+  private String[] partitionPaths = {"2017/01/01", "2017/01/02", "2017/01/03"};
 
   @Test
   public void testUnBounded() {
@@ -76,7 +76,7 @@ public class TestHoodieCompactionStrategy {
     sizesMap.put(90 * MB, Collections.singletonList(1024 * MB));
     BoundedIOCompactionStrategy strategy = new BoundedIOCompactionStrategy();
     HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
-        HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build())
+            HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build())
         .build();
     List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
     List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
@@ -99,8 +99,8 @@ public class TestHoodieCompactionStrategy {
     sizesMap.put(90 * MB, Collections.singletonList(1024 * MB));
     LogFileSizeBasedCompactionStrategy strategy = new LogFileSizeBasedCompactionStrategy();
     HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
-        HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(1205)
-            .withLogFileSizeThresholdBasedCompaction(100 * 1024 * 1024).build())
+            HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(1205)
+                .withLogFileSizeThresholdBasedCompaction(100 * 1024 * 1024).build())
         .build();
     List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
     List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
@@ -123,7 +123,7 @@ public class TestHoodieCompactionStrategy {
     sizesMap.put(100 * MB, Collections.singletonList(MB));
     sizesMap.put(90 * MB, Collections.singletonList(1024 * MB));
 
-    Map<Long, String> keyToPartitionMap = Collections.unmodifiableMap(new HashMap<Long,String>() {
+    Map<Long, String> keyToPartitionMap = Collections.unmodifiableMap(new HashMap<Long, String>() {
       {
         put(120 * MB, partitionPaths[2]);
         put(110 * MB, partitionPaths[2]);
@@ -169,7 +169,7 @@ public class TestHoodieCompactionStrategy {
     String currentDayPlus1 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(1));
     String currentDayPlus5 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(5));
 
-    Map<Long, String> keyToPartitionMap = Collections.unmodifiableMap(new HashMap<Long,String>() {
+    Map<Long, String> keyToPartitionMap = Collections.unmodifiableMap(new HashMap<Long, String>() {
       {
         put(120 * MB, currentDay);
         put(110 * MB, currentDayMinus1);
@@ -218,7 +218,7 @@ public class TestHoodieCompactionStrategy {
     String currentDayPlus1 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(1));
     String currentDayPlus5 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(5));
 
-    Map<Long, String> keyToPartitionMap = Collections.unmodifiableMap(new HashMap<Long,String>() {
+    Map<Long, String> keyToPartitionMap = Collections.unmodifiableMap(new HashMap<Long, String>() {
       {
         put(120 * MB, currentDay);
         put(110 * MB, currentDayMinus1);
@@ -243,8 +243,44 @@ public class TestHoodieCompactionStrategy {
         "BoundedPartitionAwareCompactionStrategy should have resulted in 1 compaction");
   }
 
+  @Test
+  public void testLogFileLengthBasedCompactionStrategy() {
+    Map<Long, List<Long>> sizesMap = new HashMap<>();
+    sizesMap.put(120 * MB, Arrays.asList(60 * MB, 10 * MB, 80 * MB));
+    sizesMap.put(110 * MB, new ArrayList<>());
+    sizesMap.put(100 * MB, Collections.singletonList(2048 * MB));
+    sizesMap.put(90 * MB, Arrays.asList(512 * MB, 512 * MB));
+    LogFileNumBasedCompactionStrategy strategy = new LogFileNumBasedCompactionStrategy();
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
+            HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(1024)
+                .withCompactionLogFileNumThreshold(2).build())
+        .build();
+    List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
+    List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
+
+    assertTrue(returned.size() < operations.size(),
+        "LogFileLengthBasedCompactionStrategy should have resulted in fewer compactions");
+    assertEquals(2, returned.size(), "LogFileLengthBasedCompactionStrategy should have resulted in 2 compaction");
+
+    // Delte log File length
+    Integer allFileLength = returned.stream().map(s -> s.getDeltaFilePaths().size())
+        .reduce(Integer::sum).orElse(0);
+
+    assertEquals(5, allFileLength);
+    assertEquals(3, returned.get(0).getDeltaFilePaths().size());
+    assertEquals(2, returned.get(1).getDeltaFilePaths().size());
+    // Total size of all the log files
+    Long returnedSize = returned.stream().map(s -> s.getMetrics().get(BoundedIOCompactionStrategy.TOTAL_IO_MB))
+        .map(Double::longValue).reduce(Long::sum).orElse(0L);
+    // TOTAL_IO_MB: ( 120 + 90 ) * 2 + 521 + 521 + 60 + 10 + 80
+    assertEquals(1594, (long) returnedSize,
+        "Should chose the first 2 compactions which should result in a total IO of 1594 MB");
+
+
+  }
+
   private List<HoodieCompactionOperation> createCompactionOperations(HoodieWriteConfig config,
-      Map<Long, List<Long>> sizesMap) {
+                                                                     Map<Long, List<Long>> sizesMap) {
     Map<Long, String> keyToPartitionMap = sizesMap.keySet().stream()
         .map(e -> Pair.of(e, partitionPaths[RANDOM.nextInt(partitionPaths.length - 1)]))
         .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
@@ -252,7 +288,7 @@ public class TestHoodieCompactionStrategy {
   }
 
   private List<HoodieCompactionOperation> createCompactionOperations(HoodieWriteConfig config,
-      Map<Long, List<Long>> sizesMap, Map<Long, String> keyToPartitionMap) {
+                                                                     Map<Long, List<Long>> sizesMap, Map<Long, String> keyToPartitionMap) {
     List<HoodieCompactionOperation> operations = new ArrayList<>(sizesMap.size());
 
     sizesMap.forEach((k, v) -> {