You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by co...@apache.org on 2022/07/09 14:30:56 UTC

[hudi] branch master updated: [HUDI-2150] Rename/Restructure configs for better modularity (#6061)

This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 126b88b48d [HUDI-2150] Rename/Restructure configs for better modularity (#6061)
126b88b48d is described below

commit 126b88b48ddf3af4ad6b48551cab09eea4c800c9
Author: liujinhui <96...@qq.com>
AuthorDate: Sat Jul 9 22:30:48 2022 +0800

    [HUDI-2150] Rename/Restructure configs for better modularity (#6061)
    
    - Move clean related configuration to HoodieCleanConfig
    - Move Archival related configuration to HoodieArchivalConfig
    - hoodie.compaction.payload.class move this to HoodiePayloadConfig
---
 .../org/apache/hudi/cli/commands/SparkMain.java    |   4 +-
 .../cli/commands/TestArchivedCommitsCommand.java   |   6 +-
 .../hudi/cli/commands/TestCommitsCommand.java      |   9 +-
 .../hudi/cli/commands/TestCompactionCommand.java   |   6 +-
 .../apache/hudi/config/HoodieArchivalConfig.java   | 194 ++++++++++
 .../org/apache/hudi/config/HoodieCleanConfig.java  | 297 +++++++++++++++
 .../apache/hudi/config/HoodieCompactionConfig.java | 397 +--------------------
 .../apache/hudi/config/HoodiePayloadConfig.java    |  20 ++
 .../apache/hudi/config/HoodieStorageConfig.java    |  16 +-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  97 +++--
 .../metadata/HoodieBackedTableMetadataWriter.java  |  16 +-
 .../client/transaction/TestTransactionManager.java |   8 +-
 .../apache/hudi/config/TestHoodieWriteConfig.java  |  14 +-
 .../org/apache/hudi/client/TestClientRollback.java |  12 +-
 .../hudi/client/TestHoodieClientMultiWriter.java   |  30 +-
 .../functional/TestHoodieBackedMetadata.java       |  73 ++--
 .../TestHoodieClientOnCopyOnWriteStorage.java      |  34 +-
 .../client/functional/TestHoodieMetadataBase.java  |  22 +-
 .../functional/TestHoodieMetadataBootstrap.java    |   8 +-
 .../index/hbase/TestSparkHoodieHBaseIndex.java     |   8 +-
 .../apache/hudi/io/TestHoodieTimelineArchiver.java |  30 +-
 .../java/org/apache/hudi/table/TestCleaner.java    |  43 ++-
 .../table/functional/TestCleanPlanExecutor.java    |  75 ++--
 ...dieSparkCopyOnWriteTableArchiveWithReplace.java |   8 +-
 .../TestHoodieSparkMergeOnReadTableRollback.java   |  10 +-
 .../hudi/testutils/HoodieClientTestBase.java       |   9 +-
 .../java/HoodieJavaWriteClientExample.java         |   4 +-
 .../examples/spark/HoodieWriteClientExample.java   |   4 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |  44 ++-
 .../integ/testsuite/HoodieTestSuiteWriter.java     |   6 +-
 .../writers/KafkaConnectWriterProvider.java        |  19 +-
 .../main/java/org/apache/hudi/DataSourceUtils.java |   5 +-
 .../procedures/UpgradeOrDowngradeProcedure.scala   |   4 +-
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   2 +-
 .../functional/TestHoodieDeltaStreamer.java        |  20 +-
 .../TestHoodieDeltaStreamerWithMultiWriter.java    |  18 +-
 .../utilities/sources/TestHoodieIncrSource.java    |   7 +-
 .../utilities/testutils/UtilitiesTestBase.java     |  13 +-
 38 files changed, 920 insertions(+), 672 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 4135745351..e94c38bd16 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -35,7 +35,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieBootstrapConfig;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieSavepointException;
@@ -538,7 +538,7 @@ public class SparkMain {
   private static HoodieWriteConfig getWriteConfig(String basePath, Boolean rollbackUsingMarkers, boolean lazyCleanPolicy) {
     return HoodieWriteConfig.newBuilder().withPath(basePath)
         .withRollbackUsingMarkers(rollbackUsingMarkers)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(lazyCleanPolicy ? HoodieFailedWritesCleaningPolicy.LAZY :
+        .withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(lazyCleanPolicy ? HoodieFailedWritesCleaningPolicy.LAZY :
             HoodieFailedWritesCleaningPolicy.EAGER).build())
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
   }
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
index d822ad6589..b936202bd0 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
@@ -30,7 +30,8 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.client.HoodieTimelineArchiver;
@@ -72,7 +73,8 @@ public class TestArchivedCommitsCommand extends CLIFunctionalTestHarness {
     // Generate archive
     HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
         .withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
         .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
             .withRemoteServerPort(timelineServicePort).build())
         .forTable("test-trip-table").build();
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
index b23c6fd150..e03699f66e 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
@@ -37,7 +37,8 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.NumericUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.client.HoodieTimelineArchiver;
@@ -212,7 +213,8 @@ public class TestCommitsCommand extends CLIFunctionalTestHarness {
     // Generate archive
     HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1)
         .withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
         .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
             .withRemoteServerPort(timelineServicePort).build())
         .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
@@ -266,7 +268,8 @@ public class TestCommitsCommand extends CLIFunctionalTestHarness {
     // Generate archive
     HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1)
         .withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
         .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
             .withRemoteServerPort(timelineServicePort).build())
         .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java
index 17c1002f6b..bc5ba168e3 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java
@@ -39,7 +39,8 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.testutils.CompactionTestUtils;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.HoodieSparkTable;
@@ -166,7 +167,8 @@ public class TestCompactionCommand extends CLIFunctionalTestHarness {
     // Generate archive
     HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
         .withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
         .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
             .withRemoteServerPort(timelineServicePort).build())
         .forTable("test-trip-table").build();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
new file mode 100644
index 0000000000..32bccc3a3d
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieArchivalConfig.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Archival related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "Archival Configs",
+        groupName = ConfigGroups.Names.WRITE_CLIENT,
+        description = "Configurations that control archival.")
+public class HoodieArchivalConfig extends HoodieConfig {
+
+  public static final ConfigProperty<String> AUTO_ARCHIVE = ConfigProperty
+      .key("hoodie.archive.automatic")
+      .defaultValue("true")
+      .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
+          + " to archive commits if we cross a maximum value of commits."
+          + " It's recommended to enable this, to ensure number of active commits is bounded.");
+
+  public static final ConfigProperty<String> ASYNC_ARCHIVE = ConfigProperty
+      .key("hoodie.archive.async")
+      .defaultValue("false")
+      .sinceVersion("0.11.0")
+      .withDocumentation("Only applies when " + AUTO_ARCHIVE.key() + " is turned on. "
+          + "When turned on runs archiver async with writing, which can speed up overall write performance.");
+
+  public static final ConfigProperty<String> MAX_COMMITS_TO_KEEP = ConfigProperty
+      .key("hoodie.keep.max.commits")
+      .defaultValue("30")
+      .withDocumentation("Archiving service moves older entries from timeline into an archived log after each write, to "
+          + " keep the metadata overhead constant, even as the table size grows."
+          + "This config controls the maximum number of instants to retain in the active timeline. ");
+
+  public static final ConfigProperty<Integer> DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.archive.delete.parallelism")
+      .defaultValue(100)
+      .withDocumentation("Parallelism for deleting archived hoodie commits.");
+
+  public static final ConfigProperty<String> MIN_COMMITS_TO_KEEP = ConfigProperty
+      .key("hoodie.keep.min.commits")
+      .defaultValue("20")
+      .withDocumentation("Similar to " + MAX_COMMITS_TO_KEEP.key() + ", but controls the minimum number of"
+          + "instants to retain in the active timeline.");
+
+  public static final ConfigProperty<String> COMMITS_ARCHIVAL_BATCH_SIZE = ConfigProperty
+      .key("hoodie.commits.archival.batch")
+      .defaultValue(String.valueOf(10))
+      .withDocumentation("Archiving of instants is batched in best-effort manner, to pack more instants into a single"
+          + " archive log. This config controls such archival batch size.");
+
+  public static final ConfigProperty<Integer> ARCHIVE_MERGE_FILES_BATCH_SIZE = ConfigProperty
+      .key("hoodie.archive.merge.files.batch.size")
+      .defaultValue(10)
+      .withDocumentation("The number of small archive files to be merged at once.");
+
+  public static final ConfigProperty<Long> ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty
+      .key("hoodie.archive.merge.small.file.limit.bytes")
+      .defaultValue(20L * 1024 * 1024)
+      .withDocumentation("This config sets the archive file size limit below which an archive file becomes a candidate to be selected as such a small file.");
+
+  public static final ConfigProperty<Boolean> ARCHIVE_MERGE_ENABLE = ConfigProperty
+      .key("hoodie.archive.merge.enable")
+      .defaultValue(false)
+      .withDocumentation("When enable, hoodie will auto merge several small archive files into larger one. It's"
+          + " useful when storage scheme doesn't support append operation.");
+
+  /**
+   * @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead
+   */
+  @Deprecated
+  public static final String MAX_COMMITS_TO_KEEP_PROP = MAX_COMMITS_TO_KEEP.key();
+  /**
+   * @deprecated Use {@link #MIN_COMMITS_TO_KEEP} and its methods instead
+   */
+  @Deprecated
+  public static final String MIN_COMMITS_TO_KEEP_PROP = MIN_COMMITS_TO_KEEP.key();
+  /**
+   * @deprecated Use {@link #COMMITS_ARCHIVAL_BATCH_SIZE} and its methods instead
+   */
+  @Deprecated
+  public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = COMMITS_ARCHIVAL_BATCH_SIZE.key();
+  /** @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead */
+  @Deprecated
+  private static final String DEFAULT_MAX_COMMITS_TO_KEEP = MAX_COMMITS_TO_KEEP.defaultValue();
+  /**
+   * @deprecated Use {@link #MIN_COMMITS_TO_KEEP} and its methods instead
+   */
+  @Deprecated
+  private static final String DEFAULT_MIN_COMMITS_TO_KEEP = MIN_COMMITS_TO_KEEP.defaultValue();
+  /**
+   * @deprecated Use {@link #COMMITS_ARCHIVAL_BATCH_SIZE} and its methods instead
+   */
+  @Deprecated
+  private static final String DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE = COMMITS_ARCHIVAL_BATCH_SIZE.defaultValue();
+
+  private HoodieArchivalConfig() {
+    super();
+  }
+
+  public static HoodieArchivalConfig.Builder newBuilder() {
+    return new HoodieArchivalConfig.Builder();
+  }
+
+  public static class Builder {
+
+    private final HoodieArchivalConfig archivalConfig = new HoodieArchivalConfig();
+
+    public HoodieArchivalConfig.Builder fromFile(File propertiesFile) throws IOException {
+      try (FileReader reader = new FileReader(propertiesFile)) {
+        this.archivalConfig.getProps().load(reader);
+        return this;
+      }
+    }
+
+    public HoodieArchivalConfig.Builder fromProperties(Properties props) {
+      this.archivalConfig.getProps().putAll(props);
+      return this;
+    }
+
+    public HoodieArchivalConfig.Builder withAutoArchive(Boolean autoArchive) {
+      archivalConfig.setValue(AUTO_ARCHIVE, String.valueOf(autoArchive));
+      return this;
+    }
+
+    public HoodieArchivalConfig.Builder withAsyncArchive(Boolean asyncArchive) {
+      archivalConfig.setValue(ASYNC_ARCHIVE, String.valueOf(asyncArchive));
+      return this;
+    }
+
+    public HoodieArchivalConfig.Builder archiveCommitsWith(int minToKeep, int maxToKeep) {
+      archivalConfig.setValue(MIN_COMMITS_TO_KEEP, String.valueOf(minToKeep));
+      archivalConfig.setValue(MAX_COMMITS_TO_KEEP, String.valueOf(maxToKeep));
+      return this;
+    }
+
+    public HoodieArchivalConfig.Builder withArchiveMergeFilesBatchSize(int number) {
+      archivalConfig.setValue(ARCHIVE_MERGE_FILES_BATCH_SIZE, String.valueOf(number));
+      return this;
+    }
+
+    public HoodieArchivalConfig.Builder withArchiveMergeSmallFileLimit(long size) {
+      archivalConfig.setValue(ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES, String.valueOf(size));
+      return this;
+    }
+
+    public HoodieArchivalConfig.Builder withArchiveMergeEnable(boolean enable) {
+      archivalConfig.setValue(ARCHIVE_MERGE_ENABLE, String.valueOf(enable));
+      return this;
+    }
+
+    public HoodieArchivalConfig.Builder withArchiveDeleteParallelism(int archiveDeleteParallelism) {
+      archivalConfig.setValue(DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE, String.valueOf(archiveDeleteParallelism));
+      return this;
+    }
+
+    public HoodieArchivalConfig.Builder withCommitsArchivalBatchSize(int batchSize) {
+      archivalConfig.setValue(COMMITS_ARCHIVAL_BATCH_SIZE, String.valueOf(batchSize));
+      return this;
+    }
+
+    public HoodieArchivalConfig build() {
+      archivalConfig.setDefaults(HoodieArchivalConfig.class.getName());
+      return archivalConfig;
+    }
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
new file mode 100644
index 0000000000..7b665f9b2b
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCleanConfig.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.table.action.clean.CleaningTriggerStrategy;
+
+import javax.annotation.concurrent.Immutable;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * Clean related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "Clean Configs",
+        groupName = ConfigGroups.Names.WRITE_CLIENT,
+        description = "Cleaning (reclamation of older/unused file groups/slices).")
+public class HoodieCleanConfig extends HoodieConfig {
+
+  public static final ConfigProperty<String> AUTO_CLEAN = ConfigProperty
+      .key("hoodie.clean.automatic")
+      .defaultValue("true")
+      .withDocumentation("When enabled, the cleaner table service is invoked immediately after each commit,"
+          + " to delete older file slices. It's recommended to enable this, to ensure metadata and data storage"
+          + " growth is bounded.");
+
+  public static final ConfigProperty<String> ASYNC_CLEAN = ConfigProperty
+      .key("hoodie.clean.async")
+      .defaultValue("false")
+      .withDocumentation("Only applies when " + AUTO_CLEAN.key() + " is turned on. "
+          + "When turned on runs cleaner async with writing, which can speed up overall write performance.");
+
+  public static final ConfigProperty<String> CLEANER_COMMITS_RETAINED = ConfigProperty
+      .key("hoodie.cleaner.commits.retained")
+      .defaultValue("10")
+      .withDocumentation("Number of commits to retain, without cleaning. This will be retained for num_of_commits * time_between_commits "
+          + "(scheduled). This also directly translates into how much data retention the table supports for incremental queries.");
+
+  public static final ConfigProperty<String> CLEANER_HOURS_RETAINED = ConfigProperty.key("hoodie.cleaner.hours.retained")
+      .defaultValue("24")
+      .withDocumentation("Number of hours for which commits need to be retained. This config provides a more flexible option as"
+          + "compared to number of commits retained for cleaning service. Setting this property ensures all the files, but the latest in a file group,"
+          + " corresponding to commits with commit times older than the configured number of hours to be retained are cleaned.");
+
+  public static final ConfigProperty<String> CLEANER_POLICY = ConfigProperty
+      .key("hoodie.cleaner.policy")
+      .defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
+      .withDocumentation("Cleaning policy to be used. The cleaner service deletes older file slices files to re-claim space."
+          + " By default, cleaner spares the file slices written by the last N commits, determined by  " + CLEANER_COMMITS_RETAINED.key()
+          + " Long running query plans may often refer to older file slices and will break if those are cleaned, before the query has had"
+          + "   a chance to run. So, it is good to make sure that the data is retained for more than the maximum query execution time");
+
+  public static final ConfigProperty<String> CLEAN_TRIGGER_STRATEGY = ConfigProperty
+      .key("hoodie.clean.trigger.strategy")
+      .defaultValue(CleaningTriggerStrategy.NUM_COMMITS.name())
+      .withDocumentation("Controls how cleaning is scheduled. Valid options: "
+          + Arrays.stream(CleaningTriggerStrategy.values()).map(Enum::name).collect(Collectors.joining(",")));
+
+  public static final ConfigProperty<String> CLEAN_MAX_COMMITS = ConfigProperty
+      .key("hoodie.clean.max.commits")
+      .defaultValue("1")
+      .withDocumentation("Number of commits after the last clean operation, before scheduling of a new clean is attempted.");
+
+  public static final ConfigProperty<String> CLEANER_FILE_VERSIONS_RETAINED = ConfigProperty
+      .key("hoodie.cleaner.fileversions.retained")
+      .defaultValue("3")
+      .withDocumentation("When " + HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name() + " cleaning policy is used, "
+          + " the minimum number of file slices to retain in each file group, during cleaning.");
+
+  public static final ConfigProperty<String> CLEANER_INCREMENTAL_MODE_ENABLE = ConfigProperty
+      .key("hoodie.cleaner.incremental.mode")
+      .defaultValue("true")
+      .withDocumentation("When enabled, the plans for each cleaner service run is computed incrementally off the events "
+          + " in the timeline, since the last cleaner run. This is much more efficient than obtaining listings for the full"
+          + " table for each planning (even with a metadata table).");
+
+  public static final ConfigProperty<String> FAILED_WRITES_CLEANER_POLICY = ConfigProperty
+      .key("hoodie.cleaner.policy.failed.writes")
+      .defaultValue(HoodieFailedWritesCleaningPolicy.EAGER.name())
+      .withDocumentation("Cleaning policy for failed writes to be used. Hudi will delete any files written by "
+          + "failed writes to re-claim space. Choose to perform this rollback of failed writes eagerly before "
+          + "every writer starts (only supported for single writer) or lazily by the cleaner (required for multi-writers)");
+
+  public static final ConfigProperty<String> CLEANER_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.cleaner.parallelism")
+      .defaultValue("200")
+      .withDocumentation("Parallelism for the cleaning operation. Increase this if cleaning becomes slow.");
+
+  public static final ConfigProperty<Boolean> ALLOW_MULTIPLE_CLEANS = ConfigProperty
+      .key("hoodie.clean.allow.multiple")
+      .defaultValue(true)
+      .sinceVersion("0.11.0")
+      .withDocumentation("Allows scheduling/executing multiple cleans by enabling this config. If users prefer to strictly ensure clean requests should be mutually exclusive, "
+          + ".i.e. a 2nd clean will not be scheduled if another clean is not yet completed to avoid repeat cleaning of same files, they might want to disable this config.");
+
+  public static final ConfigProperty<String> CLEANER_BOOTSTRAP_BASE_FILE_ENABLE = ConfigProperty
+      .key("hoodie.cleaner.delete.bootstrap.base.file")
+      .defaultValue("false")
+      .withDocumentation("When set to true, cleaner also deletes the bootstrap base file when it's skeleton base file is "
+          + " cleaned. Turn this to true, if you want to ensure the bootstrap dataset storage is reclaimed over time, as the"
+          + " table receives updates/deletes. Another reason to turn this on, would be to ensure data residing in bootstrap "
+          + " base files are also physically deleted, to comply with data privacy enforcement processes.");
+
+
+  /** @deprecated Use {@link #CLEANER_POLICY} and its methods instead */
+  @Deprecated
+  public static final String CLEANER_POLICY_PROP = CLEANER_POLICY.key();
+  /** @deprecated Use {@link #AUTO_CLEAN} and its methods instead */
+  @Deprecated
+  public static final String AUTO_CLEAN_PROP = AUTO_CLEAN.key();
+  /** @deprecated Use {@link #ASYNC_CLEAN} and its methods instead */
+  @Deprecated
+  public static final String ASYNC_CLEAN_PROP = ASYNC_CLEAN.key();
+  /** @deprecated Use {@link #CLEANER_FILE_VERSIONS_RETAINED} and its methods instead */
+  @Deprecated
+  public static final String CLEANER_FILE_VERSIONS_RETAINED_PROP = CLEANER_FILE_VERSIONS_RETAINED.key();
+  /**
+   * @deprecated Use {@link #CLEANER_COMMITS_RETAINED} and its methods instead
+   */
+  @Deprecated
+  public static final String CLEANER_COMMITS_RETAINED_PROP = CLEANER_COMMITS_RETAINED.key();
+  /**
+   * @deprecated Use {@link #CLEANER_INCREMENTAL_MODE_ENABLE} and its methods instead
+   */
+  @Deprecated
+  public static final String CLEANER_INCREMENTAL_MODE = CLEANER_INCREMENTAL_MODE_ENABLE.key();
+  /**
+   * @deprecated Use {@link #CLEANER_BOOTSTRAP_BASE_FILE_ENABLE} and its methods instead
+   */
+  @Deprecated
+  public static final String CLEANER_BOOTSTRAP_BASE_FILE_ENABLED = CLEANER_BOOTSTRAP_BASE_FILE_ENABLE.key();
+  /**
+   * @deprecated Use {@link #CLEANER_PARALLELISM_VALUE} and its methods instead
+   */
+  @Deprecated
+  public static final String CLEANER_PARALLELISM = CLEANER_PARALLELISM_VALUE.key();
+  /**
+   * @deprecated Use {@link #CLEANER_PARALLELISM_VALUE} and its methods instead
+   */
+  @Deprecated
+  public static final String DEFAULT_CLEANER_PARALLELISM = CLEANER_PARALLELISM_VALUE.defaultValue();
+  /** @deprecated Use {@link #CLEANER_POLICY} and its methods instead */
+  @Deprecated
+  private static final String DEFAULT_CLEANER_POLICY = CLEANER_POLICY.defaultValue();
+  /** @deprecated Use {@link #FAILED_WRITES_CLEANER_POLICY} and its methods instead */
+  @Deprecated
+  public static final String FAILED_WRITES_CLEANER_POLICY_PROP = FAILED_WRITES_CLEANER_POLICY.key();
+  /** @deprecated Use {@link #FAILED_WRITES_CLEANER_POLICY} and its methods instead */
+  @Deprecated
+  private  static final String DEFAULT_FAILED_WRITES_CLEANER_POLICY = FAILED_WRITES_CLEANER_POLICY.defaultValue();
+  /** @deprecated Use {@link #AUTO_CLEAN} and its methods instead */
+  @Deprecated
+  private static final String DEFAULT_AUTO_CLEAN = AUTO_CLEAN.defaultValue();
+  /**
+   * @deprecated Use {@link #ASYNC_CLEAN} and its methods instead
+   */
+  @Deprecated
+  private static final String DEFAULT_ASYNC_CLEAN = ASYNC_CLEAN.defaultValue();
+  /**
+   * @deprecated Use {@link #CLEANER_INCREMENTAL_MODE_ENABLE} and its methods instead
+   */
+  @Deprecated
+  private static final String DEFAULT_INCREMENTAL_CLEANER = CLEANER_INCREMENTAL_MODE_ENABLE.defaultValue();
+  /** @deprecated Use {@link #CLEANER_FILE_VERSIONS_RETAINED} and its methods instead */
+  @Deprecated
+  private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = CLEANER_FILE_VERSIONS_RETAINED.defaultValue();
+  /** @deprecated Use {@link #CLEANER_COMMITS_RETAINED} and its methods instead */
+  @Deprecated
+  private static final String DEFAULT_CLEANER_COMMITS_RETAINED = CLEANER_COMMITS_RETAINED.defaultValue();
+  /**
+   * @deprecated Use {@link #CLEANER_BOOTSTRAP_BASE_FILE_ENABLE} and its methods instead
+   */
+  @Deprecated
+  private static final String DEFAULT_CLEANER_BOOTSTRAP_BASE_FILE_ENABLED = CLEANER_BOOTSTRAP_BASE_FILE_ENABLE.defaultValue();
+
+  private HoodieCleanConfig() {
+    super();
+  }
+
+  public static HoodieCleanConfig.Builder newBuilder() {
+    return new HoodieCleanConfig.Builder();
+  }
+
+  public static class Builder {
+
+    private final HoodieCleanConfig cleanConfig = new HoodieCleanConfig();
+
+    public HoodieCleanConfig.Builder fromFile(File propertiesFile) throws IOException {
+      try (FileReader reader = new FileReader(propertiesFile)) {
+        this.cleanConfig.getProps().load(reader);
+        return this;
+      }
+    }
+
+    public HoodieCleanConfig.Builder fromProperties(Properties props) {
+      this.cleanConfig.getProps().putAll(props);
+      return this;
+    }
+
+    public HoodieCleanConfig.Builder withAutoClean(Boolean autoClean) {
+      cleanConfig.setValue(AUTO_CLEAN, String.valueOf(autoClean));
+      return this;
+    }
+
+    public HoodieCleanConfig.Builder withAsyncClean(Boolean asyncClean) {
+      cleanConfig.setValue(ASYNC_CLEAN, String.valueOf(asyncClean));
+      return this;
+    }
+
+    public HoodieCleanConfig.Builder withIncrementalCleaningMode(Boolean incrementalCleaningMode) {
+      cleanConfig.setValue(CLEANER_INCREMENTAL_MODE_ENABLE, String.valueOf(incrementalCleaningMode));
+      return this;
+    }
+
+    public HoodieCleanConfig.Builder withCleaningTriggerStrategy(String cleaningTriggerStrategy) {
+      cleanConfig.setValue(CLEAN_TRIGGER_STRATEGY, cleaningTriggerStrategy);
+      return this;
+    }
+
+    public HoodieCleanConfig.Builder withMaxCommitsBeforeCleaning(int maxCommitsBeforeCleaning) {
+      cleanConfig.setValue(CLEAN_MAX_COMMITS, String.valueOf(maxCommitsBeforeCleaning));
+      return this;
+    }
+
+    public HoodieCleanConfig.Builder withCleanerPolicy(HoodieCleaningPolicy policy) {
+      cleanConfig.setValue(CLEANER_POLICY, policy.name());
+      return this;
+    }
+
+    public HoodieCleanConfig.Builder retainFileVersions(int fileVersionsRetained) {
+      cleanConfig.setValue(CLEANER_FILE_VERSIONS_RETAINED, String.valueOf(fileVersionsRetained));
+      return this;
+    }
+
+    public HoodieCleanConfig.Builder retainCommits(int commitsRetained) {
+      cleanConfig.setValue(CLEANER_COMMITS_RETAINED, String.valueOf(commitsRetained));
+      return this;
+    }
+
+    public HoodieCleanConfig.Builder cleanerNumHoursRetained(int cleanerHoursRetained) {
+      cleanConfig.setValue(CLEANER_HOURS_RETAINED, String.valueOf(cleanerHoursRetained));
+      return this;
+    }
+
+    public HoodieCleanConfig.Builder allowMultipleCleans(boolean allowMultipleCleanSchedules) {
+      cleanConfig.setValue(ALLOW_MULTIPLE_CLEANS, String.valueOf(allowMultipleCleanSchedules));
+      return this;
+    }
+
+    public HoodieCleanConfig.Builder withCleanerParallelism(int cleanerParallelism) {
+      cleanConfig.setValue(CLEANER_PARALLELISM_VALUE, String.valueOf(cleanerParallelism));
+      return this;
+    }
+
+    public HoodieCleanConfig.Builder withCleanBootstrapBaseFileEnabled(Boolean cleanBootstrapSourceFileEnabled) {
+      cleanConfig.setValue(CLEANER_BOOTSTRAP_BASE_FILE_ENABLE, String.valueOf(cleanBootstrapSourceFileEnabled));
+      return this;
+    }
+
+    public HoodieCleanConfig.Builder withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy failedWritesPolicy) {
+      cleanConfig.setValue(FAILED_WRITES_CLEANER_POLICY, failedWritesPolicy.name());
+      return this;
+    }
+
+    public HoodieCleanConfig build() {
+      cleanConfig.setDefaults(HoodieCleanConfig.class.getName());
+      HoodieCleaningPolicy.valueOf(cleanConfig.getString(CLEANER_POLICY));
+      return cleanConfig;
+    }
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index 4003a07de7..d1d0e67261 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -22,11 +22,6 @@ import org.apache.hudi.common.config.ConfigClassProperty;
 import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
-import org.apache.hudi.common.model.HoodieCleaningPolicy;
-import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
-import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.table.action.clean.CleaningTriggerStrategy;
 import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
 import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
 import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
@@ -47,57 +42,9 @@ import java.util.stream.Collectors;
 @ConfigClassProperty(name = "Compaction Configs",
     groupName = ConfigGroups.Names.WRITE_CLIENT,
     description = "Configurations that control compaction "
-        + "(merging of log files onto a new base files) as well as  "
-        + "cleaning (reclamation of older/unused file groups/slices).")
+        + "(merging of log files onto a new base files).")
 public class HoodieCompactionConfig extends HoodieConfig {
 
-  public static final ConfigProperty<String> AUTO_ARCHIVE = ConfigProperty
-      .key("hoodie.archive.automatic")
-      .defaultValue("true")
-      .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
-          + " to archive commits if we cross a maximum value of commits."
-          + " It's recommended to enable this, to ensure number of active commits is bounded.");
-
-  public static final ConfigProperty<String> ASYNC_ARCHIVE = ConfigProperty
-      .key("hoodie.archive.async")
-      .defaultValue("false")
-      .sinceVersion("0.11.0")
-      .withDocumentation("Only applies when " + AUTO_ARCHIVE.key() + " is turned on. "
-          + "When turned on runs archiver async with writing, which can speed up overall write performance.");
-
-  public static final ConfigProperty<String> AUTO_CLEAN = ConfigProperty
-      .key("hoodie.clean.automatic")
-      .defaultValue("true")
-      .withDocumentation("When enabled, the cleaner table service is invoked immediately after each commit,"
-          + " to delete older file slices. It's recommended to enable this, to ensure metadata and data storage"
-          + " growth is bounded.");
-
-  public static final ConfigProperty<String> ASYNC_CLEAN = ConfigProperty
-      .key("hoodie.clean.async")
-      .defaultValue("false")
-      .withDocumentation("Only applies when " + AUTO_CLEAN.key() + " is turned on. "
-          + "When turned on runs cleaner async with writing, which can speed up overall write performance.");
-
-  public static final ConfigProperty<String> CLEANER_COMMITS_RETAINED = ConfigProperty
-      .key("hoodie.cleaner.commits.retained")
-      .defaultValue("10")
-      .withDocumentation("Number of commits to retain, without cleaning. This will be retained for num_of_commits * time_between_commits "
-          + "(scheduled). This also directly translates into how much data retention the table supports for incremental queries.");
-
-  public static final ConfigProperty<String> CLEANER_HOURS_RETAINED = ConfigProperty.key("hoodie.cleaner.hours.retained")
-          .defaultValue("24")
-          .withDocumentation("Number of hours for which commits need to be retained. This config provides a more flexible option as"
-          + "compared to number of commits retained for cleaning service. Setting this property ensures all the files, but the latest in a file group,"
-                  + " corresponding to commits with commit times older than the configured number of hours to be retained are cleaned.");
-
-  public static final ConfigProperty<String> CLEANER_POLICY = ConfigProperty
-      .key("hoodie.cleaner.policy")
-      .defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
-      .withDocumentation("Cleaning policy to be used. The cleaner service deletes older file slices files to re-claim space."
-          + " By default, cleaner spares the file slices written by the last N commits, determined by  " + CLEANER_COMMITS_RETAINED.key()
-          + " Long running query plans may often refer to older file slices and will break if those are cleaned, before the query has had"
-          + "   a chance to run. So, it is good to make sure that the data is retained for more than the maximum query execution time");
-
   public static final ConfigProperty<String> INLINE_COMPACT = ConfigProperty
       .key("hoodie.compact.inline")
       .defaultValue("false")
@@ -130,62 +77,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
       .withDocumentation("Controls how compaction scheduling is triggered, by time or num delta commits or combination of both. "
           + "Valid options: " + Arrays.stream(CompactionTriggerStrategy.values()).map(Enum::name).collect(Collectors.joining(",")));
 
-  public static final ConfigProperty<String> CLEAN_TRIGGER_STRATEGY = ConfigProperty
-          .key("hoodie.clean.trigger.strategy")
-          .defaultValue(CleaningTriggerStrategy.NUM_COMMITS.name())
-          .withDocumentation("Controls how cleaning is scheduled. Valid options: "
-                  + Arrays.stream(CleaningTriggerStrategy.values()).map(Enum::name).collect(Collectors.joining(",")));
-
-  public static final ConfigProperty<String> CLEAN_MAX_COMMITS = ConfigProperty
-          .key("hoodie.clean.max.commits")
-          .defaultValue("1")
-          .withDocumentation("Number of commits after the last clean operation, before scheduling of a new clean is attempted.");
-
-  public static final ConfigProperty<String> CLEANER_FILE_VERSIONS_RETAINED = ConfigProperty
-      .key("hoodie.cleaner.fileversions.retained")
-      .defaultValue("3")
-      .withDocumentation("When " + HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name() + " cleaning policy is used, "
-          + " the minimum number of file slices to retain in each file group, during cleaning.");
-
-  public static final ConfigProperty<String> CLEANER_INCREMENTAL_MODE_ENABLE = ConfigProperty
-      .key("hoodie.cleaner.incremental.mode")
-      .defaultValue("true")
-      .withDocumentation("When enabled, the plans for each cleaner service run is computed incrementally off the events "
-          + " in the timeline, since the last cleaner run. This is much more efficient than obtaining listings for the full"
-          + " table for each planning (even with a metadata table).");
-
-  public static final ConfigProperty<String> MAX_COMMITS_TO_KEEP = ConfigProperty
-      .key("hoodie.keep.max.commits")
-      .defaultValue("30")
-      .withDocumentation("Archiving service moves older entries from timeline into an archived log after each write, to "
-          + " keep the metadata overhead constant, even as the table size grows."
-          + "This config controls the maximum number of instants to retain in the active timeline. ");
-
-  public static final ConfigProperty<Integer> DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE = ConfigProperty
-      .key("hoodie.archive.delete.parallelism")
-      .defaultValue(100)
-      .withDocumentation("Parallelism for deleting archived hoodie commits.");
-
-  public static final ConfigProperty<String> MIN_COMMITS_TO_KEEP = ConfigProperty
-      .key("hoodie.keep.min.commits")
-      .defaultValue("20")
-      .withDocumentation("Similar to " + MAX_COMMITS_TO_KEEP.key() + ", but controls the minimum number of"
-          + "instants to retain in the active timeline.");
-
-  public static final ConfigProperty<String> COMMITS_ARCHIVAL_BATCH_SIZE = ConfigProperty
-      .key("hoodie.commits.archival.batch")
-      .defaultValue(String.valueOf(10))
-      .withDocumentation("Archiving of instants is batched in best-effort manner, to pack more instants into a single"
-          + " archive log. This config controls such archival batch size.");
-
-  public static final ConfigProperty<String> CLEANER_BOOTSTRAP_BASE_FILE_ENABLE = ConfigProperty
-      .key("hoodie.cleaner.delete.bootstrap.base.file")
-      .defaultValue("false")
-      .withDocumentation("When set to true, cleaner also deletes the bootstrap base file when it's skeleton base file is "
-          + " cleaned. Turn this to true, if you want to ensure the bootstrap dataset storage is reclaimed over time, as the"
-          + " table receives updates/deletes. Another reason to turn this on, would be to ensure data residing in bootstrap "
-          + " base files are also physically deleted, to comply with data privacy enforcement processes.");
-
   public static final ConfigProperty<String> PARQUET_SMALL_FILE_LIMIT = ConfigProperty
       .key("hoodie.parquet.small.file.limit")
       .defaultValue(String.valueOf(104857600))
@@ -202,11 +93,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
           + " Hudi will search commits in the reverse order, until we find a commit that has totalBytesWritten "
           + " larger than (PARQUET_SMALL_FILE_LIMIT_BYTES * this_threshold)");
 
