You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/04/05 23:14:45 UTC

[hudi] branch master updated: [HUDI-5782] Tweak defaults and remove unnecessary configs after config review (#8128)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8906b0dfeea [HUDI-5782] Tweak defaults and remove unnecessary configs after config review (#8128)
8906b0dfeea is described below

commit 8906b0dfeea3decfbfd6c0645c67fac729c24cbb
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Thu Apr 6 04:44:36 2023 +0530

    [HUDI-5782] Tweak defaults and remove unnecessary configs after config review (#8128)
    
    This commit tweaks a few defaults and removes unnecessary configs after config review:
    
    "hoodie.write.lock.dynamodb.table": add default value of hudi_locks
    "hoodie.write.lock.dynamodb.table_creation_timeout": default changed from 10 minutes to 2 minutes
    "hoodie.clustering.preserve.commit.metadata": removed since this should always be true
    "hoodie.compaction.preserve.commit.metadata": removed since this should always be true
    "hoodie.write.lock.max_wait_time_ms_between_retry": default changed from 5s to 16s
    "hoodie.write.commit.callback.http.timeout.seconds": default changed from 3s to 30s
    
    Co-authored-by: Y Ethan Guo <et...@gmail.com>
---
 .../hudi/config/DynamoDbBasedLockConfig.java       |  4 +--
 .../apache/hudi/config/HoodieClusteringConfig.java | 11 -------
 .../apache/hudi/config/HoodieCompactionConfig.java | 11 -------
 .../org/apache/hudi/config/HoodieLockConfig.java   |  2 +-
 .../config/HoodieWriteCommitCallbackConfig.java    |  4 +--
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  9 +-----
 .../org/apache/hudi/io/HoodieCreateHandle.java     |  4 ++-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  4 ++-
 .../metadata/HoodieBackedTableMetadataWriter.java  |  3 --
 .../PartitionAwareClusteringPlanStrategy.java      |  2 +-
 .../hudi/client/TestUpdateSchemaEvolution.java     | 13 ++++----
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 35 ++++++++--------------
 .../hudi/client/functional/TestHoodieIndex.java    |  2 +-
 .../hudi/table/TestHoodieMergeOnReadTable.java     |  9 +++---
 ...HoodieSparkMergeOnReadTableIncrementalRead.java |  4 +--
 ...dieSparkMergeOnReadTableInsertUpdateDelete.java |  4 +--
 .../SparkClientFunctionalTestHarness.java          |  8 ++---
 .../apache/hudi/common/util/ClusteringUtils.java   |  1 +
 .../apache/hudi/configuration/FlinkOptions.java    | 12 ++------
 .../hudi/sink/clustering/ClusteringOperator.java   | 25 +++++-----------
 .../src/test/java/HoodieJavaStreamingApp.java      |  6 ++--
 .../org/apache/hudi/functional/TestBootstrap.java  |  4 +--
 .../TestHoodieSparkMergeOnReadTableClustering.java | 28 +++++++----------
 .../apache/hudi/functional/TestOrcBootstrap.java   |  2 +-
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 13 ++------
 25 files changed, 73 insertions(+), 147 deletions(-)

diff --git a/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java b/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
index 15e81bc90e3..4530340537b 100644
--- a/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
+++ b/hudi-aws/src/main/java/org/apache/hudi/config/DynamoDbBasedLockConfig.java
@@ -46,7 +46,7 @@ public class DynamoDbBasedLockConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> DYNAMODB_LOCK_TABLE_NAME = ConfigProperty
       .key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "table")
-      .noDefaultValue()
+      .defaultValue("hudi_locks")
       .sinceVersion("0.10.0")
       .withDocumentation("For DynamoDB based lock provider, the name of the DynamoDB table acting as lock table");
 
@@ -98,7 +98,7 @@ public class DynamoDbBasedLockConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT = ConfigProperty
       .key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "table_creation_timeout")
