You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/07/09 14:30:56 UTC
[hudi] branch master updated: [HUDI-2150] Rename/Restructure configs for better modularity (#6061)
This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 126b88b48d [HUDI-2150] Rename/Restructure configs for better modularity (#6061)
126b88b48d is described below
commit 126b88b48ddf3af4ad6b48551cab09eea4c800c9
Author: liujinhui <96...@qq.com>
AuthorDate: Sat Jul 9 22:30:48 2022 +0800
[HUDI-2150] Rename/Restructure configs for better modularity (#6061)
- Move clean related configuration to HoodieCleanConfig
- Move Archival related configuration to HoodieArchivalConfig
- hoodie.compaction.payload.class move this to HoodiePayloadConfig
---
.../org/apache/hudi/cli/commands/SparkMain.java | 4 +-
.../cli/commands/TestArchivedCommitsCommand.java | 6 +-
.../hudi/cli/commands/TestCommitsCommand.java | 9 +-
.../hudi/cli/commands/TestCompactionCommand.java | 6 +-
.../apache/hudi/config/HoodieArchivalConfig.java | 194 ++++++++++
.../org/apache/hudi/config/HoodieCleanConfig.java | 297 +++++++++++++++
.../apache/hudi/config/HoodieCompactionConfig.java | 397 +--------------------
.../apache/hudi/config/HoodiePayloadConfig.java | 20 ++
.../apache/hudi/config/HoodieStorageConfig.java | 16 +-
.../org/apache/hudi/config/HoodieWriteConfig.java | 97 +++--
.../metadata/HoodieBackedTableMetadataWriter.java | 16 +-
.../client/transaction/TestTransactionManager.java | 8 +-
.../apache/hudi/config/TestHoodieWriteConfig.java | 14 +-
.../org/apache/hudi/client/TestClientRollback.java | 12 +-
.../hudi/client/TestHoodieClientMultiWriter.java | 30 +-
.../functional/TestHoodieBackedMetadata.java | 73 ++--
.../TestHoodieClientOnCopyOnWriteStorage.java | 34 +-
.../client/functional/TestHoodieMetadataBase.java | 22 +-
.../functional/TestHoodieMetadataBootstrap.java | 8 +-
.../index/hbase/TestSparkHoodieHBaseIndex.java | 8 +-
.../apache/hudi/io/TestHoodieTimelineArchiver.java | 30 +-
.../java/org/apache/hudi/table/TestCleaner.java | 43 ++-
.../table/functional/TestCleanPlanExecutor.java | 75 ++--
...dieSparkCopyOnWriteTableArchiveWithReplace.java | 8 +-
.../TestHoodieSparkMergeOnReadTableRollback.java | 10 +-
.../hudi/testutils/HoodieClientTestBase.java | 9 +-
.../java/HoodieJavaWriteClientExample.java | 4 +-
.../examples/spark/HoodieWriteClientExample.java | 4 +-
.../java/org/apache/hudi/util/StreamerUtil.java | 44 ++-
.../integ/testsuite/HoodieTestSuiteWriter.java | 6 +-
.../writers/KafkaConnectWriterProvider.java | 19 +-
.../main/java/org/apache/hudi/DataSourceUtils.java | 5 +-
.../procedures/UpgradeOrDowngradeProcedure.scala | 4 +-
.../hudi/utilities/deltastreamer/DeltaSync.java | 2 +-
.../functional/TestHoodieDeltaStreamer.java | 20 +-
.../TestHoodieDeltaStreamerWithMultiWriter.java | 18 +-
.../utilities/sources/TestHoodieIncrSource.java | 7 +-
.../utilities/testutils/UtilitiesTestBase.java | 13 +-
38 files changed, 920 insertions(+), 672 deletions(-)
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 4135745351..e94c38bd16 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -35,7 +35,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieBootstrapConfig;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieSavepointException;
@@ -538,7 +538,7 @@ public class SparkMain {
private static HoodieWriteConfig getWriteConfig(String basePath, Boolean rollbackUsingMarkers, boolean lazyCleanPolicy) {
return HoodieWriteConfig.newBuilder().withPath(basePath)
.withRollbackUsingMarkers(rollbackUsingMarkers)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(lazyCleanPolicy ? HoodieFailedWritesCleaningPolicy.LAZY :
+ .withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(lazyCleanPolicy ? HoodieFailedWritesCleaningPolicy.LAZY :
HoodieFailedWritesCleaningPolicy.EAGER).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
}
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
index d822ad6589..b936202bd0 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
@@ -30,7 +30,8 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.client.HoodieTimelineArchiver;
@@ -72,7 +73,8 @@ public class TestArchivedCommitsCommand extends CLIFunctionalTestHarness {
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.forTable("test-trip-table").build();
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
index b23c6fd150..e03699f66e 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
@@ -37,7 +37,8 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.client.HoodieTimelineArchiver;
@@ -212,7 +213,8 @@ public class TestCommitsCommand extends CLIFunctionalTestHarness {
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
@@ -266,7 +268,8 @@ public class TestCommitsCommand extends CLIFunctionalTestHarness {
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java
index 17c1002f6b..bc5ba168e3 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java
@@ -39,7 +39,8 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.CompactionTestUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieSparkTable;
@@ -166,7 +167,8 @@ public class TestCompactionCommand extends CLIFunctionalTestHarness {
// Generate archive
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.forTable("test-trip-table").build();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
new file mode 100644
index 0000000000..32bccc3a3d
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Archival related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "Archival Configs",
+ groupName = ConfigGroups.Names.WRITE_CLIENT,
+ description = "Configurations that control archival.")
+public class HoodieArchivalConfig extends HoodieConfig {
+
+ public static final ConfigProperty<String> AUTO_ARCHIVE = ConfigProperty
+ .key("hoodie.archive.automatic")
+ .defaultValue("true")
+ .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
+ + " to archive commits if we cross a maximum value of commits."
+ + " It's recommended to enable this, to ensure number of active commits is bounded.");
+
+ public static final ConfigProperty<String> ASYNC_ARCHIVE = ConfigProperty
+ .key("hoodie.archive.async")
+ .defaultValue("false")
+ .sinceVersion("0.11.0")
+ .withDocumentation("Only applies when " + AUTO_ARCHIVE.key() + " is turned on. "
+ + "When turned on runs archiver async with writing, which can speed up overall write performance.");
+
+ public static final ConfigProperty<String> MAX_COMMITS_TO_KEEP = ConfigProperty
+ .key("hoodie.keep.max.commits")
+ .defaultValue("30")
+ .withDocumentation("Archiving service moves older entries from timeline into an archived log after each write, to "
+ + " keep the metadata overhead constant, even as the table size grows."
+ + "This config controls the maximum number of instants to retain in the active timeline. ");
+
+ public static final ConfigProperty<Integer> DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE = ConfigProperty
+ .key("hoodie.archive.delete.parallelism")
+ .defaultValue(100)
+ .withDocumentation("Parallelism for deleting archived hoodie commits.");
+
+ public static final ConfigProperty<String> MIN_COMMITS_TO_KEEP = ConfigProperty
+ .key("hoodie.keep.min.commits")
+ .defaultValue("20")
+ .withDocumentation("Similar to " + MAX_COMMITS_TO_KEEP.key() + ", but controls the minimum number of"
+ + "instants to retain in the active timeline.");
+
+ public static final ConfigProperty<String> COMMITS_ARCHIVAL_BATCH_SIZE = ConfigProperty
+ .key("hoodie.commits.archival.batch")
+ .defaultValue(String.valueOf(10))
+ .withDocumentation("Archiving of instants is batched in best-effort manner, to pack more instants into a single"
+ + " archive log. This config controls such archival batch size.");
+
+ public static final ConfigProperty<Integer> ARCHIVE_MERGE_FILES_BATCH_SIZE = ConfigProperty
+ .key("hoodie.archive.merge.files.batch.size")
+ .defaultValue(10)
+ .withDocumentation("The number of small archive files to be merged at once.");
+
+ public static final ConfigProperty<Long> ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty
+ .key("hoodie.archive.merge.small.file.limit.bytes")
+ .defaultValue(20L * 1024 * 1024)
+ .withDocumentation("This config sets the archive file size limit below which an archive file becomes a candidate to be selected as such a small file.");
+
+ public static final ConfigProperty<Boolean> ARCHIVE_MERGE_ENABLE = ConfigProperty
+ .key("hoodie.archive.merge.enable")
+ .defaultValue(false)
+ .withDocumentation("When enable, hoodie will auto merge several small archive files into larger one. It's"
+ + " useful when storage scheme doesn't support append operation.");
+
+ /**
+ * @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead
+ */
+ @Deprecated
+ public static final String MAX_COMMITS_TO_KEEP_PROP = MAX_COMMITS_TO_KEEP.key();
+ /**
+ * @deprecated Use {@link #MIN_COMMITS_TO_KEEP} and its methods instead
+ */
+ @Deprecated
+ public static final String MIN_COMMITS_TO_KEEP_PROP = MIN_COMMITS_TO_KEEP.key();
+ /**
+ * @deprecated Use {@link #COMMITS_ARCHIVAL_BATCH_SIZE} and its methods instead
+ */
+ @Deprecated
+ public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = COMMITS_ARCHIVAL_BATCH_SIZE.key();
+ /** @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead */
+ @Deprecated
+ private static final String DEFAULT_MAX_COMMITS_TO_KEEP = MAX_COMMITS_TO_KEEP.defaultValue();
+ /**
+ * @deprecated Use {@link #MIN_COMMITS_TO_KEEP} and its methods instead
+ */
+ @Deprecated
+ private static final String DEFAULT_MIN_COMMITS_TO_KEEP = MIN_COMMITS_TO_KEEP.defaultValue();
+ /**
+ * @deprecated Use {@link #COMMITS_ARCHIVAL_BATCH_SIZE} and its methods instead
+ */
+ @Deprecated
+ private static final String DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE = COMMITS_ARCHIVAL_BATCH_SIZE.defaultValue();
+
+ private HoodieArchivalConfig() {
+ super();
+ }
+
+ public static HoodieArchivalConfig.Builder newBuilder() {
+ return new HoodieArchivalConfig.Builder();
+ }
+
+ public static class Builder {
+
+ private final HoodieArchivalConfig archivalConfig = new HoodieArchivalConfig();
+
+ public HoodieArchivalConfig.Builder fromFile(File propertiesFile) throws IOException {
+ try (FileReader reader = new FileReader(propertiesFile)) {
+ this.archivalConfig.getProps().load(reader);
+ return this;
+ }
+ }
+
+ public HoodieArchivalConfig.Builder fromProperties(Properties props) {
+ this.archivalConfig.getProps().putAll(props);
+ return this;
+ }
+
+ public HoodieArchivalConfig.Builder withAutoArchive(Boolean autoArchive) {
+ archivalConfig.setValue(AUTO_ARCHIVE, String.valueOf(autoArchive));
+ return this;
+ }
+
+ public HoodieArchivalConfig.Builder withAsyncArchive(Boolean asyncArchive) {
+ archivalConfig.setValue(ASYNC_ARCHIVE, String.valueOf(asyncArchive));
+ return this;
+ }
+
+ public HoodieArchivalConfig.Builder archiveCommitsWith(int minToKeep, int maxToKeep) {
+ archivalConfig.setValue(MIN_COMMITS_TO_KEEP, String.valueOf(minToKeep));
+ archivalConfig.setValue(MAX_COMMITS_TO_KEEP, String.valueOf(maxToKeep));
+ return this;
+ }
+
+ public HoodieArchivalConfig.Builder withArchiveMergeFilesBatchSize(int number) {
+ archivalConfig.setValue(ARCHIVE_MERGE_FILES_BATCH_SIZE, String.valueOf(number));
+ return this;
+ }
+
+ public HoodieArchivalConfig.Builder withArchiveMergeSmallFileLimit(long size) {
+ archivalConfig.setValue(ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES, String.valueOf(size));
+ return this;
+ }
+
+ public HoodieArchivalConfig.Builder withArchiveMergeEnable(boolean enable) {
+ archivalConfig.setValue(ARCHIVE_MERGE_ENABLE, String.valueOf(enable));
+ return this;
+ }
+
+ public HoodieArchivalConfig.Builder withArchiveDeleteParallelism(int archiveDeleteParallelism) {
+ archivalConfig.setValue(DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE, String.valueOf(archiveDeleteParallelism));
+ return this;
+ }
+
+ public HoodieArchivalConfig.Builder withCommitsArchivalBatchSize(int batchSize) {
+ archivalConfig.setValue(COMMITS_ARCHIVAL_BATCH_SIZE, String.valueOf(batchSize));
+ return this;
+ }
+
+ public HoodieArchivalConfig build() {
+ archivalConfig.setDefaults(HoodieArchivalConfig.class.getName());
+ return archivalConfig;
+ }
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
new file mode 100644
index 0000000000..7b665f9b2b
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.table.action.clean.CleaningTriggerStrategy;
+
+import javax.annotation.concurrent.Immutable;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * Clean related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "Clean Configs",
+ groupName = ConfigGroups.Names.WRITE_CLIENT,
+ description = "Cleaning (reclamation of older/unused file groups/slices).")
+public class HoodieCleanConfig extends HoodieConfig {
+
+ public static final ConfigProperty<String> AUTO_CLEAN = ConfigProperty
+ .key("hoodie.clean.automatic")
+ .defaultValue("true")
+ .withDocumentation("When enabled, the cleaner table service is invoked immediately after each commit,"
+ + " to delete older file slices. It's recommended to enable this, to ensure metadata and data storage"
+ + " growth is bounded.");
+
+ public static final ConfigProperty<String> ASYNC_CLEAN = ConfigProperty
+ .key("hoodie.clean.async")
+ .defaultValue("false")
+ .withDocumentation("Only applies when " + AUTO_CLEAN.key() + " is turned on. "
+ + "When turned on runs cleaner async with writing, which can speed up overall write performance.");
+
+ public static final ConfigProperty<String> CLEANER_COMMITS_RETAINED = ConfigProperty
+ .key("hoodie.cleaner.commits.retained")
+ .defaultValue("10")
+ .withDocumentation("Number of commits to retain, without cleaning. This will be retained for num_of_commits * time_between_commits "
+ + "(scheduled). This also directly translates into how much data retention the table supports for incremental queries.");
+
+ public static final ConfigProperty<String> CLEANER_HOURS_RETAINED = ConfigProperty.key("hoodie.cleaner.hours.retained")
+ .defaultValue("24")
+ .withDocumentation("Number of hours for which commits need to be retained. This config provides a more flexible option as"
+ + "compared to number of commits retained for cleaning service. Setting this property ensures all the files, but the latest in a file group,"
+ + " corresponding to commits with commit times older than the configured number of hours to be retained are cleaned.");
+
+ public static final ConfigProperty<String> CLEANER_POLICY = ConfigProperty
+ .key("hoodie.cleaner.policy")
+ .defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
+ .withDocumentation("Cleaning policy to be used. The cleaner service deletes older file slices files to re-claim space."
+ + " By default, cleaner spares the file slices written by the last N commits, determined by " + CLEANER_COMMITS_RETAINED.key()
+ + " Long running query plans may often refer to older file slices and will break if those are cleaned, before the query has had"
+ + " a chance to run. So, it is good to make sure that the data is retained for more than the maximum query execution time");
+
+ public static final ConfigProperty<String> CLEAN_TRIGGER_STRATEGY = ConfigProperty
+ .key("hoodie.clean.trigger.strategy")
+ .defaultValue(CleaningTriggerStrategy.NUM_COMMITS.name())
+ .withDocumentation("Controls how cleaning is scheduled. Valid options: "
+ + Arrays.stream(CleaningTriggerStrategy.values()).map(Enum::name).collect(Collectors.joining(",")));
+
+ public static final ConfigProperty<String> CLEAN_MAX_COMMITS = ConfigProperty
+ .key("hoodie.clean.max.commits")
+ .defaultValue("1")
+ .withDocumentation("Number of commits after the last clean operation, before scheduling of a new clean is attempted.");
+
+ public static final ConfigProperty<String> CLEANER_FILE_VERSIONS_RETAINED = ConfigProperty
+ .key("hoodie.cleaner.fileversions.retained")
+ .defaultValue("3")
+ .withDocumentation("When " + HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name() + " cleaning policy is used, "
+ + " the minimum number of file slices to retain in each file group, during cleaning.");
+
+ public static final ConfigProperty<String> CLEANER_INCREMENTAL_MODE_ENABLE = ConfigProperty
+ .key("hoodie.cleaner.incremental.mode")
+ .defaultValue("true")
+ .withDocumentation("When enabled, the plans for each cleaner service run is computed incrementally off the events "
+ + " in the timeline, since the last cleaner run. This is much more efficient than obtaining listings for the full"
+ + " table for each planning (even with a metadata table).");
+
+ public static final ConfigProperty<String> FAILED_WRITES_CLEANER_POLICY = ConfigProperty
+ .key("hoodie.cleaner.policy.failed.writes")
+ .defaultValue(HoodieFailedWritesCleaningPolicy.EAGER.name())
+ .withDocumentation("Cleaning policy for failed writes to be used. Hudi will delete any files written by "
+ + "failed writes to re-claim space. Choose to perform this rollback of failed writes eagerly before "
+ + "every writer starts (only supported for single writer) or lazily by the cleaner (required for multi-writers)");
+
+ public static final ConfigProperty<String> CLEANER_PARALLELISM_VALUE = ConfigProperty
+ .key("hoodie.cleaner.parallelism")
+ .defaultValue("200")
+ .withDocumentation("Parallelism for the cleaning operation. Increase this if cleaning becomes slow.");
+
+ public static final ConfigProperty<Boolean> ALLOW_MULTIPLE_CLEANS = ConfigProperty
+ .key("hoodie.clean.allow.multiple")
+ .defaultValue(true)
+ .sinceVersion("0.11.0")
+ .withDocumentation("Allows scheduling/executing multiple cleans by enabling this config. If users prefer to strictly ensure clean requests should be mutually exclusive, "
+ + ".i.e. a 2nd clean will not be scheduled if another clean is not yet completed to avoid repeat cleaning of same files, they might want to disable this config.");
+
+ public static final ConfigProperty<String> CLEANER_BOOTSTRAP_BASE_FILE_ENABLE = ConfigProperty
+ .key("hoodie.cleaner.delete.bootstrap.base.file")
+ .defaultValue("false")
+ .withDocumentation("When set to true, cleaner also deletes the bootstrap base file when it's skeleton base file is "
+ + " cleaned. Turn this to true, if you want to ensure the bootstrap dataset storage is reclaimed over time, as the"
+ + " table receives updates/deletes. Another reason to turn this on, would be to ensure data residing in bootstrap "
+ + " base files are also physically deleted, to comply with data privacy enforcement processes.");
+
+
+ /** @deprecated Use {@link #CLEANER_POLICY} and its methods instead */
+ @Deprecated
+ public static final String CLEANER_POLICY_PROP = CLEANER_POLICY.key();
+ /** @deprecated Use {@link #AUTO_CLEAN} and its methods instead */
+ @Deprecated
+ public static final String AUTO_CLEAN_PROP = AUTO_CLEAN.key();
+ /** @deprecated Use {@link #ASYNC_CLEAN} and its methods instead */
+ @Deprecated
+ public static final String ASYNC_CLEAN_PROP = ASYNC_CLEAN.key();
+ /** @deprecated Use {@link #CLEANER_FILE_VERSIONS_RETAINED} and its methods instead */
+ @Deprecated
+ public static final String CLEANER_FILE_VERSIONS_RETAINED_PROP = CLEANER_FILE_VERSIONS_RETAINED.key();
+ /**
+ * @deprecated Use {@link #CLEANER_COMMITS_RETAINED} and its methods instead
+ */
+ @Deprecated
+ public static final String CLEANER_COMMITS_RETAINED_PROP = CLEANER_COMMITS_RETAINED.key();
+ /**
+ * @deprecated Use {@link #CLEANER_INCREMENTAL_MODE_ENABLE} and its methods instead
+ */
+ @Deprecated
+ public static final String CLEANER_INCREMENTAL_MODE = CLEANER_INCREMENTAL_MODE_ENABLE.key();
+ /**
+ * @deprecated Use {@link #CLEANER_BOOTSTRAP_BASE_FILE_ENABLE} and its methods instead
+ */
+ @Deprecated
+ public static final String CLEANER_BOOTSTRAP_BASE_FILE_ENABLED = CLEANER_BOOTSTRAP_BASE_FILE_ENABLE.key();
+ /**
+ * @deprecated Use {@link #CLEANER_PARALLELISM_VALUE} and its methods instead
+ */
+ @Deprecated
+ public static final String CLEANER_PARALLELISM = CLEANER_PARALLELISM_VALUE.key();
+ /**
+ * @deprecated Use {@link #CLEANER_PARALLELISM_VALUE} and its methods instead
+ */
+ @Deprecated
+ public static final String DEFAULT_CLEANER_PARALLELISM = CLEANER_PARALLELISM_VALUE.defaultValue();
+ /** @deprecated Use {@link #CLEANER_POLICY} and its methods instead */
+ @Deprecated
+ private static final String DEFAULT_CLEANER_POLICY = CLEANER_POLICY.defaultValue();
+ /** @deprecated Use {@link #FAILED_WRITES_CLEANER_POLICY} and its methods instead */
+ @Deprecated
+ public static final String FAILED_WRITES_CLEANER_POLICY_PROP = FAILED_WRITES_CLEANER_POLICY.key();
+ /** @deprecated Use {@link #FAILED_WRITES_CLEANER_POLICY} and its methods instead */
+ @Deprecated
+ private static final String DEFAULT_FAILED_WRITES_CLEANER_POLICY = FAILED_WRITES_CLEANER_POLICY.defaultValue();
+ /** @deprecated Use {@link #AUTO_CLEAN} and its methods instead */
+ @Deprecated
+ private static final String DEFAULT_AUTO_CLEAN = AUTO_CLEAN.defaultValue();
+ /**
+ * @deprecated Use {@link #ASYNC_CLEAN} and its methods instead
+ */
+ @Deprecated
+ private static final String DEFAULT_ASYNC_CLEAN = ASYNC_CLEAN.defaultValue();
+ /**
+ * @deprecated Use {@link #CLEANER_INCREMENTAL_MODE_ENABLE} and its methods instead
+ */
+ @Deprecated
+ private static final String DEFAULT_INCREMENTAL_CLEANER = CLEANER_INCREMENTAL_MODE_ENABLE.defaultValue();
+ /** @deprecated Use {@link #CLEANER_FILE_VERSIONS_RETAINED} and its methods instead */
+ @Deprecated
+ private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = CLEANER_FILE_VERSIONS_RETAINED.defaultValue();
+ /** @deprecated Use {@link #CLEANER_COMMITS_RETAINED} and its methods instead */
+ @Deprecated
+ private static final String DEFAULT_CLEANER_COMMITS_RETAINED = CLEANER_COMMITS_RETAINED.defaultValue();
+ /**
+ * @deprecated Use {@link #CLEANER_BOOTSTRAP_BASE_FILE_ENABLE} and its methods instead
+ */
+ @Deprecated
+ private static final String DEFAULT_CLEANER_BOOTSTRAP_BASE_FILE_ENABLED = CLEANER_BOOTSTRAP_BASE_FILE_ENABLE.defaultValue();
+
+ private HoodieCleanConfig() {
+ super();
+ }
+
+ public static HoodieCleanConfig.Builder newBuilder() {
+ return new HoodieCleanConfig.Builder();
+ }
+
+ public static class Builder {
+
+ private final HoodieCleanConfig cleanConfig = new HoodieCleanConfig();
+
+ public HoodieCleanConfig.Builder fromFile(File propertiesFile) throws IOException {
+ try (FileReader reader = new FileReader(propertiesFile)) {
+ this.cleanConfig.getProps().load(reader);
+ return this;
+ }
+ }
+
+ public HoodieCleanConfig.Builder fromProperties(Properties props) {
+ this.cleanConfig.getProps().putAll(props);
+ return this;
+ }
+
+ public HoodieCleanConfig.Builder withAutoClean(Boolean autoClean) {
+ cleanConfig.setValue(AUTO_CLEAN, String.valueOf(autoClean));
+ return this;
+ }
+
+ public HoodieCleanConfig.Builder withAsyncClean(Boolean asyncClean) {
+ cleanConfig.setValue(ASYNC_CLEAN, String.valueOf(asyncClean));
+ return this;
+ }
+
+ public HoodieCleanConfig.Builder withIncrementalCleaningMode(Boolean incrementalCleaningMode) {
+ cleanConfig.setValue(CLEANER_INCREMENTAL_MODE_ENABLE, String.valueOf(incrementalCleaningMode));
+ return this;
+ }
+
+ public HoodieCleanConfig.Builder withCleaningTriggerStrategy(String cleaningTriggerStrategy) {
+ cleanConfig.setValue(CLEAN_TRIGGER_STRATEGY, cleaningTriggerStrategy);
+ return this;
+ }
+
+ public HoodieCleanConfig.Builder withMaxCommitsBeforeCleaning(int maxCommitsBeforeCleaning) {
+ cleanConfig.setValue(CLEAN_MAX_COMMITS, String.valueOf(maxCommitsBeforeCleaning));
+ return this;
+ }
+
+ public HoodieCleanConfig.Builder withCleanerPolicy(HoodieCleaningPolicy policy) {
+ cleanConfig.setValue(CLEANER_POLICY, policy.name());
+ return this;
+ }
+
+ public HoodieCleanConfig.Builder retainFileVersions(int fileVersionsRetained) {
+ cleanConfig.setValue(CLEANER_FILE_VERSIONS_RETAINED, String.valueOf(fileVersionsRetained));
+ return this;
+ }
+
+ public HoodieCleanConfig.Builder retainCommits(int commitsRetained) {
+ cleanConfig.setValue(CLEANER_COMMITS_RETAINED, String.valueOf(commitsRetained));
+ return this;
+ }
+
+ public HoodieCleanConfig.Builder cleanerNumHoursRetained(int cleanerHoursRetained) {
+ cleanConfig.setValue(CLEANER_HOURS_RETAINED, String.valueOf(cleanerHoursRetained));
+ return this;
+ }
+
+ public HoodieCleanConfig.Builder allowMultipleCleans(boolean allowMultipleCleanSchedules) {
+ cleanConfig.setValue(ALLOW_MULTIPLE_CLEANS, String.valueOf(allowMultipleCleanSchedules));
+ return this;
+ }
+
+ public HoodieCleanConfig.Builder withCleanerParallelism(int cleanerParallelism) {
+ cleanConfig.setValue(CLEANER_PARALLELISM_VALUE, String.valueOf(cleanerParallelism));
+ return this;
+ }
+
+ public HoodieCleanConfig.Builder withCleanBootstrapBaseFileEnabled(Boolean cleanBootstrapSourceFileEnabled) {
+ cleanConfig.setValue(CLEANER_BOOTSTRAP_BASE_FILE_ENABLE, String.valueOf(cleanBootstrapSourceFileEnabled));
+ return this;
+ }
+
+ public HoodieCleanConfig.Builder withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy failedWritesPolicy) {
+ cleanConfig.setValue(FAILED_WRITES_CLEANER_POLICY, failedWritesPolicy.name());
+ return this;
+ }
+
+ public HoodieCleanConfig build() {
+ cleanConfig.setDefaults(HoodieCleanConfig.class.getName());
+ HoodieCleaningPolicy.valueOf(cleanConfig.getString(CLEANER_POLICY));
+ return cleanConfig;
+ }
+ }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index 4003a07de7..d1d0e67261 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -22,11 +22,6 @@ import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
-import org.apache.hudi.common.model.HoodieCleaningPolicy;
-import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
-import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.table.action.clean.CleaningTriggerStrategy;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
@@ -47,57 +42,9 @@ import java.util.stream.Collectors;
@ConfigClassProperty(name = "Compaction Configs",
groupName = ConfigGroups.Names.WRITE_CLIENT,
description = "Configurations that control compaction "
- + "(merging of log files onto a new base files) as well as "
- + "cleaning (reclamation of older/unused file groups/slices).")
+ + "(merging of log files onto a new base files).")
public class HoodieCompactionConfig extends HoodieConfig {
- public static final ConfigProperty<String> AUTO_ARCHIVE = ConfigProperty
- .key("hoodie.archive.automatic")
- .defaultValue("true")
- .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
- + " to archive commits if we cross a maximum value of commits."
- + " It's recommended to enable this, to ensure number of active commits is bounded.");
-
- public static final ConfigProperty<String> ASYNC_ARCHIVE = ConfigProperty
- .key("hoodie.archive.async")
- .defaultValue("false")
- .sinceVersion("0.11.0")
- .withDocumentation("Only applies when " + AUTO_ARCHIVE.key() + " is turned on. "
- + "When turned on runs archiver async with writing, which can speed up overall write performance.");
-
- public static final ConfigProperty<String> AUTO_CLEAN = ConfigProperty
- .key("hoodie.clean.automatic")
- .defaultValue("true")
- .withDocumentation("When enabled, the cleaner table service is invoked immediately after each commit,"
- + " to delete older file slices. It's recommended to enable this, to ensure metadata and data storage"
- + " growth is bounded.");
-
- public static final ConfigProperty<String> ASYNC_CLEAN = ConfigProperty
- .key("hoodie.clean.async")
- .defaultValue("false")
- .withDocumentation("Only applies when " + AUTO_CLEAN.key() + " is turned on. "
- + "When turned on runs cleaner async with writing, which can speed up overall write performance.");
-
- public static final ConfigProperty<String> CLEANER_COMMITS_RETAINED = ConfigProperty
- .key("hoodie.cleaner.commits.retained")
- .defaultValue("10")
- .withDocumentation("Number of commits to retain, without cleaning. This will be retained for num_of_commits * time_between_commits "
- + "(scheduled). This also directly translates into how much data retention the table supports for incremental queries.");
-
- public static final ConfigProperty<String> CLEANER_HOURS_RETAINED = ConfigProperty.key("hoodie.cleaner.hours.retained")
- .defaultValue("24")
- .withDocumentation("Number of hours for which commits need to be retained. This config provides a more flexible option as"
- + "compared to number of commits retained for cleaning service. Setting this property ensures all the files, but the latest in a file group,"
- + " corresponding to commits with commit times older than the configured number of hours to be retained are cleaned.");
-
- public static final ConfigProperty<String> CLEANER_POLICY = ConfigProperty
- .key("hoodie.cleaner.policy")
- .defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
- .withDocumentation("Cleaning policy to be used. The cleaner service deletes older file slices files to re-claim space."
- + " By default, cleaner spares the file slices written by the last N commits, determined by " + CLEANER_COMMITS_RETAINED.key()
- + " Long running query plans may often refer to older file slices and will break if those are cleaned, before the query has had"
- + " a chance to run. So, it is good to make sure that the data is retained for more than the maximum query execution time");
-
public static final ConfigProperty<String> INLINE_COMPACT = ConfigProperty
.key("hoodie.compact.inline")
.defaultValue("false")
@@ -130,62 +77,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
.withDocumentation("Controls how compaction scheduling is triggered, by time or num delta commits or combination of both. "
+ "Valid options: " + Arrays.stream(CompactionTriggerStrategy.values()).map(Enum::name).collect(Collectors.joining(",")));
- public static final ConfigProperty<String> CLEAN_TRIGGER_STRATEGY = ConfigProperty
- .key("hoodie.clean.trigger.strategy")
- .defaultValue(CleaningTriggerStrategy.NUM_COMMITS.name())
- .withDocumentation("Controls how cleaning is scheduled. Valid options: "
- + Arrays.stream(CleaningTriggerStrategy.values()).map(Enum::name).collect(Collectors.joining(",")));
-
- public static final ConfigProperty<String> CLEAN_MAX_COMMITS = ConfigProperty
- .key("hoodie.clean.max.commits")
- .defaultValue("1")
- .withDocumentation("Number of commits after the last clean operation, before scheduling of a new clean is attempted.");
-
- public static final ConfigProperty<String> CLEANER_FILE_VERSIONS_RETAINED = ConfigProperty
- .key("hoodie.cleaner.fileversions.retained")
- .defaultValue("3")
- .withDocumentation("When " + HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name() + " cleaning policy is used, "
- + " the minimum number of file slices to retain in each file group, during cleaning.");
-
- public static final ConfigProperty<String> CLEANER_INCREMENTAL_MODE_ENABLE = ConfigProperty
- .key("hoodie.cleaner.incremental.mode")
- .defaultValue("true")
- .withDocumentation("When enabled, the plans for each cleaner service run is computed incrementally off the events "
- + " in the timeline, since the last cleaner run. This is much more efficient than obtaining listings for the full"
- + " table for each planning (even with a metadata table).");
-
- public static final ConfigProperty<String> MAX_COMMITS_TO_KEEP = ConfigProperty
- .key("hoodie.keep.max.commits")
- .defaultValue("30")
- .withDocumentation("Archiving service moves older entries from timeline into an archived log after each write, to "
- + " keep the metadata overhead constant, even as the table size grows."
- + "This config controls the maximum number of instants to retain in the active timeline. ");
-
- public static final ConfigProperty<Integer> DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE = ConfigProperty
- .key("hoodie.archive.delete.parallelism")
- .defaultValue(100)
- .withDocumentation("Parallelism for deleting archived hoodie commits.");
-
- public static final ConfigProperty<String> MIN_COMMITS_TO_KEEP = ConfigProperty
- .key("hoodie.keep.min.commits")
- .defaultValue("20")
- .withDocumentation("Similar to " + MAX_COMMITS_TO_KEEP.key() + ", but controls the minimum number of"
- + "instants to retain in the active timeline.");
-
- public static final ConfigProperty<String> COMMITS_ARCHIVAL_BATCH_SIZE = ConfigProperty
- .key("hoodie.commits.archival.batch")
- .defaultValue(String.valueOf(10))
- .withDocumentation("Archiving of instants is batched in best-effort manner, to pack more instants into a single"
- + " archive log. This config controls such archival batch size.");
-
- public static final ConfigProperty<String> CLEANER_BOOTSTRAP_BASE_FILE_ENABLE = ConfigProperty
- .key("hoodie.cleaner.delete.bootstrap.base.file")
- .defaultValue("false")
- .withDocumentation("When set to true, cleaner also deletes the bootstrap base file when it's skeleton base file is "
- + " cleaned. Turn this to true, if you want to ensure the bootstrap dataset storage is reclaimed over time, as the"
- + " table receives updates/deletes. Another reason to turn this on, would be to ensure data residing in bootstrap "
- + " base files are also physically deleted, to comply with data privacy enforcement processes.");
-
public static final ConfigProperty<String> PARQUET_SMALL_FILE_LIMIT = ConfigProperty
.key("hoodie.parquet.small.file.limit")
.defaultValue(String.valueOf(104857600))
@@ -202,11 +93,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
+ " Hudi will search commits in the reverse order, until we find a commit that has totalBytesWritten "
+ " larger than (PARQUET_SMALL_FILE_LIMIT_BYTES * this_threshold)");
- public static final ConfigProperty<String> CLEANER_PARALLELISM_VALUE = ConfigProperty
- .key("hoodie.cleaner.parallelism")
- .defaultValue("200")
- .withDocumentation("Parallelism for the cleaning operation. Increase this if cleaning becomes slow.");
-
// 500GB of target IO per compaction (both read and write
public static final ConfigProperty<String> TARGET_IO_PER_COMPACTION_IN_MB = ConfigProperty
.key("hoodie.compaction.target.io")
@@ -227,13 +113,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
+ "compaction during each compaction run. By default. Hudi picks the log file "
+ "with most accumulated unmerged data");
- public static final ConfigProperty<String> PAYLOAD_CLASS_NAME = ConfigProperty
- .key("hoodie.compaction.payload.class")
- .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
- .withDocumentation("This needs to be same as class used during insert/upserts. Just like writing, compaction also uses "
- + "the record payload class to merge records in the log against each other, merge again with the base file and "
- + "produce the final record to be written after compaction.");
-
public static final ConfigProperty<String> COMPACTION_LAZY_BLOCK_READ_ENABLE = ConfigProperty
.key("hoodie.compaction.lazy.block.read")
.defaultValue("true")
@@ -247,13 +126,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
.withDocumentation("HoodieLogFormatReader reads a logfile in the forward direction starting from pos=0 to pos=file_length. "
+ "If this config is set to true, the reader reads the logfile in reverse direction, from pos=file_length to pos=0");
- public static final ConfigProperty<String> FAILED_WRITES_CLEANER_POLICY = ConfigProperty
- .key("hoodie.cleaner.policy.failed.writes")
- .defaultValue(HoodieFailedWritesCleaningPolicy.EAGER.name())
- .withDocumentation("Cleaning policy for failed writes to be used. Hudi will delete any files written by "
- + "failed writes to re-claim space. Choose to perform this rollback of failed writes eagerly before "
- + "every writer starts (only supported for single writer) or lazily by the cleaner (required for multi-writers)");
-
public static final ConfigProperty<String> TARGET_PARTITIONS_PER_DAYBASED_COMPACTION = ConfigProperty
.key("hoodie.compaction.daybased.target.partitions")
.defaultValue("10")
@@ -290,39 +162,8 @@ public class HoodieCompactionConfig extends HoodieConfig {
.withDocumentation("The average record size. If not explicitly specified, hudi will compute the "
+ "record size estimate compute dynamically based on commit metadata. "
+ " This is critical in computing the insert parallelism and bin-packing inserts into small files.");
-
- public static final ConfigProperty<Boolean> ALLOW_MULTIPLE_CLEANS = ConfigProperty
- .key("hoodie.clean.allow.multiple")
- .defaultValue(true)
- .sinceVersion("0.11.0")
- .withDocumentation("Allows scheduling/executing multiple cleans by enabling this config. If users prefer to strictly ensure clean requests should be mutually exclusive, "
- + ".i.e. a 2nd clean will not be scheduled if another clean is not yet completed to avoid repeat cleaning of same files, they might want to disable this config.");
-
- public static final ConfigProperty<Integer> ARCHIVE_MERGE_FILES_BATCH_SIZE = ConfigProperty
- .key("hoodie.archive.merge.files.batch.size")
- .defaultValue(10)
- .withDocumentation("The number of small archive files to be merged at once.");
-
- public static final ConfigProperty<Long> ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty
- .key("hoodie.archive.merge.small.file.limit.bytes")
- .defaultValue(20L * 1024 * 1024)
- .withDocumentation("This config sets the archive file size limit below which an archive file becomes a candidate to be selected as such a small file.");
-
- public static final ConfigProperty<Boolean> ARCHIVE_MERGE_ENABLE = ConfigProperty
- .key("hoodie.archive.merge.enable")
- .defaultValue(false)
- .withDocumentation("When enable, hoodie will auto merge several small archive files into larger one. It's"
- + " useful when storage scheme doesn't support append operation.");
-
- /** @deprecated Use {@link #CLEANER_POLICY} and its methods instead */
- @Deprecated
- public static final String CLEANER_POLICY_PROP = CLEANER_POLICY.key();
- /** @deprecated Use {@link #AUTO_CLEAN} and its methods instead */
- @Deprecated
- public static final String AUTO_CLEAN_PROP = AUTO_CLEAN.key();
- /** @deprecated Use {@link #ASYNC_CLEAN} and its methods instead */
- @Deprecated
- public static final String ASYNC_CLEAN_PROP = ASYNC_CLEAN.key();
+
+
/** @deprecated Use {@link #INLINE_COMPACT} and its methods instead */
@Deprecated
public static final String INLINE_COMPACT_PROP = INLINE_COMPACT.key();
@@ -335,39 +176,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
/** @deprecated Use {@link #INLINE_COMPACT_TRIGGER_STRATEGY} and its methods instead */
@Deprecated
public static final String INLINE_COMPACT_TRIGGER_STRATEGY_PROP = INLINE_COMPACT_TRIGGER_STRATEGY.key();
- /** @deprecated Use {@link #CLEANER_FILE_VERSIONS_RETAINED} and its methods instead */
- @Deprecated
- public static final String CLEANER_FILE_VERSIONS_RETAINED_PROP = CLEANER_FILE_VERSIONS_RETAINED.key();
- /**
- * @deprecated Use {@link #CLEANER_COMMITS_RETAINED} and its methods instead
- */
- @Deprecated
- public static final String CLEANER_COMMITS_RETAINED_PROP = CLEANER_COMMITS_RETAINED.key();
- /**
- * @deprecated Use {@link #CLEANER_INCREMENTAL_MODE_ENABLE} and its methods instead
- */
- @Deprecated
- public static final String CLEANER_INCREMENTAL_MODE = CLEANER_INCREMENTAL_MODE_ENABLE.key();
- /**
- * @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead
- */
- @Deprecated
- public static final String MAX_COMMITS_TO_KEEP_PROP = MAX_COMMITS_TO_KEEP.key();
- /**
- * @deprecated Use {@link #MIN_COMMITS_TO_KEEP} and its methods instead
- */
- @Deprecated
- public static final String MIN_COMMITS_TO_KEEP_PROP = MIN_COMMITS_TO_KEEP.key();
- /**
- * @deprecated Use {@link #COMMITS_ARCHIVAL_BATCH_SIZE} and its methods instead
- */
- @Deprecated
- public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = COMMITS_ARCHIVAL_BATCH_SIZE.key();
- /**
- * @deprecated Use {@link #CLEANER_BOOTSTRAP_BASE_FILE_ENABLE} and its methods instead
- */
- @Deprecated
- public static final String CLEANER_BOOTSTRAP_BASE_FILE_ENABLED = CLEANER_BOOTSTRAP_BASE_FILE_ENABLE.key();
/**
* @deprecated Use {@link #PARQUET_SMALL_FILE_LIMIT} and its methods instead
*/
@@ -418,16 +226,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
*/
@Deprecated
public static final String DEFAULT_COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE = COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.defaultValue();
- /**
- * @deprecated Use {@link #CLEANER_PARALLELISM_VALUE} and its methods instead
- */
- @Deprecated
- public static final String CLEANER_PARALLELISM = CLEANER_PARALLELISM_VALUE.key();
- /**
- * @deprecated Use {@link #CLEANER_PARALLELISM_VALUE} and its methods instead
- */
- @Deprecated
- public static final String DEFAULT_CLEANER_PARALLELISM = CLEANER_PARALLELISM_VALUE.defaultValue();
/**
* @deprecated Use {@link #TARGET_IO_PER_COMPACTION_IN_MB} and its methods instead
*/
@@ -446,12 +244,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
/** @deprecated Use {@link #COMPACTION_STRATEGY} and its methods instead */
@Deprecated
public static final String DEFAULT_COMPACTION_STRATEGY = COMPACTION_STRATEGY.defaultValue();
- /** @deprecated Use {@link #PAYLOAD_CLASS_NAME} and its methods instead */
- @Deprecated
- public static final String DEFAULT_PAYLOAD_CLASS = PAYLOAD_CLASS_NAME.defaultValue();
- /** @deprecated Use {@link #PAYLOAD_CLASS_NAME} and its methods instead */
- @Deprecated
- public static final String PAYLOAD_CLASS_PROP = PAYLOAD_CLASS_NAME.key();
/** @deprecated Use {@link #COMPACTION_LAZY_BLOCK_READ_ENABLE} and its methods instead */
@Deprecated
public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = COMPACTION_LAZY_BLOCK_READ_ENABLE.key();
@@ -464,33 +256,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
/** @deprecated Use {@link #COMPACTION_REVERSE_LOG_READ_ENABLE} and its methods instead */
@Deprecated
public static final String DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED = COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue();
- /** @deprecated Use {@link #CLEANER_POLICY} and its methods instead */
- @Deprecated
- private static final String DEFAULT_CLEANER_POLICY = CLEANER_POLICY.defaultValue();
- /** @deprecated Use {@link #FAILED_WRITES_CLEANER_POLICY} and its methods instead */
- @Deprecated
- public static final String FAILED_WRITES_CLEANER_POLICY_PROP = FAILED_WRITES_CLEANER_POLICY.key();
- /** @deprecated Use {@link #FAILED_WRITES_CLEANER_POLICY} and its methods instead */
- @Deprecated
- private static final String DEFAULT_FAILED_WRITES_CLEANER_POLICY = FAILED_WRITES_CLEANER_POLICY.defaultValue();
- /** @deprecated Use {@link #AUTO_CLEAN} and its methods instead */
- @Deprecated
- private static final String DEFAULT_AUTO_CLEAN = AUTO_CLEAN.defaultValue();
- /**
- * @deprecated Use {@link #ASYNC_CLEAN} and its methods instead
- */
- @Deprecated
- private static final String DEFAULT_ASYNC_CLEAN = ASYNC_CLEAN.defaultValue();
/**
* @deprecated Use {@link #INLINE_COMPACT} and its methods instead
*/
@Deprecated
private static final String DEFAULT_INLINE_COMPACT = INLINE_COMPACT.defaultValue();
- /**
- * @deprecated Use {@link #CLEANER_INCREMENTAL_MODE_ENABLE} and its methods instead
- */
- @Deprecated
- private static final String DEFAULT_INCREMENTAL_CLEANER = CLEANER_INCREMENTAL_MODE_ENABLE.defaultValue();
/** @deprecated Use {@link #INLINE_COMPACT_NUM_DELTA_COMMITS} and its methods instead */
@Deprecated
private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = INLINE_COMPACT_NUM_DELTA_COMMITS.defaultValue();
@@ -500,30 +270,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
/** @deprecated Use {@link #INLINE_COMPACT_TRIGGER_STRATEGY} and its methods instead */
@Deprecated
private static final String DEFAULT_INLINE_COMPACT_TRIGGER_STRATEGY = INLINE_COMPACT_TRIGGER_STRATEGY.defaultValue();
- /** @deprecated Use {@link #CLEANER_FILE_VERSIONS_RETAINED} and its methods instead */
- @Deprecated
- private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = CLEANER_FILE_VERSIONS_RETAINED.defaultValue();
- /** @deprecated Use {@link #CLEANER_COMMITS_RETAINED} and its methods instead */
- @Deprecated
- private static final String DEFAULT_CLEANER_COMMITS_RETAINED = CLEANER_COMMITS_RETAINED.defaultValue();
- /** @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead */
- @Deprecated
- private static final String DEFAULT_MAX_COMMITS_TO_KEEP = MAX_COMMITS_TO_KEEP.defaultValue();
- /**
- * @deprecated Use {@link #MIN_COMMITS_TO_KEEP} and its methods instead
- */
- @Deprecated
- private static final String DEFAULT_MIN_COMMITS_TO_KEEP = MIN_COMMITS_TO_KEEP.defaultValue();
- /**
- * @deprecated Use {@link #COMMITS_ARCHIVAL_BATCH_SIZE} and its methods instead
- */
- @Deprecated
- private static final String DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE = COMMITS_ARCHIVAL_BATCH_SIZE.defaultValue();
- /**
- * @deprecated Use {@link #CLEANER_BOOTSTRAP_BASE_FILE_ENABLE} and its methods instead
- */
- @Deprecated
- private static final String DEFAULT_CLEANER_BOOTSTRAP_BASE_FILE_ENABLED = CLEANER_BOOTSTRAP_BASE_FILE_ENABLE.defaultValue();
/** @deprecated Use {@link #TARGET_PARTITIONS_PER_DAYBASED_COMPACTION} and its methods instead */
@Deprecated
public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = TARGET_PARTITIONS_PER_DAYBASED_COMPACTION.key();
@@ -555,31 +301,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
return this;
}
- public Builder withAutoArchive(Boolean autoArchive) {
- compactionConfig.setValue(AUTO_ARCHIVE, String.valueOf(autoArchive));
- return this;
- }
-
- public Builder withAsyncArchive(Boolean asyncArchive) {
- compactionConfig.setValue(ASYNC_ARCHIVE, String.valueOf(asyncArchive));
- return this;
- }
-
- public Builder withAutoClean(Boolean autoClean) {
- compactionConfig.setValue(AUTO_CLEAN, String.valueOf(autoClean));
- return this;
- }
-
- public Builder withAsyncClean(Boolean asyncClean) {
- compactionConfig.setValue(ASYNC_CLEAN, String.valueOf(asyncClean));
- return this;
- }
-
- public Builder withIncrementalCleaningMode(Boolean incrementalCleaningMode) {
- compactionConfig.setValue(CLEANER_INCREMENTAL_MODE_ENABLE, String.valueOf(incrementalCleaningMode));
- return this;
- }
-
public Builder withInlineCompaction(Boolean inlineCompaction) {
compactionConfig.setValue(INLINE_COMPACT, String.valueOf(inlineCompaction));
return this;
@@ -595,57 +316,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
return this;
}
- public Builder withCleaningTriggerStrategy(String cleaningTriggerStrategy) {
- compactionConfig.setValue(CLEAN_TRIGGER_STRATEGY, cleaningTriggerStrategy);
- return this;
- }
-
- public Builder withMaxCommitsBeforeCleaning(int maxCommitsBeforeCleaning) {
- compactionConfig.setValue(CLEAN_MAX_COMMITS, String.valueOf(maxCommitsBeforeCleaning));
- return this;
- }
-
- public Builder withCleanerPolicy(HoodieCleaningPolicy policy) {
- compactionConfig.setValue(CLEANER_POLICY, policy.name());
- return this;
- }
-
- public Builder retainFileVersions(int fileVersionsRetained) {
- compactionConfig.setValue(CLEANER_FILE_VERSIONS_RETAINED, String.valueOf(fileVersionsRetained));
- return this;
- }
-
- public Builder retainCommits(int commitsRetained) {
- compactionConfig.setValue(CLEANER_COMMITS_RETAINED, String.valueOf(commitsRetained));
- return this;
- }
-
- public Builder cleanerNumHoursRetained(int cleanerHoursRetained) {
- compactionConfig.setValue(CLEANER_HOURS_RETAINED, String.valueOf(cleanerHoursRetained));
- return this;
- }
-
- public Builder archiveCommitsWith(int minToKeep, int maxToKeep) {
- compactionConfig.setValue(MIN_COMMITS_TO_KEEP, String.valueOf(minToKeep));
- compactionConfig.setValue(MAX_COMMITS_TO_KEEP, String.valueOf(maxToKeep));
- return this;
- }
-
- public Builder withArchiveMergeFilesBatchSize(int number) {
- compactionConfig.setValue(ARCHIVE_MERGE_FILES_BATCH_SIZE, String.valueOf(number));
- return this;
- }
-
- public Builder withArchiveMergeSmallFileLimit(long size) {
- compactionConfig.setValue(ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES, String.valueOf(size));
- return this;
- }
-
- public Builder withArchiveMergeEnable(boolean enable) {
- compactionConfig.setValue(ARCHIVE_MERGE_ENABLE, String.valueOf(enable));
- return this;
- }
-
public Builder compactionSmallFileSize(long smallFileLimitBytes) {
compactionConfig.setValue(PARQUET_SMALL_FILE_LIMIT, String.valueOf(smallFileLimitBytes));
return this;
@@ -671,26 +341,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
return this;
}
- public Builder allowMultipleCleans(boolean allowMultipleCleanSchedules) {
- compactionConfig.setValue(ALLOW_MULTIPLE_CLEANS, String.valueOf(allowMultipleCleanSchedules));
- return this;
- }
-
- public Builder withCleanerParallelism(int cleanerParallelism) {
- compactionConfig.setValue(CLEANER_PARALLELISM_VALUE, String.valueOf(cleanerParallelism));
- return this;
- }
-
public Builder withCompactionStrategy(CompactionStrategy compactionStrategy) {
compactionConfig.setValue(COMPACTION_STRATEGY, compactionStrategy.getClass().getName());
return this;
}
- public Builder withPayloadClass(String payloadClassName) {
- compactionConfig.setValue(PAYLOAD_CLASS_NAME, payloadClassName);
- return this;
- }
-
public Builder withTargetIOPerCompactionInMB(long targetIOPerCompactionInMB) {
compactionConfig.setValue(TARGET_IO_PER_COMPACTION_IN_MB, String.valueOf(targetIOPerCompactionInMB));
return this;
@@ -701,11 +356,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
return this;
}
- public Builder withArchiveDeleteParallelism(int archiveDeleteParallelism) {
- compactionConfig.setValue(DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE, String.valueOf(archiveDeleteParallelism));
- return this;
- }
-
public Builder withMaxDeltaSecondsBeforeCompaction(int maxDeltaSecondsBeforeCompaction) {
compactionConfig.setValue(INLINE_COMPACT_TIME_DELTA_SECONDS, String.valueOf(maxDeltaSecondsBeforeCompaction));
return this;
@@ -736,49 +386,8 @@ public class HoodieCompactionConfig extends HoodieConfig {
return this;
}
- public Builder withCommitsArchivalBatchSize(int batchSize) {
- compactionConfig.setValue(COMMITS_ARCHIVAL_BATCH_SIZE, String.valueOf(batchSize));
- return this;
- }
-
- public Builder withCleanBootstrapBaseFileEnabled(Boolean cleanBootstrapSourceFileEnabled) {
- compactionConfig.setValue(CLEANER_BOOTSTRAP_BASE_FILE_ENABLE, String.valueOf(cleanBootstrapSourceFileEnabled));
- return this;
- }
-
- public Builder withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy failedWritesPolicy) {
- compactionConfig.setValue(FAILED_WRITES_CLEANER_POLICY, failedWritesPolicy.name());
- return this;
- }
-
public HoodieCompactionConfig build() {
compactionConfig.setDefaults(HoodieCompactionConfig.class.getName());
- // validation
- HoodieCleaningPolicy.valueOf(compactionConfig.getString(CLEANER_POLICY));
-
- // Ensure minInstantsToKeep > cleanerCommitsRetained, otherwise we will archive some
- // commit instant on timeline, that still has not been cleaned. Could miss some data via incr pull
- int minInstantsToKeep = Integer.parseInt(compactionConfig.getStringOrDefault(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP));
- int maxInstantsToKeep = Integer.parseInt(compactionConfig.getStringOrDefault(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP));
- int cleanerCommitsRetained =
- Integer.parseInt(compactionConfig.getStringOrDefault(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED));
- ValidationUtils.checkArgument(maxInstantsToKeep > minInstantsToKeep,
- String.format(
- "Increase %s=%d to be greater than %s=%d.",
- HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), maxInstantsToKeep,
- HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), minInstantsToKeep));
- ValidationUtils.checkArgument(minInstantsToKeep > cleanerCommitsRetained,
- String.format(
- "Increase %s=%d to be greater than %s=%d. Otherwise, there is risk of incremental pull "
- + "missing data from few instants.",
- HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), minInstantsToKeep,
- HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), cleanerCommitsRetained));
-
- boolean inlineCompact = compactionConfig.getBoolean(HoodieCompactionConfig.INLINE_COMPACT);
- boolean inlineCompactSchedule = compactionConfig.getBoolean(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT);
- ValidationUtils.checkArgument(!(inlineCompact && inlineCompactSchedule), String.format("Either of inline compaction (%s) or "
- + "schedule inline compaction (%s) can be enabled. Both can't be set to true at the same time. %s, %s", HoodieCompactionConfig.INLINE_COMPACT.key(),
- HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), inlineCompact, inlineCompactSchedule));
return compactionConfig;
}
}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
index 2989d8c2b3..2a05752aa6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import java.io.File;
import java.io.FileReader;
@@ -52,6 +53,20 @@ public class HoodiePayloadConfig extends HoodieConfig {
.withDocumentation("Table column/field name to derive timestamp associated with the records. This can"
+ "be useful for e.g, determining the freshness of the table.");
+ public static final ConfigProperty<String> PAYLOAD_CLASS_NAME = ConfigProperty
+ .key("hoodie.compaction.payload.class")
+ .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
+ .withDocumentation("This needs to be same as class used during insert/upserts. Just like writing, compaction also uses "
+ + "the record payload class to merge records in the log against each other, merge again with the base file and "
+ + "produce the final record to be written after compaction.");
+
+ /** @deprecated Use {@link #PAYLOAD_CLASS_NAME} and its methods instead */
+ @Deprecated
+ public static final String DEFAULT_PAYLOAD_CLASS = PAYLOAD_CLASS_NAME.defaultValue();
+ /** @deprecated Use {@link #PAYLOAD_CLASS_NAME} and its methods instead */
+ @Deprecated
+ public static final String PAYLOAD_CLASS_PROP = PAYLOAD_CLASS_NAME.key();
+
private HoodiePayloadConfig() {
super();
}
@@ -86,6 +101,11 @@ public class HoodiePayloadConfig extends HoodieConfig {
return this;
}
+ public HoodiePayloadConfig.Builder withPayloadClass(String payloadClassName) {
+ payloadConfig.setValue(PAYLOAD_CLASS_NAME, payloadClassName);
+ return this;
+ }
+
public HoodiePayloadConfig build() {
payloadConfig.setDefaults(HoodiePayloadConfig.class.getName());
return payloadConfig;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java
index ba3888863d..fc1798f206 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java
@@ -119,16 +119,16 @@ public class HoodieStorageConfig extends HoodieConfig {
.withDocumentation("Whether to use dictionary encoding");
public static final ConfigProperty<String> PARQUET_WRITE_LEGACY_FORMAT_ENABLED = ConfigProperty
- .key("hoodie.parquet.writelegacyformat.enabled")
- .defaultValue("false")
- .withDocumentation("Sets spark.sql.parquet.writeLegacyFormat. If true, data will be written in a way of Spark 1.4 and earlier. "
- + "For example, decimal values will be written in Parquet's fixed-length byte array format which other systems such as Apache Hive and Apache Impala use. "
- + "If false, the newer format in Parquet will be used. For example, decimals will be written in int-based format.");
+ .key("hoodie.parquet.writelegacyformat.enabled")
+ .defaultValue("false")
+ .withDocumentation("Sets spark.sql.parquet.writeLegacyFormat. If true, data will be written in a way of Spark 1.4 and earlier. "
+ + "For example, decimal values will be written in Parquet's fixed-length byte array format which other systems such as Apache Hive and Apache Impala use. "
+ + "If false, the newer format in Parquet will be used. For example, decimals will be written in int-based format.");
public static final ConfigProperty<String> PARQUET_OUTPUT_TIMESTAMP_TYPE = ConfigProperty
- .key("hoodie.parquet.outputtimestamptype")
- .defaultValue("TIMESTAMP_MICROS")
- .withDocumentation("Sets spark.sql.parquet.outputTimestampType. Parquet timestamp type to use when Spark writes data to Parquet files.");
+ .key("hoodie.parquet.outputtimestamptype")
+ .defaultValue("TIMESTAMP_MICROS")
+ .withDocumentation("Sets spark.sql.parquet.outputTimestampType. Parquet timestamp type to use when Spark writes data to Parquet files.");
public static final ConfigProperty<String> HFILE_COMPRESSION_ALGORITHM_NAME = ConfigProperty
.key("hoodie.hfile.compression.algorithm")
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index d18238fa4b..4d07097c07 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -89,6 +89,8 @@ import java.util.Properties;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_POLICY;
+
/**
* Class storing configs for the HoodieWriteClient.
*/
@@ -1148,31 +1150,31 @@ public class HoodieWriteConfig extends HoodieConfig {
* compaction properties.
*/
public HoodieCleaningPolicy getCleanerPolicy() {
- return HoodieCleaningPolicy.valueOf(getString(HoodieCompactionConfig.CLEANER_POLICY));
+ return HoodieCleaningPolicy.valueOf(getString(CLEANER_POLICY));
}
public int getCleanerFileVersionsRetained() {
- return getInt(HoodieCompactionConfig.CLEANER_FILE_VERSIONS_RETAINED);
+ return getInt(HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED);
}
public int getCleanerCommitsRetained() {
- return getInt(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED);
+ return getInt(HoodieCleanConfig.CLEANER_COMMITS_RETAINED);
}
public int getCleanerHoursRetained() {
- return getInt(HoodieCompactionConfig.CLEANER_HOURS_RETAINED);
+ return getInt(HoodieCleanConfig.CLEANER_HOURS_RETAINED);
}
public int getMaxCommitsToKeep() {
- return getInt(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP);
+ return getInt(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP);
}
public int getMinCommitsToKeep() {
- return getInt(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP);
+ return getInt(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP);
}
public int getArchiveMergeFilesBatchSize() {
- return getInt(HoodieCompactionConfig.ARCHIVE_MERGE_FILES_BATCH_SIZE);
+ return getInt(HoodieArchivalConfig.ARCHIVE_MERGE_FILES_BATCH_SIZE);
}
public int getParquetSmallFileLimit() {
@@ -1192,7 +1194,7 @@ public class HoodieWriteConfig extends HoodieConfig {
}
public boolean allowMultipleCleans() {
- return getBoolean(HoodieCompactionConfig.ALLOW_MULTIPLE_CLEANS);
+ return getBoolean(HoodieCleanConfig.ALLOW_MULTIPLE_CLEANS);
}
public boolean shouldAutoTuneInsertSplits() {
@@ -1200,43 +1202,43 @@ public class HoodieWriteConfig extends HoodieConfig {
}
public int getCleanerParallelism() {
- return getInt(HoodieCompactionConfig.CLEANER_PARALLELISM_VALUE);
+ return getInt(HoodieCleanConfig.CLEANER_PARALLELISM_VALUE);
}
public int getCleaningMaxCommits() {
- return getInt(HoodieCompactionConfig.CLEAN_MAX_COMMITS);
+ return getInt(HoodieCleanConfig.CLEAN_MAX_COMMITS);
}
public CleaningTriggerStrategy getCleaningTriggerStrategy() {
- return CleaningTriggerStrategy.valueOf(getString(HoodieCompactionConfig.CLEAN_TRIGGER_STRATEGY));
+ return CleaningTriggerStrategy.valueOf(getString(HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY));
}
public boolean isAutoClean() {
- return getBoolean(HoodieCompactionConfig.AUTO_CLEAN);
+ return getBoolean(HoodieCleanConfig.AUTO_CLEAN);
}
public boolean getArchiveMergeEnable() {
- return getBoolean(HoodieCompactionConfig.ARCHIVE_MERGE_ENABLE);
+ return getBoolean(HoodieArchivalConfig.ARCHIVE_MERGE_ENABLE);
}
public long getArchiveMergeSmallFileLimitBytes() {
- return getLong(HoodieCompactionConfig.ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES);
+ return getLong(HoodieArchivalConfig.ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES);
}
public boolean isAutoArchive() {
- return getBoolean(HoodieCompactionConfig.AUTO_ARCHIVE);
+ return getBoolean(HoodieArchivalConfig.AUTO_ARCHIVE);
}
public boolean isAsyncArchive() {
- return getBoolean(HoodieCompactionConfig.ASYNC_ARCHIVE);
+ return getBoolean(HoodieArchivalConfig.ASYNC_ARCHIVE);
}
public boolean isAsyncClean() {
- return getBoolean(HoodieCompactionConfig.ASYNC_CLEAN);
+ return getBoolean(HoodieCleanConfig.ASYNC_CLEAN);
}
public boolean incrementalCleanerModeEnabled() {
- return getBoolean(HoodieCompactionConfig.CLEANER_INCREMENTAL_MODE_ENABLE);
+ return getBoolean(HoodieCleanConfig.CLEANER_INCREMENTAL_MODE_ENABLE);
}
public boolean inlineCompactionEnabled() {
@@ -1280,7 +1282,7 @@ public class HoodieWriteConfig extends HoodieConfig {
}
public int getArchiveDeleteParallelism() {
- return getInt(HoodieCompactionConfig.DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE);
+ return getInt(HoodieArchivalConfig.DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE);
}
public boolean inlineClusteringEnabled() {
@@ -1321,7 +1323,7 @@ public class HoodieWriteConfig extends HoodieConfig {
}
public String getPayloadClass() {
- return getString(HoodieCompactionConfig.PAYLOAD_CLASS_NAME);
+ return getString(HoodiePayloadConfig.PAYLOAD_CLASS_NAME);
}
public int getTargetPartitionsPerDayBasedCompaction() {
@@ -1329,11 +1331,11 @@ public class HoodieWriteConfig extends HoodieConfig {
}
public int getCommitArchivalBatchSize() {
- return getInt(HoodieCompactionConfig.COMMITS_ARCHIVAL_BATCH_SIZE);
+ return getInt(HoodieArchivalConfig.COMMITS_ARCHIVAL_BATCH_SIZE);
}
public Boolean shouldCleanBootstrapBaseFile() {
- return getBoolean(HoodieCompactionConfig.CLEANER_BOOTSTRAP_BASE_FILE_ENABLE);
+ return getBoolean(HoodieCleanConfig.CLEANER_BOOTSTRAP_BASE_FILE_ENABLE);
}
public String getClusteringUpdatesStrategyClass() {
@@ -1342,7 +1344,7 @@ public class HoodieWriteConfig extends HoodieConfig {
public HoodieFailedWritesCleaningPolicy getFailedWritesCleanPolicy() {
return HoodieFailedWritesCleaningPolicy
- .valueOf(getString(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY));
+ .valueOf(getString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY));
}
/**
@@ -2117,6 +2119,8 @@ public class HoodieWriteConfig extends HoodieConfig {
private boolean isIndexConfigSet = false;
private boolean isStorageConfigSet = false;
private boolean isCompactionConfigSet = false;
+ private boolean isCleanConfigSet = false;
+ private boolean isArchivalConfigSet = false;
private boolean isClusteringConfigSet = false;
private boolean isOptimizeConfigSet = false;
private boolean isMetricsConfigSet = false;
@@ -2284,6 +2288,18 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
+ public Builder withCleanConfig(HoodieCleanConfig cleanConfig) {
+ writeConfig.getProps().putAll(cleanConfig.getProps());
+ isCleanConfigSet = true;
+ return this;
+ }
+
+ public Builder withArchivalConfig(HoodieArchivalConfig cleanConfig) {
+ writeConfig.getProps().putAll(cleanConfig.getProps());
+ isArchivalConfigSet = true;
+ return this;
+ }
+
public Builder withClusteringConfig(HoodieClusteringConfig clusteringConfig) {
writeConfig.getProps().putAll(clusteringConfig.getProps());
isClusteringConfigSet = true;
@@ -2517,6 +2533,10 @@ public class HoodieWriteConfig extends HoodieConfig {
writeConfig.getProps()).build());
writeConfig.setDefaultOnCondition(!isCompactionConfigSet,
HoodieCompactionConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
+ writeConfig.setDefaultOnCondition(!isCleanConfigSet,
+ HoodieCleanConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
+ writeConfig.setDefaultOnCondition(!isArchivalConfigSet,
+ HoodieArchivalConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
writeConfig.setDefaultOnCondition(!isClusteringConfigSet,
HoodieClusteringConfig.newBuilder().withEngineType(engineType)
.fromProperties(writeConfig.getProps()).build());
@@ -2587,10 +2607,10 @@ public class HoodieWriteConfig extends HoodieConfig {
if (WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value()
.equalsIgnoreCase(writeConcurrencyMode)) {
// In this case, we assume that the user takes care of setting the lock provider used
- writeConfig.setValue(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(),
+ writeConfig.setValue(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
HoodieFailedWritesCleaningPolicy.LAZY.name());
LOG.info(String.format("Automatically set %s=%s since optimistic concurrency control is used",
- HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(),
+ HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
HoodieFailedWritesCleaningPolicy.LAZY.name()));
}
}
@@ -2602,9 +2622,34 @@ public class HoodieWriteConfig extends HoodieConfig {
Objects.requireNonNull(writeConfig.getString(BASE_PATH));
if (writeConfig.getString(WRITE_CONCURRENCY_MODE)
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value())) {
- ValidationUtils.checkArgument(!writeConfig.getString(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY)
+ ValidationUtils.checkArgument(!writeConfig.getString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY)
.equals(HoodieFailedWritesCleaningPolicy.EAGER.name()), "To enable optimistic concurrency control, set hoodie.cleaner.policy.failed.writes=LAZY");
}
+
+ HoodieCleaningPolicy.valueOf(writeConfig.getString(CLEANER_POLICY));
+ // Ensure minInstantsToKeep > cleanerCommitsRetained, otherwise we will archive some
+ // commit instant on timeline, that still has not been cleaned. Could miss some data via incr pull
+ int minInstantsToKeep = Integer.parseInt(writeConfig.getStringOrDefault(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP));
+ int maxInstantsToKeep = Integer.parseInt(writeConfig.getStringOrDefault(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP));
+ int cleanerCommitsRetained =
+ Integer.parseInt(writeConfig.getStringOrDefault(HoodieCleanConfig.CLEANER_COMMITS_RETAINED));
+ ValidationUtils.checkArgument(maxInstantsToKeep > minInstantsToKeep,
+ String.format(
+ "Increase %s=%d to be greater than %s=%d.",
+ HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), maxInstantsToKeep,
+ HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), minInstantsToKeep));
+ ValidationUtils.checkArgument(minInstantsToKeep > cleanerCommitsRetained,
+ String.format(
+ "Increase %s=%d to be greater than %s=%d. Otherwise, there is risk of incremental pull "
+ + "missing data from few instants.",
+ HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), minInstantsToKeep,
+ HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), cleanerCommitsRetained));
+
+ boolean inlineCompact = writeConfig.getBoolean(HoodieCompactionConfig.INLINE_COMPACT);
+ boolean inlineCompactSchedule = writeConfig.getBoolean(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT);
+ ValidationUtils.checkArgument(!(inlineCompact && inlineCompactSchedule), String.format("Either of inline compaction (%s) or "
+ + "schedule inline compaction (%s) can be enabled. Both can't be set to true at the same time. %s, %s", HoodieCompactionConfig.INLINE_COMPACT.key(),
+ HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), inlineCompact, inlineCompactSchedule));
}
public HoodieWriteConfig build() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index e36adf6be5..f4ee3fc9f2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -58,6 +58,8 @@ import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
@@ -255,20 +257,24 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
.withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath()))
.withSchema(HoodieMetadataRecord.getClassSchema().toString())
.forTable(tableName)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ // we will trigger cleaning manually, to control the instant times
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withAsyncClean(writeConfig.isMetadataAsyncClean())
- // we will trigger cleaning manually, to control the instant times
.withAutoClean(false)
.withCleanerParallelism(parallelism)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.retainCommits(writeConfig.getMetadataCleanerCommitsRetained())
+ .build())
+ // we will trigger archive manually, to ensure only regular writer invokes it
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
.archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep)
- // we will trigger compaction manually, to control the instant times
+ .withAutoArchive(false)
+ .build())
+ // we will trigger compaction manually, to control the instant times
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax())
- // we will trigger archive manually, to ensure only regular writer invokes it
- .withAutoArchive(false)
// by default, the HFile does not keep the metadata fields, set up as false
// to always use the metadata of the new record.
.withPreserveCommitMetadata(false)
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
index 6573560e75..afbedc0de3 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
@@ -26,7 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieLockException;
@@ -57,9 +57,9 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
private HoodieWriteConfig getWriteConfig() {
return HoodieWriteConfig.newBuilder()
.withPath(basePath)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
- .build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withLockConfig(HoodieLockConfig.newBuilder()
.withLockProvider(InProcessLockProvider.class)
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 3d0da3e49a..0adbb998a0 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -47,10 +47,10 @@ import java.util.Properties;
import java.util.function.Function;
import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
-import static org.apache.hudi.config.HoodieCompactionConfig.ASYNC_ARCHIVE;
-import static org.apache.hudi.config.HoodieCompactionConfig.ASYNC_CLEAN;
-import static org.apache.hudi.config.HoodieCompactionConfig.AUTO_CLEAN;
-import static org.apache.hudi.config.HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY;
+import static org.apache.hudi.config.HoodieArchivalConfig.ASYNC_ARCHIVE;
+import static org.apache.hudi.config.HoodieCleanConfig.ASYNC_CLEAN;
+import static org.apache.hudi.config.HoodieCleanConfig.AUTO_CLEAN;
+import static org.apache.hudi.config.HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY;
import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT;
import static org.apache.hudi.config.HoodieWriteConfig.TABLE_SERVICES_ENABLED;
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE;
@@ -64,9 +64,9 @@ public class TestHoodieWriteConfig {
public void testPropertyLoading(boolean withAlternative) throws IOException {
Builder builder = HoodieWriteConfig.newBuilder().withPath("/tmp");
Map<String, String> params = new HashMap<>(3);
- params.put(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "1");
- params.put(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "5");
- params.put(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "2");
+ params.put(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1");
+ params.put(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "5");
+ params.put(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "2");
if (withAlternative) {
params.put("hoodie.avro.schema.externalTransformation", "true");
} else {
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
index f6315eec7d..629b16cdb8 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
@@ -38,7 +38,7 @@ import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieRollbackException;
@@ -80,7 +80,7 @@ public class TestClientRollback extends HoodieClientTestBase {
*/
@Test
public void testSavepointAndRollback() throws Exception {
- HoodieWriteConfig cfg = getConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ HoodieWriteConfig cfg = getConfigBuilder().withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()).build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);
@@ -214,7 +214,7 @@ public class TestClientRollback extends HoodieClientTestBase {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withRollbackUsingMarkers(false)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
@@ -329,7 +329,7 @@ public class TestClientRollback extends HoodieClientTestBase {
.enable(enableMetadataTable)
.build()
)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
@@ -436,7 +436,7 @@ public class TestClientRollback extends HoodieClientTestBase {
// Set Failed Writes rollback to LAZY
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()).build();
HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
@@ -530,7 +530,7 @@ public class TestClientRollback extends HoodieClientTestBase {
.enable(enableMetadataTable)
.build()
)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index 3aeca0f275..268674e78d 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -33,6 +33,8 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -100,9 +102,11 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
HoodieWriteConfig writeConfig = getConfigBuilder()
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
- .withAutoArchive(false).withAutoClean(false).build())
+ .withAutoClean(false).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .withAutoArchive(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
// Timeline-server-based markers are not used for multi-writer tests
.withMarkersType(MarkerType.DIRECT.name())
@@ -192,9 +196,11 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20");
HoodieWriteConfig cfg = getConfigBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withInlineCompaction(false)
- .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withMaxNumDeltaCommitsBeforeCompaction(2)
.build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
@@ -265,9 +271,12 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
// Disabling embedded timeline server, it doesn't work with multiwriter
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().withAutoClean(false)
- .withInlineCompaction(false).withAsyncClean(true)
- .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withAutoClean(false)
+ .withAsyncClean(true)
+ .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(2).build())
.withEmbeddedTimelineServerEnabled(false)
// Timeline-server-based markers are not used for multi-writer tests
@@ -402,7 +411,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withAutoClean(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
// Timeline-server-based markers are not used for multi-writer tests
@@ -453,7 +463,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100");
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withAutoClean(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
// Timeline-server-based markers are not used for multi-writer tests
@@ -536,7 +547,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100");
HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withAutoClean(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
// Timeline-server-based markers are not used for multi-writer tests
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 1a618a01df..6d410ded1c 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -82,11 +82,13 @@ import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.hash.ColumnIndexID;
import org.apache.hudi.common.util.hash.PartitionIndexID;
import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieLockConfig;
-import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.storage.HoodieHFileReader;
@@ -476,7 +478,13 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
.archiveCommitsWith(3, 4)
.retainCommits(1)
.build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3).retainCommits(1).build()).build();
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .retainCommits(1)
+ .build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .archiveCommitsWith(2, 3)
+ .build())
+ .build();
initWriteConfigAndMetatableWriter(writeConfig, true);
AtomicInteger commitTime = new AtomicInteger(1);
@@ -637,8 +645,9 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
initPath();
int maxCommits = 1;
HoodieWriteConfig cfg = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits)
+ .build())
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
@@ -1172,8 +1181,15 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction)
.withPopulateMetaFields(populateMateFields)
.build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(minArchiveCommitsDataset, minArchiveCommitsDataset + 1)
- .retainCommits(1).retainFileVersions(1).withAutoClean(false).withAsyncClean(true).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .retainCommits(1)
+ .retainFileVersions(1)
+ .withAutoClean(false)
+ .withAsyncClean(true)
+ .build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .archiveCommitsWith(minArchiveCommitsDataset, minArchiveCommitsDataset + 1)
+ .build())
.build();
initWriteConfigAndMetatableWriter(writeConfig, true);
@@ -1399,10 +1415,13 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
// disable small file handling so that every insert goes to a new file group.
HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
.withRollbackUsingMarkers(false)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
+ .withAutoClean(false).retainCommits(1).retainFileVersions(1)
+ .build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
- .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
- .withAutoClean(false).retainCommits(1).retainFileVersions(1).build())
+ .build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(true)
.withMetadataIndexColumnStats(true)
@@ -1612,7 +1631,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20");
HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
@@ -1676,8 +1695,9 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(true).retainCommits(4).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(true).retainCommits(4)
+ .build())
.withAutoCommit(false)
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
@@ -1853,9 +1873,12 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
.archiveCommitsWith(40, 60).retainCommits(1)
.withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 4)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER)
- .retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(false).build())
+ .retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(false)
+ .build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .archiveCommitsWith(2, 4).build())
.build();
List<HoodieRecord> records;
@@ -2006,8 +2029,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
HoodieWriteConfig writeConfig = getWriteConfigBuilder(false, true, false)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
.withProperties(properties)
@@ -2034,10 +2057,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
// set hoodie.table.version to 2 in hoodie.properties file
changeTableVersion(HoodieTableVersion.TWO);
- writeConfig = getWriteConfigBuilder(true, true, false)
- .withRollbackUsingMarkers(false)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
+ writeConfig = getWriteConfigBuilder(true, true, false).withRollbackUsingMarkers(false).withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
.withProperties(properties)
@@ -2119,7 +2140,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
int maxCommits = 1;
HoodieWriteConfig cfg = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
@@ -2285,13 +2306,13 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean mergeAllowDuplicateInserts) {
HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER);
- return builder
- .withCompactionConfig(
+ return builder.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.compactionSmallFileSize(smallFileSize)
// Set rollback to LAZY so no inflights are deleted
- .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.insertSplitSize(insertSplitSize).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
.withStorageConfig(
HoodieStorageConfig.newBuilder()
.hfileMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200))
@@ -2307,8 +2328,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
.withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
.withWriteStatusClass(MetadataMergeWriteStatus.class)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(cleaningPolicy)
- .compactionSmallFileSize(1024 * 1024).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(cleaningPolicy).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).orcMaxFileSize(1024 * 1024).build())
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index cebf3145bf..8ba459b772 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -71,12 +71,13 @@ import org.apache.hudi.common.util.MarkerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
-import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieCorruptedDataException;
@@ -360,7 +361,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
.withPrecommitValidatorSingleResultSqlQueries(COUNT_SQL_QUERY_FOR_VALIDATION + "#" + 500)
.build();
HoodieWriteConfig config = getConfigBuilder()
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).build())
.withPreCommitValidatorConfig(validatorConfig)
.build();
@@ -386,7 +387,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
.withPrecommitValidatorSingleResultSqlQueries(COUNT_SQL_QUERY_FOR_VALIDATION + "#" + numRecords)
.build();
config = getConfigBuilder()
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).build())
.withPreCommitValidatorConfig(validatorConfig)
.build();
String instant2 = HoodieActiveTimeline.createNewInstantTime();
@@ -921,7 +922,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
.setTimelineLayoutVersion(VERSION_0)
.initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
// Set rollback to LAZY so no inflights are deleted
- hoodieWriteConfig.getProps().put(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(),
+ hoodieWriteConfig.getProps().put(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
HoodieFailedWritesCleaningPolicy.LAZY.name());
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
@@ -2606,17 +2607,16 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
if (!populateMetaFields) {
builder.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.SIMPLE).build());
}
- return builder
- .withCompactionConfig(
- HoodieCompactionConfig.newBuilder()
- .compactionSmallFileSize(smallFileSize)
- // Set rollback to LAZY so no inflights are deleted
- .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
- .insertSplitSize(insertSplitSize).build())
- .withStorageConfig(
- HoodieStorageConfig.newBuilder()
- .hfileMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200))
- .parquetMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build())
+ return builder.withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .compactionSmallFileSize(smallFileSize)
+ // Set rollback to LAZY so no inflights are deleted
+ .insertSplitSize(insertSplitSize).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .build())
+ .withStorageConfig(HoodieStorageConfig.newBuilder()
+ .hfileMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200))
+ .parquetMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build())
.withMergeAllowDuplicateOnInserts(mergeAllowDuplicateInserts)
.withProps(props)
.build();
@@ -2636,7 +2636,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
private HoodieWriteConfig getParallelWritingWriteConfig(HoodieFailedWritesCleaningPolicy cleaningPolicy, boolean populateMetaFields) {
return getConfigBuilder()
.withEmbeddedTimelineServerEnabled(false)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(cleaningPolicy)
.withAutoClean(false).build())
.withTimelineLayoutVersion(1)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
index 2e387be544..29c653daee 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
@@ -36,9 +36,11 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig;
@@ -338,9 +340,11 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
.withParallelism(2, 2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2)
.withAutoCommit(autoCommit)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
- .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
+ .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(policy)
- .withAutoClean(false).retainCommits(1).retainFileVersions(1).build())
+ .withAutoClean(false).retainCommits(1).retainFileVersions(1)
+ .build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).build())
.withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
@@ -390,16 +394,20 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
.withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath()))
.withSchema(HoodieMetadataRecord.getClassSchema().toString())
.forTable(writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ // we will trigger cleaning manually, to control the instant times
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withAsyncClean(writeConfig.isMetadataAsyncClean())
- // we will trigger cleaning manually, to control the instant times
.withAutoClean(false)
.withCleanerParallelism(parallelism)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.retainCommits(writeConfig.getMetadataCleanerCommitsRetained())
- .archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep)
- // we will trigger compaction manually, to control the instant times
+ .build())
+ // we will trigger archival manually, to control the instant times
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep).build())
+ // we will trigger compaction manually, to control the instant times
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build())
.withParallelism(parallelism, parallelism)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java
index bdbc9e72d3..8531030a5c 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java
@@ -26,7 +26,8 @@ import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.log4j.LogManager;
@@ -275,7 +276,10 @@ public class TestHoodieMetadataBootstrap extends TestHoodieMetadataBase {
private HoodieWriteConfig getWriteConfig(int minArchivalCommits, int maxArchivalCommits) throws Exception {
return HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minArchivalCommits, maxArchivalCommits).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .retainCommits(1).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .archiveCommitsWith(minArchivalCommits, maxArchivalCommits).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
index 87bcad04bc..407fb8de0e 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
@@ -32,6 +32,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieHBaseIndexConfig;
import org.apache.hudi.config.HoodieIndexConfig;
@@ -477,9 +479,9 @@ public class TestSparkHoodieHBaseIndex extends SparkClientFunctionalTestHarness
public void testHbaseTagLocationForArchivedCommits() throws Exception {
// Load to memory
Map<String, String> params = new HashMap<String, String>();
- params.put(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "1");
- params.put(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "3");
- params.put(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "2");
+ params.put(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1");
+ params.put(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "3");
+ params.put(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "2");
HoodieWriteConfig config = getConfigBuilder(100, false, false).withProps(params).build();
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index d412052c2d..4f41c4a44d 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -45,6 +45,8 @@ import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
@@ -181,13 +183,14 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
init(tableType);
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minArchivalCommits, maxArchivalCommits)
- .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits)
+ .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).withFailedWritesCleaningPolicy(failedWritesCleaningPolicy).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
.withArchiveMergeEnable(enableArchiveMerge)
.withArchiveMergeFilesBatchSize(archiveFilesBatch)
.withArchiveMergeSmallFileLimit(size)
- .withFailedWritesCleaningPolicy(failedWritesCleaningPolicy)
- .build())
+ .archiveCommitsWith(minArchivalCommits, maxArchivalCommits).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata)
@@ -566,7 +569,8 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
init();
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table")
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 5).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
@@ -716,7 +720,8 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
HoodieWriteConfig cfg =
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).forTable("test-trip-table")
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
@@ -881,9 +886,9 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
HoodieWriteConfig cfg =
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).forTable("test-trip-table")
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstantsToKeep, maxInstantsToKeep).build())
- .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
- .withRemoteServerPort(timelineServicePort).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(minInstantsToKeep, maxInstantsToKeep).build())
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -940,7 +945,8 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
HoodieWriteConfig cfg =
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).forTable("test-trip-table")
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
@@ -1146,8 +1152,8 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
// Test configs where metadata table has more aggressive archival configs than the compaction config
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .retainCommits(1).archiveCommitsWith(2, 4).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 4).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index b8545b0f63..8da877940b 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -70,6 +70,7 @@ import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
@@ -261,11 +262,15 @@ public class TestCleaner extends HoodieClientTestBase {
HoodieWriteConfig writeConfig = getConfigBuilder()
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
.withEnableBackupForRemoteFileSystemView(false).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
- .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
+
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
.allowMultipleCleans(false)
- .withAutoClean(false).retainCommits(1).retainFileVersions(1).build())
+ .withAutoClean(false).retainCommits(1).retainFileVersions(1)
+ .build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
+ .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
+ .build())
.withEmbeddedTimelineServerEnabled(false).build();
int index = 0;
@@ -334,8 +339,9 @@ public class TestCleaner extends HoodieClientTestBase {
throws Exception {
int maxVersions = 2; // keep upto 2 versions for each file
HoodieWriteConfig cfg = getConfigBuilder()
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(maxVersions).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS)
+ .retainFileVersions(maxVersions).build())
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.build();
@@ -503,7 +509,7 @@ public class TestCleaner extends HoodieClientTestBase {
throws Exception {
int maxCommits = 3; // keep upto 3 commits from the past
HoodieWriteConfig cfg = getConfigBuilder()
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
@@ -579,7 +585,7 @@ public class TestCleaner extends HoodieClientTestBase {
HoodieWriteConfig cfg = getConfigBuilder()
.withAutoCommit(false)
.withHeartbeatIntervalInMs(3000)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
@@ -732,8 +738,8 @@ public class TestCleaner extends HoodieClientTestBase {
HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).build())
.build();
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -787,8 +793,9 @@ public class TestCleaner extends HoodieClientTestBase {
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1)
.withAssumeDatePartitioning(true).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+ .retainCommits(2).build())
.build();
HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
@@ -1140,7 +1147,7 @@ public class TestCleaner extends HoodieClientTestBase {
public void testCleaningWithZeroPartitionPaths() throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
.build();
@@ -1164,7 +1171,7 @@ public class TestCleaner extends HoodieClientTestBase {
public void testKeepLatestCommitsWithPendingCompactions() throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
.build();
// Deletions:
@@ -1188,7 +1195,7 @@ public class TestCleaner extends HoodieClientTestBase {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(2).build())
.build();
// Deletions:
@@ -1212,8 +1219,8 @@ public class TestCleaner extends HoodieClientTestBase {
HoodieWriteConfig.newBuilder()
.withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
.build();
String commitTime = makeNewCommitTime(1, "%09d");
@@ -1241,7 +1248,7 @@ public class TestCleaner extends HoodieClientTestBase {
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1)
.withAssumeDatePartitioning(true).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
.build();
@@ -1317,7 +1324,7 @@ public class TestCleaner extends HoodieClientTestBase {
HoodieWriteConfig cfg = getConfigBuilder()
.withAutoCommit(false)
.withHeartbeatIntervalInMs(3000)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(maxVersions).build())
.withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
index bd015baec9..9fcac64c00 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
@@ -37,7 +37,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
@@ -77,14 +77,14 @@ public class TestCleanPlanExecutor extends TestCleaner {
@Test
public void testInvalidCleaningTriggerStrategy() {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
- .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(false).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withIncrementalCleaningMode(true)
- .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
- .withCleanBootstrapBaseFileEnabled(true)
- .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2)
- .withCleaningTriggerStrategy("invalid_strategy").build())
- .build();
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(false).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withIncrementalCleaningMode(true)
+ .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
+ .withCleanBootstrapBaseFileEnabled(true)
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2)
+ .withCleaningTriggerStrategy("invalid_strategy").build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(false).build()).build();
Exception e = assertThrows(IllegalArgumentException.class, () -> runCleaner(config, true), "should fail when invalid trigger strategy is provided!");
assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.table.action.clean.CleaningTriggerStrategy.invalid_strategy"));
}
@@ -108,18 +108,15 @@ public class TestCleanPlanExecutor extends TestCleaner {
boolean simulateFailureRetry, boolean simulateMetadataFailure,
boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
- .withMetadataConfig(
- HoodieMetadataConfig.newBuilder()
- .withAssumeDatePartitioning(true)
- .build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withIncrementalCleaningMode(enableIncrementalClean)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
.withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.retainCommits(2)
- .withMaxCommitsBeforeCleaning(2).build())
- .build();
+ .withMaxCommitsBeforeCleaning(2)
+ .build()).build();
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
String p0 = "2020/01/01";
@@ -274,7 +271,7 @@ public class TestCleanPlanExecutor extends TestCleaner {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
.build();
@@ -353,7 +350,7 @@ public class TestCleanPlanExecutor extends TestCleaner {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanBootstrapBaseFileEnabled(true)
.withCleanerParallelism(1)
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
@@ -453,18 +450,15 @@ public class TestCleanPlanExecutor extends TestCleaner {
@Test
public void testKeepLatestFileVersionsMOR() throws Exception {
- HoodieWriteConfig config =
- HoodieWriteConfig.newBuilder().withPath(basePath)
- .withMetadataConfig(
- HoodieMetadataConfig.newBuilder()
- .withAssumeDatePartitioning(true)
- // Column Stats Index is disabled, since these tests construct tables which are
- // not valid (empty commit metadata, invalid parquet files)
- .withMetadataIndexColumnStats(false)
- .build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
- .build();
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true)
+ // Column Stats Index is disabled, since these tests construct tables which are
+ // not valid (empty commit metadata, invalid parquet files)
+ .withMetadataIndexColumnStats(false)
+ .build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1)
+ .build()).build();
HoodieTableMetaClient metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
@@ -497,18 +491,14 @@ public class TestCleanPlanExecutor extends TestCleaner {
@Test
public void testKeepLatestCommitsMOR() throws Exception {
- HoodieWriteConfig config =
- HoodieWriteConfig.newBuilder().withPath(basePath)
- .withMetadataConfig(
- HoodieMetadataConfig.newBuilder()
- .withAssumeDatePartitioning(true)
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true)
// Column Stats Index is disabled, since these tests construct tables which are
// not valid (empty commit metadata, invalid parquet files)
- .withMetadataIndexColumnStats(false)
- .build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build())
- .build();
+ .withMetadataIndexColumnStats(false).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build())
+ .build();
HoodieTableMetaClient metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
@@ -552,11 +542,12 @@ public class TestCleanPlanExecutor extends TestCleaner {
boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
.withIncrementalCleaningMode(enableIncrementalClean)
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
.withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
- .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(2).build())
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(2)
+ .build())
.build();
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
index b4d6aefa71..baff4ebac8 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
@@ -27,7 +27,8 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
@@ -54,8 +55,9 @@ public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie
public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOException {
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
HoodieWriteConfig writeConfig = getConfigBuilder(true)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3).retainCommits(1).build())
- .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build())
.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(DEFAULT_PARTITION_PATHS)) {
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index c33f0bcc2c..2f0e585ec9 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -43,6 +43,7 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -556,7 +557,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
// trigger clean. creating a new client with aggresive cleaner configs so that clean will kick in immediately.
cfgBuilder = getConfigBuilder(false)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
// Timeline-server-based markers are not used for multi-rollback tests
.withMarkersType(MarkerType.DIRECT.name());
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
@@ -977,10 +978,13 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
private HoodieWriteConfig getWriteConfig(boolean autoCommit, boolean rollbackUsingMarkers) {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(autoCommit).withRollbackUsingMarkers(rollbackUsingMarkers)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withAutoClean(false)
+ .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024L)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(3)
- .withAutoClean(false)
- .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build());
+ .build());
return cfgBuilder.build();
}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 16fd48af6c..900674a677 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -38,10 +38,11 @@ import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
-import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.index.SparkHoodieIndexFactory;
@@ -146,8 +147,8 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
.withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
.withWriteStatusClass(MetadataMergeWriteStatus.class)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(cleaningPolicy)
- .compactionSmallFileSize(1024 * 1024).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(cleaningPolicy).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).orcMaxFileSize(1024 * 1024).build())
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
diff --git a/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java b/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
index 4890a6529a..6e20ee1190 100644
--- a/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
+++ b/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
@@ -27,7 +27,7 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.examples.common.HoodieExampleDataGenerator;
@@ -85,7 +85,7 @@ public class HoodieJavaWriteClientExample {
.withSchema(HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withDeleteParallelism(2).forTable(tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build();
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(20, 30).build()).build();
HoodieJavaWriteClient<HoodieAvroPayload> client =
new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg);
diff --git a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
index 1afc180531..299fe992fa 100644
--- a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
+++ b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
@@ -29,7 +29,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.examples.common.HoodieExampleDataGenerator;
@@ -99,7 +99,7 @@ public class HoodieWriteClientExample {
.withSchema(HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withDeleteParallelism(2).forTable(tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build();
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(20, 30).build()).build();
SparkRDDWriteClient<HoodieAvroPayload> client = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg);
// inserts
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index d292b3832a..0db960c32f 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -38,12 +38,14 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
-import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieStorageConfig;
-import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
@@ -178,23 +180,25 @@ public class StreamerUtil {
.withClusteringSkipPartitionsFromLatest(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST))
.withAsyncClusteringMaxCommits(conf.getInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS))
.build())
- .withCompactionConfig(
- HoodieCompactionConfig.newBuilder()
- .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
- .withTargetIOPerCompactionInMB(conf.getLong(FlinkOptions.COMPACTION_TARGET_IO))
- .withInlineCompactionTriggerStrategy(
- CompactionTriggerStrategy.valueOf(conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toUpperCase(Locale.ROOT)))
- .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_COMMITS))
- .withMaxDeltaSecondsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS))
- .withAsyncClean(conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED))
- .retainCommits(conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS))
- .retainFileVersions(conf.getInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS))
- // override and hardcode to 20,
- // actually Flink cleaning is always with parallelism 1 now
- .withCleanerParallelism(20)
- .archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS))
- .withCleanerPolicy(HoodieCleaningPolicy.valueOf(conf.getString(FlinkOptions.CLEAN_POLICY)))
- .build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withAsyncClean(conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED))
+ .retainCommits(conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS))
+ .retainFileVersions(conf.getInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS))
+ // override and hardcode to 20,
+ // actually Flink cleaning is always with parallelism 1 now
+ .withCleanerParallelism(20)
+ .withCleanerPolicy(HoodieCleaningPolicy.valueOf(conf.getString(FlinkOptions.CLEAN_POLICY)))
+ .build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS))
+ .build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withTargetIOPerCompactionInMB(conf.getLong(FlinkOptions.COMPACTION_TARGET_IO))
+ .withInlineCompactionTriggerStrategy(
+ CompactionTriggerStrategy.valueOf(conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toUpperCase(Locale.ROOT)))
+ .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_COMMITS))
+ .withMaxDeltaSecondsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS))
+ .build())
.withMemoryConfig(
HoodieMemoryConfig.newBuilder()
.withMaxMemoryMaxSize(
@@ -214,8 +218,10 @@ public class StreamerUtil {
.withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
.build())
.withPayloadConfig(HoodiePayloadConfig.newBuilder()
+ .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
.withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
.withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
+ .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
.build())
.withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService)
.withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
index 7a9e122e86..dc711818d7 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
@@ -25,7 +25,6 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -95,8 +94,9 @@ public abstract class HoodieTestSuiteWriter implements Serializable {
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath)
.withAutoCommit(false)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build())
- .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField)
+ .withPayloadConfig(HoodiePayloadConfig.newBuilder()
+ .withPayloadOrderingField(cfg.sourceOrderingField)
+ .withPayloadClass(cfg.payloadClassName)
.build())
.forTable(cfg.targetTableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java
index 6ab0469738..1a33560dc8 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java
@@ -27,10 +27,12 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.config.HoodieClusteringConfig;
-import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.connect.KafkaConnectFileIdPrefixProvider;
import org.apache.hudi.connect.utils.KafkaConnectUtils;
import org.apache.hudi.exception.HoodieException;
@@ -87,13 +89,10 @@ public class KafkaConnectWriterProvider implements ConnectWriterProvider<WriteSt
.withAutoCommit(false)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
// participants should not trigger table services, and leave it to the coordinator
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withAutoClean(false)
- .withAutoArchive(false)
- .withInlineCompaction(false).build())
- .withClusteringConfig(HoodieClusteringConfig.newBuilder()
- .withInlineClustering(false)
- .build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder().withAutoArchive(false).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder().withAutoClean(false).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build())
+ .withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(false).build())
.build();
context = new HoodieJavaEngineContext(hadoopConf);
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index ca1474fa88..80be97ebef 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -180,9 +180,10 @@ public class DataSourceUtils {
return builder.forTable(tblName)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key()))
.withInlineCompaction(inlineCompact).build())
- .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(parameters.get(DataSourceWriteOptions.PRECOMBINE_FIELD().key()))
+ .withPayloadConfig(HoodiePayloadConfig.newBuilder()
+ .withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key()))
+ .withPayloadOrderingField(parameters.get(DataSourceWriteOptions.PRECOMBINE_FIELD().key()))
.build())
// override above with Hoodie configs specified as options.
.withProps(parameters).build();
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
index 792d26b184..9eeef164a1 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
@@ -22,7 +22,7 @@ import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion
import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
import org.apache.hudi.common.util.Option
-import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig, HoodieCleanConfig}
import org.apache.hudi.index.HoodieIndex
import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, UpgradeDowngrade}
import org.apache.spark.internal.Logging
@@ -82,7 +82,7 @@ class UpgradeOrDowngradeProcedure extends BaseProcedure with ProcedureBuilder wi
HoodieWriteConfig.newBuilder
.withPath(basePath)
.withRollbackUsingMarkers(true)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build)
+ .withCleanConfig(HoodieCleanConfig.newBuilder.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build)
.withIndexConfig(HoodieIndexConfig.newBuilder.withIndexType(HoodieIndex.IndexType.BLOOM).build)
.build
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 5940ab1d46..badd3ab627 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -783,12 +783,12 @@ public class DeltaSync implements Serializable {
.combineInput(cfg.filterDupes, combineBeforeUpsert)
.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
- .withPayloadClass(cfg.payloadClassName)
.withInlineCompaction(cfg.isInlineCompactionEnabled())
.build()
)
.withPayloadConfig(
HoodiePayloadConfig.newBuilder()
+ .withPayloadClass(cfg.payloadClassName)
.withPayloadOrderingField(cfg.sourceOrderingField)
.build())
.forTable(cfg.targetTableName)
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 45d8a427c0..dde0e5f73f 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -46,6 +46,8 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -766,7 +768,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
}
cfg.tableType = tableType.name();
cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
- cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
+ cfg.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
@@ -946,16 +948,16 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
// Step 4 : Insert 1 record and trigger sync/async cleaner and archive.
List<String> configs = getAsyncServicesConfigs(1, "true", "true", "2", "", "");
- configs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS"));
- configs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "1"));
- configs.add(String.format("%s=%s", HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "2"));
- configs.add(String.format("%s=%s", HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "3"));
- configs.add(String.format("%s=%s", HoodieCompactionConfig.ASYNC_CLEAN.key(), asyncClean));
+ configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS"));
+ configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1"));
+ configs.add(String.format("%s=%s", HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "2"));
+ configs.add(String.format("%s=%s", HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "3"));
+ configs.add(String.format("%s=%s", HoodieCleanConfig.ASYNC_CLEAN.key(), asyncClean));
configs.add(String.format("%s=%s", HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
if (asyncClean) {
configs.add(String.format("%s=%s", HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));
- configs.add(String.format("%s=%s", HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(),
+ configs.add(String.format("%s=%s", HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
HoodieFailedWritesCleaningPolicy.LAZY.name()));
configs.add(String.format("%s=%s", HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
InProcessLockProvider.class.getName()));
@@ -987,7 +989,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
List<String> configs = new ArrayList<>();
configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
if (!StringUtils.isNullOrEmpty(autoClean)) {
- configs.add(String.format("%s=%s", HoodieCompactionConfig.AUTO_CLEAN.key(), autoClean));
+ configs.add(String.format("%s=%s", HoodieCleanConfig.AUTO_CLEAN.key(), autoClean));
}
if (!StringUtils.isNullOrEmpty(inlineCluster)) {
configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING.key(), inlineCluster));
@@ -1462,7 +1464,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
cfg2.filterDupes = false;
cfg2.sourceLimit = 2000;
cfg2.operation = WriteOperationType.UPSERT;
- cfg2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
+ cfg2.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg2, jsc);
ds2.sync();
mClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
index 13f5ad97cf..abe1994a3d 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
@@ -26,7 +26,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
@@ -95,7 +95,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
prepJobConfig.continuousMode = true;
prepJobConfig.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
- prepJobConfig.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
+ prepJobConfig.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc());
// Prepare base dataset with some commits
@@ -115,7 +115,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
cfgIngestionJob.continuousMode = true;
cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
- cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
+ cfgIngestionJob.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
// create a backfill job
HoodieDeltaStreamer.Config cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
@@ -127,7 +127,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
- cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
+ cfgBackfillJob.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc());
// re-init ingestion job to start sync service
@@ -157,7 +157,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
prepJobConfig.continuousMode = true;
prepJobConfig.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
- prepJobConfig.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
+ prepJobConfig.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc());
// Prepare base dataset with some commits
@@ -188,13 +188,13 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
cfgBackfillJob2.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob2.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
- cfgBackfillJob2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
+ cfgBackfillJob2.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer.Config cfgIngestionJob2 = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName()));
cfgIngestionJob2.continuousMode = true;
cfgIngestionJob2.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
- cfgIngestionJob2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
+ cfgIngestionJob2.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
// re-init ingestion job
HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob2, jsc());
// re-init backfill job
@@ -225,7 +225,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
prepJobConfig.continuousMode = true;
prepJobConfig.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
- prepJobConfig.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
+ prepJobConfig.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc());
// Prepare base dataset with some commits
@@ -263,7 +263,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
// Set checkpoint to the last successful position
cfgBackfillJob.checkpoint = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
- cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
+ cfgBackfillJob.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc());
backfillJob.sync();
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index fa5cba446f..57270bdf81 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -28,7 +28,8 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -64,8 +65,8 @@ public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness {
@Test
public void testHoodieIncrSource() throws IOException {
HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .archiveCommitsWith(2, 3).retainCommits(1).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
.build();
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index 8e7bce944f..67a002c3ba 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -124,11 +124,14 @@ public class UtilitiesTestBase {
}
public static void initTestServices(boolean needsHive, boolean needsZookeeper) throws Exception {
- hdfsTestService = new HdfsTestService();
- dfsCluster = hdfsTestService.start(true);
- dfs = dfsCluster.getFileSystem();
- dfsBasePath = dfs.getWorkingDirectory().toString();
- dfs.mkdirs(new Path(dfsBasePath));
+
+ if (hdfsTestService == null) {
+ hdfsTestService = new HdfsTestService();
+ dfsCluster = hdfsTestService.start(true);
+ dfs = dfsCluster.getFileSystem();
+ dfsBasePath = dfs.getWorkingDirectory().toString();
+ dfs.mkdirs(new Path(dfsBasePath));
+ }
if (needsHive) {
hiveTestService = new HiveTestService(hdfsTestService.getHadoopConf());
hiveServer = hiveTestService.start();