-  public static final ConfigProperty<String> CLEANER_PARALLELISM_VALUE = ConfigProperty
-      .key("hoodie.cleaner.parallelism")
-      .defaultValue("200")
-      .withDocumentation("Parallelism for the cleaning operation. Increase this if cleaning becomes slow.");
-
   // 500GB of target IO per compaction (both read and write
   public static final ConfigProperty<String> TARGET_IO_PER_COMPACTION_IN_MB = ConfigProperty
       .key("hoodie.compaction.target.io")
@@ -227,13 +113,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
           + "compaction during each compaction run. By default. Hudi picks the log file "
           + "with most accumulated unmerged data");
 
-  public static final ConfigProperty<String> PAYLOAD_CLASS_NAME = ConfigProperty
-      .key("hoodie.compaction.payload.class")
-      .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
-      .withDocumentation("This needs to be same as class used during insert/upserts. Just like writing, compaction also uses "
-          + "the record payload class to merge records in the log against each other, merge again with the base file and "
-          + "produce the final record to be written after compaction.");
-
   public static final ConfigProperty<String> COMPACTION_LAZY_BLOCK_READ_ENABLE = ConfigProperty
       .key("hoodie.compaction.lazy.block.read")
       .defaultValue("true")
@@ -247,13 +126,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
       .withDocumentation("HoodieLogFormatReader reads a logfile in the forward direction starting from pos=0 to pos=file_length. "
           + "If this config is set to true, the reader reads the logfile in reverse direction, from pos=file_length to pos=0");
 