-      .defaultValue(String.valueOf(10 * 60 * 1000))
+      .defaultValue(String.valueOf(2 * 60 * 1000))
       .sinceVersion("0.10.0")
       .withDocumentation("For DynamoDB based lock provider, the maximum number of milliseconds to wait for creating DynamoDB table");
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index e04ce341e46..124d2444705 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -218,12 +218,6 @@ public class HoodieClusteringConfig extends HoodieConfig {
       .withDocumentation("Enable running of clustering service, asynchronously as inserts happen on the table.")
       .withAlternatives("hoodie.datasource.clustering.async.enable");
 
-  public static final ConfigProperty<Boolean> PRESERVE_COMMIT_METADATA = ConfigProperty
-      .key("hoodie.clustering.preserve.commit.metadata")
-      .defaultValue(true)
-      .sinceVersion("0.9.0")
-      .withDocumentation("When rewriting data, preserves existing hoodie_commit_time");
-
   /**
    * @deprecated this setting has no effect. Please refer to clustering configuration, as well as
    * {@link #LAYOUT_OPTIMIZE_STRATEGY} config to enable advanced record layout optimization strategies
@@ -581,11 +575,6 @@ public class HoodieClusteringConfig extends HoodieConfig {
       return this;
     }
 
-    public Builder withPreserveHoodieCommitMetadata(Boolean preserveHoodieCommitMetadata) {
-      clusteringConfig.setValue(PRESERVE_COMMIT_METADATA, String.valueOf(preserveHoodieCommitMetadata));
-      return this;
-    }
-
     public Builder withRollbackPendingClustering(Boolean rollbackPendingClustering) {
       clusteringConfig.setValue(ROLLBACK_PENDING_CLUSTERING_ON_CONFLICT, String.valueOf(rollbackPendingClustering));
       return this;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index e37ff3c46bf..02d86463d80 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -151,12 +151,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
       .withDocumentation("Used by org.apache.hudi.io.compact.strategy.DayBasedCompactionStrategy to denote the number of "
           + "latest partitions to compact during a compaction run.");
 
-  public static final ConfigProperty<Boolean> PRESERVE_COMMIT_METADATA = ConfigProperty
-      .key("hoodie.compaction.preserve.commit.metadata")
-      .defaultValue(true)
-      .sinceVersion("0.11.0")
-      .withDocumentation("When rewriting data, preserves existing hoodie_commit_time");
-
   /**
    * Configs related to specific table types.
    */
@@ -423,11 +417,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
       return this;
     }
 
