You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/09/17 17:08:30 UTC
[hudi] branch master updated: [HUDI-4842] Support compaction strategy based on delta log file num (#6670)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cd2ea2a10b [HUDI-4842] Support compaction strategy based on delta log file num (#6670)
cd2ea2a10b is described below
commit cd2ea2a10b5b1f4e44a5fc844198c25d768fb2ca
Author: 苏承祥 <sc...@aliyun.com>
AuthorDate: Sun Sep 18 01:08:19 2022 +0800
[HUDI-4842] Support compaction strategy based on delta log file num (#6670)
Co-authored-by: 苏承祥 <su...@tuya.com>
---
.../apache/hudi/config/HoodieCompactionConfig.java | 11 +++++
.../org/apache/hudi/config/HoodieWriteConfig.java | 12 +++--
.../LogFileNumBasedCompactionStrategy.java | 49 ++++++++++++++++++++
.../strategy/TestHoodieCompactionStrategy.java | 54 ++++++++++++++++++----
4 files changed, 113 insertions(+), 13 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index d1d0e67261..3b61d49727 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -106,6 +106,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
.withDocumentation("Only if the log file size is greater than the threshold in bytes,"
+ " the file group will be compacted.");
+ public static final ConfigProperty<Long> COMPACTION_LOG_FILE_NUM_THRESHOLD = ConfigProperty
+ .key("hoodie.compaction.logfile.num.threshold")
+ .defaultValue(0L)
+ .withDocumentation("Only if the log file num is greater than the threshold,"
+ + " the file group will be compacted.");
+
public static final ConfigProperty<String> COMPACTION_STRATEGY = ConfigProperty
.key("hoodie.compaction.strategy")
.defaultValue(LogFileSizeBasedCompactionStrategy.class.getName())
@@ -381,6 +387,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
return this;
}
+ public Builder withCompactionLogFileNumThreshold(int logFileNumThreshold) {
+ compactionConfig.setValue(COMPACTION_LOG_FILE_NUM_THRESHOLD, String.valueOf(logFileNumThreshold));
+ return this;
+ }
+
public Builder withPreserveCommitMetadata(boolean preserveCommitMetadata) {
compactionConfig.setValue(PRESERVE_COMMIT_METADATA, String.valueOf(preserveCommitMetadata));
return this;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 90a468368f..78c8d67752 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -199,10 +199,10 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "before writing records to the table.");
public static final ConfigProperty<String> BULKINSERT_USER_DEFINED_PARTITIONER_SORT_COLUMNS = ConfigProperty
- .key("hoodie.bulkinsert.user.defined.partitioner.sort.columns")
- .noDefaultValue()
- .withDocumentation("Columns to sort the data by when use org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner as user defined partitioner during bulk_insert. "
- + "For example 'column1,column2'");
+ .key("hoodie.bulkinsert.user.defined.partitioner.sort.columns")
+ .noDefaultValue()
+ .withDocumentation("Columns to sort the data by when use org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner as user defined partitioner during bulk_insert. "
+ + "For example 'column1,column2'");
public static final ConfigProperty<String> BULKINSERT_USER_DEFINED_PARTITIONER_CLASS_NAME = ConfigProperty
.key("hoodie.bulkinsert.user.defined.partitioner.class")
@@ -1275,6 +1275,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getLong(HoodieCompactionConfig.COMPACTION_LOG_FILE_SIZE_THRESHOLD);
}
+ public Long getCompactionLogFileNumThreshold() {
+ return getLong(HoodieCompactionConfig.COMPACTION_LOG_FILE_NUM_THRESHOLD);
+ }
+
public Boolean getCompactionLazyBlockReadEnabled() {
return getBoolean(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLE);
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileNumBasedCompactionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileNumBasedCompactionStrategy.java
new file mode 100644
index 0000000000..6f79b684d0
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/strategy/LogFileNumBasedCompactionStrategy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.compact.strategy;
+
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * LogFileLengthBasedCompactionStrategy orders the compactions based on the total log files num,
+ * filters the file group which log files length is greater than the threshold and limits the compactions within a configured IO bound.
+ */
+public class LogFileNumBasedCompactionStrategy extends BoundedIOCompactionStrategy
+ implements Comparator<HoodieCompactionOperation> {
+
+ @Override
+ public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig, List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) {
+ Long numThreshold = writeConfig.getCompactionLogFileNumThreshold();
+ List<HoodieCompactionOperation> filterOperator = operations.stream()
+ .filter(e -> e.getDeltaFilePaths().size() >= numThreshold)
+ .sorted(this).collect(Collectors.toList());
+ return super.orderAndFilter(writeConfig, filterOperator, pendingCompactionPlans);
+ }
+
+ @Override
+ public int compare(HoodieCompactionOperation hco1, HoodieCompactionOperation hco2) {
+ return hco2.getDeltaFilePaths().size() - hco1.getDeltaFilePaths().size();
+ }
+}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
index 0c7190092e..319d6ea031 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/strategy/TestHoodieCompactionStrategy.java
@@ -49,8 +49,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieCompactionStrategy {
private static final long MB = 1024 * 1024L;
- private String[] partitionPaths = {"2017/01/01", "2017/01/02", "2017/01/03"};
private static final Random RANDOM = new Random();
+ private String[] partitionPaths = {"2017/01/01", "2017/01/02", "2017/01/03"};
@Test
public void testUnBounded() {
@@ -76,7 +76,7 @@ public class TestHoodieCompactionStrategy {
sizesMap.put(90 * MB, Collections.singletonList(1024 * MB));
BoundedIOCompactionStrategy strategy = new BoundedIOCompactionStrategy();
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
- HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build())
+ HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(400).build())
.build();
List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
@@ -99,8 +99,8 @@ public class TestHoodieCompactionStrategy {
sizesMap.put(90 * MB, Collections.singletonList(1024 * MB));
LogFileSizeBasedCompactionStrategy strategy = new LogFileSizeBasedCompactionStrategy();
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
- HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(1205)
- .withLogFileSizeThresholdBasedCompaction(100 * 1024 * 1024).build())
+ HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(1205)
+ .withLogFileSizeThresholdBasedCompaction(100 * 1024 * 1024).build())
.build();
List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
@@ -123,7 +123,7 @@ public class TestHoodieCompactionStrategy {
sizesMap.put(100 * MB, Collections.singletonList(MB));
sizesMap.put(90 * MB, Collections.singletonList(1024 * MB));
- Map<Long, String> keyToPartitionMap = Collections.unmodifiableMap(new HashMap<Long,String>() {
+ Map<Long, String> keyToPartitionMap = Collections.unmodifiableMap(new HashMap<Long, String>() {
{
put(120 * MB, partitionPaths[2]);
put(110 * MB, partitionPaths[2]);
@@ -169,7 +169,7 @@ public class TestHoodieCompactionStrategy {
String currentDayPlus1 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(1));
String currentDayPlus5 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(5));
- Map<Long, String> keyToPartitionMap = Collections.unmodifiableMap(new HashMap<Long,String>() {
+ Map<Long, String> keyToPartitionMap = Collections.unmodifiableMap(new HashMap<Long, String>() {
{
put(120 * MB, currentDay);
put(110 * MB, currentDayMinus1);
@@ -218,7 +218,7 @@ public class TestHoodieCompactionStrategy {
String currentDayPlus1 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(1));
String currentDayPlus5 = format.format(BoundedPartitionAwareCompactionStrategy.getDateAtOffsetFromToday(5));
- Map<Long, String> keyToPartitionMap = Collections.unmodifiableMap(new HashMap<Long,String>() {
+ Map<Long, String> keyToPartitionMap = Collections.unmodifiableMap(new HashMap<Long, String>() {
{
put(120 * MB, currentDay);
put(110 * MB, currentDayMinus1);
@@ -243,8 +243,44 @@ public class TestHoodieCompactionStrategy {
"BoundedPartitionAwareCompactionStrategy should have resulted in 1 compaction");
}
+ @Test
+ public void testLogFileLengthBasedCompactionStrategy() {
+ Map<Long, List<Long>> sizesMap = new HashMap<>();
+ sizesMap.put(120 * MB, Arrays.asList(60 * MB, 10 * MB, 80 * MB));
+ sizesMap.put(110 * MB, new ArrayList<>());
+ sizesMap.put(100 * MB, Collections.singletonList(2048 * MB));
+ sizesMap.put(90 * MB, Arrays.asList(512 * MB, 512 * MB));
+ LogFileNumBasedCompactionStrategy strategy = new LogFileNumBasedCompactionStrategy();
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp").withCompactionConfig(
+ HoodieCompactionConfig.newBuilder().withCompactionStrategy(strategy).withTargetIOPerCompactionInMB(1024)
+ .withCompactionLogFileNumThreshold(2).build())
+ .build();
+ List<HoodieCompactionOperation> operations = createCompactionOperations(writeConfig, sizesMap);
+ List<HoodieCompactionOperation> returned = strategy.orderAndFilter(writeConfig, operations, new ArrayList<>());
+
+ assertTrue(returned.size() < operations.size(),
+ "LogFileLengthBasedCompactionStrategy should have resulted in fewer compactions");
+ assertEquals(2, returned.size(), "LogFileLengthBasedCompactionStrategy should have resulted in 2 compaction");
+
+ // Delte log File length
+ Integer allFileLength = returned.stream().map(s -> s.getDeltaFilePaths().size())
+ .reduce(Integer::sum).orElse(0);
+
+ assertEquals(5, allFileLength);
+ assertEquals(3, returned.get(0).getDeltaFilePaths().size());
+ assertEquals(2, returned.get(1).getDeltaFilePaths().size());
+ // Total size of all the log files
+ Long returnedSize = returned.stream().map(s -> s.getMetrics().get(BoundedIOCompactionStrategy.TOTAL_IO_MB))
+ .map(Double::longValue).reduce(Long::sum).orElse(0L);
+ // TOTAL_IO_MB: ( 120 + 90 ) * 2 + 521 + 521 + 60 + 10 + 80
+ assertEquals(1594, (long) returnedSize,
+ "Should chose the first 2 compactions which should result in a total IO of 1594 MB");
+
+
+ }
+
private List<HoodieCompactionOperation> createCompactionOperations(HoodieWriteConfig config,
- Map<Long, List<Long>> sizesMap) {
+ Map<Long, List<Long>> sizesMap) {
Map<Long, String> keyToPartitionMap = sizesMap.keySet().stream()
.map(e -> Pair.of(e, partitionPaths[RANDOM.nextInt(partitionPaths.length - 1)]))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
@@ -252,7 +288,7 @@ public class TestHoodieCompactionStrategy {
}
private List<HoodieCompactionOperation> createCompactionOperations(HoodieWriteConfig config,
- Map<Long, List<Long>> sizesMap, Map<Long, String> keyToPartitionMap) {
+ Map<Long, List<Long>> sizesMap, Map<Long, String> keyToPartitionMap) {
List<HoodieCompactionOperation> operations = new ArrayList<>(sizesMap.size());
sizesMap.forEach((k, v) -> {