-  public static final ConfigProperty<String> FAILED_WRITES_CLEANER_POLICY = ConfigProperty
-      .key("hoodie.cleaner.policy.failed.writes")
-      .defaultValue(HoodieFailedWritesCleaningPolicy.EAGER.name())
-      .withDocumentation("Cleaning policy for failed writes to be used. Hudi will delete any files written by "
-          + "failed writes to re-claim space. Choose to perform this rollback of failed writes eagerly before "
-          + "every writer starts (only supported for single writer) or lazily by the cleaner (required for multi-writers)");
-
   public static final ConfigProperty<String> TARGET_PARTITIONS_PER_DAYBASED_COMPACTION = ConfigProperty
       .key("hoodie.compaction.daybased.target.partitions")
       .defaultValue("10")
@@ -290,39 +162,8 @@ public class HoodieCompactionConfig extends HoodieConfig {
       .withDocumentation("The average record size. If not explicitly specified, hudi will compute the "
           + "record size estimate compute dynamically based on commit metadata. "
           + " This is critical in computing the insert parallelism and bin-packing inserts into small files.");
-  
-  public static final ConfigProperty<Boolean> ALLOW_MULTIPLE_CLEANS = ConfigProperty
-      .key("hoodie.clean.allow.multiple")
-      .defaultValue(true)
-      .sinceVersion("0.11.0")
-      .withDocumentation("Allows scheduling/executing multiple cleans by enabling this config. If users prefer to strictly ensure clean requests should be mutually exclusive, "
-          + ".i.e. a 2nd clean will not be scheduled if another clean is not yet completed to avoid repeat cleaning of same files, they might want to disable this config.");
-
-  public static final ConfigProperty<Integer> ARCHIVE_MERGE_FILES_BATCH_SIZE = ConfigProperty
-      .key("hoodie.archive.merge.files.batch.size")
-      .defaultValue(10)
-      .withDocumentation("The number of small archive files to be merged at once.");
-
-  public static final ConfigProperty<Long> ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty
-      .key("hoodie.archive.merge.small.file.limit.bytes")
-      .defaultValue(20L * 1024 * 1024)
-      .withDocumentation("This config sets the archive file size limit below which an archive file becomes a candidate to be selected as such a small file.");
-
-  public static final ConfigProperty<Boolean> ARCHIVE_MERGE_ENABLE = ConfigProperty
-      .key("hoodie.archive.merge.enable")
-      .defaultValue(false)
-      .withDocumentation("When enable, hoodie will auto merge several small archive files into larger one. It's"
-          + " useful when storage scheme doesn't support append operation.");
-
-  /** @deprecated Use {@link #CLEANER_POLICY} and its methods instead */
-  @Deprecated
-  public static final String CLEANER_POLICY_PROP = CLEANER_POLICY.key();
-  /** @deprecated Use {@link #AUTO_CLEAN} and its methods instead */
-  @Deprecated
-  public static final String AUTO_CLEAN_PROP = AUTO_CLEAN.key();
-  /** @deprecated Use {@link #ASYNC_CLEAN} and its methods instead */
-  @Deprecated
-  public static final String ASYNC_CLEAN_PROP = ASYNC_CLEAN.key();
+
+
   /** @deprecated Use {@link #INLINE_COMPACT} and its methods instead */
   @Deprecated
   public static final String INLINE_COMPACT_PROP = INLINE_COMPACT.key();
@@ -335,39 +176,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
   /** @deprecated Use {@link #INLINE_COMPACT_TRIGGER_STRATEGY} and its methods instead */
   @Deprecated
   public static final String INLINE_COMPACT_TRIGGER_STRATEGY_PROP = INLINE_COMPACT_TRIGGER_STRATEGY.key();
-  /** @deprecated Use {@link #CLEANER_FILE_VERSIONS_RETAINED} and its methods instead */
-  @Deprecated
-  public static final String CLEANER_FILE_VERSIONS_RETAINED_PROP = CLEANER_FILE_VERSIONS_RETAINED.key();
-  /**
-   * @deprecated Use {@link #CLEANER_COMMITS_RETAINED} and its methods instead
-   */
-  @Deprecated
-  public static final String CLEANER_COMMITS_RETAINED_PROP = CLEANER_COMMITS_RETAINED.key();
-  /**
-   * @deprecated Use {@link #CLEANER_INCREMENTAL_MODE_ENABLE} and its methods instead
-   */
-  @Deprecated
-  public static final String CLEANER_INCREMENTAL_MODE = CLEANER_INCREMENTAL_MODE_ENABLE.key();
-  /**
-   * @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead
-   */
-  @Deprecated
-  public static final String MAX_COMMITS_TO_KEEP_PROP = MAX_COMMITS_TO_KEEP.key();
-  /**
-   * @deprecated Use {@link #MIN_COMMITS_TO_KEEP} and its methods instead
-   */
-  @Deprecated
-  public static final String MIN_COMMITS_TO_KEEP_PROP = MIN_COMMITS_TO_KEEP.key();
-  /**
-   * @deprecated Use {@link #COMMITS_ARCHIVAL_BATCH_SIZE} and its methods instead
-   */
-  @Deprecated
-  public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = COMMITS_ARCHIVAL_BATCH_SIZE.key();
-  /**
-   * @deprecated Use {@link #CLEANER_BOOTSTRAP_BASE_FILE_ENABLE} and its methods instead
-   */
-  @Deprecated
-  public static final String CLEANER_BOOTSTRAP_BASE_FILE_ENABLED = CLEANER_BOOTSTRAP_BASE_FILE_ENABLE.key();
   /**
    * @deprecated Use {@link #PARQUET_SMALL_FILE_LIMIT} and its methods instead
    */
@@ -418,16 +226,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
    */
   @Deprecated
   public static final String DEFAULT_COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE = COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.defaultValue();
-  /**
-   * @deprecated Use {@link #CLEANER_PARALLELISM_VALUE} and its methods instead
-   */
-  @Deprecated
-  public static final String CLEANER_PARALLELISM = CLEANER_PARALLELISM_VALUE.key();
-  /**
-   * @deprecated Use {@link #CLEANER_PARALLELISM_VALUE} and its methods instead
-   */
-  @Deprecated
-  public static final String DEFAULT_CLEANER_PARALLELISM = CLEANER_PARALLELISM_VALUE.defaultValue();
   /**
    * @deprecated Use {@link #TARGET_IO_PER_COMPACTION_IN_MB} and its methods instead
    */
@@ -446,12 +244,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
   /** @deprecated Use {@link #COMPACTION_STRATEGY} and its methods instead */
   @Deprecated
   public static final String DEFAULT_COMPACTION_STRATEGY = COMPACTION_STRATEGY.defaultValue();
-  /** @deprecated Use {@link #PAYLOAD_CLASS_NAME} and its methods instead */
-  @Deprecated
-  public static final String DEFAULT_PAYLOAD_CLASS = PAYLOAD_CLASS_NAME.defaultValue();
-  /** @deprecated Use {@link #PAYLOAD_CLASS_NAME} and its methods instead */
-  @Deprecated
-  public static final String PAYLOAD_CLASS_PROP = PAYLOAD_CLASS_NAME.key();
   /** @deprecated Use {@link #COMPACTION_LAZY_BLOCK_READ_ENABLE} and its methods instead */
   @Deprecated
   public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = COMPACTION_LAZY_BLOCK_READ_ENABLE.key();
@@ -464,33 +256,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
   /** @deprecated Use {@link #COMPACTION_REVERSE_LOG_READ_ENABLE} and its methods instead */
   @Deprecated
   public static final String DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED = COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue();
-  /** @deprecated Use {@link #CLEANER_POLICY} and its methods instead */
-  @Deprecated
-  private static final String DEFAULT_CLEANER_POLICY = CLEANER_POLICY.defaultValue();
-  /** @deprecated Use {@link #FAILED_WRITES_CLEANER_POLICY} and its methods instead */
-  @Deprecated
-  public static final String FAILED_WRITES_CLEANER_POLICY_PROP = FAILED_WRITES_CLEANER_POLICY.key();
-  /** @deprecated Use {@link #FAILED_WRITES_CLEANER_POLICY} and its methods instead */
-  @Deprecated
-  private  static final String DEFAULT_FAILED_WRITES_CLEANER_POLICY = FAILED_WRITES_CLEANER_POLICY.defaultValue();
-  /** @deprecated Use {@link #AUTO_CLEAN} and its methods instead */
-  @Deprecated
-  private static final String DEFAULT_AUTO_CLEAN = AUTO_CLEAN.defaultValue();
-  /**
-   * @deprecated Use {@link #ASYNC_CLEAN} and its methods instead
-   */
-  @Deprecated
-  private static final String DEFAULT_ASYNC_CLEAN = ASYNC_CLEAN.defaultValue();
   /**
    * @deprecated Use {@link #INLINE_COMPACT} and its methods instead
    */
   @Deprecated
   private static final String DEFAULT_INLINE_COMPACT = INLINE_COMPACT.defaultValue();
-  /**
-   * @deprecated Use {@link #CLEANER_INCREMENTAL_MODE_ENABLE} and its methods instead
-   */
-  @Deprecated
-  private static final String DEFAULT_INCREMENTAL_CLEANER = CLEANER_INCREMENTAL_MODE_ENABLE.defaultValue();
   /** @deprecated Use {@link #INLINE_COMPACT_NUM_DELTA_COMMITS} and its methods instead */
   @Deprecated
   private static final String DEFAULT_INLINE_COMPACT_NUM_DELTA_COMMITS = INLINE_COMPACT_NUM_DELTA_COMMITS.defaultValue();
@@ -500,30 +270,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
   /** @deprecated Use {@link #INLINE_COMPACT_TRIGGER_STRATEGY} and its methods instead */
   @Deprecated
   private static final String DEFAULT_INLINE_COMPACT_TRIGGER_STRATEGY = INLINE_COMPACT_TRIGGER_STRATEGY.defaultValue();
-  /** @deprecated Use {@link #CLEANER_FILE_VERSIONS_RETAINED} and its methods instead */
-  @Deprecated
-  private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = CLEANER_FILE_VERSIONS_RETAINED.defaultValue();
-  /** @deprecated Use {@link #CLEANER_COMMITS_RETAINED} and its methods instead */
-  @Deprecated
-  private static final String DEFAULT_CLEANER_COMMITS_RETAINED = CLEANER_COMMITS_RETAINED.defaultValue();
-  /** @deprecated Use {@link #MAX_COMMITS_TO_KEEP} and its methods instead */
-  @Deprecated
-  private static final String DEFAULT_MAX_COMMITS_TO_KEEP = MAX_COMMITS_TO_KEEP.defaultValue();
-  /**
-   * @deprecated Use {@link #MIN_COMMITS_TO_KEEP} and its methods instead
-   */
-  @Deprecated
-  private static final String DEFAULT_MIN_COMMITS_TO_KEEP = MIN_COMMITS_TO_KEEP.defaultValue();
-  /**
-   * @deprecated Use {@link #COMMITS_ARCHIVAL_BATCH_SIZE} and its methods instead
-   */
-  @Deprecated
-  private static final String DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE = COMMITS_ARCHIVAL_BATCH_SIZE.defaultValue();
-  /**
-   * @deprecated Use {@link #CLEANER_BOOTSTRAP_BASE_FILE_ENABLE} and its methods instead
-   */
-  @Deprecated
-  private static final String DEFAULT_CLEANER_BOOTSTRAP_BASE_FILE_ENABLED = CLEANER_BOOTSTRAP_BASE_FILE_ENABLE.defaultValue();
   /** @deprecated Use {@link #TARGET_PARTITIONS_PER_DAYBASED_COMPACTION} and its methods instead */
   @Deprecated
   public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = TARGET_PARTITIONS_PER_DAYBASED_COMPACTION.key();
@@ -555,31 +301,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
       return this;
     }
 