-    public Builder withPreserveCommitMetadata(boolean preserveCommitMetadata) {
-      compactionConfig.setValue(PRESERVE_COMMIT_METADATA, String.valueOf(preserveCommitMetadata));
-      return this;
-    }
-
     public Builder withLogCompactionBlocksThreshold(String logCompactionBlocksThreshold) {
       compactionConfig.setValue(LOG_COMPACTION_BLOCKS_THRESHOLD, logCompactionBlocksThreshold);
       return this;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
index f91c5791a03..73ab98f7067 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
@@ -76,7 +76,7 @@ public class HoodieLockConfig extends HoodieConfig {
 
   public static final ConfigProperty<String> LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS = ConfigProperty
       .key(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY)
-      .defaultValue(String.valueOf(5000L))
+      .defaultValue(String.valueOf(16000L))
       .sinceVersion("0.8.0")
       .withDocumentation("Maximum amount of time to wait between retries by lock provider client. This bounds"
           + " the maximum delay from the exponential backoff. Currently used by ZK based lock provider only.");
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java
index 8be57c05d1c..d9e0193dc72 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java
@@ -67,9 +67,9 @@ public class HoodieWriteCommitCallbackConfig extends HoodieConfig {
 
   public static final ConfigProperty<Integer> CALLBACK_HTTP_TIMEOUT_IN_SECONDS = ConfigProperty
       .key(CALLBACK_PREFIX + "http.timeout.seconds")
-      .defaultValue(3)
+      .defaultValue(30)
       .sinceVersion("0.6.0")
-      .withDocumentation("Callback timeout in seconds. 3 by default");
+      .withDocumentation("Callback timeout in seconds.");
 
   /**
    * @deprecated Use {@link #TURN_CALLBACK_ON} and its methods instead
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index c83792c22de..51539f585d9 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1537,14 +1537,6 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBoolean(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE);
   }
 
-  public boolean isPreserveHoodieCommitMetadataForClustering() {
-    return getBoolean(HoodieClusteringConfig.PRESERVE_COMMIT_METADATA);
-  }
-
-  public boolean isPreserveHoodieCommitMetadataForCompaction() {
-    return getBoolean(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA);
-  }
-
   public boolean isClusteringEnabled() {
     // TODO: future support async clustering
     return inlineClusteringEnabled() || isAsyncClusteringEnabled();
@@ -2308,6 +2300,7 @@ public class HoodieWriteConfig extends HoodieConfig {
 
   /**
    * Hoodie Client Lock Configs.
+   *
    * @return
    */
   public boolean isAutoAdjustLockConfigs() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 23edb15a780..8b20df3f1a5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -35,6 +35,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
@@ -115,7 +116,8 @@ public class HoodieCreateHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O
   public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
       String partitionPath, String fileId, Map<String, HoodieRecord<T>> recordMap,
       TaskContextSupplier taskContextSupplier) {
-    this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier, config.isPreserveHoodieCommitMetadataForCompaction());
+    // preserveMetadata is disabled by default for MDT but enabled otherwise
+    this(config, instantTime, hoodieTable, partitionPath, fileId, taskContextSupplier, !HoodieTableMetadata.isMetadataTable(config.getBasePath()));
     this.recordMap = recordMap;
     this.useWriterSchema = true;
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 4202e2c590b..073a0e0aad1 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -47,6 +47,7 @@ import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
 import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
@@ -140,7 +141,8 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O>
     super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
     this.keyToNewRecords = keyToNewRecords;
     this.useWriterSchemaForCompaction = true;
-    this.preserveMetadata = config.isPreserveHoodieCommitMetadataForCompaction();
+    // preserveMetadata is disabled by default for MDT but enabled otherwise
+    this.preserveMetadata = !HoodieTableMetadata.isMetadataTable(config.getBasePath());
     init(fileId, this.partitionPath, dataFileToBeMerged);
     validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index e5065baeb82..9ba192d4a42 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -291,9 +291,6 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
             .withInlineCompaction(false)
             .withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax())
-            // by default, the HFile does not keep the metadata fields, set up as false
-            // to always use the metadata of the new record.
-            .withPreserveCommitMetadata(false)
             .withEnableOptimizedLogBlocksScan(String.valueOf(writeConfig.enableOptimizedLogBlocksScan()))
             .build())
         .withParallelism(parallelism, parallelism)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
index 8c0bd0cab1f..5b6ee9075bd 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/PartitionAwareClusteringPlanStrategy.java
@@ -122,7 +122,7 @@ public abstract class PartitionAwareClusteringPlanStrategy<T,I,K,O> extends Clus
         .setInputGroups(clusteringGroups)
         .setExtraMetadata(getExtraMetadata())
         .setVersion(getPlanVersion())
-        .setPreserveHoodieMetadata(getWriteConfig().isPreserveHoodieCommitMetadataForClustering())
+        .setPreserveHoodieMetadata(true)
         .build());
   }
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index 8e3252a8462..12106e011fd 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -30,11 +30,11 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
 import org.apache.hudi.common.util.BaseFileUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieUpsertException;
-import org.apache.hudi.io.HoodieCreateHandle;
+import org.apache.hudi.io.CreateHandleFactory;
 import org.apache.hudi.io.HoodieMergeHandle;
+import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
 
@@ -80,7 +80,6 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness implement
   private WriteStatus prepareFirstRecordCommit(List<String> recordsStrs) throws IOException {
     // Create a bunch of records with an old version of schema
     final HoodieWriteConfig config = makeHoodieClientConfig("/exampleSchema.avsc");
-    config.setValue(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA, "false");
     final HoodieSparkTable table = HoodieSparkTable.create(config, context);
     final List<WriteStatus> statuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
       List<HoodieRecord> insertRecords = new ArrayList<>();
@@ -91,9 +90,11 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness implement
       }
       Map<String, HoodieRecord> insertRecordMap = insertRecords.stream()
           .collect(Collectors.toMap(r -> r.getRecordKey(), Function.identity()));
-      HoodieCreateHandle<?,?,?,?> createHandle =
-          new HoodieCreateHandle(config, "100", table, insertRecords.get(0).getPartitionPath(), "f1-0", insertRecordMap, supplier);
-      createHandle.write();
+      HoodieWriteHandle<?,?,?,?> createHandle = new CreateHandleFactory<>(false)
+          .create(config, "100", table, insertRecords.get(0).getPartitionPath(), "f1-0", supplier);
+      for (HoodieRecord record : insertRecordMap.values()) {
+        createHandle.write(record, createHandle.getWriterSchemaWithMetaFields(), createHandle.getConfig().getProps());
+      }
       return createHandle.close().get(0);
     }).collect();
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 7f552b6da2d..222816ff772 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -199,15 +199,6 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     return Arrays.stream(new Boolean[][] {{true}, {false}}).map(Arguments::of);
   }
 
-  private static Stream<Arguments> populateMetaFieldsAndPreserveMetadataParams() {
-    return Arrays.stream(new Boolean[][] {
-        {true, true},
-        {false, true},
-        {true, false},
-        {false, false}
-    }).map(Arguments::of);
-  }
-
   private static Stream<Arguments> rollbackFailedCommitsParams() {
     return Stream.of(
         Arguments.of(HoodieFailedWritesCleaningPolicy.LAZY, true),
@@ -1040,8 +1031,6 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
         .withCompactionConfig(
             HoodieCompactionConfig.newBuilder().compactionSmallFileSize(10000).build())
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType)
-            .withBloomIndexUpdatePartitionPath(true)
-            .withGlobalSimpleIndexUpdatePartitionPath(true)
             .build()).withTimelineLayoutVersion(VERSION_0).build();
 
     HoodieTableMetaClient.withPropertyBuilder()
@@ -1518,12 +1507,12 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
   }
 
   @ParameterizedTest