-    public Builder withAutoArchive(Boolean autoArchive) {
-      compactionConfig.setValue(AUTO_ARCHIVE, String.valueOf(autoArchive));
-      return this;
-    }
-
-    public Builder withAsyncArchive(Boolean asyncArchive) {
-      compactionConfig.setValue(ASYNC_ARCHIVE, String.valueOf(asyncArchive));
-      return this;
-    }
-
-    public Builder withAutoClean(Boolean autoClean) {
-      compactionConfig.setValue(AUTO_CLEAN, String.valueOf(autoClean));
-      return this;
-    }
-
-    public Builder withAsyncClean(Boolean asyncClean) {
-      compactionConfig.setValue(ASYNC_CLEAN, String.valueOf(asyncClean));
-      return this;
-    }
-
-    public Builder withIncrementalCleaningMode(Boolean incrementalCleaningMode) {
-      compactionConfig.setValue(CLEANER_INCREMENTAL_MODE_ENABLE, String.valueOf(incrementalCleaningMode));
-      return this;
-    }
-
     public Builder withInlineCompaction(Boolean inlineCompaction) {
       compactionConfig.setValue(INLINE_COMPACT, String.valueOf(inlineCompaction));
       return this;
@@ -595,57 +316,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
       return this;
     }
 
-    public Builder withCleaningTriggerStrategy(String cleaningTriggerStrategy) {
-      compactionConfig.setValue(CLEAN_TRIGGER_STRATEGY, cleaningTriggerStrategy);
-      return this;
-    }
-
-    public Builder withMaxCommitsBeforeCleaning(int maxCommitsBeforeCleaning) {
-      compactionConfig.setValue(CLEAN_MAX_COMMITS, String.valueOf(maxCommitsBeforeCleaning));
-      return this;
-    }
-
-    public Builder withCleanerPolicy(HoodieCleaningPolicy policy) {
-      compactionConfig.setValue(CLEANER_POLICY, policy.name());
-      return this;
-    }
-
-    public Builder retainFileVersions(int fileVersionsRetained) {
-      compactionConfig.setValue(CLEANER_FILE_VERSIONS_RETAINED, String.valueOf(fileVersionsRetained));
-      return this;
-    }
-
-    public Builder retainCommits(int commitsRetained) {
-      compactionConfig.setValue(CLEANER_COMMITS_RETAINED, String.valueOf(commitsRetained));
-      return this;
-    }
-
-    public Builder cleanerNumHoursRetained(int cleanerHoursRetained) {
-      compactionConfig.setValue(CLEANER_HOURS_RETAINED, String.valueOf(cleanerHoursRetained));
-      return this;
-    }
-
-    public Builder archiveCommitsWith(int minToKeep, int maxToKeep) {
-      compactionConfig.setValue(MIN_COMMITS_TO_KEEP, String.valueOf(minToKeep));
-      compactionConfig.setValue(MAX_COMMITS_TO_KEEP, String.valueOf(maxToKeep));
-      return this;
-    }
-
-    public Builder withArchiveMergeFilesBatchSize(int number) {
-      compactionConfig.setValue(ARCHIVE_MERGE_FILES_BATCH_SIZE, String.valueOf(number));
-      return this;
-    }
-
-    public Builder withArchiveMergeSmallFileLimit(long size) {
-      compactionConfig.setValue(ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES, String.valueOf(size));
-      return this;
-    }
-
-    public Builder withArchiveMergeEnable(boolean enable) {
-      compactionConfig.setValue(ARCHIVE_MERGE_ENABLE, String.valueOf(enable));
-      return this;
-    }
-
     public Builder compactionSmallFileSize(long smallFileLimitBytes) {
       compactionConfig.setValue(PARQUET_SMALL_FILE_LIMIT, String.valueOf(smallFileLimitBytes));
       return this;
@@ -671,26 +341,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
       return this;
     }
 
-    public Builder allowMultipleCleans(boolean allowMultipleCleanSchedules) {
-      compactionConfig.setValue(ALLOW_MULTIPLE_CLEANS, String.valueOf(allowMultipleCleanSchedules));
-      return this;
-    }
-
-    public Builder withCleanerParallelism(int cleanerParallelism) {
-      compactionConfig.setValue(CLEANER_PARALLELISM_VALUE, String.valueOf(cleanerParallelism));
-      return this;
-    }
-
     public Builder withCompactionStrategy(CompactionStrategy compactionStrategy) {
       compactionConfig.setValue(COMPACTION_STRATEGY, compactionStrategy.getClass().getName());
       return this;
     }
 
-    public Builder withPayloadClass(String payloadClassName) {
-      compactionConfig.setValue(PAYLOAD_CLASS_NAME, payloadClassName);
-      return this;
-    }
-
     public Builder withTargetIOPerCompactionInMB(long targetIOPerCompactionInMB) {
       compactionConfig.setValue(TARGET_IO_PER_COMPACTION_IN_MB, String.valueOf(targetIOPerCompactionInMB));
       return this;
@@ -701,11 +356,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
       return this;
     }
 
-    public Builder withArchiveDeleteParallelism(int archiveDeleteParallelism) {
-      compactionConfig.setValue(DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE, String.valueOf(archiveDeleteParallelism));
-      return this;
-    }
-
     public Builder withMaxDeltaSecondsBeforeCompaction(int maxDeltaSecondsBeforeCompaction) {
       compactionConfig.setValue(INLINE_COMPACT_TIME_DELTA_SECONDS, String.valueOf(maxDeltaSecondsBeforeCompaction));
       return this;
@@ -736,49 +386,8 @@ public class HoodieCompactionConfig extends HoodieConfig {
       return this;
     }
 
-    public Builder withCommitsArchivalBatchSize(int batchSize) {
-      compactionConfig.setValue(COMMITS_ARCHIVAL_BATCH_SIZE, String.valueOf(batchSize));
-      return this;
-    }
-
-    public Builder withCleanBootstrapBaseFileEnabled(Boolean cleanBootstrapSourceFileEnabled) {
-      compactionConfig.setValue(CLEANER_BOOTSTRAP_BASE_FILE_ENABLE, String.valueOf(cleanBootstrapSourceFileEnabled));
-      return this;
-    }
-
-    public Builder withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy failedWritesPolicy) {
-      compactionConfig.setValue(FAILED_WRITES_CLEANER_POLICY, failedWritesPolicy.name());
-      return this;
-    }
-
     public HoodieCompactionConfig build() {
       compactionConfig.setDefaults(HoodieCompactionConfig.class.getName());
-      // validation
-      HoodieCleaningPolicy.valueOf(compactionConfig.getString(CLEANER_POLICY));
-
-      // Ensure minInstantsToKeep > cleanerCommitsRetained, otherwise we will archive some
-      // commit instant on timeline, that still has not been cleaned. Could miss some data via incr pull
-      int minInstantsToKeep = Integer.parseInt(compactionConfig.getStringOrDefault(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP));
-      int maxInstantsToKeep = Integer.parseInt(compactionConfig.getStringOrDefault(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP));
-      int cleanerCommitsRetained =
-          Integer.parseInt(compactionConfig.getStringOrDefault(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED));
-      ValidationUtils.checkArgument(maxInstantsToKeep > minInstantsToKeep,
-          String.format(
-              "Increase %s=%d to be greater than %s=%d.",
-              HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), maxInstantsToKeep,
-              HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), minInstantsToKeep));
-      ValidationUtils.checkArgument(minInstantsToKeep > cleanerCommitsRetained,
-          String.format(
-              "Increase %s=%d to be greater than %s=%d. Otherwise, there is risk of incremental pull "
-                  + "missing data from few instants.",
-              HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), minInstantsToKeep,
-              HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), cleanerCommitsRetained));
-
-      boolean inlineCompact = compactionConfig.getBoolean(HoodieCompactionConfig.INLINE_COMPACT);
-      boolean inlineCompactSchedule = compactionConfig.getBoolean(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT);
-      ValidationUtils.checkArgument(!(inlineCompact && inlineCompactSchedule), String.format("Either of inline compaction (%s) or "
-              + "schedule inline compaction (%s) can be enabled. Both can't be set to true at the same time. %s, %s", HoodieCompactionConfig.INLINE_COMPACT.key(),
-          HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), inlineCompact, inlineCompactSchedule));
       return compactionConfig;
     }
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
index 2989d8c2b3..2a05752aa6 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.ConfigClassProperty;
 import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 
 import java.io.File;
 import java.io.FileReader;