-  @MethodSource("populateMetaFieldsAndPreserveMetadataParams")
-  public void testSimpleClustering(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception {
+  @MethodSource("populateMetaFieldsParams")
+  public void testSimpleClustering(boolean populateMetaFields) throws Exception {
     // setup clustering config.
     HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
         .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
-        .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build();
+        .build();
     testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
   }
 
@@ -1567,7 +1556,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
   public void testRolblackOfRegularCommitWithPendingReplaceCommitInTimeline() throws Exception {
     HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
         .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
-        .withPreserveHoodieCommitMetadata(true).build();
+        .build();
     // trigger clustering, but do not complete
     testInsertAndClustering(clusteringConfig, true, false, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
 
@@ -1606,7 +1595,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     // setup clustering config.
     HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
         .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(false).withScheduleInlineClustering(scheduleInlineClustering)
-        .withPreserveHoodieCommitMetadata(true).build();
+        .build();
 
     HoodieWriteConfig config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(false)
         .withClusteringConfig(clusteringConfig)
@@ -1633,26 +1622,26 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
   }
 
   @ParameterizedTest
-  @MethodSource("populateMetaFieldsAndPreserveMetadataParams")
-  public void testClusteringWithSortColumns(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception {
+  @MethodSource("populateMetaFieldsParams")
+  public void testClusteringWithSortColumns(boolean populateMetaFields) throws Exception {
     // setup clustering config.
     HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
         .withClusteringSortColumns(populateMetaFields ? "_hoodie_record_key" : "_row_key")
         .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(true)
-        .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build();
+        .build();
     testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
   }
 
   @ParameterizedTest
-  @MethodSource("populateMetaFieldsAndPreserveMetadataParams")
-  public void testClusteringWithSortOneFilePerGroup(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception {
+  @MethodSource("populateMetaFieldsParams")
+  public void testClusteringWithSortOneFilePerGroup(boolean populateMetaFields) throws Exception {
     // setup clustering config.
     HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
         .withClusteringSortColumns("begin_lat,begin_lon")
         .withClusteringPlanStrategyClass(SparkSingleFileSortPlanStrategy.class.getName())
         .withClusteringExecutionStrategyClass(SparkSingleFileSortExecutionStrategy.class.getName())
         .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1)
-        .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build();
+        .build();
     // note that assertSameFileIds is true for this test because of the plan and execution strategy
     testInsertAndClustering(clusteringConfig, populateMetaFields, true, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
   }
@@ -1923,7 +1912,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     SparkRDDWriteClient client = getHoodieWriteClient(config);
     String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString();
     HoodieWriteMetadata<JavaRDD<WriteStatus>> clusterMetadata = client.cluster(clusteringCommitTime, completeClustering);
-    if (config.isPreserveHoodieCommitMetadataForClustering() && config.populateMetaFields()) {
+    if (config.populateMetaFields()) {
       verifyRecordsWrittenWithPreservedMetadata(new HashSet<>(allRecords.getRight()), allRecords.getLeft(), clusterMetadata.getWriteStatuses().collect());
     } else {
       verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords.getLeft(), clusterMetadata.getWriteStatuses().collect(), config);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
index d6d5b2495a3..7745998fd97 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
@@ -20,6 +20,7 @@ package org.apache.hudi.client.functional;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -38,7 +39,6 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieLayoutConfig;
-import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieIndex.IndexType;
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 26f2705a4ab..184c7880c9e 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -217,13 +217,12 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
   }
 
   // TODO: Enable metadata virtual keys in this test once the feature HUDI-2593 is completed
-  @ParameterizedTest
-  @ValueSource(booleans = {false, true})
-  public void testLogFileCountsAfterCompaction(boolean preserveCommitMeta) throws Exception {
+  @Test
+  public void testLogFileCountsAfterCompaction() throws Exception {
     boolean populateMetaFields = true;
     // insert 100 records
     HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true, false, HoodieIndex.IndexType.BLOOM,
-        1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build(), preserveCommitMeta);
+        1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build());
     addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
     HoodieWriteConfig config = cfgBuilder.build();
 
@@ -304,7 +303,7 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
       List<Row> rows = actual.collectAsList();
       assertEquals(updatedRecords.size(), rows.size());
       for (Row row : rows) {
-        assertEquals(row.getAs(HoodieRecord.COMMIT_TIME_METADATA_FIELD), preserveCommitMeta ? newCommitTime : compactionInstantTime);
+        assertEquals(row.getAs(HoodieRecord.COMMIT_TIME_METADATA_FIELD), newCommitTime);
       }
     }
   }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
index 275fd32ca7d..ddf458f9505 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java
@@ -170,13 +170,13 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead extends SparkClientF
 
       // verify new write shows up in snapshot mode after compaction is complete
       snapshotROFiles = getROSnapshotFiles(partitionPath);
-      validateFiles(partitionPath,2, snapshotROFiles, false, roSnapshotJobConf,400, commitTime1, compactionCommitTime,
+      validateFiles(partitionPath,2, snapshotROFiles, false, roSnapshotJobConf,400, commitTime1, updateTime,
           insertsTime);
 
       incrementalROFiles = getROIncrementalFiles(partitionPath, "002", -1, true);
       assertTrue(incrementalROFiles.length == 2);
       // verify 006 shows up because of pending compaction
-      validateFiles(partitionPath, 2, incrementalROFiles, false, roJobConf, 400, commitTime1, compactionCommitTime,
+      validateFiles(partitionPath, 2, incrementalROFiles, false, roJobConf, 400, commitTime1, updateTime,
           insertsTime);
     }
   }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
index 73b1da95648..f061c152104 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
@@ -149,7 +149,7 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
 
     HoodieWriteConfig cfg = getConfigBuilder(false)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
-            .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withPreserveCommitMetadata(true).withScheduleInlineCompaction(scheduleInlineCompaction).build())
+            .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withScheduleInlineCompaction(scheduleInlineCompaction).build())
         .build();
     try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
 
@@ -191,7 +191,7 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
 
     HoodieWriteConfig cfg = getConfigBuilder(false)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
-            .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withPreserveCommitMetadata(true).withScheduleInlineCompaction(scheduleInlineCompaction).build())
+            .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withScheduleInlineCompaction(scheduleInlineCompaction).build())
         .build();
     try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index 88d108879b4..511613d9044 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -360,20 +360,20 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe
   }
 
   protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) {
-    return getConfigBuilder(autoCommit, false, HoodieIndex.IndexType.BLOOM, compactionSmallFileSize, clusteringConfig, false);
+    return getConfigBuilder(autoCommit, false, HoodieIndex.IndexType.BLOOM, compactionSmallFileSize, clusteringConfig);
   }
 
   protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType) {
-    return getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build(), false);
+    return getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build());
   }
 
   protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType,