@@ -52,6 +53,20 @@ public class HoodiePayloadConfig extends HoodieConfig {
       .withDocumentation("Table column/field name to derive timestamp associated with the records. This can"
           + "be useful for e.g, determining the freshness of the table.");
 
+  public static final ConfigProperty<String> PAYLOAD_CLASS_NAME = ConfigProperty
+      .key("hoodie.compaction.payload.class")
+      .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
+      .withDocumentation("This needs to be same as class used during insert/upserts. Just like writing, compaction also uses "
+        + "the record payload class to merge records in the log against each other, merge again with the base file and "
+        + "produce the final record to be written after compaction.");
+
+  /** @deprecated Use {@link #PAYLOAD_CLASS_NAME} and its methods instead */
+  @Deprecated
+  public static final String DEFAULT_PAYLOAD_CLASS = PAYLOAD_CLASS_NAME.defaultValue();
+  /** @deprecated Use {@link #PAYLOAD_CLASS_NAME} and its methods instead */
+  @Deprecated
+  public static final String PAYLOAD_CLASS_PROP = PAYLOAD_CLASS_NAME.key();
+
   private HoodiePayloadConfig() {
     super();
   }
@@ -86,6 +101,11 @@ public class HoodiePayloadConfig extends HoodieConfig {
       return this;
     }
 
+    public HoodiePayloadConfig.Builder withPayloadClass(String payloadClassName) {
+      payloadConfig.setValue(PAYLOAD_CLASS_NAME, payloadClassName);
+      return this;
+    }
+
     public HoodiePayloadConfig build() {
       payloadConfig.setDefaults(HoodiePayloadConfig.class.getName());
       return payloadConfig;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java
index ba3888863d..fc1798f206 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java
@@ -119,16 +119,16 @@ public class HoodieStorageConfig extends HoodieConfig {
       .withDocumentation("Whether to use dictionary encoding");
 
   public static final ConfigProperty<String> PARQUET_WRITE_LEGACY_FORMAT_ENABLED = ConfigProperty
-          .key("hoodie.parquet.writelegacyformat.enabled")
-          .defaultValue("false")
-          .withDocumentation("Sets spark.sql.parquet.writeLegacyFormat. If true, data will be written in a way of Spark 1.4 and earlier. "
-                  + "For example, decimal values will be written in Parquet's fixed-length byte array format which other systems such as Apache Hive and Apache Impala use. "
-                  + "If false, the newer format in Parquet will be used. For example, decimals will be written in int-based format.");
+      .key("hoodie.parquet.writelegacyformat.enabled")
+      .defaultValue("false")
+      .withDocumentation("Sets spark.sql.parquet.writeLegacyFormat. If true, data will be written in a way of Spark 1.4 and earlier. "
+          + "For example, decimal values will be written in Parquet's fixed-length byte array format which other systems such as Apache Hive and Apache Impala use. "
+          + "If false, the newer format in Parquet will be used. For example, decimals will be written in int-based format.");
 
   public static final ConfigProperty<String> PARQUET_OUTPUT_TIMESTAMP_TYPE = ConfigProperty
-          .key("hoodie.parquet.outputtimestamptype")
-          .defaultValue("TIMESTAMP_MICROS")
-          .withDocumentation("Sets spark.sql.parquet.outputTimestampType. Parquet timestamp type to use when Spark writes data to Parquet files.");
+      .key("hoodie.parquet.outputtimestamptype")
+      .defaultValue("TIMESTAMP_MICROS")
+      .withDocumentation("Sets spark.sql.parquet.outputTimestampType. Parquet timestamp type to use when Spark writes data to Parquet files.");
 
   public static final ConfigProperty<String> HFILE_COMPRESSION_ALGORITHM_NAME = ConfigProperty
       .key("hoodie.hfile.compression.algorithm")
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index d18238fa4b..4d07097c07 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -89,6 +89,8 @@ import java.util.Properties;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_POLICY;
+
 /**
  * Class storing configs for the HoodieWriteClient.
  */
@@ -1148,31 +1150,31 @@ public class HoodieWriteConfig extends HoodieConfig {
    * compaction properties.
    */
   public HoodieCleaningPolicy getCleanerPolicy() {
-    return HoodieCleaningPolicy.valueOf(getString(HoodieCompactionConfig.CLEANER_POLICY));
+    return HoodieCleaningPolicy.valueOf(getString(CLEANER_POLICY));
   }
 
   public int getCleanerFileVersionsRetained() {
-    return getInt(HoodieCompactionConfig.CLEANER_FILE_VERSIONS_RETAINED);
+    return getInt(HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED);
   }
 
   public int getCleanerCommitsRetained() {
-    return getInt(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED);
+    return getInt(HoodieCleanConfig.CLEANER_COMMITS_RETAINED);
   }
 
   public int getCleanerHoursRetained() {
-    return getInt(HoodieCompactionConfig.CLEANER_HOURS_RETAINED);
+    return getInt(HoodieCleanConfig.CLEANER_HOURS_RETAINED);
   }
 
   public int getMaxCommitsToKeep() {
-    return getInt(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP);
+    return getInt(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP);
   }
 
   public int getMinCommitsToKeep() {
-    return getInt(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP);
+    return getInt(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP);
   }
 
   public int getArchiveMergeFilesBatchSize() {
-    return getInt(HoodieCompactionConfig.ARCHIVE_MERGE_FILES_BATCH_SIZE);
+    return getInt(HoodieArchivalConfig.ARCHIVE_MERGE_FILES_BATCH_SIZE);
   }
 
   public int getParquetSmallFileLimit() {
@@ -1192,7 +1194,7 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public boolean allowMultipleCleans() {
-    return getBoolean(HoodieCompactionConfig.ALLOW_MULTIPLE_CLEANS);
+    return getBoolean(HoodieCleanConfig.ALLOW_MULTIPLE_CLEANS);
   }
 
   public boolean shouldAutoTuneInsertSplits() {
@@ -1200,43 +1202,43 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public int getCleanerParallelism() {
-    return getInt(HoodieCompactionConfig.CLEANER_PARALLELISM_VALUE);
+    return getInt(HoodieCleanConfig.CLEANER_PARALLELISM_VALUE);
   }
 
   public int getCleaningMaxCommits() {
-    return getInt(HoodieCompactionConfig.CLEAN_MAX_COMMITS);
+    return getInt(HoodieCleanConfig.CLEAN_MAX_COMMITS);
   }
 
   public CleaningTriggerStrategy getCleaningTriggerStrategy() {
-    return CleaningTriggerStrategy.valueOf(getString(HoodieCompactionConfig.CLEAN_TRIGGER_STRATEGY));
+    return CleaningTriggerStrategy.valueOf(getString(HoodieCleanConfig.CLEAN_TRIGGER_STRATEGY));
   }
 
   public boolean isAutoClean() {
-    return getBoolean(HoodieCompactionConfig.AUTO_CLEAN);
+    return getBoolean(HoodieCleanConfig.AUTO_CLEAN);
   }
 
   public boolean getArchiveMergeEnable() {
-    return getBoolean(HoodieCompactionConfig.ARCHIVE_MERGE_ENABLE);
+    return getBoolean(HoodieArchivalConfig.ARCHIVE_MERGE_ENABLE);
   }
 
   public long getArchiveMergeSmallFileLimitBytes() {
-    return getLong(HoodieCompactionConfig.ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES);
+    return getLong(HoodieArchivalConfig.ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES);
   }
 
   public boolean isAutoArchive() {
-    return getBoolean(HoodieCompactionConfig.AUTO_ARCHIVE);
+    return getBoolean(HoodieArchivalConfig.AUTO_ARCHIVE);
   }
 
   public boolean isAsyncArchive() {
-    return getBoolean(HoodieCompactionConfig.ASYNC_ARCHIVE);
+    return getBoolean(HoodieArchivalConfig.ASYNC_ARCHIVE);
   }
 
   public boolean isAsyncClean() {
-    return getBoolean(HoodieCompactionConfig.ASYNC_CLEAN);
+    return getBoolean(HoodieCleanConfig.ASYNC_CLEAN);
   }
 
   public boolean incrementalCleanerModeEnabled() {
-    return getBoolean(HoodieCompactionConfig.CLEANER_INCREMENTAL_MODE_ENABLE);
+    return getBoolean(HoodieCleanConfig.CLEANER_INCREMENTAL_MODE_ENABLE);
   }
 
   public boolean inlineCompactionEnabled() {
@@ -1280,7 +1282,7 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public int getArchiveDeleteParallelism() {
-    return getInt(HoodieCompactionConfig.DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE);
+    return getInt(HoodieArchivalConfig.DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE);
   }
 
   public boolean inlineClusteringEnabled() {
@@ -1321,7 +1323,7 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public String getPayloadClass() {
-    return getString(HoodieCompactionConfig.PAYLOAD_CLASS_NAME);
+    return getString(HoodiePayloadConfig.PAYLOAD_CLASS_NAME);
   }
 
   public int getTargetPartitionsPerDayBasedCompaction() {
@@ -1329,11 +1331,11 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public int getCommitArchivalBatchSize() {
-    return getInt(HoodieCompactionConfig.COMMITS_ARCHIVAL_BATCH_SIZE);
+    return getInt(HoodieArchivalConfig.COMMITS_ARCHIVAL_BATCH_SIZE);
   }
 
   public Boolean shouldCleanBootstrapBaseFile() {
-    return getBoolean(HoodieCompactionConfig.CLEANER_BOOTSTRAP_BASE_FILE_ENABLE);
+    return getBoolean(HoodieCleanConfig.CLEANER_BOOTSTRAP_BASE_FILE_ENABLE);
   }
 
   public String getClusteringUpdatesStrategyClass() {
@@ -1342,7 +1344,7 @@ public class HoodieWriteConfig extends HoodieConfig {
 
   public HoodieFailedWritesCleaningPolicy getFailedWritesCleanPolicy() {
     return HoodieFailedWritesCleaningPolicy
-        .valueOf(getString(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY));
+        .valueOf(getString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY));
   }
 
   /**
@@ -2117,6 +2119,8 @@ public class HoodieWriteConfig extends HoodieConfig {
     private boolean isIndexConfigSet = false;
     private boolean isStorageConfigSet = false;
     private boolean isCompactionConfigSet = false;
+    private boolean isCleanConfigSet = false;
+    private boolean isArchivalConfigSet = false;
     private boolean isClusteringConfigSet = false;
     private boolean isOptimizeConfigSet = false;
     private boolean isMetricsConfigSet = false;
@@ -2284,6 +2288,18 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withCleanConfig(HoodieCleanConfig cleanConfig) {
+      writeConfig.getProps().putAll(cleanConfig.getProps());
+      isCleanConfigSet = true;
+      return this;
+    }
+
+    public Builder withArchivalConfig(HoodieArchivalConfig cleanConfig) {
+      writeConfig.getProps().putAll(cleanConfig.getProps());
+      isArchivalConfigSet = true;
+      return this;
+    }
+
     public Builder withClusteringConfig(HoodieClusteringConfig clusteringConfig) {
       writeConfig.getProps().putAll(clusteringConfig.getProps());
       isClusteringConfigSet = true;
@@ -2517,6 +2533,10 @@ public class HoodieWriteConfig extends HoodieConfig {
           writeConfig.getProps()).build());
       writeConfig.setDefaultOnCondition(!isCompactionConfigSet,
           HoodieCompactionConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
+      writeConfig.setDefaultOnCondition(!isCleanConfigSet,
+          HoodieCleanConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
+      writeConfig.setDefaultOnCondition(!isArchivalConfigSet,
+          HoodieArchivalConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
       writeConfig.setDefaultOnCondition(!isClusteringConfigSet,
           HoodieClusteringConfig.newBuilder().withEngineType(engineType)
               .fromProperties(writeConfig.getProps()).build());
@@ -2587,10 +2607,10 @@ public class HoodieWriteConfig extends HoodieConfig {
       if (WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value()
           .equalsIgnoreCase(writeConcurrencyMode)) {
         // In this case, we assume that the user takes care of setting the lock provider used
-        writeConfig.setValue(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(),
+        writeConfig.setValue(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
             HoodieFailedWritesCleaningPolicy.LAZY.name());
         LOG.info(String.format("Automatically set %s=%s since optimistic concurrency control is used",
-            HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(),
+            HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
             HoodieFailedWritesCleaningPolicy.LAZY.name()));
       }
     }
@@ -2602,9 +2622,34 @@ public class HoodieWriteConfig extends HoodieConfig {
       Objects.requireNonNull(writeConfig.getString(BASE_PATH));
       if (writeConfig.getString(WRITE_CONCURRENCY_MODE)
           .equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value())) {
-        ValidationUtils.checkArgument(!writeConfig.getString(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY)
+        ValidationUtils.checkArgument(!writeConfig.getString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY)
             .equals(HoodieFailedWritesCleaningPolicy.EAGER.name()), "To enable optimistic concurrency control, set hoodie.cleaner.policy.failed.writes=LAZY");
       }
+
+      HoodieCleaningPolicy.valueOf(writeConfig.getString(CLEANER_POLICY));
+      // Ensure minInstantsToKeep > cleanerCommitsRetained, otherwise we will archive some
+      // commit instant on timeline, that still has not been cleaned. Could miss some data via incr pull
+      int minInstantsToKeep = Integer.parseInt(writeConfig.getStringOrDefault(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP));
+      int maxInstantsToKeep = Integer.parseInt(writeConfig.getStringOrDefault(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP));
+      int cleanerCommitsRetained =
+          Integer.parseInt(writeConfig.getStringOrDefault(HoodieCleanConfig.CLEANER_COMMITS_RETAINED));
+      ValidationUtils.checkArgument(maxInstantsToKeep > minInstantsToKeep,
+          String.format(
+              "Increase %s=%d to be greater than %s=%d.",
+              HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), maxInstantsToKeep,
+              HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), minInstantsToKeep));
+      ValidationUtils.checkArgument(minInstantsToKeep > cleanerCommitsRetained,
+          String.format(
+              "Increase %s=%d to be greater than %s=%d. Otherwise, there is risk of incremental pull "
+                  + "missing data from few instants.",
+              HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), minInstantsToKeep,
+              HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), cleanerCommitsRetained));
+
+      boolean inlineCompact = writeConfig.getBoolean(HoodieCompactionConfig.INLINE_COMPACT);
+      boolean inlineCompactSchedule = writeConfig.getBoolean(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT);
+      ValidationUtils.checkArgument(!(inlineCompact && inlineCompactSchedule), String.format("Either of inline compaction (%s) or "
+              + "schedule inline compaction (%s) can be enabled. Both can't be set to true at the same time. %s, %s", HoodieCompactionConfig.INLINE_COMPACT.key(),
+          HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), inlineCompact, inlineCompactSchedule));
     }
 
     public HoodieWriteConfig build() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index e36adf6be5..f4ee3fc9f2 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -58,6 +58,8 @@ import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
@@ -255,20 +257,24 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
         .withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath()))
         .withSchema(HoodieMetadataRecord.getClassSchema().toString())
         .forTable(tableName)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+        // we will trigger cleaning manually, to control the instant times
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withAsyncClean(writeConfig.isMetadataAsyncClean())
-            // we will trigger cleaning manually, to control the instant times
             .withAutoClean(false)
             .withCleanerParallelism(parallelism)
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
             .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
             .retainCommits(writeConfig.getMetadataCleanerCommitsRetained())
+            .build())
+        // we will trigger archive manually, to ensure only regular writer invokes it
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder()
             .archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep)
-            // we will trigger compaction manually, to control the instant times
+            .withAutoArchive(false)
+            .build())
+        // we will trigger compaction manually, to control the instant times
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             .withInlineCompaction(false)
             .withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax())
-            // we will trigger archive manually, to ensure only regular writer invokes it
-            .withAutoArchive(false)
             // by default, the HFile does not keep the metadata fields, set up as false
             // to always use the metadata of the new record.
             .withPreserveCommitMetadata(false)
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
index 6573560e75..afbedc0de3 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestTransactionManager.java
@@ -26,7 +26,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieLockException;
@@ -57,9 +57,9 @@ public class TestTransactionManager extends HoodieCommonTestHarness {
   private HoodieWriteConfig getWriteConfig() {
     return HoodieWriteConfig.newBuilder()
         .withPath(basePath)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
-            .build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+        .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+        .build())
         .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
         .withLockConfig(HoodieLockConfig.newBuilder()
             .withLockProvider(InProcessLockProvider.class)
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 3d0da3e49a..0adbb998a0 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -47,10 +47,10 @@ import java.util.Properties;
 import java.util.function.Function;
 
 import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
-import static org.apache.hudi.config.HoodieCompactionConfig.ASYNC_ARCHIVE;
-import static org.apache.hudi.config.HoodieCompactionConfig.ASYNC_CLEAN;
-import static org.apache.hudi.config.HoodieCompactionConfig.AUTO_CLEAN;
-import static org.apache.hudi.config.HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY;
+import static org.apache.hudi.config.HoodieArchivalConfig.ASYNC_ARCHIVE;
+import static org.apache.hudi.config.HoodieCleanConfig.ASYNC_CLEAN;
+import static org.apache.hudi.config.HoodieCleanConfig.AUTO_CLEAN;
+import static org.apache.hudi.config.HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY;
 import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT;
 import static org.apache.hudi.config.HoodieWriteConfig.TABLE_SERVICES_ENABLED;
 import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE;
@@ -64,9 +64,9 @@ public class TestHoodieWriteConfig {
   public void testPropertyLoading(boolean withAlternative) throws IOException {
     Builder builder = HoodieWriteConfig.newBuilder().withPath("/tmp");
     Map<String, String> params = new HashMap<>(3);
-    params.put(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "1");
-    params.put(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "5");
-    params.put(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "2");
+    params.put(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1");
+    params.put(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "5");
+    params.put(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "2");
     if (withAlternative) {
       params.put("hoodie.avro.schema.externalTransformation", "true");
     } else {
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
index f6315eec7d..629b16cdb8 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
@@ -38,7 +38,7 @@ import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieRollbackException;
@@ -80,7 +80,7 @@ public class TestClientRollback extends HoodieClientTestBase {
    */
   @Test
   public void testSavepointAndRollback() throws Exception {
-    HoodieWriteConfig cfg = getConfigBuilder().withCompactionConfig(HoodieCompactionConfig.newBuilder()
+    HoodieWriteConfig cfg = getConfigBuilder().withCleanConfig(HoodieCleanConfig.newBuilder()
         .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build()).build();
     try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
       HoodieTestDataGenerator.writePartitionMetadataDeprecated(fs, HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, basePath);
@@ -214,7 +214,7 @@ public class TestClientRollback extends HoodieClientTestBase {
 
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
         .withRollbackUsingMarkers(false)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
 
@@ -329,7 +329,7 @@ public class TestClientRollback extends HoodieClientTestBase {
                 .enable(enableMetadataTable)
                 .build()
         )
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
 
@@ -436,7 +436,7 @@ public class TestClientRollback extends HoodieClientTestBase {
     // Set Failed Writes rollback to LAZY
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build()).build();
 
     HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
@@ -530,7 +530,7 @@ public class TestClientRollback extends HoodieClientTestBase {
                 .enable(enableMetadataTable)
                 .build()
         )
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index 3aeca0f275..268674e78d 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -33,6 +33,8 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -100,9 +102,11 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
     properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
     properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
     HoodieWriteConfig writeConfig = getConfigBuilder()
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
-            .withAutoArchive(false).withAutoClean(false).build())
+            .withAutoClean(false).build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+            .withAutoArchive(false).build())
         .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
         // Timeline-server-based markers are not used for multi-writer tests
         .withMarkersType(MarkerType.DIRECT.name())
@@ -192,9 +196,11 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
     properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20");
 
     HoodieWriteConfig cfg = getConfigBuilder()
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+            .build())
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             .withInlineCompaction(false)
-            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
             .withMaxNumDeltaCommitsBeforeCompaction(2)
             .build())
         .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
@@ -265,9 +271,12 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
     properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
     // Disabling embedded timeline server, it doesn't work with multiwriter
     HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().withAutoClean(false)
-            .withInlineCompaction(false).withAsyncClean(true)
-            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withAutoClean(false)
+            .withAsyncClean(true)
+            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withInlineCompaction(false)
             .withMaxNumDeltaCommitsBeforeCompaction(2).build())
         .withEmbeddedTimelineServerEnabled(false)
         // Timeline-server-based markers are not used for multi-writer tests
@@ -402,7 +411,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
     properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
     properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
     HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
             .withAutoClean(false).build())
         .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
         // Timeline-server-based markers are not used for multi-writer tests
@@ -453,7 +463,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
     properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
     properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100");
     HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
             .withAutoClean(false).build())
         .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
         // Timeline-server-based markers are not used for multi-writer tests
@@ -536,7 +547,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
     properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
     properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100");
     HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder()
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
             .withAutoClean(false).build())
         .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
         // Timeline-server-based markers are not used for multi-writer tests
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 1a618a01df..6d410ded1c 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -82,11 +82,13 @@ import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.hash.ColumnIndexID;
 import org.apache.hudi.common.util.hash.PartitionIndexID;
 import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieLockConfig;
-import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.io.storage.HoodieHFileReader;
@@ -476,7 +478,13 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
             .archiveCommitsWith(3, 4)
             .retainCommits(1)
             .build())
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3).retainCommits(1).build()).build();
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .retainCommits(1)
+            .build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+            .archiveCommitsWith(2, 3)
+            .build())
+        .build();
     initWriteConfigAndMetatableWriter(writeConfig, true);
 
     AtomicInteger commitTime = new AtomicInteger(1);
@@ -637,8 +645,9 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
     initPath();
     int maxCommits = 1;
     HoodieWriteConfig cfg = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits)