-      long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig, boolean preserveCommitMetaForCompaction) {
+      long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) {
     return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
         .withDeleteParallelism(2)
         .withAutoCommit(autoCommit)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(compactionSmallFileSize)
-            .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).withPreserveCommitMetadata(preserveCommitMetaForCompaction).build())
+            .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
         .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build())
         .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
         .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index 5d724ff9621..3de4f796e9b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -181,6 +181,7 @@ public class ClusteringUtils {
         .setInputGroups(clusteringGroups)
         .setExtraMetadata(extraMetadata)
         .setStrategy(strategy)
+        .setPreserveHoodieMetadata(true)
         .build();
   }
 
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 2d9243ab5b4..54ee03451cd 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -31,8 +31,6 @@ import org.apache.hudi.common.model.HoodieSyncTableStrategy;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.config.HoodieClusteringConfig;
-import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.hive.MultiPartKeysValueExtractor;
@@ -303,10 +301,7 @@ public class FlinkOptions extends HoodieConfig {
       .booleanType()
       .defaultValue(false)// default read as batch
       .withDescription("Whether to skip compaction instants and avoid reading compacted base files for streaming read to improve read performance.\n"
-          + "There are two cases that this option can be used to avoid reading duplicates:\n"
-          + "1) you are definitely sure that the consumer reads [faster than/completes before] any compaction instants "
-          + "when " + HoodieCompactionConfig.PRESERVE_COMMIT_METADATA.key() + " is set to false.\n"
-          + "2) changelog mode is enabled, this option is a solution to keep data integrity");
+          + "This option can be used to avoid reading duplicates when changelog mode is enabled, it is a solution to keep data integrity\n");
 
   // this option is experimental
   public static final ConfigOption<Boolean> READ_STREAMING_SKIP_CLUSTERING = ConfigOptions
@@ -314,10 +309,7 @@ public class FlinkOptions extends HoodieConfig {
           .booleanType()
           .defaultValue(false)
           .withDescription("Whether to skip clustering instants to avoid reading base files of clustering operations for streaming read "
-              + "to improve read performance.\n"
-              + "This option toggled to true to avoid duplicates when: \n"
-              + "1) you are definitely sure that the consumer reads [faster than/completes before] any clustering instants "
-              + "when " + HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.key() + " is set to false.\n");
+              + "to improve read performance.");
 
   public static final String START_COMMIT_EARLIEST = "earliest";
   public static final ConfigOption<String> READ_START_COMMIT = ConfigOptions
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index bf05e54117e..85a5626e3b8 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -18,23 +18,21 @@
 
 package org.apache.hudi.sink.clustering;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.ConcatenatingIterator;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.model.ClusteringGroupInfo;
 import org.apache.hudi.common.model.ClusteringOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.log.HoodieFileSliceReader;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