+            .build())
         .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
         .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
         .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
@@ -1172,8 +1181,15 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
             .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction)
             .withPopulateMetaFields(populateMateFields)
             .build())
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(minArchiveCommitsDataset, minArchiveCommitsDataset + 1)
-            .retainCommits(1).retainFileVersions(1).withAutoClean(false).withAsyncClean(true).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .retainCommits(1)
+            .retainFileVersions(1)
+            .withAutoClean(false)
+            .withAsyncClean(true)
+            .build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+            .archiveCommitsWith(minArchiveCommitsDataset, minArchiveCommitsDataset + 1)
+            .build())
         .build();
 
     initWriteConfigAndMetatableWriter(writeConfig, true);
@@ -1399,10 +1415,13 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
     // disable small file handling so that every insert goes to a new file group.
     HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
         .withRollbackUsingMarkers(false)
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
+            .withAutoClean(false).retainCommits(1).retainFileVersions(1)
+            .build())
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0)
             .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
-            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
-            .withAutoClean(false).retainCommits(1).retainFileVersions(1).build())
+            .build())
         .withMetadataConfig(HoodieMetadataConfig.newBuilder()
             .enable(true)
             .withMetadataIndexColumnStats(true)
@@ -1612,7 +1631,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
     properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
     properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20");
     HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
         .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
         .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
@@ -1676,8 +1695,9 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
     properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
 
     HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(true).retainCommits(4).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+        .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(true).retainCommits(4)
+        .build())
         .withAutoCommit(false)
         .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
         .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
@@ -1853,9 +1873,12 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
         .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
             .archiveCommitsWith(40, 60).retainCommits(1)
             .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommitsBeforeCompaction).build())
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 4)
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER)
-            .retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(false).build())
+            .retainCommits(1).retainFileVersions(1).withAutoClean(true).withAsyncClean(false)
+            .build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+            .archiveCommitsWith(2, 4).build())
         .build();
 
     List<HoodieRecord> records;
@@ -2006,8 +2029,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
     properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3");
     properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000");
     HoodieWriteConfig writeConfig = getWriteConfigBuilder(false, true, false)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+        .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
         .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
         .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
         .withProperties(properties)
@@ -2034,10 +2057,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
 
     // set hoodie.table.version to 2 in hoodie.properties file
     changeTableVersion(HoodieTableVersion.TWO);
-    writeConfig = getWriteConfigBuilder(true, true, false)
-        .withRollbackUsingMarkers(false)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
+    writeConfig = getWriteConfigBuilder(true, true, false).withRollbackUsingMarkers(false).withCleanConfig(HoodieCleanConfig.newBuilder()
+        .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
         .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
         .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
         .withProperties(properties)
@@ -2119,7 +2140,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
 
     int maxCommits = 1;
     HoodieWriteConfig cfg = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
         .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
         .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
@@ -2285,13 +2306,13 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
 
   private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, String schemaStr, long smallFileSize, boolean mergeAllowDuplicateInserts) {
     HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER);
-    return builder
-        .withCompactionConfig(
+    return builder.withCompactionConfig(
             HoodieCompactionConfig.newBuilder()
                 .compactionSmallFileSize(smallFileSize)
                 // Set rollback to LAZY so no inflights are deleted
-                .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
                 .insertSplitSize(insertSplitSize).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
         .withStorageConfig(
             HoodieStorageConfig.newBuilder()
                 .hfileMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200))
@@ -2307,8 +2328,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
         .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
         .withWriteStatusClass(MetadataMergeWriteStatus.class)
         .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(cleaningPolicy)
-            .compactionSmallFileSize(1024 * 1024).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(cleaningPolicy).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
         .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).orcMaxFileSize(1024 * 1024).build())
         .forTable("test-trip-table")
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index cebf3145bf..8ba459b772 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -71,12 +71,13 @@ import org.apache.hudi.common.util.MarkerUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
-import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieCorruptedDataException;
@@ -360,7 +361,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
         .withPrecommitValidatorSingleResultSqlQueries(COUNT_SQL_QUERY_FOR_VALIDATION + "#" + 500)
         .build();
     HoodieWriteConfig config = getConfigBuilder()
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).build())
         .withPreCommitValidatorConfig(validatorConfig)
         .build();
 
@@ -386,7 +387,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
         .withPrecommitValidatorSingleResultSqlQueries(COUNT_SQL_QUERY_FOR_VALIDATION + "#" + numRecords)
         .build();
     config = getConfigBuilder()
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).build())
         .withPreCommitValidatorConfig(validatorConfig)
         .build();
     String instant2 = HoodieActiveTimeline.createNewInstantTime();
@@ -921,7 +922,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
       .setTimelineLayoutVersion(VERSION_0)
       .initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
     // Set rollback to LAZY so no inflights are deleted
-    hoodieWriteConfig.getProps().put(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(),
+    hoodieWriteConfig.getProps().put(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
         HoodieFailedWritesCleaningPolicy.LAZY.name());
     SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
 
@@ -2606,17 +2607,16 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     if (!populateMetaFields) {
       builder.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.SIMPLE).build());
     }
-    return builder
-        .withCompactionConfig(
-            HoodieCompactionConfig.newBuilder()
-                .compactionSmallFileSize(smallFileSize)
-                // Set rollback to LAZY so no inflights are deleted
-                .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
-                .insertSplitSize(insertSplitSize).build())
-        .withStorageConfig(
-            HoodieStorageConfig.newBuilder()
-                .hfileMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200))
-                .parquetMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build())
+    return builder.withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .compactionSmallFileSize(smallFileSize)
+            // Set rollback to LAZY so no inflights are deleted
+            .insertSplitSize(insertSplitSize).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+            .build())
+        .withStorageConfig(HoodieStorageConfig.newBuilder()
+            .hfileMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200))
+            .parquetMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build())
         .withMergeAllowDuplicateOnInserts(mergeAllowDuplicateInserts)
         .withProps(props)
         .build();
@@ -2636,7 +2636,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
   private HoodieWriteConfig getParallelWritingWriteConfig(HoodieFailedWritesCleaningPolicy cleaningPolicy, boolean populateMetaFields) {
     return getConfigBuilder()
         .withEmbeddedTimelineServerEnabled(false)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withFailedWritesCleaningPolicy(cleaningPolicy)
             .withAutoClean(false).build())
         .withTimelineLayoutVersion(1)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
index 2e387be544..29c653daee 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
@@ -36,9 +36,11 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig;
@@ -338,9 +340,11 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
         .withParallelism(2, 2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2)
         .withAutoCommit(autoCommit)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
-            .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
+            .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withFailedWritesCleaningPolicy(policy)
-            .withAutoClean(false).retainCommits(1).retainFileVersions(1).build())
+            .withAutoClean(false).retainCommits(1).retainFileVersions(1)
+            .build())
         .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).build())
         .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
         .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
@@ -390,16 +394,20 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
         .withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath()))
         .withSchema(HoodieMetadataRecord.getClassSchema().toString())
         .forTable(writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+        // we will trigger cleaning manually, to control the instant times
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withAsyncClean(writeConfig.isMetadataAsyncClean())
-            // we will trigger cleaning manually, to control the instant times
             .withAutoClean(false)
             .withCleanerParallelism(parallelism)
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
             .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
             .retainCommits(writeConfig.getMetadataCleanerCommitsRetained())
-            .archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep)
-            // we will trigger compaction manually, to control the instant times
+            .build())
+        // we will trigger archival manually, to control the instant times
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+           .archiveCommitsWith(minCommitsToKeep, maxCommitsToKeep).build())
+        // we will trigger compaction manually, to control the instant times
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             .withInlineCompaction(false)
             .withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax()).build())
         .withParallelism(parallelism, parallelism)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java
index bdbc9e72d3..8531030a5c 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBootstrap.java
@@ -26,7 +26,8 @@ import org.apache.hudi.common.testutils.FileCreateUtils;
 import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestTable;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 
 import org.apache.log4j.LogManager;
@@ -275,7 +276,10 @@ public class TestHoodieMetadataBootstrap extends TestHoodieMetadataBase {
   private HoodieWriteConfig getWriteConfig(int minArchivalCommits, int maxArchivalCommits) throws Exception {
     return HoodieWriteConfig.newBuilder().withPath(basePath)
         .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minArchivalCommits, maxArchivalCommits).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .retainCommits(1).build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+            .archiveCommitsWith(minArchivalCommits, maxArchivalCommits).build())
         .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
             .withRemoteServerPort(timelineServicePort).build())
         .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
index 87bcad04bc..407fb8de0e 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java
@@ -32,6 +32,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieHBaseIndexConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
@@ -477,9 +479,9 @@ public class TestSparkHoodieHBaseIndex extends SparkClientFunctionalTestHarness
   public void testHbaseTagLocationForArchivedCommits() throws Exception {
     // Load to memory
     Map<String, String> params = new HashMap<String, String>();
-    params.put(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "1");
-    params.put(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "3");
-    params.put(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "2");
+    params.put(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1");
+    params.put(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "3");
+    params.put(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "2");
     HoodieWriteConfig config = getConfigBuilder(100, false, false).withProps(params).build();
 
     SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index d412052c2d..4f41c4a44d 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -45,6 +45,8 @@ import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -181,13 +183,14 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
     init(tableType);
     HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(basePath)
         .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minArchivalCommits, maxArchivalCommits)
-            .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits)
+        .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).withFailedWritesCleaningPolicy(failedWritesCleaningPolicy).build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder()
             .withArchiveMergeEnable(enableArchiveMerge)
             .withArchiveMergeFilesBatchSize(archiveFilesBatch)
             .withArchiveMergeSmallFileLimit(size)
-            .withFailedWritesCleaningPolicy(failedWritesCleaningPolicy)
-            .build())
+            .archiveCommitsWith(minArchivalCommits, maxArchivalCommits).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(maxDeltaCommits).build())
         .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
             .withRemoteServerPort(timelineServicePort).build())
         .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata)
@@ -566,7 +569,8 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
     init();
     HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
         .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable("test-trip-table")
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 5).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
         .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
             .withRemoteServerPort(timelineServicePort).build())
         .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
@@ -716,7 +720,8 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
     HoodieWriteConfig cfg =
         HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
             .withParallelism(2, 2).forTable("test-trip-table")
-            .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
+            .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+            .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
             .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
                 .withRemoteServerPort(timelineServicePort).build())
             .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
@@ -881,9 +886,9 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
     HoodieWriteConfig cfg =
         HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
             .withParallelism(2, 2).forTable("test-trip-table")
-            .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minInstantsToKeep, maxInstantsToKeep).build())
-            .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
-                .withRemoteServerPort(timelineServicePort).build())
+          .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+              .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(minInstantsToKeep, maxInstantsToKeep).build())
+              .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
             .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
             .build();
     metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -940,7 +945,8 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
     HoodieWriteConfig cfg =
         HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
             .withParallelism(2, 2).forTable("test-trip-table")
-            .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
+            .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+                .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
             .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
                 .withRemoteServerPort(timelineServicePort).build())
             .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
@@ -1146,8 +1152,8 @@ public class TestHoodieTimelineArchiver extends HoodieClientTestHarness {
     // Test configs where metadata table has more aggressive archival configs than the compaction config
     HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(basePath)
         .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .retainCommits(1).archiveCommitsWith(2, 4).build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 4).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
         .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
             .withRemoteServerPort(timelineServicePort).build())
         .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index b8545b0f63..8da877940b 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -70,6 +70,7 @@ import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
@@ -261,11 +262,15 @@ public class TestCleaner extends HoodieClientTestBase {
     HoodieWriteConfig writeConfig = getConfigBuilder()
         .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
             .withEnableBackupForRemoteFileSystemView(false).build())
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
-            .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
+
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
             .allowMultipleCleans(false)
-            .withAutoClean(false).retainCommits(1).retainFileVersions(1).build())
+            .withAutoClean(false).retainCommits(1).retainFileVersions(1)
+            .build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
+            .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
+            .build())
         .withEmbeddedTimelineServerEnabled(false).build();
 
     int index = 0;
@@ -334,8 +339,9 @@ public class TestCleaner extends HoodieClientTestBase {
       throws Exception {
     int maxVersions = 2; // keep upto 2 versions for each file
     HoodieWriteConfig cfg = getConfigBuilder()
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(maxVersions).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS)
+            .retainFileVersions(maxVersions).build())
         .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
         .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
         .build();
@@ -503,7 +509,7 @@ public class TestCleaner extends HoodieClientTestBase {
       throws Exception {
     int maxCommits = 3; // keep upto 3 commits from the past
     HoodieWriteConfig cfg = getConfigBuilder()
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
         .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
         .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
@@ -579,7 +585,7 @@ public class TestCleaner extends HoodieClientTestBase {
     HoodieWriteConfig cfg = getConfigBuilder()
         .withAutoCommit(false)
         .withHeartbeatIntervalInMs(3000)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build())
         .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
@@ -732,8 +738,8 @@ public class TestCleaner extends HoodieClientTestBase {
             HoodieWriteConfig.newBuilder()
                     .withPath(basePath)
                     .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
-                    .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-                            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).build())
+                    .withCleanConfig(HoodieCleanConfig.newBuilder()
+                        .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).build())
                     .build();
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
@@ -787,8 +793,9 @@ public class TestCleaner extends HoodieClientTestBase {
         .withMetadataConfig(HoodieMetadataConfig.newBuilder()
             .withMaxNumDeltaCommitsBeforeCompaction(1)
             .withAssumeDatePartitioning(true).build())
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+            .retainCommits(2).build())
         .build();
 
     HoodieTableMetadataWriter metadataWriter = SparkHoodieBackedTableMetadataWriter.create(hadoopConf, config, context);
@@ -1140,7 +1147,7 @@ public class TestCleaner extends HoodieClientTestBase {
   public void testCleaningWithZeroPartitionPaths() throws Exception {
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
         .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
         .build();
 
@@ -1164,7 +1171,7 @@ public class TestCleaner extends HoodieClientTestBase {
   public void testKeepLatestCommitsWithPendingCompactions() throws Exception {
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
         .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
         .build();
     // Deletions:
@@ -1188,7 +1195,7 @@ public class TestCleaner extends HoodieClientTestBase {
     HoodieWriteConfig config =
         HoodieWriteConfig.newBuilder().withPath(basePath)
             .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
-            .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withCleanConfig(HoodieCleanConfig.newBuilder()
                 .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(2).build())
             .build();
     // Deletions:
@@ -1212,8 +1219,8 @@ public class TestCleaner extends HoodieClientTestBase {
         HoodieWriteConfig.newBuilder()
             .withPath(basePath)
             .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
-            .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
+            .withCleanConfig(HoodieCleanConfig.newBuilder()
+                .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
             .build();
 
     String commitTime = makeNewCommitTime(1, "%09d");
@@ -1241,7 +1248,7 @@ public class TestCleaner extends HoodieClientTestBase {
         .withMetadataConfig(HoodieMetadataConfig.newBuilder()
             .withMaxNumDeltaCommitsBeforeCompaction(1)
             .withAssumeDatePartitioning(true).build())
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
         .build();
 
@@ -1317,7 +1324,7 @@ public class TestCleaner extends HoodieClientTestBase {
     HoodieWriteConfig cfg = getConfigBuilder()
         .withAutoCommit(false)
         .withHeartbeatIntervalInMs(3000)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(maxVersions).build())
         .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
index bd015baec9..9fcac64c00 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
@@ -37,7 +37,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
@@ -77,14 +77,14 @@ public class TestCleanPlanExecutor extends TestCleaner {
   @Test
   public void testInvalidCleaningTriggerStrategy() {
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
-                .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(false).build())
-                .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-                        .withIncrementalCleaningMode(true)
-                        .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
-                        .withCleanBootstrapBaseFileEnabled(true)
-                        .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2)
-                        .withCleaningTriggerStrategy("invalid_strategy").build())
-                .build();
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(false).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withIncrementalCleaningMode(true)
+            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
+            .withCleanBootstrapBaseFileEnabled(true)
+            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2)
+            .withCleaningTriggerStrategy("invalid_strategy").build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).enable(false).build()).build();
     Exception e = assertThrows(IllegalArgumentException.class, () -> runCleaner(config, true), "should fail when invalid trigger strategy is provided!");
     assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.table.action.clean.CleaningTriggerStrategy.invalid_strategy"));
   }
@@ -108,18 +108,15 @@ public class TestCleanPlanExecutor extends TestCleaner {
       boolean simulateFailureRetry, boolean simulateMetadataFailure,
       boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception {
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
-        .withMetadataConfig(
-            HoodieMetadataConfig.newBuilder()
-                .withAssumeDatePartitioning(true)
-                .build())
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withIncrementalCleaningMode(enableIncrementalClean)
             .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
             .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
             .retainCommits(2)
-            .withMaxCommitsBeforeCleaning(2).build())
-        .build();
+            .withMaxCommitsBeforeCleaning(2)
+          .build()).build();
 
     HoodieTestTable testTable = HoodieTestTable.of(metaClient);
     String p0 = "2020/01/01";
@@ -274,7 +271,7 @@ public class TestCleanPlanExecutor extends TestCleaner {
     HoodieWriteConfig config =
         HoodieWriteConfig.newBuilder().withPath(basePath)
             .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
-            .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withCleanConfig(HoodieCleanConfig.newBuilder()
                 .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
             .build();
 
@@ -353,7 +350,7 @@ public class TestCleanPlanExecutor extends TestCleaner {
     HoodieWriteConfig config =
         HoodieWriteConfig.newBuilder().withPath(basePath)
             .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
-            .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withCleanConfig(HoodieCleanConfig.newBuilder()
                 .withCleanBootstrapBaseFileEnabled(true)
                 .withCleanerParallelism(1)
                 .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
@@ -453,18 +450,15 @@ public class TestCleanPlanExecutor extends TestCleaner {
   @Test
   public void testKeepLatestFileVersionsMOR() throws Exception {
 
-    HoodieWriteConfig config =
-                HoodieWriteConfig.newBuilder().withPath(basePath)
-                        .withMetadataConfig(
-                            HoodieMetadataConfig.newBuilder()
-                                .withAssumeDatePartitioning(true)
-                                // Column Stats Index is disabled, since these tests construct tables which are
-                                // not valid (empty commit metadata, invalid parquet files)
-                                .withMetadataIndexColumnStats(false)
-                                .build())
-                        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-                                .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
-                        .build();
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true)
+            // Column Stats Index is disabled, since these tests construct tables which are
+            // not valid (empty commit metadata, invalid parquet files)
+            .withMetadataIndexColumnStats(false)
+            .build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1)
+            .build()).build();
 
     HoodieTableMetaClient metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
     HoodieTestTable testTable = HoodieTestTable.of(metaClient);
@@ -497,18 +491,14 @@ public class TestCleanPlanExecutor extends TestCleaner {
   @Test
   public void testKeepLatestCommitsMOR() throws Exception {
 
-    HoodieWriteConfig config =
-            HoodieWriteConfig.newBuilder().withPath(basePath)
-                    .withMetadataConfig(
-                        HoodieMetadataConfig.newBuilder()
-                            .withAssumeDatePartitioning(true)
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true)
                             // Column Stats Index is disabled, since these tests construct tables which are
                             // not valid (empty commit metadata, invalid parquet files)
-                            .withMetadataIndexColumnStats(false)
-                            .build())
-                    .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-                            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build())
-                    .build();
+                            .withMetadataIndexColumnStats(false).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(1).build())
+        .build();
 
     HoodieTableMetaClient metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
     HoodieTestTable testTable = HoodieTestTable.of(metaClient);
@@ -552,11 +542,12 @@ public class TestCleanPlanExecutor extends TestCleaner {
       boolean enableIncrementalClean, boolean enableBootstrapSourceClean) throws Exception {
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
         .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withIncrementalCleaningMode(enableIncrementalClean)
             .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER)
             .withCleanBootstrapBaseFileEnabled(enableBootstrapSourceClean)
-            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(2).build())
+            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS).cleanerNumHoursRetained(2)
+            .build())
         .build();
 
     HoodieTestTable testTable = HoodieTestTable.of(metaClient);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
index b4d6aefa71..baff4ebac8 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
@@ -27,7 +27,8 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 
@@ -54,8 +55,9 @@ public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie
   public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOException {
     HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
     HoodieWriteConfig writeConfig = getConfigBuilder(true)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3).retainCommits(1).build())
-        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
+            .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build())
         .build();
     try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig);
          HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(DEFAULT_PARTITION_PATHS)) {
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index c33f0bcc2c..2f0e585ec9 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -43,6 +43,7 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -556,7 +557,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
 
       // trigger clean. creating a new client with aggresive cleaner configs so that clean will kick in immediately.
       cfgBuilder = getConfigBuilder(false)
-          .withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).build())
+          .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
           // Timeline-server-based markers are not used for multi-rollback tests
           .withMarkersType(MarkerType.DIRECT.name());
       addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