-import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.common.config.HoodieStorageConfig;
-import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
@@ -54,7 +52,6 @@ import org.apache.hudi.util.FlinkWriteClients;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
@@ -63,8 +60,8 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
@@ -92,8 +89,6 @@ import java.util.Spliterators;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
-import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
-
 /**
  * Operator to execute the actual clustering task assigned by the clustering plan task.
  * In order to execute scalable, the input should shuffle by the clustering event {@link ClusteringPlanEvent}.
@@ -103,7 +98,6 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
   private static final Logger LOG = LoggerFactory.getLogger(ClusteringOperator.class);
 
   private final Configuration conf;
-  private final boolean preserveHoodieMetadata;
   private final RowType rowType;
   private int taskID;
   private transient HoodieWriteConfig writeConfig;
@@ -133,10 +127,7 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
 
   public ClusteringOperator(Configuration conf, RowType rowType) {
     this.conf = conf;
-    this.preserveHoodieMetadata = conf.getBoolean(HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.key(), HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.defaultValue());
-    this.rowType = this.preserveHoodieMetadata
-        ? BulkInsertWriterHelper.addMetadataFields(rowType, false)
-        : rowType;
+    this.rowType = BulkInsertWriterHelper.addMetadataFields(rowType, false);
     this.asyncClustering = OptionsResolver.needsAsyncClustering(conf);
     this.sortClusteringEnabled = OptionsResolver.sortClusteringEnabled(conf);
 
@@ -161,7 +152,7 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
     this.table = writeClient.getHoodieTable();
 
     this.schema = AvroSchemaConverter.convertToSchema(rowType);
-    this.readerSchema = this.preserveHoodieMetadata ? this.schema : HoodieAvroUtils.addMetadataFields(this.schema);
+    this.readerSchema = this.schema;
     this.requiredPos = getRequiredPositions();
 
     this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(rowType);
@@ -228,7 +219,7 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
 
     BulkInsertWriterHelper writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,
         instantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
-        this.rowType, this.preserveHoodieMetadata);
+        this.rowType, true);
 
     List<ClusteringOperation> clusteringOps = clusteringGroupInfo.getOperations();
     boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
@@ -348,9 +339,7 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
    * Transform IndexedRecord into HoodieRecord.
    */
   private RowData transform(IndexedRecord indexedRecord) {
-    GenericRecord record = this.preserveHoodieMetadata
-        ? (GenericRecord) indexedRecord
-        : buildAvroRecordBySchema(indexedRecord, schema, requiredPos, new GenericRecordBuilder(schema));
+    GenericRecord record = (GenericRecord) indexedRecord;
     return (RowData) avroToRowDataConverter.convert(record);
   }
 
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
index ce5b7e739d2..8e5897d14e1 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
@@ -302,7 +302,7 @@ public class HoodieJavaStreamingApp {
       // wait for spark streaming to process one microbatch
       Thread.sleep(3000);
       waitTillNCommits(fs, numExpCommits, 180, 3);
-      commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
+      commitInstantTime2 = HoodieDataSourceHelpers.listCommitsSince(fs, tablePath, commitInstantTime1).stream().sorted().findFirst().get();
       LOG.info("Second commit at instant time :" + commitInstantTime2);
     }
 
@@ -312,8 +312,7 @@ public class HoodieJavaStreamingApp {
       }
       // Wait for compaction to also finish and track latest timestamp as commit timestamp
       waitTillNCommits(fs, numExpCommits, 180, 3);
-      commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
-      LOG.info("Compaction commit at instant time :" + commitInstantTime2);
+      LOG.info("Compaction commit at instant time :" + HoodieDataSourceHelpers.latestCommit(fs, tablePath));
     }
 
     /**
@@ -377,7 +376,6 @@ public class HoodieJavaStreamingApp {
         .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1")
         .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "true")
         .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true")
-        .option(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA.key(), "false")
         .option(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH().key(),"true")
         .option(HoodieWriteConfig.TBL_NAME.key(), tableName).option("checkpointLocation", checkpointLocation)
         .outputMode(OutputMode.Append());
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
index be93171adc2..63e2f7f58c1 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
@@ -58,8 +58,8 @@ import org.apache.hudi.io.storage.HoodieAvroParquetReader;
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
 import org.apache.hudi.keygen.SimpleKeyGenerator;
 import org.apache.hudi.table.action.bootstrap.BootstrapUtils;
-import org.apache.hudi.testutils.HoodieSparkClientTestBase;
 import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
+import org.apache.hudi.testutils.HoodieSparkClientTestBase;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -305,7 +305,7 @@ public class TestBootstrap extends HoodieSparkClientTestBase {
       client.compact(compactionInstant.get());
       checkBootstrapResults(totalRecords, schema, compactionInstant.get(), checkNumRawFiles,
           numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, !deltaCommit,
-          Arrays.asList(compactionInstant.get()), !config.isPreserveHoodieCommitMetadataForCompaction());
+          Arrays.asList(compactionInstant.get()), false);
     }
     client.close();
   }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
index 3a84755248e..c6b0560b87e 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
@@ -61,28 +61,20 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTes
   private static Stream<Arguments> testClustering() {
     // enableClusteringAsRow, doUpdates, populateMetaFields, preserveCommitMetadata
     return Stream.of(
-        Arguments.of(true, true, true, true),
-        Arguments.of(true, true, true, false),
-        Arguments.of(true, true, false, true),
-        Arguments.of(true, true, false, false),
-        Arguments.of(true, false, true, true),
-        Arguments.of(true, false, true, false),
-        Arguments.of(true, false, false, true),
-        Arguments.of(true, false, false, false),
-        Arguments.of(false, true, true, true),
-        Arguments.of(false, true, true, false),
-        Arguments.of(false, true, false, true),
-        Arguments.of(false, true, false, false),
-        Arguments.of(false, false, true, true),
-        Arguments.of(false, false, true, false),
-        Arguments.of(false, false, false, true),
-        Arguments.of(false, false, false, false)
+        Arguments.of(true, true, true),
+        Arguments.of(true, true, false),
+        Arguments.of(true, false, true),
+        Arguments.of(true, false, false),
+        Arguments.of(false, true, true),
+        Arguments.of(false, true, false),
+        Arguments.of(false, false, true),
+        Arguments.of(false, false, false)
     );
   }
 
   @ParameterizedTest
   @MethodSource
-  void testClustering(boolean clusteringAsRow, boolean doUpdates, boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception {
+  void testClustering(boolean clusteringAsRow, boolean doUpdates, boolean populateMetaFields) throws Exception {
     // set low compaction small File Size to generate more file groups.
     HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder()
         .forTable("test-trip-table")
@@ -106,7 +98,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTes
             .withClusteringTargetPartitions(0)
             .withInlineClustering(true)
             .withInlineClusteringNumCommits(1)
-            .withPreserveHoodieCommitMetadata(preserveCommitMetadata).build())
+            .build())
         .withRollbackUsingMarkers(false);
     addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
     HoodieWriteConfig cfg = cfgBuilder.build();
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
index 77a980f01e5..d442069d50f 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
@@ -297,7 +297,7 @@ public class TestOrcBootstrap extends HoodieSparkClientTestBase {
       client.compact(compactionInstant.get());
       checkBootstrapResults(totalRecords, schema, compactionInstant.get(), checkNumRawFiles,
           numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, !deltaCommit,
-          Arrays.asList(compactionInstant.get()), !config.isPreserveHoodieCommitMetadataForCompaction());
+          Arrays.asList(compactionInstant.get()), false);
     }
     client.close();
   }
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 879c220faa2..fe2e1702993 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -974,8 +974,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
   }
 
   @ParameterizedTest
-  @CsvSource(value = {"true, AVRO", "true, SPARK", "false, AVRO", "false, SPARK"})
-  public void testInlineClustering(String preserveCommitMetadata, HoodieRecordType recordType) throws Exception {
+  @CsvSource(value = {"AVRO", "SPARK"})
+  public void testInlineClustering(HoodieRecordType recordType) throws Exception {
     String tableBasePath = basePath + "/inlineClustering";
     // Keep it higher than batch-size to test continuous mode
     int totalRecords = 3000;
@@ -985,7 +985,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     TestHelpers.addRecordMerger(recordType, cfg.configs);
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
-    cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", "", preserveCommitMetadata));
+    cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", "2", "", ""));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
       TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
@@ -1122,13 +1122,6 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
-  private List<String> getAsyncServicesConfigs(int totalRecords, String autoClean, String inlineCluster, String inlineClusterMaxCommit,
-                                               String asyncCluster, String asyncClusterMaxCommit, String preserveCommitMetadata) {
-    List<String> configs = getAsyncServicesConfigs(totalRecords, autoClean, inlineCluster, inlineClusterMaxCommit, asyncCluster, asyncClusterMaxCommit);
-    configs.add(String.format("%s=%s", HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.key(), preserveCommitMetadata));
-    return configs;
-  }
-
   private List<String> getAsyncServicesConfigs(int totalRecords, String autoClean, String inlineCluster,
                                                String inlineClusterMaxCommit, String asyncCluster, String asyncClusterMaxCommit) {
     List<String> configs = new ArrayList<>();