@@ -977,10 +978,13 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
 
   private HoodieWriteConfig getWriteConfig(boolean autoCommit, boolean rollbackUsingMarkers) {
     HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(autoCommit).withRollbackUsingMarkers(rollbackUsingMarkers)
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withAutoClean(false)
+            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+            .build())
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024L)
             .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(3)
-            .withAutoClean(false)
-            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build());
+            .build());
     return cfgBuilder.build();
   }
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 16fd48af6c..900674a677 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -38,10 +38,11 @@ import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieStorageConfig;
-import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.index.SparkHoodieIndexFactory;
@@ -146,8 +147,8 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
         .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
         .withWriteStatusClass(MetadataMergeWriteStatus.class)
         .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(cleaningPolicy)
-            .compactionSmallFileSize(1024 * 1024).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(cleaningPolicy).build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
         .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).orcMaxFileSize(1024 * 1024).build())
         .forTable("test-trip-table")
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
diff --git a/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java b/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
index 4890a6529a..6e20ee1190 100644
--- a/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
+++ b/hudi-examples/hudi-examples-java/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
@@ -27,7 +27,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.examples.common.HoodieExampleDataGenerator;
@@ -85,7 +85,7 @@ public class HoodieJavaWriteClientExample {
         .withSchema(HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
         .withDeleteParallelism(2).forTable(tableName)
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build();
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(20, 30).build()).build();
     HoodieJavaWriteClient<HoodieAvroPayload> client =
         new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg);
 
diff --git a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
index 1afc180531..299fe992fa 100644
--- a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
+++ b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
@@ -29,7 +29,7 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.examples.common.HoodieExampleDataGenerator;
@@ -99,7 +99,7 @@ public class HoodieWriteClientExample {
               .withSchema(HoodieExampleDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
               .withDeleteParallelism(2).forTable(tableName)
               .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
-              .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(20, 30).build()).build();
+              .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(20, 30).build()).build();
       SparkRDDWriteClient<HoodieAvroPayload> client = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg);
 
       // inserts
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index d292b3832a..0db960c32f 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -38,12 +38,14 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.HoodieMemoryConfig;
-import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.config.HoodieStorageConfig;
-import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
@@ -178,23 +180,25 @@ public class StreamerUtil {
                     .withClusteringSkipPartitionsFromLatest(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST))
                     .withAsyncClusteringMaxCommits(conf.getInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS))
                     .build())
-            .withCompactionConfig(
-                HoodieCompactionConfig.newBuilder()
-                    .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
-                    .withTargetIOPerCompactionInMB(conf.getLong(FlinkOptions.COMPACTION_TARGET_IO))
-                    .withInlineCompactionTriggerStrategy(
-                        CompactionTriggerStrategy.valueOf(conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toUpperCase(Locale.ROOT)))
-                    .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_COMMITS))
-                    .withMaxDeltaSecondsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS))
-                    .withAsyncClean(conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED))
-                    .retainCommits(conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS))
-                    .retainFileVersions(conf.getInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS))
-                    // override and hardcode to 20,
-                    // actually Flink cleaning is always with parallelism 1 now
-                    .withCleanerParallelism(20)
-                    .archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS))
-                    .withCleanerPolicy(HoodieCleaningPolicy.valueOf(conf.getString(FlinkOptions.CLEAN_POLICY)))
-                    .build())
+            .withCleanConfig(HoodieCleanConfig.newBuilder()
+                .withAsyncClean(conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED))
+                .retainCommits(conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS))
+                .retainFileVersions(conf.getInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS))
+                // override and hardcode to 20,
+                // actually Flink cleaning is always with parallelism 1 now
+                .withCleanerParallelism(20)
+                .withCleanerPolicy(HoodieCleaningPolicy.valueOf(conf.getString(FlinkOptions.CLEAN_POLICY)))
+                .build())
+            .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+                .archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS))
+                .build())
+            .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+                .withTargetIOPerCompactionInMB(conf.getLong(FlinkOptions.COMPACTION_TARGET_IO))
+                .withInlineCompactionTriggerStrategy(
+                    CompactionTriggerStrategy.valueOf(conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toUpperCase(Locale.ROOT)))
+                .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_COMMITS))
+                .withMaxDeltaSecondsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS))
+                .build())
             .withMemoryConfig(
                 HoodieMemoryConfig.newBuilder()
                     .withMaxMemoryMaxSize(
@@ -214,8 +218,10 @@ public class StreamerUtil {
                 .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
                 .build())
             .withPayloadConfig(HoodiePayloadConfig.newBuilder()
+                .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
                 .withPayloadOrderingField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
                 .withPayloadEventTimeField(conf.getString(FlinkOptions.PRECOMBINE_FIELD))
+                .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME))
                 .build())
             .withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService)
             .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
index 7a9e122e86..dc711818d7 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java
@@ -25,7 +25,6 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -95,8 +94,9 @@ public abstract class HoodieTestSuiteWriter implements Serializable {
     HoodieWriteConfig.Builder builder =
         HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath)
             .withAutoCommit(false)
-            .withCompactionConfig(HoodieCompactionConfig.newBuilder().withPayloadClass(cfg.payloadClassName).build())
-            .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(cfg.sourceOrderingField)
+            .withPayloadConfig(HoodiePayloadConfig.newBuilder()
+                .withPayloadOrderingField(cfg.sourceOrderingField)
+                .withPayloadClass(cfg.payloadClassName)
                 .build())
             .forTable(cfg.targetTableName)
             .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java
index 6ab0469738..1a33560dc8 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectWriterProvider.java
@@ -27,10 +27,12 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.config.HoodieClusteringConfig;
-import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.connect.KafkaConnectFileIdPrefixProvider;
 import org.apache.hudi.connect.utils.KafkaConnectUtils;
 import org.apache.hudi.exception.HoodieException;
@@ -87,13 +89,10 @@ public class KafkaConnectWriterProvider implements ConnectWriterProvider<WriteSt
           .withAutoCommit(false)
           .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
           // participants should not trigger table services, and leave it to the coordinator
-          .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-              .withAutoClean(false)
-              .withAutoArchive(false)
-              .withInlineCompaction(false).build())
-          .withClusteringConfig(HoodieClusteringConfig.newBuilder()
-              .withInlineClustering(false)
-              .build())
+          .withArchivalConfig(HoodieArchivalConfig.newBuilder().withAutoArchive(false).build())
+          .withCleanConfig(HoodieCleanConfig.newBuilder().withAutoClean(false).build())
+          .withCompactionConfig(HoodieCompactionConfig.newBuilder().withInlineCompaction(false).build())
+          .withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(false).build())
           .build();
 
       context = new HoodieJavaEngineContext(hadoopConf);
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index ca1474fa88..80be97ebef 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -180,9 +180,10 @@ public class DataSourceUtils {
 
     return builder.forTable(tblName)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key()))
             .withInlineCompaction(inlineCompact).build())
-        .withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(parameters.get(DataSourceWriteOptions.PRECOMBINE_FIELD().key()))
+        .withPayloadConfig(HoodiePayloadConfig.newBuilder()
+            .withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key()))
+            .withPayloadOrderingField(parameters.get(DataSourceWriteOptions.PRECOMBINE_FIELD().key()))
             .build())
         // override above with Hoodie configs specified as options.
         .withProps(parameters).build();
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
index 792d26b184..9eeef164a1 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
@@ -22,7 +22,7 @@ import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion
 import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
 import org.apache.hudi.common.util.Option
-import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig, HoodieCleanConfig}
 import org.apache.hudi.index.HoodieIndex
 import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, UpgradeDowngrade}
 import org.apache.spark.internal.Logging
@@ -82,7 +82,7 @@ class UpgradeOrDowngradeProcedure extends BaseProcedure with ProcedureBuilder wi
     HoodieWriteConfig.newBuilder
       .withPath(basePath)
       .withRollbackUsingMarkers(true)
-      .withCompactionConfig(HoodieCompactionConfig.newBuilder.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build)
+      .withCleanConfig(HoodieCleanConfig.newBuilder.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build)
       .withIndexConfig(HoodieIndexConfig.newBuilder.withIndexType(HoodieIndex.IndexType.BLOOM).build)
       .build
   }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 5940ab1d46..badd3ab627 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -783,12 +783,12 @@ public class DeltaSync implements Serializable {
             .combineInput(cfg.filterDupes, combineBeforeUpsert)
             .withCompactionConfig(
                 HoodieCompactionConfig.newBuilder()
-                    .withPayloadClass(cfg.payloadClassName)
                     .withInlineCompaction(cfg.isInlineCompactionEnabled())
                     .build()
             )
             .withPayloadConfig(
                 HoodiePayloadConfig.newBuilder()
+                    .withPayloadClass(cfg.payloadClassName)
                     .withPayloadOrderingField(cfg.sourceOrderingField)
                     .build())
             .forTable(cfg.targetTableName)
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 45d8a427c0..dde0e5f73f 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -46,6 +46,8 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -766,7 +768,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     }
     cfg.tableType = tableType.name();
     cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
-    cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
+    cfg.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
       if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
@@ -946,16 +948,16 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
 
     // Step 4 : Insert 1 record and trigger sync/async cleaner and archive.
     List<String> configs = getAsyncServicesConfigs(1, "true", "true", "2", "", "");
-    configs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS"));
-    configs.add(String.format("%s=%s", HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "1"));
-    configs.add(String.format("%s=%s", HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "2"));
-    configs.add(String.format("%s=%s", HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "3"));
-    configs.add(String.format("%s=%s", HoodieCompactionConfig.ASYNC_CLEAN.key(), asyncClean));
+    configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS"));
+    configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1"));
+    configs.add(String.format("%s=%s", HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "2"));
+    configs.add(String.format("%s=%s", HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "3"));
+    configs.add(String.format("%s=%s", HoodieCleanConfig.ASYNC_CLEAN.key(), asyncClean));
     configs.add(String.format("%s=%s", HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
     if (asyncClean) {
       configs.add(String.format("%s=%s", HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
           WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));
-      configs.add(String.format("%s=%s", HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(),
+      configs.add(String.format("%s=%s", HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
           HoodieFailedWritesCleaningPolicy.LAZY.name()));
       configs.add(String.format("%s=%s", HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
           InProcessLockProvider.class.getName()));
@@ -987,7 +989,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     List<String> configs = new ArrayList<>();
     configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
     if (!StringUtils.isNullOrEmpty(autoClean)) {
-      configs.add(String.format("%s=%s", HoodieCompactionConfig.AUTO_CLEAN.key(), autoClean));
+      configs.add(String.format("%s=%s", HoodieCleanConfig.AUTO_CLEAN.key(), autoClean));
     }
     if (!StringUtils.isNullOrEmpty(inlineCluster)) {
       configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING.key(), inlineCluster));
@@ -1462,7 +1464,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     cfg2.filterDupes = false;
     cfg2.sourceLimit = 2000;
     cfg2.operation = WriteOperationType.UPSERT;
-    cfg2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
+    cfg2.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
     HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg2, jsc);
     ds2.sync();
     mClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
index 13f5ad97cf..abe1994a3d 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java
@@ -26,7 +26,7 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
@@ -95,7 +95,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
         propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
     prepJobConfig.continuousMode = true;
     prepJobConfig.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
-    prepJobConfig.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
+    prepJobConfig.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
     HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc());
 
     // Prepare base dataset with some commits
@@ -115,7 +115,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
         propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
     cfgIngestionJob.continuousMode = true;
     cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
-    cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
+    cfgIngestionJob.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
 
     // create a backfill job
     HoodieDeltaStreamer.Config cfgBackfillJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
@@ -127,7 +127,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
         .fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
     cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
     cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
-    cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
+    cfgBackfillJob.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
     HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc());
 
     // re-init ingestion job to start sync service
@@ -157,7 +157,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
         propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
     prepJobConfig.continuousMode = true;
     prepJobConfig.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
-    prepJobConfig.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
+    prepJobConfig.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
     HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc());
 
     // Prepare base dataset with some commits
@@ -188,13 +188,13 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
         .fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
     cfgBackfillJob2.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
     cfgBackfillJob2.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
-    cfgBackfillJob2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
+    cfgBackfillJob2.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
 
     HoodieDeltaStreamer.Config cfgIngestionJob2 = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
         propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName()));
     cfgIngestionJob2.continuousMode = true;
     cfgIngestionJob2.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
-    cfgIngestionJob2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
+    cfgIngestionJob2.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
     // re-init ingestion job
     HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob2, jsc());
     // re-init backfill job
@@ -225,7 +225,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
         propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
     prepJobConfig.continuousMode = true;
     prepJobConfig.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
-    prepJobConfig.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
+    prepJobConfig.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
     HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc());
 
     // Prepare base dataset with some commits
@@ -263,7 +263,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
     // Set checkpoint to the last successful position
     cfgBackfillJob.checkpoint = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY);
     cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
-    cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
+    cfgBackfillJob.configs.add(String.format("%s=false", HoodieCleanConfig.AUTO_CLEAN.key()));
     HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc());
     backfillJob.sync();
 
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index fa5cba446f..57270bdf81 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -28,7 +28,8 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -64,8 +65,8 @@ public class TestHoodieIncrSource extends SparkClientFunctionalTestHarness {
   @Test
   public void testHoodieIncrSource() throws IOException {
     HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .archiveCommitsWith(2, 3).retainCommits(1).build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
         .withMetadataConfig(HoodieMetadataConfig.newBuilder()
             .withMaxNumDeltaCommitsBeforeCompaction(1).build())
         .build();
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index 8e7bce944f..67a002c3ba 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -124,11 +124,14 @@ public class UtilitiesTestBase {
   }
 
   public static void initTestServices(boolean needsHive, boolean needsZookeeper) throws Exception {
-    hdfsTestService = new HdfsTestService();
-    dfsCluster = hdfsTestService.start(true);
-    dfs = dfsCluster.getFileSystem();
-    dfsBasePath = dfs.getWorkingDirectory().toString();
-    dfs.mkdirs(new Path(dfsBasePath));
+
+    if (hdfsTestService == null) {
+      hdfsTestService = new HdfsTestService();
+      dfsCluster = hdfsTestService.start(true);
+      dfs = dfsCluster.getFileSystem();
+      dfsBasePath = dfs.getWorkingDirectory().toString();
+      dfs.mkdirs(new Path(dfsBasePath));
+    }
     if (needsHive) {
       hiveTestService = new HiveTestService(hdfsTestService.getHadoopConf());
       hiveServer = hiveTestService.start();