You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/08/13 03:31:16 UTC

[hudi] branch master updated: [MINOR] Deprecate older configs (#3464)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0544d70  [MINOR] Deprecate older configs (#3464)
0544d70 is described below

commit 0544d70d8f4204f4e5edfe9144c17f1ed221eb7c
Author: Sagar Sumit <sa...@gmail.com>
AuthorDate: Fri Aug 13 09:01:04 2021 +0530

    [MINOR] Deprecate older configs (#3464)
    
    Rename and deprecate props in HoodieWriteConfig
    
    Rename and deprecate older props
---
 .../hudi/cli/commands/HoodieLogFileCommand.java    |   8 +-
 .../org/apache/hudi/cli/commands/SparkMain.java    |   2 +-
 .../cli/commands/TestHoodieLogFileCommand.java     |   8 +-
 .../http/HoodieWriteCommitHttpCallbackClient.java  |   2 +-
 .../callback/util/HoodieCommitCallbackFactory.java |   2 +-
 .../hudi/client/AbstractHoodieWriteClient.java     |   8 +-
 .../apache/hudi/config/HoodieBootstrapConfig.java  |  12 +-
 .../apache/hudi/config/HoodieClusteringConfig.java |  20 +-
 .../apache/hudi/config/HoodieCompactionConfig.java | 142 +++++++----
 .../apache/hudi/config/HoodieHBaseIndexConfig.java |  72 ++++--
 .../org/apache/hudi/config/HoodieIndexConfig.java  |  92 +++++--
 .../org/apache/hudi/config/HoodieLockConfig.java   | 106 ++++++---
 .../org/apache/hudi/config/HoodieMemoryConfig.java |  40 +++-
 .../apache/hudi/config/HoodiePayloadConfig.java    |  12 +-
 .../config/HoodieWriteCommitCallbackConfig.java    |  12 +-
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 264 ++++++++++++---------
 .../src/main/java/org/apache/hudi/io/IOUtils.java  |  20 +-
 .../java/org/apache/hudi/keygen/KeyGenUtils.java   |   2 +-
 .../factory/HoodieAvroKeyGeneratorFactory.java     |   6 +-
 .../apache/hudi/config/TestHoodieWriteConfig.java  |   6 +-
 ...estCreateAvroKeyGeneratorByTypeWithFactory.java |   2 +-
 .../factory/TestHoodieAvroKeyGeneratorFactory.java |  10 +-
 .../SparkSortAndSizeExecutionStrategy.java         |   2 +-
 .../hudi/index/hbase/SparkHoodieHBaseIndex.java    |   2 +-
 .../factory/HoodieSparkKeyGeneratorFactory.java    |   6 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |   2 +-
 .../apache/hudi/index/hbase/TestHBaseIndex.java    |   6 +-
 .../java/org/apache/hudi/io/TestSparkIOUtils.java  |   8 +-
 .../apache/hudi/keygen/TestCustomKeyGenerator.java |   8 +-
 .../TestCreateKeyGeneratorByTypeWithFactory.java   |   2 +-
 .../TestHoodieSparkKeyGeneratorFactory.java        |  10 +-
 .../apache/hudi/configuration/FlinkOptions.java    |   4 +-
 .../reader/DFSHoodieDatasetInputReader.java        |   4 +-
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  84 ++++++-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  18 +-
 .../command/InsertIntoHoodieTableCommand.scala     |   2 +-
 .../hudi/command/MergeIntoHoodieTableCommand.scala |   4 +-
 .../spark/sql/hudi/command/SqlKeyGenerator.scala   |   2 +-
 .../hudi/command/payload/ExpressionPayload.scala   |   6 +-
 .../hudi-spark/src/test/java/HoodieJavaApp.java    |   4 +-
 .../src/test/java/HoodieJavaStreamingApp.java      |   2 +-
 .../apache/hudi/HoodieSparkSqlWriterSuite.scala    |   4 +-
 .../apache/hudi/functional/TestCOWDataSource.scala |   2 +-
 .../functional/TestDataSourceForBootstrap.scala    |   8 +-
 .../apache/hudi/functional/TestMORDataSource.scala |   2 +-
 .../hudi/functional/TestStructuredStreaming.scala  |   4 +-
 .../hudi/utilities/deltastreamer/DeltaSync.java    |  20 +-
 .../deltastreamer/HoodieDeltaStreamer.java         |   2 +-
 .../functional/TestHoodieDeltaStreamer.java        |  22 +-
 49 files changed, 688 insertions(+), 400 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index b2319d6..1c747ab 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -204,14 +204,14 @@ public class HoodieLogFileCommand implements CommandMarker {
                       .getCommitTimeline().lastInstant().get().getTimestamp())
               .withReadBlocksLazily(
                   Boolean.parseBoolean(
-                      HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP.defaultValue()))
+                      HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED.defaultValue()))
               .withReverseReader(
                   Boolean.parseBoolean(
-                      HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLED_PROP.defaultValue()))
-              .withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP.defaultValue())
+                      HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLED.defaultValue()))
+              .withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
               .withMaxMemorySizeInBytes(
                   HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
-              .withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH_PROP.defaultValue())
+              .withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue())
               .withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
               .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
               .build();
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index fdc6d9d..ccf0327 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -361,7 +361,7 @@ public class SparkMain {
     TypedProperties properties = propsFilePath == null ? UtilHelpers.buildProperties(configs)
         : UtilHelpers.readConfig(FSUtils.getFs(propsFilePath, jsc.hadoopConfiguration()), new Path(propsFilePath), configs).getConfig();
 
-    properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP.key(), sourcePath);
+    properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH.key(), sourcePath);
 
     if (!StringUtils.isNullOrEmpty(keyGenerator) &&  KeyGeneratorType.getNames().contains(keyGenerator.toUpperCase(Locale.ROOT))) {
       properties.setProperty(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_TYPE.key(), keyGenerator.toUpperCase(Locale.ROOT));
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index e0640ed..a44601d 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -208,12 +208,12 @@ public class TestHoodieLogFileCommand extends AbstractShellIntegrationTest {
             HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
         .withReadBlocksLazily(
             Boolean.parseBoolean(
-                HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP.defaultValue()))
+                HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED.defaultValue()))
         .withReverseReader(
             Boolean.parseBoolean(
-                HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLED_PROP.defaultValue()))
-        .withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP.defaultValue())
-        .withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH_PROP.defaultValue())
+                HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLED.defaultValue()))
+        .withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
+        .withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue())
         .withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
         .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
         .build();
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCommitHttpCallbackClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCommitHttpCallbackClient.java
index 2245761..08ed283 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCommitHttpCallbackClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCommitHttpCallbackClient.java
@@ -83,7 +83,7 @@ public class HoodieWriteCommitHttpCallbackClient implements Closeable {
   }
 
   private String getUrl() {
-    return writeConfig.getString(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_URL_PROP);
+    return writeConfig.getString(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_URL);
   }
 
   private CloseableHttpClient getClient() {
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/util/HoodieCommitCallbackFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/util/HoodieCommitCallbackFactory.java
index 00610de..66166d4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/util/HoodieCommitCallbackFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/util/HoodieCommitCallbackFactory.java
@@ -39,7 +39,7 @@ public class HoodieCommitCallbackFactory {
       return (HoodieWriteCommitCallback) instance;
     } else {
       throw new HoodieCommitCallbackException(String.format("The value of the config option %s can not be null or "
-          + "empty", HoodieWriteCommitCallbackConfig.CALLBACK_CLASS_PROP.key()));
+          + "empty", HoodieWriteCommitCallbackConfig.CALLBACK_CLASS.key()));
     }
   }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index 3680d5f..dfb2fc8 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -453,19 +453,19 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
       // Do an inline compaction if enabled
       if (config.inlineCompactionEnabled()) {
         runAnyPendingCompactions(table);
-        metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP.key(), "true");
+        metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
         inlineCompact(extraMetadata);
       } else {
-        metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP.key(), "false");
+        metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "false");
       }
 
       // Do an inline clustering if enabled
       if (config.inlineClusteringEnabled()) {
         runAnyPendingClustering(table);
-        metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key(), "true");
+        metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");
         inlineCluster(extraMetadata);
       } else {
-        metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key(), "false");
+        metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "false");
       }
     }
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
index d008a1a..836b0c1 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
@@ -44,11 +44,13 @@ import java.util.Properties;
         + " writers and new hudi writers in parallel, to validate the migration.")
 public class HoodieBootstrapConfig extends HoodieConfig {
 
-  public static final ConfigProperty<String> BOOTSTRAP_BASE_PATH_PROP = ConfigProperty
+  public static final ConfigProperty<String> BOOTSTRAP_BASE_PATH = ConfigProperty
       .key("hoodie.bootstrap.base.path")
       .noDefaultValue()
       .sinceVersion("0.6.0")
       .withDocumentation("Base path of the dataset that needs to be bootstrapped as a Hudi table");
+  @Deprecated
+  public static final String BOOTSTRAP_BASE_PATH_PROP = BOOTSTRAP_BASE_PATH.key();
 
   public static final ConfigProperty<String> BOOTSTRAP_MODE_SELECTOR = ConfigProperty
       .key("hoodie.bootstrap.mode.selector")
@@ -100,11 +102,13 @@ public class HoodieBootstrapConfig extends HoodieConfig {
           + "METADATA_ONLY will generate just skeleton base files with keys/footers, avoiding full cost of rewriting the dataset. "
           + "FULL_RECORD will perform a full copy/rewrite of the data as a Hudi table.");
 
-  public static final ConfigProperty<String> BOOTSTRAP_INDEX_CLASS_PROP = ConfigProperty
+  public static final ConfigProperty<String> BOOTSTRAP_INDEX_CLASS = ConfigProperty
       .key("hoodie.bootstrap.index.class")
       .defaultValue(HFileBootstrapIndex.class.getName())
       .sinceVersion("0.6.0")
       .withDocumentation("Implementation to use, for mapping a skeleton base file to a boostrap base file.");
+  @Deprecated
+  public static final String BOOTSTRAP_INDEX_CLASS_PROP = BOOTSTRAP_INDEX_CLASS.key();
 
   private HoodieBootstrapConfig() {
     super();
@@ -126,7 +130,7 @@ public class HoodieBootstrapConfig extends HoodieConfig {
     }
 
     public Builder withBootstrapBasePath(String basePath) {
-      bootstrapConfig.setValue(BOOTSTRAP_BASE_PATH_PROP, basePath);
+      bootstrapConfig.setValue(BOOTSTRAP_BASE_PATH, basePath);
       return this;
     }
 
@@ -178,7 +182,7 @@ public class HoodieBootstrapConfig extends HoodieConfig {
 
     public HoodieBootstrapConfig build() {
       // TODO: use infer function instead
-      bootstrapConfig.setDefaultValue(BOOTSTRAP_INDEX_CLASS_PROP, HoodieTableConfig.getDefaultBootstrapIndexClass(
+      bootstrapConfig.setDefaultValue(BOOTSTRAP_INDEX_CLASS, HoodieTableConfig.getDefaultBootstrapIndexClass(
           bootstrapConfig.getProps()));
       bootstrapConfig.setDefaults(HoodieBootstrapConfig.class.getName());
       return bootstrapConfig;
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index 9e04611..4c68ec4 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -62,17 +62,21 @@ public class HoodieClusteringConfig extends HoodieConfig {
           + " clustering plan is executed. By default, we sort the file groups in th plan by the specified columns, while "
           + " meeting the configured target file sizes.");
 
-  public static final ConfigProperty<String> INLINE_CLUSTERING_PROP = ConfigProperty
+  public static final ConfigProperty<String> INLINE_CLUSTERING = ConfigProperty
       .key("hoodie.clustering.inline")
       .defaultValue("false")
       .sinceVersion("0.7.0")
       .withDocumentation("Turn on inline clustering - clustering will be run after each write operation is complete");
+  @Deprecated
+  public static final String INLINE_CLUSTERING_PROP = INLINE_CLUSTERING.key();
 
-  public static final ConfigProperty<String> INLINE_CLUSTERING_MAX_COMMIT_PROP = ConfigProperty
+  public static final ConfigProperty<String> INLINE_CLUSTERING_MAX_COMMIT = ConfigProperty
       .key("hoodie.clustering.inline.max.commits")
       .defaultValue("4")
       .sinceVersion("0.7.0")
       .withDocumentation("Config to control frequency of clustering planning");
+  @Deprecated
+  public static final String INLINE_CLUSTERING_MAX_COMMIT_PROP = INLINE_CLUSTERING_MAX_COMMIT.key();
 
   public static final ConfigProperty<String> ASYNC_CLUSTERING_MAX_COMMIT_PROP = ConfigProperty
       .key("hoodie.clustering.async.max.commits")
@@ -118,18 +122,22 @@ public class HoodieClusteringConfig extends HoodieConfig {
       .sinceVersion("0.7.0")
       .withDocumentation("Columns to sort the data by when clustering");
 
-  public static final ConfigProperty<String> CLUSTERING_UPDATES_STRATEGY_PROP = ConfigProperty
+  public static final ConfigProperty<String> CLUSTERING_UPDATES_STRATEGY = ConfigProperty
       .key("hoodie.clustering.updates.strategy")
       .defaultValue("org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy")
       .sinceVersion("0.7.0")
       .withDocumentation("Determines how to handle updates, deletes to file groups that are under clustering."
           + " Default strategy just rejects the update");
+  @Deprecated
+  public static final String CLUSTERING_UPDATES_STRATEGY_PROP = CLUSTERING_UPDATES_STRATEGY.key();
 
   public static final ConfigProperty<String> ASYNC_CLUSTERING_ENABLE = ConfigProperty
       .key("hoodie.clustering.async.enabled")
       .defaultValue("false")
       .sinceVersion("0.7.0")
       .withDocumentation("Enable running of clustering service, asynchronously as inserts happen on the table.");
+  @Deprecated
+  public static final String ASYNC_CLUSTERING_ENABLE_OPT_KEY = "hoodie.clustering.async.enabled";
 
   public static final ConfigProperty<Boolean> CLUSTERING_PRESERVE_HOODIE_COMMIT_METADATA = ConfigProperty
       .key("hoodie.clustering.preserve.commit.metadata")
@@ -202,12 +210,12 @@ public class HoodieClusteringConfig extends HoodieConfig {
     }
 
     public Builder withInlineClustering(Boolean inlineClustering) {
-      clusteringConfig.setValue(INLINE_CLUSTERING_PROP, String.valueOf(inlineClustering));
+      clusteringConfig.setValue(INLINE_CLUSTERING, String.valueOf(inlineClustering));
       return this;
     }
 
     public Builder withInlineClusteringNumCommits(int numCommits) {
-      clusteringConfig.setValue(INLINE_CLUSTERING_MAX_COMMIT_PROP, String.valueOf(numCommits));
+      clusteringConfig.setValue(INLINE_CLUSTERING_MAX_COMMIT, String.valueOf(numCommits));
       return this;
     }
 
@@ -222,7 +230,7 @@ public class HoodieClusteringConfig extends HoodieConfig {
     }
 
     public Builder withClusteringUpdatesStrategy(String updatesStrategyClass) {
-      clusteringConfig.setValue(CLUSTERING_UPDATES_STRATEGY_PROP, updatesStrategyClass);
+      clusteringConfig.setValue(CLUSTERING_UPDATES_STRATEGY, updatesStrategyClass);
       return this;
     }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index 5bc3a7c..1ae8495 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -50,60 +50,78 @@ import java.util.stream.Collectors;
         + "cleaning (reclamation of older/unused file groups/slices).")
 public class HoodieCompactionConfig extends HoodieConfig {
 
-  public static final ConfigProperty<String> AUTO_CLEAN_PROP = ConfigProperty
+  public static final ConfigProperty<String> AUTO_CLEAN = ConfigProperty
       .key("hoodie.clean.automatic")
       .defaultValue("true")
       .withDocumentation("When enabled, the cleaner table service is invoked immediately after each commit,"
           + " to delete older file slices. It's recommended to enable this, to ensure metadata and data storage"
           + " growth is bounded.");
+  @Deprecated
+  public static final String AUTO_CLEAN_PROP = AUTO_CLEAN.key();
 
-  public static final ConfigProperty<String> ASYNC_CLEAN_PROP = ConfigProperty
+  public static final ConfigProperty<String> ASYNC_CLEAN = ConfigProperty
       .key("hoodie.clean.async")
       .defaultValue("false")
-      .withDocumentation("Only applies when " + AUTO_CLEAN_PROP.key() + " is turned on. "
+      .withDocumentation("Only applies when " + AUTO_CLEAN.key() + " is turned on. "
           + "When turned on runs cleaner async with writing, which can speed up overall write performance.");
+  @Deprecated
+  public static final String ASYNC_CLEAN_PROP = ASYNC_CLEAN.key();;
 
-  public static final ConfigProperty<String> CLEANER_COMMITS_RETAINED_PROP = ConfigProperty
+  public static final ConfigProperty<String> CLEANER_COMMITS_RETAINED = ConfigProperty
       .key("hoodie.cleaner.commits.retained")
       .defaultValue("10")
       .withDocumentation("Number of commits to retain, without cleaning. This will be retained for num_of_commits * time_between_commits "
           + "(scheduled). This also directly translates into how much data retention the table supports for incremental queries.");
+  @Deprecated
+  public static final String CLEANER_COMMITS_RETAINED_PROP = CLEANER_COMMITS_RETAINED.key();;
 
-  public static final ConfigProperty<String> CLEANER_POLICY_PROP = ConfigProperty
+  public static final ConfigProperty<String> CLEANER_POLICY = ConfigProperty
       .key("hoodie.cleaner.policy")
       .defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
       .withDocumentation("Cleaning policy to be used. The cleaner service deletes older file slices files to re-claim space."
-          + " By default, cleaner spares the file slices written by the last N commits, determined by  " + CLEANER_COMMITS_RETAINED_PROP.key()
+          + " By default, cleaner spares the file slices written by the last N commits, determined by  " + CLEANER_COMMITS_RETAINED.key()
           + " Long running query plans may often refer to older file slices and will break if those are cleaned, before the query has had"
           + "   a chance to run. So, it is good to make sure that the data is retained for more than the maximum query execution time");
+  @Deprecated
+  public static final String CLEANER_POLICY_PROP = CLEANER_POLICY.key();
 
-  public static final ConfigProperty<String> INLINE_COMPACT_PROP = ConfigProperty
+  public static final ConfigProperty<String> INLINE_COMPACT = ConfigProperty
       .key("hoodie.compact.inline")
       .defaultValue("false")
       .withDocumentation("When set to true, compaction service is triggered after each write. While being "
           + " simpler operationally, this adds extra latency on the write path.");
+  @Deprecated
+  public static final String INLINE_COMPACT_PROP = INLINE_COMPACT.key();
 
-  public static final ConfigProperty<String> INLINE_COMPACT_NUM_DELTA_COMMITS_PROP = ConfigProperty
+  public static final ConfigProperty<String> INLINE_COMPACT_NUM_DELTA_COMMITS = ConfigProperty
       .key("hoodie.compact.inline.max.delta.commits")
       .defaultValue("5")
       .withDocumentation("Number of delta commits after the last compaction, before scheduling of a new compaction is attempted.");
+  @Deprecated
+  public static final String INLINE_COMPACT_NUM_DELTA_COMMITS_PROP = INLINE_COMPACT_NUM_DELTA_COMMITS.key();
 
-  public static final ConfigProperty<String> INLINE_COMPACT_TIME_DELTA_SECONDS_PROP = ConfigProperty
+  public static final ConfigProperty<String> INLINE_COMPACT_TIME_DELTA_SECONDS = ConfigProperty
       .key("hoodie.compact.inline.max.delta.seconds")
       .defaultValue(String.valueOf(60 * 60))
       .withDocumentation("Number of elapsed seconds after the last compaction, before scheduling a new one.");
+  @Deprecated
+  public static final String INLINE_COMPACT_TIME_DELTA_SECONDS_PROP = INLINE_COMPACT_TIME_DELTA_SECONDS.key();
 
-  public static final ConfigProperty<String> INLINE_COMPACT_TRIGGER_STRATEGY_PROP = ConfigProperty
+  public static final ConfigProperty<String> INLINE_COMPACT_TRIGGER_STRATEGY = ConfigProperty
       .key("hoodie.compact.inline.trigger.strategy")
       .defaultValue(CompactionTriggerStrategy.NUM_COMMITS.name())
       .withDocumentation("Controls how compaction scheduling is triggered, by time or num delta commits or combination of both. "
           + "Valid options: " + Arrays.stream(CompactionTriggerStrategy.values()).map(Enum::name).collect(Collectors.joining(",")));
+  @Deprecated
+  public static final String INLINE_COMPACT_TRIGGER_STRATEGY_PROP = INLINE_COMPACT_TRIGGER_STRATEGY.key();
 
-  public static final ConfigProperty<String> CLEANER_FILE_VERSIONS_RETAINED_PROP = ConfigProperty
+  public static final ConfigProperty<String> CLEANER_FILE_VERSIONS_RETAINED = ConfigProperty
       .key("hoodie.cleaner.fileversions.retained")
       .defaultValue("3")
       .withDocumentation("When " + HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name() + " cleaning policy is used, "
           + " the minimum number of file slices to retain in each file group, during cleaning.");
+  @Deprecated
+  public static final String CLEANER_FILE_VERSIONS_RETAINED_PROP = CLEANER_COMMITS_RETAINED.key();
 
   public static final ConfigProperty<String> CLEANER_INCREMENTAL_MODE = ConfigProperty
       .key("hoodie.cleaner.incremental.mode")
@@ -112,24 +130,30 @@ public class HoodieCompactionConfig extends HoodieConfig {
           + " in the timeline, since the last cleaner run. This is much more efficient than obtaining listings for the full"
           + " table for each planning (even with a metadata table).");
 
-  public static final ConfigProperty<String> MAX_COMMITS_TO_KEEP_PROP = ConfigProperty
+  public static final ConfigProperty<String> MAX_COMMITS_TO_KEEP = ConfigProperty
       .key("hoodie.keep.max.commits")
       .defaultValue("30")
       .withDocumentation("Archiving service moves older entries from timeline into an archived log after each write, to "
           + " keep the metadata overhead constant, even as the table size grows."
           + "This config controls the maximum number of instants to retain in the active timeline. ");
+  @Deprecated
+  public static final String MAX_COMMITS_TO_KEEP_PROP = MAX_COMMITS_TO_KEEP.key();
 
-  public static final ConfigProperty<String> MIN_COMMITS_TO_KEEP_PROP = ConfigProperty
+  public static final ConfigProperty<String> MIN_COMMITS_TO_KEEP = ConfigProperty
       .key("hoodie.keep.min.commits")
       .defaultValue("20")
-      .withDocumentation("Similar to " + MAX_COMMITS_TO_KEEP_PROP.key() + ", but controls the minimum number of"
+      .withDocumentation("Similar to " + MAX_COMMITS_TO_KEEP.key() + ", but controls the minimum number of"
           + "instants to retain in the active timeline.");
+  @Deprecated
+  public static final String MIN_COMMITS_TO_KEEP_PROP = MIN_COMMITS_TO_KEEP.key();
 
-  public static final ConfigProperty<String> COMMITS_ARCHIVAL_BATCH_SIZE_PROP = ConfigProperty
+  public static final ConfigProperty<String> COMMITS_ARCHIVAL_BATCH_SIZE = ConfigProperty
       .key("hoodie.commits.archival.batch")
       .defaultValue(String.valueOf(10))
       .withDocumentation("Archiving of instants is batched in best-effort manner, to pack more instants into a single"
           + " archive log. This config controls such archival batch size.");
+  @Deprecated
+  public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = COMMITS_ARCHIVAL_BATCH_SIZE.key();
 
   public static final ConfigProperty<String> CLEANER_BOOTSTRAP_BASE_FILE_ENABLED = ConfigProperty
       .key("hoodie.cleaner.delete.bootstrap.base.file")
@@ -146,13 +170,15 @@ public class HoodieCompactionConfig extends HoodieConfig {
           + " new files, to keep number of files to an optimum. This config sets the file size limit below which a file on storage "
           + " becomes a candidate to be selected as such a `small file`. By default, treat any file <= 100MB as a small file.");
 
-  public static final ConfigProperty<String> RECORD_SIZE_ESTIMATION_THRESHOLD_PROP = ConfigProperty
+  public static final ConfigProperty<String> RECORD_SIZE_ESTIMATION_THRESHOLD = ConfigProperty
       .key("hoodie.record.size.estimation.threshold")
       .defaultValue("1.0")
       .withDocumentation("We use the previous commits' metadata to calculate the estimated record size and use it "
           + " to bin pack records into partitions. If the previous commit is too small to make an accurate estimation, "
           + " Hudi will search commits in the reverse order, until we find a commit that has totalBytesWritten "
           + " larger than (PARQUET_SMALL_FILE_LIMIT_BYTES * this_threshold)");
+  @Deprecated
+  public static final String RECORD_SIZE_ESTIMATION_THRESHOLD_PROP = RECORD_SIZE_ESTIMATION_THRESHOLD.key();
 
   public static final ConfigProperty<String> CLEANER_PARALLELISM = ConfigProperty
       .key("hoodie.cleaner.parallelism")
@@ -160,51 +186,65 @@ public class HoodieCompactionConfig extends HoodieConfig {
       .withDocumentation("Parallelism for the cleaning operation. Increase this if cleaning becomes slow.");
 
   // 500GB of target IO per compaction (both read and write
-  public static final ConfigProperty<String> TARGET_IO_PER_COMPACTION_IN_MB_PROP = ConfigProperty
+  public static final ConfigProperty<String> TARGET_IO_PER_COMPACTION_IN_MB = ConfigProperty
       .key("hoodie.compaction.target.io")
       .defaultValue(String.valueOf(500 * 1024))
       .withDocumentation("Amount of MBs to spend during compaction run for the LogFileSizeBasedCompactionStrategy. "
           + "This value helps bound ingestion latency while compaction is run inline mode.");
+  @Deprecated
+  public static final String TARGET_IO_PER_COMPACTION_IN_MB_PROP = TARGET_IO_PER_COMPACTION_IN_MB.key();
 
-  public static final ConfigProperty<String> COMPACTION_STRATEGY_PROP = ConfigProperty
+  public static final ConfigProperty<String> COMPACTION_STRATEGY = ConfigProperty
       .key("hoodie.compaction.strategy")
       .defaultValue(LogFileSizeBasedCompactionStrategy.class.getName())
       .withDocumentation("Compaction strategy decides which file groups are picked up for "
           + "compaction during each compaction run. By default. Hudi picks the log file "
           + "with most accumulated unmerged data");
+  @Deprecated
+  public static final String COMPACTION_STRATEGY_PROP = COMPACTION_STRATEGY.key();
 
-  public static final ConfigProperty<String> PAYLOAD_CLASS_PROP = ConfigProperty
+  public static final ConfigProperty<String> PAYLOAD_CLASS = ConfigProperty
       .key("hoodie.compaction.payload.class")
       .defaultValue(OverwriteWithLatestAvroPayload.class.getName())
       .withDocumentation("This needs to be same as class used during insert/upserts. Just like writing, compaction also uses "
           + "the record payload class to merge records in the log against each other, merge again with the base file and "
           + "produce the final record to be written after compaction.");
+  @Deprecated
+  public static final String PAYLOAD_CLASS_PROP = PAYLOAD_CLASS.key();
 
-  public static final ConfigProperty<String> COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = ConfigProperty
+  public static final ConfigProperty<String> COMPACTION_LAZY_BLOCK_READ_ENABLED = ConfigProperty
       .key("hoodie.compaction.lazy.block.read")
       .defaultValue("false")
       .withDocumentation("When merging the delta log files, this config helps to choose whether the log blocks "
           + "should be read lazily or not. Choose true to use lazy block reading (low memory usage, but incurs seeks to each block"
           + " header) or false for immediate block read (higher memory usage)");
+  @Deprecated
+  public static final String COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP = COMPACTION_LAZY_BLOCK_READ_ENABLED.key();
 
-  public static final ConfigProperty<String> COMPACTION_REVERSE_LOG_READ_ENABLED_PROP = ConfigProperty
+  public static final ConfigProperty<String> COMPACTION_REVERSE_LOG_READ_ENABLED = ConfigProperty
       .key("hoodie.compaction.reverse.log.read")
       .defaultValue("false")
       .withDocumentation("HoodieLogFormatReader reads a logfile in the forward direction starting from pos=0 to pos=file_length. "
           + "If this config is set to true, the reader reads the logfile in reverse direction, from pos=file_length to pos=0");
+  @Deprecated
+  public static final String COMPACTION_REVERSE_LOG_READ_ENABLED_PROP = COMPACTION_REVERSE_LOG_READ_ENABLED.key();
 
-  public static final ConfigProperty<String> FAILED_WRITES_CLEANER_POLICY_PROP = ConfigProperty
+  public static final ConfigProperty<String> FAILED_WRITES_CLEANER_POLICY = ConfigProperty
       .key("hoodie.cleaner.policy.failed.writes")
       .defaultValue(HoodieFailedWritesCleaningPolicy.EAGER.name())
       .withDocumentation("Cleaning policy for failed writes to be used. Hudi will delete any files written by "
           + "failed writes to re-claim space. Choose to perform this rollback of failed writes eagerly before "
           + "every writer starts (only supported for single writer) or lazily by the cleaner (required for multi-writers)");
+  @Deprecated
+  public static final String FAILED_WRITES_CLEANER_POLICY_PROP = FAILED_WRITES_CLEANER_POLICY.key();
 
-  public static final ConfigProperty<String> TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = ConfigProperty
+  public static final ConfigProperty<String> TARGET_PARTITIONS_PER_DAYBASED_COMPACTION = ConfigProperty
       .key("hoodie.compaction.daybased.target.partitions")
       .defaultValue("10")
       .withDocumentation("Used by org.apache.hudi.io.compact.strategy.DayBasedCompactionStrategy to denote the number of "
           + "latest partitions to compact during a compaction run.");
+  @Deprecated
+  public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = TARGET_PARTITIONS_PER_DAYBASED_COMPACTION.key();
 
   /**
    * Configs related to specific table types.
@@ -256,12 +296,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
     }
 
     public Builder withAutoClean(Boolean autoClean) {
-      compactionConfig.setValue(AUTO_CLEAN_PROP, String.valueOf(autoClean));
+      compactionConfig.setValue(AUTO_CLEAN, String.valueOf(autoClean));
       return this;
     }
 
     public Builder withAsyncClean(Boolean asyncClean) {
-      compactionConfig.setValue(ASYNC_CLEAN_PROP, String.valueOf(asyncClean));
+      compactionConfig.setValue(ASYNC_CLEAN, String.valueOf(asyncClean));
       return this;
     }
 
@@ -271,33 +311,33 @@ public class HoodieCompactionConfig extends HoodieConfig {
     }
 
     public Builder withInlineCompaction(Boolean inlineCompaction) {
-      compactionConfig.setValue(INLINE_COMPACT_PROP, String.valueOf(inlineCompaction));
+      compactionConfig.setValue(INLINE_COMPACT, String.valueOf(inlineCompaction));
       return this;
     }
 
     public Builder withInlineCompactionTriggerStrategy(CompactionTriggerStrategy compactionTriggerStrategy) {
-      compactionConfig.setValue(INLINE_COMPACT_TRIGGER_STRATEGY_PROP, compactionTriggerStrategy.name());
+      compactionConfig.setValue(INLINE_COMPACT_TRIGGER_STRATEGY, compactionTriggerStrategy.name());
       return this;
     }
 
     public Builder withCleanerPolicy(HoodieCleaningPolicy policy) {
-      compactionConfig.setValue(CLEANER_POLICY_PROP, policy.name());
+      compactionConfig.setValue(CLEANER_POLICY, policy.name());
       return this;
     }
 
     public Builder retainFileVersions(int fileVersionsRetained) {
-      compactionConfig.setValue(CLEANER_FILE_VERSIONS_RETAINED_PROP, String.valueOf(fileVersionsRetained));
+      compactionConfig.setValue(CLEANER_FILE_VERSIONS_RETAINED, String.valueOf(fileVersionsRetained));
       return this;
     }
 
     public Builder retainCommits(int commitsRetained) {
-      compactionConfig.setValue(CLEANER_COMMITS_RETAINED_PROP, String.valueOf(commitsRetained));
+      compactionConfig.setValue(CLEANER_COMMITS_RETAINED, String.valueOf(commitsRetained));
       return this;
     }
 
     public Builder archiveCommitsWith(int minToKeep, int maxToKeep) {
-      compactionConfig.setValue(MIN_COMMITS_TO_KEEP_PROP, String.valueOf(minToKeep));
-      compactionConfig.setValue(MAX_COMMITS_TO_KEEP_PROP, String.valueOf(maxToKeep));
+      compactionConfig.setValue(MIN_COMMITS_TO_KEEP, String.valueOf(minToKeep));
+      compactionConfig.setValue(MAX_COMMITS_TO_KEEP, String.valueOf(maxToKeep));
       return this;
     }
 
@@ -307,7 +347,7 @@ public class HoodieCompactionConfig extends HoodieConfig {
     }
 
     public Builder compactionRecordSizeEstimateThreshold(double threshold) {
-      compactionConfig.setValue(RECORD_SIZE_ESTIMATION_THRESHOLD_PROP, String.valueOf(threshold));
+      compactionConfig.setValue(RECORD_SIZE_ESTIMATION_THRESHOLD, String.valueOf(threshold));
       return this;
     }
 
@@ -332,47 +372,47 @@ public class HoodieCompactionConfig extends HoodieConfig {
     }
 
     public Builder withCompactionStrategy(CompactionStrategy compactionStrategy) {
-      compactionConfig.setValue(COMPACTION_STRATEGY_PROP, compactionStrategy.getClass().getName());
+      compactionConfig.setValue(COMPACTION_STRATEGY, compactionStrategy.getClass().getName());
       return this;
     }
 
     public Builder withPayloadClass(String payloadClassName) {
-      compactionConfig.setValue(PAYLOAD_CLASS_PROP, payloadClassName);
+      compactionConfig.setValue(PAYLOAD_CLASS, payloadClassName);
       return this;
     }
 
     public Builder withTargetIOPerCompactionInMB(long targetIOPerCompactionInMB) {
-      compactionConfig.setValue(TARGET_IO_PER_COMPACTION_IN_MB_PROP, String.valueOf(targetIOPerCompactionInMB));
+      compactionConfig.setValue(TARGET_IO_PER_COMPACTION_IN_MB, String.valueOf(targetIOPerCompactionInMB));
       return this;
     }
 
     public Builder withMaxNumDeltaCommitsBeforeCompaction(int maxNumDeltaCommitsBeforeCompaction) {
-      compactionConfig.setValue(INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, String.valueOf(maxNumDeltaCommitsBeforeCompaction));
+      compactionConfig.setValue(INLINE_COMPACT_NUM_DELTA_COMMITS, String.valueOf(maxNumDeltaCommitsBeforeCompaction));
       return this;
     }
 
     public Builder withMaxDeltaSecondsBeforeCompaction(int maxDeltaSecondsBeforeCompaction) {
-      compactionConfig.setValue(INLINE_COMPACT_TIME_DELTA_SECONDS_PROP, String.valueOf(maxDeltaSecondsBeforeCompaction));
+      compactionConfig.setValue(INLINE_COMPACT_TIME_DELTA_SECONDS, String.valueOf(maxDeltaSecondsBeforeCompaction));
       return this;
     }
 
     public Builder withCompactionLazyBlockReadEnabled(Boolean compactionLazyBlockReadEnabled) {
-      compactionConfig.setValue(COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, String.valueOf(compactionLazyBlockReadEnabled));
+      compactionConfig.setValue(COMPACTION_LAZY_BLOCK_READ_ENABLED, String.valueOf(compactionLazyBlockReadEnabled));
       return this;
     }
 
     public Builder withCompactionReverseLogReadEnabled(Boolean compactionReverseLogReadEnabled) {
-      compactionConfig.setValue(COMPACTION_REVERSE_LOG_READ_ENABLED_PROP, String.valueOf(compactionReverseLogReadEnabled));
+      compactionConfig.setValue(COMPACTION_REVERSE_LOG_READ_ENABLED, String.valueOf(compactionReverseLogReadEnabled));
       return this;
     }
 
     public Builder withTargetPartitionsPerDayBasedCompaction(int targetPartitionsPerCompaction) {
-      compactionConfig.setValue(TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP, String.valueOf(targetPartitionsPerCompaction));
+      compactionConfig.setValue(TARGET_PARTITIONS_PER_DAYBASED_COMPACTION, String.valueOf(targetPartitionsPerCompaction));
       return this;
     }
 
     public Builder withCommitsArchivalBatchSize(int batchSize) {
-      compactionConfig.setValue(COMMITS_ARCHIVAL_BATCH_SIZE_PROP, String.valueOf(batchSize));
+      compactionConfig.setValue(COMMITS_ARCHIVAL_BATCH_SIZE, String.valueOf(batchSize));
       return this;
     }
 
@@ -382,32 +422,32 @@ public class HoodieCompactionConfig extends HoodieConfig {
     }
 
     public Builder withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy failedWritesPolicy) {
-      compactionConfig.setValue(FAILED_WRITES_CLEANER_POLICY_PROP, failedWritesPolicy.name());
+      compactionConfig.setValue(FAILED_WRITES_CLEANER_POLICY, failedWritesPolicy.name());
       return this;
     }
 
     public HoodieCompactionConfig build() {
       compactionConfig.setDefaults(HoodieCompactionConfig.class.getName());
       // validation
-      HoodieCleaningPolicy.valueOf(compactionConfig.getString(CLEANER_POLICY_PROP));
+      HoodieCleaningPolicy.valueOf(compactionConfig.getString(CLEANER_POLICY));
 
       // Ensure minInstantsToKeep > cleanerCommitsRetained, otherwise we will archive some
       // commit instant on timeline, that still has not been cleaned. Could miss some data via incr pull
-      int minInstantsToKeep = Integer.parseInt(compactionConfig.getStringOrDefault(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP));
-      int maxInstantsToKeep = Integer.parseInt(compactionConfig.getStringOrDefault(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP_PROP));
+      int minInstantsToKeep = Integer.parseInt(compactionConfig.getStringOrDefault(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP));
+      int maxInstantsToKeep = Integer.parseInt(compactionConfig.getStringOrDefault(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP));
       int cleanerCommitsRetained =
-          Integer.parseInt(compactionConfig.getStringOrDefault(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP));
+          Integer.parseInt(compactionConfig.getStringOrDefault(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED));
       ValidationUtils.checkArgument(maxInstantsToKeep > minInstantsToKeep,
           String.format(
               "Increase %s=%d to be greater than %s=%d.",
-              HoodieCompactionConfig.MAX_COMMITS_TO_KEEP_PROP.key(), maxInstantsToKeep,
-              HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP.key(), minInstantsToKeep));
+              HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), maxInstantsToKeep,
+              HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), minInstantsToKeep));
       ValidationUtils.checkArgument(minInstantsToKeep > cleanerCommitsRetained,
           String.format(
               "Increase %s=%d to be greater than %s=%d. Otherwise, there is risk of incremental pull "
                   + "missing data from few instants.",
-              HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP.key(), minInstantsToKeep,
-              HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP.key(), cleanerCommitsRetained));
+              HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), minInstantsToKeep,
+              HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), cleanerCommitsRetained));
       return compactionConfig;
     }
   }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java
index 4e30046..2c8da8b 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java
@@ -36,27 +36,35 @@ import java.util.Properties;
         + "records as either inserts or updates to older records.")
 public class HoodieHBaseIndexConfig extends HoodieConfig {
 
-  public static final ConfigProperty<String> HBASE_ZKQUORUM_PROP = ConfigProperty
+  public static final ConfigProperty<String> HBASE_ZKQUORUM = ConfigProperty
       .key("hoodie.index.hbase.zkquorum")
       .noDefaultValue()
       .withDocumentation("Only applies if index type is HBASE. HBase ZK Quorum url to connect to");
+  @Deprecated
+  public static final String HBASE_ZKQUORUM_PROP = HBASE_ZKQUORUM.key();
 
-  public static final ConfigProperty<String> HBASE_ZKPORT_PROP = ConfigProperty
+  public static final ConfigProperty<String> HBASE_ZKPORT = ConfigProperty
       .key("hoodie.index.hbase.zkport")
       .noDefaultValue()
       .withDocumentation("Only applies if index type is HBASE. HBase ZK Quorum port to connect to");
+  @Deprecated
+  public static final String HBASE_ZKPORT_PROP = HBASE_ZKPORT.key();
 
-  public static final ConfigProperty<String> HBASE_TABLENAME_PROP = ConfigProperty
+  public static final ConfigProperty<String> HBASE_TABLENAME = ConfigProperty
       .key("hoodie.index.hbase.table")
       .noDefaultValue()
       .withDocumentation("Only applies if index type is HBASE. HBase Table name to use as the index. "
           + "Hudi stores the row_key and [partition_path, fileID, commitTime] mapping in the table");
+  @Deprecated
+  public static final String HBASE_TABLENAME_PROP = HBASE_TABLENAME.key();
 
-  public static final ConfigProperty<Integer> HBASE_GET_BATCH_SIZE_PROP = ConfigProperty
+  public static final ConfigProperty<Integer> HBASE_GET_BATCH_SIZE = ConfigProperty
       .key("hoodie.index.hbase.get.batch.size")
       .defaultValue(100)
       .withDocumentation("Controls the batch size for performing gets against HBase. "
           + "Batching improves throughput, by saving round trips.");
+  @Deprecated
+  public static final String HBASE_GET_BATCH_SIZE_PROP = HBASE_GET_BATCH_SIZE.key();
 
   public static final ConfigProperty<String> HBASE_ZK_ZNODEPARENT = ConfigProperty
       .key("hoodie.index.hbase.zknode.path")
@@ -64,11 +72,13 @@ public class HoodieHBaseIndexConfig extends HoodieConfig {
       .withDocumentation("Only applies if index type is HBASE. This is the root znode that will contain "
           + "all the znodes created/used by HBase");
 
-  public static final ConfigProperty<Integer> HBASE_PUT_BATCH_SIZE_PROP = ConfigProperty
+  public static final ConfigProperty<Integer> HBASE_PUT_BATCH_SIZE = ConfigProperty
       .key("hoodie.index.hbase.put.batch.size")
       .defaultValue(100)
       .withDocumentation("Controls the batch size for performing puts against HBase. "
           + "Batching improves throughput, by saving round trips.");
+  @Deprecated
+  public static final String HBASE_PUT_BATCH_SIZE_PROP = HBASE_PUT_BATCH_SIZE.key();
 
   public static final ConfigProperty<String> HBASE_INDEX_QPS_ALLOCATOR_CLASS = ConfigProperty
       .key("hoodie.index.hbase.qps.allocator.class")
@@ -76,56 +86,70 @@ public class HoodieHBaseIndexConfig extends HoodieConfig {
       .withDocumentation("Property to set which implementation of HBase QPS resource allocator to be used, which"
           + "controls the batching rate dynamically.");
 
-  public static final ConfigProperty<String> HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP = ConfigProperty
+  public static final ConfigProperty<String> HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE = ConfigProperty
       .key("hoodie.index.hbase.put.batch.size.autocompute")
       .defaultValue("false")
       .withDocumentation("Property to set to enable auto computation of put batch size");
+  @Deprecated
+  public static final String HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP = HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE.key();
 
-  public static final ConfigProperty<Float> HBASE_QPS_FRACTION_PROP = ConfigProperty
+  public static final ConfigProperty<Float> HBASE_QPS_FRACTION = ConfigProperty
       .key("hoodie.index.hbase.qps.fraction")
       .defaultValue(0.5f)
       .withDocumentation("Property to set the fraction of the global share of QPS that should be allocated to this job. Let's say there are 3"
           + " jobs which have input size in terms of number of rows required for HbaseIndexing as x, 2x, 3x respectively. Then"
           + " this fraction for the jobs would be (0.17) 1/6, 0.33 (2/6) and 0.5 (3/6) respectively."
           + " Default is 50%, which means a total of 2 jobs can run using HbaseIndex without overwhelming Region Servers.");
+  @Deprecated
+  public static final String HBASE_QPS_FRACTION_PROP = HBASE_QPS_FRACTION.key();
 
-  public static final ConfigProperty<Integer> HBASE_MAX_QPS_PER_REGION_SERVER_PROP = ConfigProperty
+  public static final ConfigProperty<Integer> HBASE_MAX_QPS_PER_REGION_SERVER = ConfigProperty
       .key("hoodie.index.hbase.max.qps.per.region.server")
       .defaultValue(1000)
       .withDocumentation("Property to set maximum QPS allowed per Region Server. This should be same across various jobs. This is intended to\n"
           + " limit the aggregate QPS generated across various jobs to an Hbase Region Server. It is recommended to set this\n"
           + " value based on global indexing throughput needs and most importantly, how much the HBase installation in use is\n"
           + " able to tolerate without Region Servers going down.");
+  @Deprecated
+  public static final String HBASE_MAX_QPS_PER_REGION_SERVER_PROP = HBASE_MAX_QPS_PER_REGION_SERVER.key();
 
   public static final ConfigProperty<Boolean> HOODIE_INDEX_COMPUTE_QPS_DYNAMICALLY = ConfigProperty
       .key("hoodie.index.hbase.dynamic_qps")
       .defaultValue(false)
       .withDocumentation("Property to decide if HBASE_QPS_FRACTION_PROP is dynamically calculated based on write volume.");
 
-  public static final ConfigProperty<String> HBASE_MIN_QPS_FRACTION_PROP = ConfigProperty
+  public static final ConfigProperty<String> HBASE_MIN_QPS_FRACTION = ConfigProperty
       .key("hoodie.index.hbase.min.qps.fraction")
       .noDefaultValue()
       .withDocumentation("Minimum for HBASE_QPS_FRACTION_PROP to stabilize skewed write workloads");
+  @Deprecated
+  public static final String HBASE_MIN_QPS_FRACTION_PROP = HBASE_MIN_QPS_FRACTION.key();
 
-  public static final ConfigProperty<String> HBASE_MAX_QPS_FRACTION_PROP = ConfigProperty
+  public static final ConfigProperty<String> HBASE_MAX_QPS_FRACTION = ConfigProperty
       .key("hoodie.index.hbase.max.qps.fraction")
       .noDefaultValue()
       .withDocumentation("Maximum for HBASE_QPS_FRACTION_PROP to stabilize skewed write workloads");
+  @Deprecated
+  public static final String HBASE_MAX_QPS_FRACTION_PROP = HBASE_MAX_QPS_FRACTION.key();
 
   public static final ConfigProperty<Integer> HOODIE_INDEX_DESIRED_PUTS_TIME_IN_SECS = ConfigProperty
       .key("hoodie.index.hbase.desired_puts_time_in_secs")
       .defaultValue(600)
       .withDocumentation("");
 
-  public static final ConfigProperty<String> HBASE_SLEEP_MS_PUT_BATCH_PROP = ConfigProperty
+  public static final ConfigProperty<String> HBASE_SLEEP_MS_PUT_BATCH = ConfigProperty
       .key("hoodie.index.hbase.sleep.ms.for.put.batch")
       .noDefaultValue()
       .withDocumentation("");
+  @Deprecated
+  public static final String HBASE_SLEEP_MS_PUT_BATCH_PROP = HBASE_SLEEP_MS_PUT_BATCH.key();
 
-  public static final ConfigProperty<String> HBASE_SLEEP_MS_GET_BATCH_PROP = ConfigProperty
+  public static final ConfigProperty<String> HBASE_SLEEP_MS_GET_BATCH = ConfigProperty
       .key("hoodie.index.hbase.sleep.ms.for.get.batch")
       .noDefaultValue()
       .withDocumentation("");
+  @Deprecated
+  public static final String HBASE_SLEEP_MS_GET_BATCH_PROP = HBASE_SLEEP_MS_GET_BATCH.key();;
 
   public static final ConfigProperty<Integer> HOODIE_INDEX_HBASE_ZK_SESSION_TIMEOUT_MS = ConfigProperty
       .key("hoodie.index.hbase.zk.session_timeout_ms")
@@ -182,17 +206,17 @@ public class HoodieHBaseIndexConfig extends HoodieConfig {
     }
 
     public HoodieHBaseIndexConfig.Builder hbaseZkQuorum(String zkString) {
-      hBaseIndexConfig.setValue(HBASE_ZKQUORUM_PROP, zkString);
+      hBaseIndexConfig.setValue(HBASE_ZKQUORUM, zkString);
       return this;
     }
 
     public HoodieHBaseIndexConfig.Builder hbaseZkPort(int port) {
-      hBaseIndexConfig.setValue(HBASE_ZKPORT_PROP, String.valueOf(port));
+      hBaseIndexConfig.setValue(HBASE_ZKPORT, String.valueOf(port));
       return this;
     }
 
     public HoodieHBaseIndexConfig.Builder hbaseTableName(String tableName) {
-      hBaseIndexConfig.setValue(HBASE_TABLENAME_PROP, tableName);
+      hBaseIndexConfig.setValue(HBASE_TABLENAME, tableName);
       return this;
     }
 
@@ -202,17 +226,17 @@ public class HoodieHBaseIndexConfig extends HoodieConfig {
     }
 
     public Builder hbaseIndexGetBatchSize(int getBatchSize) {
-      hBaseIndexConfig.setValue(HBASE_GET_BATCH_SIZE_PROP, String.valueOf(getBatchSize));
+      hBaseIndexConfig.setValue(HBASE_GET_BATCH_SIZE, String.valueOf(getBatchSize));
       return this;
     }
 
     public Builder hbaseIndexPutBatchSize(int putBatchSize) {
-      hBaseIndexConfig.setValue(HBASE_PUT_BATCH_SIZE_PROP, String.valueOf(putBatchSize));
+      hBaseIndexConfig.setValue(HBASE_PUT_BATCH_SIZE, String.valueOf(putBatchSize));
       return this;
     }
 
     public Builder hbaseIndexPutBatchSizeAutoCompute(boolean putBatchSizeAutoCompute) {
-      hBaseIndexConfig.setValue(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP, String.valueOf(putBatchSizeAutoCompute));
+      hBaseIndexConfig.setValue(HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE, String.valueOf(putBatchSizeAutoCompute));
       return this;
     }
 
@@ -227,27 +251,27 @@ public class HoodieHBaseIndexConfig extends HoodieConfig {
     }
 
     public Builder hbaseIndexQPSFraction(float qpsFraction) {
-      hBaseIndexConfig.setValue(HBASE_QPS_FRACTION_PROP, String.valueOf(qpsFraction));
+      hBaseIndexConfig.setValue(HBASE_QPS_FRACTION, String.valueOf(qpsFraction));
       return this;
     }
 
     public Builder hbaseIndexMinQPSFraction(float minQPSFraction) {
-      hBaseIndexConfig.setValue(HBASE_MIN_QPS_FRACTION_PROP, String.valueOf(minQPSFraction));
+      hBaseIndexConfig.setValue(HBASE_MIN_QPS_FRACTION, String.valueOf(minQPSFraction));
       return this;
     }
 
     public Builder hbaseIndexMaxQPSFraction(float maxQPSFraction) {
-      hBaseIndexConfig.setValue(HBASE_MAX_QPS_FRACTION_PROP, String.valueOf(maxQPSFraction));
+      hBaseIndexConfig.setValue(HBASE_MAX_QPS_FRACTION, String.valueOf(maxQPSFraction));
       return this;
     }
 
     public Builder hbaseIndexSleepMsBetweenPutBatch(int sleepMsBetweenPutBatch) {
-      hBaseIndexConfig.setValue(HBASE_SLEEP_MS_PUT_BATCH_PROP, String.valueOf(sleepMsBetweenPutBatch));
+      hBaseIndexConfig.setValue(HBASE_SLEEP_MS_PUT_BATCH, String.valueOf(sleepMsBetweenPutBatch));
       return this;
     }
 
     public Builder hbaseIndexSleepMsBetweenGetBatch(int sleepMsBetweenGetBatch) {
-      hBaseIndexConfig.setValue(HBASE_SLEEP_MS_GET_BATCH_PROP, String.valueOf(sleepMsBetweenGetBatch));
+      hBaseIndexConfig.setValue(HBASE_SLEEP_MS_GET_BATCH, String.valueOf(sleepMsBetweenGetBatch));
       return this;
     }
 
@@ -293,7 +317,7 @@ public class HoodieHBaseIndexConfig extends HoodieConfig {
      */
     public HoodieHBaseIndexConfig.Builder hbaseIndexMaxQPSPerRegionServer(int maxQPSPerRegionServer) {
       // This should be same across various jobs
-      hBaseIndexConfig.setValue(HoodieHBaseIndexConfig.HBASE_MAX_QPS_PER_REGION_SERVER_PROP,
+      hBaseIndexConfig.setValue(HoodieHBaseIndexConfig.HBASE_MAX_QPS_PER_REGION_SERVER,
           String.valueOf(maxQPSPerRegionServer));
       return this;
     }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
index 91524f1..7f74fbd 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java
@@ -34,6 +34,12 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.util.Properties;
 
+import static org.apache.hudi.config.HoodieHBaseIndexConfig.HBASE_ZKQUORUM;
+import static org.apache.hudi.config.HoodieHBaseIndexConfig.HBASE_ZKPORT;
+import static org.apache.hudi.config.HoodieHBaseIndexConfig.HBASE_TABLENAME;
+import static org.apache.hudi.config.HoodieHBaseIndexConfig.HBASE_GET_BATCH_SIZE;
+import static org.apache.hudi.config.HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE;
+
 /**
  * Indexing related config.
  */
@@ -44,19 +50,23 @@ import java.util.Properties;
         + "which tags incoming records as either inserts or updates to older records.")
 public class HoodieIndexConfig extends HoodieConfig {
 
-  public static final ConfigProperty<String> INDEX_TYPE_PROP = ConfigProperty
+  public static final ConfigProperty<String> INDEX_TYPE = ConfigProperty
       .key("hoodie.index.type")
       .noDefaultValue()
       .withDocumentation("Type of index to use. Default is Bloom filter. "
           + "Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | GLOBAL_SIMPLE | INMEMORY | HBASE]. "
           + "Bloom filters removes the dependency on a external system "
           + "and is stored in the footer of the Parquet Data Files");
+  @Deprecated
+  public static final String INDEX_TYPE_PROP = INDEX_TYPE.key();
 
-  public static final ConfigProperty<String> INDEX_CLASS_PROP = ConfigProperty
+  public static final ConfigProperty<String> INDEX_CLASS = ConfigProperty
       .key("hoodie.index.class")
       .defaultValue("")
       .withDocumentation("Full path of user-defined index class and must be a subclass of HoodieIndex class. "
           + "It will take precedence over the hoodie.index.type configuration if specified");
+  @Deprecated
+  public static final String INDEX_CLASS_PROP = INDEX_CLASS.key();
 
   // ***** Bloom Index configs *****
   public static final ConfigProperty<String> BLOOM_FILTER_NUM_ENTRIES = ConfigProperty
@@ -81,14 +91,16 @@ public class HoodieIndexConfig extends HoodieConfig {
           + "If the number of entries added to bloom filter exceeds the configured value (hoodie.index.bloom.num_entries), "
           + "then this fpp may not be honored.");
 
-  public static final ConfigProperty<String> BLOOM_INDEX_PARALLELISM_PROP = ConfigProperty
+  public static final ConfigProperty<String> BLOOM_INDEX_PARALLELISM = ConfigProperty
       .key("hoodie.bloom.index.parallelism")
       .defaultValue("0")
       .withDocumentation("Only applies if index type is BLOOM. "
           + "This is the amount of parallelism for index lookup, which involves a shuffle. "
           + "By default, this is auto computed based on input workload characteristics.");
+  @Deprecated
+  public static final String BLOOM_INDEX_PARALLELISM_PROP = BLOOM_INDEX_PARALLELISM.key();
 
-  public static final ConfigProperty<String> BLOOM_INDEX_PRUNE_BY_RANGES_PROP = ConfigProperty
+  public static final ConfigProperty<String> BLOOM_INDEX_PRUNE_BY_RANGES = ConfigProperty
       .key("hoodie.bloom.index.prune.by.ranges")
       .defaultValue("true")
       .withDocumentation("Only applies if index type is BLOOM. "
@@ -96,28 +108,36 @@ public class HoodieIndexConfig extends HoodieConfig {
           + "if the key has a monotonously increasing prefix, such as timestamp. "
           + "If the record key is completely random, it is better to turn this off, since range pruning will only "
           + " add extra overhead to the index lookup.");
+  @Deprecated
+  public static final String BLOOM_INDEX_PRUNE_BY_RANGES_PROP = BLOOM_INDEX_PRUNE_BY_RANGES.key();
 
-  public static final ConfigProperty<String> BLOOM_INDEX_USE_CACHING_PROP = ConfigProperty
+  public static final ConfigProperty<String> BLOOM_INDEX_USE_CACHING = ConfigProperty
       .key("hoodie.bloom.index.use.caching")
       .defaultValue("true")
       .withDocumentation("Only applies if index type is BLOOM."
           + "When true, the input RDD will cached to speed up index lookup by reducing IO "
           + "for computing parallelism or affected partitions");
+  @Deprecated
+  public static final String BLOOM_INDEX_USE_CACHING_PROP = BLOOM_INDEX_USE_CACHING.key();
 
-  public static final ConfigProperty<String> BLOOM_INDEX_TREE_BASED_FILTER_PROP = ConfigProperty
+  public static final ConfigProperty<String> BLOOM_INDEX_TREE_BASED_FILTER = ConfigProperty
       .key("hoodie.bloom.index.use.treebased.filter")
       .defaultValue("true")
       .withDocumentation("Only applies if index type is BLOOM. "
           + "When true, interval tree based file pruning optimization is enabled. "
           + "This mode speeds-up file-pruning based on key ranges when compared with the brute-force mode");
+  @Deprecated
+  public static final String BLOOM_INDEX_TREE_BASED_FILTER_PROP = BLOOM_INDEX_TREE_BASED_FILTER.key();
 
   // TODO: On by default. Once stable, we will remove the other mode.
-  public static final ConfigProperty<String> BLOOM_INDEX_BUCKETIZED_CHECKING_PROP = ConfigProperty
+  public static final ConfigProperty<String> BLOOM_INDEX_BUCKETIZED_CHECKING = ConfigProperty
       .key("hoodie.bloom.index.bucketized.checking")
       .defaultValue("true")
       .withDocumentation("Only applies if index type is BLOOM. "
           + "When true, bucketized bloom filtering is enabled. "
           + "This reduces skew seen in sort based bloom index lookup");
+  @Deprecated
+  public static final String BLOOM_INDEX_BUCKETIZED_CHECKING_PROP = BLOOM_INDEX_BUCKETIZED_CHECKING.key();
 
   public static final ConfigProperty<String> BLOOM_INDEX_FILTER_TYPE = ConfigProperty
       .key("hoodie.bloom.index.filter.type")
@@ -132,34 +152,42 @@ public class HoodieIndexConfig extends HoodieConfig {
       .withDocumentation("The threshold for the maximum number of keys to record in a dynamic Bloom filter row. "
           + "Only applies if filter type is BloomFilterTypeCode.DYNAMIC_V0.");
 
-  public static final ConfigProperty<String> SIMPLE_INDEX_USE_CACHING_PROP = ConfigProperty
+  public static final ConfigProperty<String> SIMPLE_INDEX_USE_CACHING = ConfigProperty
       .key("hoodie.simple.index.use.caching")
       .defaultValue("true")
       .withDocumentation("Only applies if index type is SIMPLE. "
           + "When true, the incoming writes will cached to speed up index lookup by reducing IO "
           + "for computing parallelism or affected partitions");
+  @Deprecated
+  public static final String SIMPLE_INDEX_USE_CACHING_PROP = SIMPLE_INDEX_USE_CACHING.key();
 
-  public static final ConfigProperty<String> SIMPLE_INDEX_PARALLELISM_PROP = ConfigProperty
+  public static final ConfigProperty<String> SIMPLE_INDEX_PARALLELISM = ConfigProperty
       .key("hoodie.simple.index.parallelism")
       .defaultValue("50")
       .withDocumentation("Only applies if index type is SIMPLE. "
           + "This is the amount of parallelism for index lookup, which involves a Spark Shuffle");
+  @Deprecated
+  public static final String SIMPLE_INDEX_PARALLELISM_PROP = SIMPLE_INDEX_PARALLELISM.key();
 
-  public static final ConfigProperty<String> GLOBAL_SIMPLE_INDEX_PARALLELISM_PROP = ConfigProperty
+  public static final ConfigProperty<String> GLOBAL_SIMPLE_INDEX_PARALLELISM = ConfigProperty
       .key("hoodie.global.simple.index.parallelism")
       .defaultValue("100")
       .withDocumentation("Only applies if index type is GLOBAL_SIMPLE. "
           + "This is the amount of parallelism for index lookup, which involves a Spark Shuffle");
+  @Deprecated
+  public static final String GLOBAL_SIMPLE_INDEX_PARALLELISM_PROP = GLOBAL_SIMPLE_INDEX_PARALLELISM.key();
 
   // 1B bloom filter checks happen in 250 seconds. 500ms to read a bloom filter.
   // 10M checks in 2500ms, thus amortizing the cost of reading bloom filter across partitions.
-  public static final ConfigProperty<String> BLOOM_INDEX_KEYS_PER_BUCKET_PROP = ConfigProperty
+  public static final ConfigProperty<String> BLOOM_INDEX_KEYS_PER_BUCKET = ConfigProperty
       .key("hoodie.bloom.index.keys.per.bucket")
       .defaultValue("10000000")
       .withDocumentation("Only applies if bloomIndexBucketizedChecking is enabled and index type is bloom. "
           + "This configuration controls the “bucket” size which tracks the number of record-key checks made against "
           + "a single file and is the unit of work allocated to each partition performing bloom filter lookup. "
           + "A higher value would amortize the fixed cost of reading a bloom filter to memory.");
+  @Deprecated
+  public static final String BLOOM_INDEX_KEYS_PER_BUCKET_PROP = BLOOM_INDEX_KEYS_PER_BUCKET.key();
 
   public static final ConfigProperty<String> BLOOM_INDEX_INPUT_STORAGE_LEVEL = ConfigProperty
       .key("hoodie.bloom.index.input.storage.level")
@@ -194,6 +222,22 @@ public class HoodieIndexConfig extends HoodieConfig {
       .defaultValue("false")
       .withDocumentation("Similar to " + BLOOM_INDEX_UPDATE_PARTITION_PATH + ", but for simple index.");
 
+  /**
+   * Deprecated configs. These are now part of {@link HoodieHBaseIndexConfig}.
+   */
+  @Deprecated
+  public static final String HBASE_ZKQUORUM_PROP = HBASE_ZKQUORUM.key();
+  @Deprecated
+  public static final String HBASE_ZKPORT_PROP = HBASE_ZKPORT.key();
+  @Deprecated
+  public static final String HBASE_ZK_ZNODEPARENT = HoodieHBaseIndexConfig.HBASE_ZK_ZNODEPARENT.key();
+  @Deprecated
+  public static final String HBASE_TABLENAME_PROP = HBASE_TABLENAME.key();
+  @Deprecated
+  public static final String HBASE_GET_BATCH_SIZE_PROP = HBASE_GET_BATCH_SIZE.key();
+  @Deprecated
+  public static final String HBASE_PUT_BATCH_SIZE_PROP = HBASE_PUT_BATCH_SIZE.key();
+
   private EngineType engineType;
 
   /**
@@ -231,12 +275,12 @@ public class HoodieIndexConfig extends HoodieConfig {
     }
 
     public Builder withIndexType(HoodieIndex.IndexType indexType) {
-      hoodieIndexConfig.setValue(INDEX_TYPE_PROP, indexType.name());
+      hoodieIndexConfig.setValue(INDEX_TYPE, indexType.name());
       return this;
     }
 
     public Builder withIndexClass(String indexClass) {
-      hoodieIndexConfig.setValue(INDEX_CLASS_PROP, indexClass);
+      hoodieIndexConfig.setValue(INDEX_CLASS, indexClass);
       return this;
     }
 
@@ -256,32 +300,32 @@ public class HoodieIndexConfig extends HoodieConfig {
     }
 
     public Builder bloomIndexParallelism(int parallelism) {
-      hoodieIndexConfig.setValue(BLOOM_INDEX_PARALLELISM_PROP, String.valueOf(parallelism));
+      hoodieIndexConfig.setValue(BLOOM_INDEX_PARALLELISM, String.valueOf(parallelism));
       return this;
     }
 
     public Builder bloomIndexPruneByRanges(boolean pruneRanges) {
-      hoodieIndexConfig.setValue(BLOOM_INDEX_PRUNE_BY_RANGES_PROP, String.valueOf(pruneRanges));
+      hoodieIndexConfig.setValue(BLOOM_INDEX_PRUNE_BY_RANGES, String.valueOf(pruneRanges));
       return this;
     }
 
     public Builder bloomIndexUseCaching(boolean useCaching) {
-      hoodieIndexConfig.setValue(BLOOM_INDEX_USE_CACHING_PROP, String.valueOf(useCaching));
+      hoodieIndexConfig.setValue(BLOOM_INDEX_USE_CACHING, String.valueOf(useCaching));
       return this;
     }
 
     public Builder bloomIndexTreebasedFilter(boolean useTreeFilter) {
-      hoodieIndexConfig.setValue(BLOOM_INDEX_TREE_BASED_FILTER_PROP, String.valueOf(useTreeFilter));
+      hoodieIndexConfig.setValue(BLOOM_INDEX_TREE_BASED_FILTER, String.valueOf(useTreeFilter));
       return this;
     }
 
     public Builder bloomIndexBucketizedChecking(boolean bucketizedChecking) {
-      hoodieIndexConfig.setValue(BLOOM_INDEX_BUCKETIZED_CHECKING_PROP, String.valueOf(bucketizedChecking));
+      hoodieIndexConfig.setValue(BLOOM_INDEX_BUCKETIZED_CHECKING, String.valueOf(bucketizedChecking));
       return this;
     }
 
     public Builder bloomIndexKeysPerBucket(int keysPerBucket) {
-      hoodieIndexConfig.setValue(BLOOM_INDEX_KEYS_PER_BUCKET_PROP, String.valueOf(keysPerBucket));
+      hoodieIndexConfig.setValue(BLOOM_INDEX_KEYS_PER_BUCKET, String.valueOf(keysPerBucket));
       return this;
     }
 
@@ -296,12 +340,12 @@ public class HoodieIndexConfig extends HoodieConfig {
     }
 
     public Builder withSimpleIndexParallelism(int parallelism) {
-      hoodieIndexConfig.setValue(SIMPLE_INDEX_PARALLELISM_PROP, String.valueOf(parallelism));
+      hoodieIndexConfig.setValue(SIMPLE_INDEX_PARALLELISM, String.valueOf(parallelism));
       return this;
     }
 
     public Builder simpleIndexUseCaching(boolean useCaching) {
-      hoodieIndexConfig.setValue(SIMPLE_INDEX_USE_CACHING_PROP, String.valueOf(useCaching));
+      hoodieIndexConfig.setValue(SIMPLE_INDEX_USE_CACHING, String.valueOf(useCaching));
       return this;
     }
 
@@ -311,7 +355,7 @@ public class HoodieIndexConfig extends HoodieConfig {
     }
 
     public Builder withGlobalSimpleIndexParallelism(int parallelism) {
-      hoodieIndexConfig.setValue(GLOBAL_SIMPLE_INDEX_PARALLELISM_PROP, String.valueOf(parallelism));
+      hoodieIndexConfig.setValue(GLOBAL_SIMPLE_INDEX_PARALLELISM, String.valueOf(parallelism));
       return this;
     }
 
@@ -326,11 +370,11 @@ public class HoodieIndexConfig extends HoodieConfig {
     }
 
     public HoodieIndexConfig build() {
-      hoodieIndexConfig.setDefaultValue(INDEX_TYPE_PROP, getDefaultIndexType(engineType));
+      hoodieIndexConfig.setDefaultValue(INDEX_TYPE, getDefaultIndexType(engineType));
       hoodieIndexConfig.setDefaults(HoodieIndexConfig.class.getName());
 
       // Throws IllegalArgumentException if the value set is not a known Hoodie Index Type
-      HoodieIndex.IndexType.valueOf(hoodieIndexConfig.getString(INDEX_TYPE_PROP));
+      HoodieIndex.IndexType.valueOf(hoodieIndexConfig.getString(INDEX_TYPE));
       return hoodieIndexConfig;
     }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
index 162dc61..41ad4e5 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
@@ -64,121 +64,157 @@ import static org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT
         + " are auto managed internally.")
 public class HoodieLockConfig extends HoodieConfig {
 
-  public static final ConfigProperty<String> LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP = ConfigProperty
+  public static final ConfigProperty<String> LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS = ConfigProperty
       .key(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY)
       .defaultValue(DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS)
       .sinceVersion("0.8.0")
       .withDocumentation("Initial amount of time to wait between retries to acquire locks, "
           + " subsequent retries will exponentially backoff.");
+  @Deprecated
+  public static final String LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP = LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.key();
 
-  public static final ConfigProperty<String> LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP = ConfigProperty
+  public static final ConfigProperty<String> LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS = ConfigProperty
       .key(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY)
       .defaultValue(String.valueOf(5000L))
       .sinceVersion("0.8.0")
       .withDocumentation("Maximum amount of time to wait between retries by lock provider client. This bounds"
           + " the maximum delay from the exponential backoff. Currently used by ZK based lock provider only.");
+  @Deprecated
+  public static final String LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP = LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.key();
 
-  public static final ConfigProperty<String> LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP = ConfigProperty
+  public static final ConfigProperty<String> LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS = ConfigProperty
       .key(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY)
       .defaultValue(String.valueOf(10000L))
       .sinceVersion("0.8.0")
       .withDocumentation("Amount of time to wait between retries on the lock provider by the lock manager");
+  @Deprecated
+  public static final String LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP = LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.key();
 
-  public static final ConfigProperty<String> LOCK_ACQUIRE_NUM_RETRIES_PROP = ConfigProperty
+  public static final ConfigProperty<String> LOCK_ACQUIRE_NUM_RETRIES = ConfigProperty
       .key(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY)
       .defaultValue(DEFAULT_LOCK_ACQUIRE_NUM_RETRIES)
       .sinceVersion("0.8.0")
       .withDocumentation("Maximum number of times to retry lock acquire, at each lock provider");
+  @Deprecated
+  public static final String LOCK_ACQUIRE_NUM_RETRIES_PROP = LOCK_ACQUIRE_NUM_RETRIES.key();
 
-  public static final ConfigProperty<String> LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP = ConfigProperty
+  public static final ConfigProperty<String> LOCK_ACQUIRE_CLIENT_NUM_RETRIES = ConfigProperty
       .key(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY)
       .defaultValue(String.valueOf(0))
       .sinceVersion("0.8.0")
       .withDocumentation("Maximum number of times to retry to acquire lock additionally from the lock manager.");
+  @Deprecated
+  public static final String LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP = LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key();
 
-  public static final ConfigProperty<Integer> LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP = ConfigProperty
+  public static final ConfigProperty<Integer> LOCK_ACQUIRE_WAIT_TIMEOUT_MS = ConfigProperty
       .key(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY)
       .defaultValue(60 * 1000)
       .sinceVersion("0.8.0")
       .withDocumentation("Timeout in ms, to wait on an individual lock acquire() call, at the lock provider.");
+  @Deprecated
+  public static final String LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP = LOCK_ACQUIRE_WAIT_TIMEOUT_MS.key();
 
-  public static final ConfigProperty<String> FILESYSTEM_LOCK_PATH_PROP = ConfigProperty
+  public static final ConfigProperty<String> FILESYSTEM_LOCK_PATH = ConfigProperty
       .key(FILESYSTEM_LOCK_PATH_PROP_KEY)
       .noDefaultValue()
       .sinceVersion("0.8.0")
       .withDocumentation("For DFS based lock providers, path to store the locks under.");
+  @Deprecated
+  public static final String FILESYSTEM_LOCK_PATH_PROP = FILESYSTEM_LOCK_PATH.key();
 
-  public static final ConfigProperty<String> HIVE_DATABASE_NAME_PROP = ConfigProperty
+  public static final ConfigProperty<String> HIVE_DATABASE_NAME = ConfigProperty
       .key(HIVE_DATABASE_NAME_PROP_KEY)
       .noDefaultValue()
       .sinceVersion("0.8.0")
       .withDocumentation("For Hive based lock provider, the Hive database to acquire lock against");
+  @Deprecated
+  public static final String HIVE_DATABASE_NAME_PROP = HIVE_DATABASE_NAME.key();
 
-  public static final ConfigProperty<String> HIVE_TABLE_NAME_PROP = ConfigProperty
+  public static final ConfigProperty<String> HIVE_TABLE_NAME = ConfigProperty
       .key(HIVE_TABLE_NAME_PROP_KEY)
       .noDefaultValue()
       .sinceVersion("0.8.0")
       .withDocumentation("For Hive based lock provider, the Hive table to acquire lock against");
+  @Deprecated
+  public static final String HIVE_TABLE_NAME_PROP = HIVE_TABLE_NAME.key();
 
-  public static final ConfigProperty<String> HIVE_METASTORE_URI_PROP = ConfigProperty
+  public static final ConfigProperty<String> HIVE_METASTORE_URI = ConfigProperty
       .key(HIVE_METASTORE_URI_PROP_KEY)
       .noDefaultValue()
       .sinceVersion("0.8.0")
       .withDocumentation("For Hive based lock provider, the Hive metastore URI to acquire locks against.");
+  @Deprecated
+  public static final String HIVE_METASTORE_URI_PROP = HIVE_METASTORE_URI.key();
 
-  public static final ConfigProperty<String> ZK_BASE_PATH_PROP = ConfigProperty
+  public static final ConfigProperty<String> ZK_BASE_PATH = ConfigProperty
       .key(ZK_BASE_PATH_PROP_KEY)
       .noDefaultValue()
       .sinceVersion("0.8.0")
       .withDocumentation("The base path on Zookeeper under which to create lock related ZNodes. "
           + "This should be same for all concurrent writers to the same table");
+  @Deprecated
+  public static final String ZK_BASE_PATH_PROP = ZK_BASE_PATH.key();
 
-  public static final ConfigProperty<Integer> ZK_SESSION_TIMEOUT_MS_PROP = ConfigProperty
+  public static final ConfigProperty<Integer> ZK_SESSION_TIMEOUT_MS = ConfigProperty
       .key(ZK_SESSION_TIMEOUT_MS_PROP_KEY)
       .defaultValue(DEFAULT_ZK_SESSION_TIMEOUT_MS)
       .sinceVersion("0.8.0")
       .withDocumentation("Timeout in ms, to wait after losing connection to ZooKeeper, before the session is expired");
+  @Deprecated
+  public static final String ZK_SESSION_TIMEOUT_MS_PROP = ZK_SESSION_TIMEOUT_MS.key();
 
-  public static final ConfigProperty<Integer> ZK_CONNECTION_TIMEOUT_MS_PROP = ConfigProperty
+  public static final ConfigProperty<Integer> ZK_CONNECTION_TIMEOUT_MS = ConfigProperty
       .key(ZK_CONNECTION_TIMEOUT_MS_PROP_KEY)
       .defaultValue(DEFAULT_ZK_CONNECTION_TIMEOUT_MS)
       .sinceVersion("0.8.0")
       .withDocumentation("Timeout in ms, to wait for establishing connection with Zookeeper.");
+  @Deprecated
+  public static final String ZK_CONNECTION_TIMEOUT_MS_PROP = ZK_CONNECTION_TIMEOUT_MS.key();
 
-  public static final ConfigProperty<String> ZK_CONNECT_URL_PROP = ConfigProperty
+  public static final ConfigProperty<String> ZK_CONNECT_URL = ConfigProperty
       .key(ZK_CONNECT_URL_PROP_KEY)
       .noDefaultValue()
       .sinceVersion("0.8.0")
       .withDocumentation("Zookeeper URL to connect to.");
+  @Deprecated
+  public static final String ZK_CONNECT_URL_PROP = ZK_CONNECT_URL.key();
 
-  public static final ConfigProperty<String> ZK_PORT_PROP = ConfigProperty
+  public static final ConfigProperty<String> ZK_PORT = ConfigProperty
       .key(ZK_PORT_PROP_KEY)
       .noDefaultValue()
       .sinceVersion("0.8.0")
       .withDocumentation("Zookeeper port to connect to.");
+  @Deprecated
+  public static final String ZK_PORT_PROP = ZK_PORT.key();
 
-  public static final ConfigProperty<String> ZK_LOCK_KEY_PROP = ConfigProperty
+  public static final ConfigProperty<String> ZK_LOCK_KEY = ConfigProperty
       .key(ZK_LOCK_KEY_PROP_KEY)
       .noDefaultValue()
       .sinceVersion("0.8.0")
       .withDocumentation("Key name under base_path at which to create a ZNode and acquire lock. "
           + "Final path on zk will look like base_path/lock_key. We recommend setting this to the table name");
+  @Deprecated
+  public static final String ZK_LOCK_KEY_PROP = ZK_LOCK_KEY.key();
 
   // Pluggable type of lock provider
-  public static final ConfigProperty<String> LOCK_PROVIDER_CLASS_PROP = ConfigProperty
+  public static final ConfigProperty<String> LOCK_PROVIDER_CLASS = ConfigProperty
       .key(LOCK_PREFIX + "provider")
       .defaultValue(ZookeeperBasedLockProvider.class.getName())
       .sinceVersion("0.8.0")
       .withDocumentation("Lock provider class name, user can provide their own implementation of LockProvider "
           + "which should be subclass of org.apache.hudi.common.lock.LockProvider");
+  @Deprecated
+  public static final String LOCK_PROVIDER_CLASS_PROP = LOCK_PROVIDER_CLASS.key();
 
   // Pluggable strategies to use when resolving conflicts
-  public static final ConfigProperty<String> WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_PROP = ConfigProperty
+  public static final ConfigProperty<String> WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS = ConfigProperty
       .key(LOCK_PREFIX + "conflict.resolution.strategy")
       .defaultValue(SimpleConcurrentFileWritesConflictResolutionStrategy.class.getName())
       .sinceVersion("0.8.0")
       .withDocumentation("Lock provider class name, this should be subclass of "
           + "org.apache.hudi.client.transaction.ConflictResolutionStrategy");
+  @Deprecated
+  public static final String WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_PROP = WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS.key();
 
   private HoodieLockConfig() {
     super();
@@ -205,87 +241,87 @@ public class HoodieLockConfig extends HoodieConfig {
     }
 
     public HoodieLockConfig.Builder withLockProvider(Class<? extends LockProvider> lockProvider) {
-      lockConfig.setValue(LOCK_PROVIDER_CLASS_PROP, lockProvider.getName());
+      lockConfig.setValue(LOCK_PROVIDER_CLASS, lockProvider.getName());
       return this;
     }
 
     public HoodieLockConfig.Builder withHiveDatabaseName(String databaseName) {
-      lockConfig.setValue(HIVE_DATABASE_NAME_PROP, databaseName);
+      lockConfig.setValue(HIVE_DATABASE_NAME, databaseName);
       return this;
     }
 
     public HoodieLockConfig.Builder withHiveTableName(String tableName) {
-      lockConfig.setValue(HIVE_TABLE_NAME_PROP, tableName);
+      lockConfig.setValue(HIVE_TABLE_NAME, tableName);
       return this;
     }
 
     public HoodieLockConfig.Builder withHiveMetastoreURIs(String hiveMetastoreURIs) {
-      lockConfig.setValue(HIVE_METASTORE_URI_PROP, hiveMetastoreURIs);
+      lockConfig.setValue(HIVE_METASTORE_URI, hiveMetastoreURIs);
       return this;
     }
 
     public HoodieLockConfig.Builder withZkQuorum(String zkQuorum) {
-      lockConfig.setValue(ZK_CONNECT_URL_PROP, zkQuorum);
+      lockConfig.setValue(ZK_CONNECT_URL, zkQuorum);
       return this;
     }
 
     public HoodieLockConfig.Builder withZkBasePath(String zkBasePath) {
-      lockConfig.setValue(ZK_BASE_PATH_PROP, zkBasePath);
+      lockConfig.setValue(ZK_BASE_PATH, zkBasePath);
       return this;
     }
 
     public HoodieLockConfig.Builder withZkPort(String zkPort) {
-      lockConfig.setValue(ZK_PORT_PROP, zkPort);
+      lockConfig.setValue(ZK_PORT, zkPort);
       return this;
     }
 
     public HoodieLockConfig.Builder withZkLockKey(String zkLockKey) {
-      lockConfig.setValue(ZK_LOCK_KEY_PROP, zkLockKey);
+      lockConfig.setValue(ZK_LOCK_KEY, zkLockKey);
       return this;
     }
 
     public HoodieLockConfig.Builder withZkConnectionTimeoutInMs(Long connectionTimeoutInMs) {
-      lockConfig.setValue(ZK_CONNECTION_TIMEOUT_MS_PROP, String.valueOf(connectionTimeoutInMs));
+      lockConfig.setValue(ZK_CONNECTION_TIMEOUT_MS, String.valueOf(connectionTimeoutInMs));
       return this;
     }
 
     public HoodieLockConfig.Builder withZkSessionTimeoutInMs(Long sessionTimeoutInMs) {
-      lockConfig.setValue(ZK_SESSION_TIMEOUT_MS_PROP, String.valueOf(sessionTimeoutInMs));
+      lockConfig.setValue(ZK_SESSION_TIMEOUT_MS, String.valueOf(sessionTimeoutInMs));
       return this;
     }
 
     public HoodieLockConfig.Builder withNumRetries(int numRetries) {
-      lockConfig.setValue(LOCK_ACQUIRE_NUM_RETRIES_PROP, String.valueOf(numRetries));
+      lockConfig.setValue(LOCK_ACQUIRE_NUM_RETRIES, String.valueOf(numRetries));
       return this;
     }
 
     public HoodieLockConfig.Builder withRetryWaitTimeInMillis(Long retryWaitTimeInMillis) {
-      lockConfig.setValue(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP, String.valueOf(retryWaitTimeInMillis));
+      lockConfig.setValue(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS, String.valueOf(retryWaitTimeInMillis));
       return this;
     }
 
     public HoodieLockConfig.Builder withRetryMaxWaitTimeInMillis(Long retryMaxWaitTimeInMillis) {
-      lockConfig.setValue(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP, String.valueOf(retryMaxWaitTimeInMillis));
+      lockConfig.setValue(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS, String.valueOf(retryMaxWaitTimeInMillis));
       return this;
     }
 
     public HoodieLockConfig.Builder withClientNumRetries(int clientNumRetries) {
-      lockConfig.setValue(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP, String.valueOf(clientNumRetries));
+      lockConfig.setValue(LOCK_ACQUIRE_CLIENT_NUM_RETRIES, String.valueOf(clientNumRetries));
       return this;
     }
 
     public HoodieLockConfig.Builder withClientRetryWaitTimeInMillis(Long clientRetryWaitTimeInMillis) {
-      lockConfig.setValue(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP, String.valueOf(clientRetryWaitTimeInMillis));
+      lockConfig.setValue(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS, String.valueOf(clientRetryWaitTimeInMillis));
       return this;
     }
 
     public HoodieLockConfig.Builder withLockWaitTimeInMillis(Long waitTimeInMillis) {
-      lockConfig.setValue(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP, String.valueOf(waitTimeInMillis));
+      lockConfig.setValue(LOCK_ACQUIRE_WAIT_TIMEOUT_MS, String.valueOf(waitTimeInMillis));
       return this;
     }
 
     public HoodieLockConfig.Builder withConflictResolutionStrategy(ConflictResolutionStrategy conflictResolutionStrategy) {
-      lockConfig.setValue(WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_PROP, conflictResolutionStrategy.getClass().getName());
+      lockConfig.setValue(WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS, conflictResolutionStrategy.getClass().getName());
       return this;
     }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java
index 319b7f6..b8f3410 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieMemoryConfig.java
@@ -41,14 +41,16 @@ import java.util.Properties;
 public class HoodieMemoryConfig extends HoodieConfig {
 
   // Default max memory fraction during hash-merge, excess spills to disk
-  public static final ConfigProperty<String> MAX_MEMORY_FRACTION_FOR_MERGE_PROP = ConfigProperty
+  public static final ConfigProperty<String> MAX_MEMORY_FRACTION_FOR_MERGE = ConfigProperty
       .key("hoodie.memory.merge.fraction")
       .defaultValue(String.valueOf(0.6))
       .withDocumentation("This fraction is multiplied with the user memory fraction (1 - spark.memory.fraction) "
           + "to get a final fraction of heap space to use during merge");
+  @Deprecated
+  public static final String MAX_MEMORY_FRACTION_FOR_MERGE_PROP = MAX_MEMORY_FRACTION_FOR_MERGE.key();
 
   // Default max memory fraction during compaction, excess spills to disk
-  public static final ConfigProperty<String> MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP = ConfigProperty
+  public static final ConfigProperty<String> MAX_MEMORY_FRACTION_FOR_COMPACTION = ConfigProperty
       .key("hoodie.memory.compaction.fraction")
       .defaultValue(String.valueOf(0.6))
       .withDocumentation("HoodieCompactedLogScanner reads logblocks, converts records to HoodieRecords and then "
@@ -56,38 +58,50 @@ public class HoodieMemoryConfig extends HoodieConfig {
           + "less than or equal to the number of entries in the corresponding parquet file. This can lead to "
           + "OOM in the Scanner. Hence, a spillable map helps alleviate the memory pressure. Use this config to "
           + "set the max allowable inMemory footprint of the spillable map");
+  @Deprecated
+  public static final String MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP = MAX_MEMORY_FRACTION_FOR_COMPACTION.key();
 
   // Default memory size (1GB) per compaction (used if SparkEnv is absent), excess spills to disk
   public static final long DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES = 1024 * 1024 * 1024L;
   // Minimum memory size (100MB) for the spillable map.
   public static final long DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES = 100 * 1024 * 1024L;
 
-  public static final ConfigProperty<Long> MAX_MEMORY_FOR_MERGE_PROP = ConfigProperty
+  public static final ConfigProperty<Long> MAX_MEMORY_FOR_MERGE = ConfigProperty
       .key("hoodie.memory.merge.max.size")
       .defaultValue(DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
       .withDocumentation("Maximum amount of memory used for merge operations, before spilling to local storage.");
+  @Deprecated
+  public static final String MAX_MEMORY_FOR_MERGE_PROP = MAX_MEMORY_FOR_MERGE.key();
 
-  public static final ConfigProperty<String> MAX_MEMORY_FOR_COMPACTION_PROP = ConfigProperty
+  public static final ConfigProperty<String> MAX_MEMORY_FOR_COMPACTION = ConfigProperty
       .key("hoodie.memory.compaction.max.size")
       .noDefaultValue()
       .withDocumentation("Maximum amount of memory used for compaction operations, before spilling to local storage.");
+  @Deprecated
+  public static final String MAX_MEMORY_FOR_COMPACTION_PROP = MAX_MEMORY_FOR_COMPACTION.key();
 
-  public static final ConfigProperty<Integer> MAX_DFS_STREAM_BUFFER_SIZE_PROP = ConfigProperty
+  public static final ConfigProperty<Integer> MAX_DFS_STREAM_BUFFER_SIZE = ConfigProperty
       .key("hoodie.memory.dfs.buffer.max.size")
       .defaultValue(16 * 1024 * 1024)
       .withDocumentation("Property to control the max memory for dfs input stream buffer size");
+  @Deprecated
+  public static final String MAX_DFS_STREAM_BUFFER_SIZE_PROP = MAX_DFS_STREAM_BUFFER_SIZE.key();
 
-  public static final ConfigProperty<String> SPILLABLE_MAP_BASE_PATH_PROP = ConfigProperty
+  public static final ConfigProperty<String> SPILLABLE_MAP_BASE_PATH = ConfigProperty
       .key("hoodie.memory.spillable.map.path")
       .defaultValue("/tmp/")
       .withDocumentation("Default file path prefix for spillable map");
+  @Deprecated
+  public static final String SPILLABLE_MAP_BASE_PATH_PROP = SPILLABLE_MAP_BASE_PATH.key();
 
-  public static final ConfigProperty<Double> WRITESTATUS_FAILURE_FRACTION_PROP = ConfigProperty
+  public static final ConfigProperty<Double> WRITESTATUS_FAILURE_FRACTION = ConfigProperty
       .key("hoodie.memory.writestatus.failure.fraction")
       .defaultValue(0.1)
       .withDocumentation("Property to control how what fraction of the failed record, exceptions we report back to driver. "
           + "Default is 10%. If set to 100%, with lot of failures, this can cause memory pressure, cause OOMs and "
           + "mask actual data errors.");
+  @Deprecated
+  public static final String WRITESTATUS_FAILURE_FRACTION_PROP = WRITESTATUS_FAILURE_FRACTION.key();
 
   private HoodieMemoryConfig() {
     super();
@@ -114,28 +128,28 @@ public class HoodieMemoryConfig extends HoodieConfig {
     }
 
     public Builder withMaxMemoryFractionPerPartitionMerge(double maxMemoryFractionPerPartitionMerge) {
-      memoryConfig.setValue(MAX_MEMORY_FRACTION_FOR_MERGE_PROP, String.valueOf(maxMemoryFractionPerPartitionMerge));
+      memoryConfig.setValue(MAX_MEMORY_FRACTION_FOR_MERGE, String.valueOf(maxMemoryFractionPerPartitionMerge));
       return this;
     }
 
     public Builder withMaxMemoryMaxSize(long mergeMaxSize, long compactionMaxSize) {
-      memoryConfig.setValue(MAX_MEMORY_FOR_MERGE_PROP, String.valueOf(mergeMaxSize));
-      memoryConfig.setValue(MAX_MEMORY_FOR_COMPACTION_PROP, String.valueOf(compactionMaxSize));
+      memoryConfig.setValue(MAX_MEMORY_FOR_MERGE, String.valueOf(mergeMaxSize));
+      memoryConfig.setValue(MAX_MEMORY_FOR_COMPACTION, String.valueOf(compactionMaxSize));
       return this;
     }
 
     public Builder withMaxMemoryFractionPerCompaction(double maxMemoryFractionPerCompaction) {
-      memoryConfig.setValue(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP, String.valueOf(maxMemoryFractionPerCompaction));
+      memoryConfig.setValue(MAX_MEMORY_FRACTION_FOR_COMPACTION, String.valueOf(maxMemoryFractionPerCompaction));
       return this;
     }
 
     public Builder withMaxDFSStreamBufferSize(int maxStreamBufferSize) {
-      memoryConfig.setValue(MAX_DFS_STREAM_BUFFER_SIZE_PROP, String.valueOf(maxStreamBufferSize));
+      memoryConfig.setValue(MAX_DFS_STREAM_BUFFER_SIZE, String.valueOf(maxStreamBufferSize));
       return this;
     }
 
     public Builder withWriteStatusFailureFraction(double failureFraction) {
-      memoryConfig.setValue(WRITESTATUS_FAILURE_FRACTION_PROP, String.valueOf(failureFraction));
+      memoryConfig.setValue(WRITESTATUS_FAILURE_FRACTION, String.valueOf(failureFraction));
       return this;
     }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
index b243fbf..95d4fe3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
@@ -40,17 +40,21 @@ import static org.apache.hudi.common.model.HoodiePayloadProps.PAYLOAD_ORDERING_F
         + "control merges based on specific business fields in the data.")
 public class HoodiePayloadConfig extends HoodieConfig {
 
-  public static final ConfigProperty<String> PAYLOAD_ORDERING_FIELD_PROP = ConfigProperty
+  public static final ConfigProperty<String> PAYLOAD_ORDERING_FIELD = ConfigProperty
       .key(PAYLOAD_ORDERING_FIELD_PROP_KEY)
       .defaultValue("ts")
       .withDocumentation("Table column/field name to order records that have the same key, before "
           + "merging and writing to storage.");
+  @Deprecated
+  public static final String PAYLOAD_ORDERING_FIELD_PROP = PAYLOAD_ORDERING_FIELD.key();
 
-  public static final ConfigProperty<String> PAYLOAD_EVENT_TIME_FIELD_PROP = ConfigProperty
+  public static final ConfigProperty<String> PAYLOAD_EVENT_TIME_FIELD = ConfigProperty
       .key(PAYLOAD_EVENT_TIME_FIELD_PROP_KEY)
       .defaultValue("ts")
       .withDocumentation("Table column/field name to derive timestamp associated with the records. This can"
           + "be useful for e.g, determining the freshness of the table.");
+  @Deprecated
+  public static final String PAYLOAD_EVENT_TIME_FIELD_PROP = PAYLOAD_EVENT_TIME_FIELD.key();
 
   private HoodiePayloadConfig() {
     super();
@@ -77,12 +81,12 @@ public class HoodiePayloadConfig extends HoodieConfig {
     }
 
     public Builder withPayloadOrderingField(String payloadOrderingField) {
-      payloadConfig.setValue(PAYLOAD_ORDERING_FIELD_PROP, String.valueOf(payloadOrderingField));
+      payloadConfig.setValue(PAYLOAD_ORDERING_FIELD, String.valueOf(payloadOrderingField));
       return this;
     }
 
     public Builder withPayloadEventTimeField(String payloadEventTimeField) {
-      payloadConfig.setValue(PAYLOAD_EVENT_TIME_FIELD_PROP, String.valueOf(payloadEventTimeField));
+      payloadConfig.setValue(PAYLOAD_EVENT_TIME_FIELD, String.valueOf(payloadEventTimeField));
       return this;
     }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java
index df45359..d313ea3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java
@@ -44,19 +44,23 @@ public class HoodieWriteCommitCallbackConfig extends HoodieConfig {
       .sinceVersion("0.6.0")
       .withDocumentation("Turn commit callback on/off. off by default.");
 
-  public static final ConfigProperty<String> CALLBACK_CLASS_PROP = ConfigProperty
+  public static final ConfigProperty<String> CALLBACK_CLASS = ConfigProperty
       .key(CALLBACK_PREFIX + "class")
       .defaultValue("org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback")
       .sinceVersion("0.6.0")
       .withDocumentation("Full path of callback class and must be a subclass of HoodieWriteCommitCallback class, "
           + "org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback by default");
+  @Deprecated
+  public static final String CALLBACK_CLASS_PROP = CALLBACK_CLASS.key();
 
   // ***** HTTP callback configs *****
-  public static final ConfigProperty<String> CALLBACK_HTTP_URL_PROP = ConfigProperty
+  public static final ConfigProperty<String> CALLBACK_HTTP_URL = ConfigProperty
       .key(CALLBACK_PREFIX + "http.url")
       .noDefaultValue()
       .sinceVersion("0.6.0")
       .withDocumentation("Callback host to be sent along with callback messages");
+  @Deprecated
+  public static final String CALLBACK_HTTP_URL_PROP = CALLBACK_HTTP_URL.key();
 
   public static final ConfigProperty<String> CALLBACK_HTTP_API_KEY = ConfigProperty
       .key(CALLBACK_PREFIX + "http.api.key")
@@ -100,12 +104,12 @@ public class HoodieWriteCommitCallbackConfig extends HoodieConfig {
     }
 
     public HoodieWriteCommitCallbackConfig.Builder withCallbackClass(String callbackClass) {
-      writeCommitCallbackConfig.setValue(CALLBACK_CLASS_PROP, callbackClass);
+      writeCommitCallbackConfig.setValue(CALLBACK_CLASS, callbackClass);
       return this;
     }
 
     public HoodieWriteCommitCallbackConfig.Builder withCallbackHttpUrl(String url) {
-      writeCommitCallbackConfig.setValue(CALLBACK_HTTP_URL_PROP, url);
+      writeCommitCallbackConfig.setValue(CALLBACK_HTTP_URL, url);
       return this;
     }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 0bc2b84..4cbbbbc 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -83,11 +83,13 @@ public class HoodieWriteConfig extends HoodieConfig {
       .noDefaultValue()
       .withDocumentation("Table name that will be used for registering with metastores like HMS. Needs to be same across runs.");
 
-  public static final ConfigProperty<String> PRECOMBINE_FIELD_PROP = ConfigProperty
+  public static final ConfigProperty<String> PRECOMBINE_FIELD = ConfigProperty
       .key("hoodie.datasource.write.precombine.field")
       .defaultValue("ts")
       .withDocumentation("Field used in preCombining before actual write. When two records have the same key value, "
           + "we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..)");
+  @Deprecated
+  public static final String PRECOMBINE_FIELD_PROP = PRECOMBINE_FIELD.key();
 
   public static final ConfigProperty<String> WRITE_PAYLOAD_CLASS = ConfigProperty
       .key("hoodie.datasource.write.payload.class")
@@ -95,13 +97,15 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. "
           + "This will render any value set for PRECOMBINE_FIELD_OPT_VAL in-effective");
 
-  public static final ConfigProperty<String> KEYGENERATOR_CLASS_PROP = ConfigProperty
+  public static final ConfigProperty<String> KEYGENERATOR_CLASS = ConfigProperty
       .key("hoodie.datasource.write.keygenerator.class")
       .noDefaultValue()
       .withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator` "
           + "extract a key out of incoming records.");
+  @Deprecated
+  public static final String KEYGENERATOR_CLASS_PROP = KEYGENERATOR_CLASS.key();
 
-  public static final ConfigProperty<String> KEYGENERATOR_TYPE_PROP = ConfigProperty
+  public static final ConfigProperty<String> KEYGENERATOR_TYPE = ConfigProperty
       .key("hoodie.datasource.write.keygenerator.type")
       .defaultValue(KeyGeneratorType.SIMPLE.name())
       .withDocumentation("Easily configure one the built-in key generators, instead of specifying the key generator class."
@@ -126,13 +130,15 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withAlternatives("hoodie.table.ro.file.format")
       .withDocumentation("");
 
-  public static final ConfigProperty<String> BASE_PATH_PROP = ConfigProperty
+  public static final ConfigProperty<String> BASE_PATH = ConfigProperty
       .key("hoodie.base.path")
       .noDefaultValue()
       .withDocumentation("Base path on lake storage, under which all the table data is stored. "
           + "Always prefix it explicitly with the storage scheme (e.g hdfs://, s3:// etc). "
           + "Hudi stores all the main meta-data about commits, savepoints, cleaning audit logs "
           + "etc in .hoodie directory under this base path directory.");
+  @Deprecated
+  public static final String BASE_PATH_PROP = BASE_PATH.key();
 
   public static final ConfigProperty<String> AVRO_SCHEMA = ConfigProperty
       .key("hoodie.avro.schema")
@@ -184,24 +190,30 @@ public class HoodieWriteConfig extends HoodieConfig {
       .defaultValue(String.valueOf(4 * 1024 * 1024))
       .withDocumentation("Size of in-memory buffer used for parallelizing network reads and lake storage writes.");
 
-  public static final ConfigProperty<String> COMBINE_BEFORE_INSERT_PROP = ConfigProperty
+  public static final ConfigProperty<String> COMBINE_BEFORE_INSERT = ConfigProperty
       .key("hoodie.combine.before.insert")
       .defaultValue("false")
       .withDocumentation("When inserted records share same key, controls whether they should be first combined (i.e de-duplicated) before"
           + " writing to storage.");
+  @Deprecated
+  public static final String COMBINE_BEFORE_INSERT_PROP = COMBINE_BEFORE_INSERT.key();
 
-  public static final ConfigProperty<String> COMBINE_BEFORE_UPSERT_PROP = ConfigProperty
+  public static final ConfigProperty<String> COMBINE_BEFORE_UPSERT = ConfigProperty
       .key("hoodie.combine.before.upsert")
       .defaultValue("true")
       .withDocumentation("When upserted records share same key, controls whether they should be first combined (i.e de-duplicated) before"
           + " writing to storage. This should be turned off only if you are absolutely certain that there are no duplicates incoming, "
           + " otherwise it can lead to duplicate keys and violate the uniqueness guarantees.");
+  @Deprecated
+  public static final String COMBINE_BEFORE_UPSERT_PROP = COMBINE_BEFORE_UPSERT.key();
 
-  public static final ConfigProperty<String> COMBINE_BEFORE_DELETE_PROP = ConfigProperty
+  public static final ConfigProperty<String> COMBINE_BEFORE_DELETE = ConfigProperty
       .key("hoodie.combine.before.delete")
       .defaultValue("true")
       .withDocumentation("During delete operations, controls whether we should combine deletes (and potentially also upserts) before "
           + " writing to storage.");
+  @Deprecated
+  public static final String COMBINE_BEFORE_DELETE_PROP = COMBINE_BEFORE_DELETE.key();
 
   public static final ConfigProperty<String> WRITE_STATUS_STORAGE_LEVEL = ConfigProperty
       .key("hoodie.write.status.storage.level")
@@ -209,17 +221,21 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Write status objects hold metadata about a write (stats, errors), that is not yet committed to storage. "
           + "This controls the how that information is cached for inspection by clients. We rarely expect this to be changed.");
 
-  public static final ConfigProperty<String> HOODIE_AUTO_COMMIT_PROP = ConfigProperty
+  public static final ConfigProperty<String> HOODIE_AUTO_COMMIT = ConfigProperty
       .key("hoodie.auto.commit")
       .defaultValue("true")
       .withDocumentation("Controls whether a write operation should auto commit. This can be turned off to perform inspection"
           + " of the uncommitted write before deciding to commit.");
+  @Deprecated
+  public static final String HOODIE_AUTO_COMMIT_PROP = HOODIE_AUTO_COMMIT.key();
 
-  public static final ConfigProperty<String> HOODIE_WRITE_STATUS_CLASS_PROP = ConfigProperty
+  public static final ConfigProperty<String> HOODIE_WRITE_STATUS_CLASS = ConfigProperty
       .key("hoodie.writestatus.class")
       .defaultValue(WriteStatus.class.getName())
       .withDocumentation("Subclass of " + WriteStatus.class.getName() + " to be used to collect information about a write. Can be "
           + "overridden to collection additional metrics/statistics about the data if needed.");
+  @Deprecated
+  public static final String HOODIE_WRITE_STATUS_CLASS_PROP = HOODIE_WRITE_STATUS_CLASS.key();
 
   public static final ConfigProperty<String> FINALIZE_WRITE_PARALLELISM = ConfigProperty
       .key("hoodie.finalize.write.parallelism")
@@ -228,7 +244,7 @@ public class HoodieWriteConfig extends HoodieConfig {
           + "files from lake storage, before committing the write. Reduce this value, if the high number of tasks incur delays for smaller tables "
           + "or low latency writes.");
 
-  public static final ConfigProperty<String> MARKERS_TYPE_PROP = ConfigProperty
+  public static final ConfigProperty<String> MARKERS_TYPE = ConfigProperty
       .key("hoodie.write.markers.type")
       .defaultValue(MarkerType.DIRECT.toString())
       .sinceVersion("0.9.0")
@@ -239,14 +255,14 @@ public class HoodieWriteConfig extends HoodieConfig {
           + "which serves as a proxy.  New marker entries are batch processed and stored "
           + "in a limited number of underlying files for efficiency.");
 
-  public static final ConfigProperty<Integer> MARKERS_TIMELINE_SERVER_BASED_BATCH_NUM_THREADS_PROP = ConfigProperty
+  public static final ConfigProperty<Integer> MARKERS_TIMELINE_SERVER_BASED_BATCH_NUM_THREADS = ConfigProperty
       .key("hoodie.markers.timeline_server_based.batch.num_threads")
       .defaultValue(20)
       .sinceVersion("0.9.0")
       .withDocumentation("Number of threads to use for batch processing marker "
           + "creation requests at the timeline server");
 
-  public static final ConfigProperty<Long> MARKERS_TIMELINE_SERVER_BASED_BATCH_INTERVAL_MS_PROP = ConfigProperty
+  public static final ConfigProperty<Long> MARKERS_TIMELINE_SERVER_BASED_BATCH_INTERVAL_MS = ConfigProperty
       .key("hoodie.markers.timeline_server_based.batch.interval_ms")
       .defaultValue(50L)
       .sinceVersion("0.9.0")
@@ -302,27 +318,35 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Controls whether or not, the requests to the timeline server are processed in asynchronous fashion, "
           + "potentially improving throughput.");
 
-  public static final ConfigProperty<String> FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP = ConfigProperty
+  public static final ConfigProperty<String> FAIL_ON_TIMELINE_ARCHIVING_ENABLED = ConfigProperty
       .key("hoodie.fail.on.timeline.archiving")
       .defaultValue("true")
       .withDocumentation("Timeline archiving removes older instants from the timeline, after each write operation, to minimize metadata overhead. "
           + "Controls whether or not, the write should be failed as well, if such archiving fails.");
+  @Deprecated
+  public static final String FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP = FAIL_ON_TIMELINE_ARCHIVING_ENABLED.key();
 
-  public static final ConfigProperty<Long> INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP = ConfigProperty
+  public static final ConfigProperty<Long> INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = ConfigProperty
       .key("hoodie.consistency.check.initial_interval_ms")
       .defaultValue(2000L)
       .withDocumentation("Initial time between successive attempts to ensure written data's metadata is consistent on storage. Grows with exponential"
           + " backoff after the initial value.");
+  @Deprecated
+  public static final String INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP = INITIAL_CONSISTENCY_CHECK_INTERVAL_MS.key();
 
-  public static final ConfigProperty<Long> MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = ConfigProperty
+  public static final ConfigProperty<Long> MAX_CONSISTENCY_CHECK_INTERVAL_MS = ConfigProperty
       .key("hoodie.consistency.check.max_interval_ms")
       .defaultValue(300000L)
       .withDocumentation("Max time to wait between successive attempts at performing consistency checks");
+  @Deprecated
+  public static final String MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = MAX_CONSISTENCY_CHECK_INTERVAL_MS.key();
 
-  public static final ConfigProperty<Integer> MAX_CONSISTENCY_CHECKS_PROP = ConfigProperty
+  public static final ConfigProperty<Integer> MAX_CONSISTENCY_CHECKS = ConfigProperty
       .key("hoodie.consistency.check.max_checks")
       .defaultValue(7)
       .withDocumentation("Maximum number of checks, for consistency of written data.");
+  @Deprecated
+  public static final String MAX_CONSISTENCY_CHECKS_PROP = MAX_CONSISTENCY_CHECKS.key();
 
   public static final ConfigProperty<String> MERGE_DATA_VALIDATION_CHECK_ENABLED = ConfigProperty
       .key("hoodie.merge.data.validation.enabled")
@@ -336,34 +360,42 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("When enabled, we allow duplicate keys even if inserts are routed to merge with an existing file (for ensuring file sizing)."
           + " This is only relevant for insert operation, since upsert, delete operations will ensure unique key constraints are maintained.");
 
-  public static final ConfigProperty<Integer> CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP = ConfigProperty
+  public static final ConfigProperty<Integer> CLIENT_HEARTBEAT_INTERVAL_IN_MS = ConfigProperty
       .key("hoodie.client.heartbeat.interval_in_ms")
       .defaultValue(60 * 1000)
       .withDocumentation("Writers perform heartbeats to indicate liveness. Controls how often (in ms), such heartbeats are registered to lake storage.");
+  @Deprecated
+  public static final String CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP = CLIENT_HEARTBEAT_INTERVAL_IN_MS.key();
 
-  public static final ConfigProperty<Integer> CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES_PROP = ConfigProperty
+  public static final ConfigProperty<Integer> CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES = ConfigProperty
       .key("hoodie.client.heartbeat.tolerable.misses")
       .defaultValue(2)
       .withDocumentation("Number of heartbeat misses, before a writer is deemed not alive and all pending writes are aborted.");
+  @Deprecated
+  public static final String CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES_PROP = CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES.key();
 
-  public static final ConfigProperty<String> WRITE_CONCURRENCY_MODE_PROP = ConfigProperty
+  public static final ConfigProperty<String> WRITE_CONCURRENCY_MOD = ConfigProperty
       .key("hoodie.write.concurrency.mode")
       .defaultValue(WriteConcurrencyMode.SINGLE_WRITER.name())
       .withDocumentation("Enable different concurrency modes. Options are "
           + "SINGLE_WRITER: Only one active writer to the table. Maximizes throughput"
           + "OPTIMISTIC_CONCURRENCY_CONTROL: Multiple writers can operate on the table and exactly one of them succeed "
           + "if a conflict (writes affect the same file group) is detected.");
+  @Deprecated
+  public static final String WRITE_CONCURRENCY_MODE_PROP = WRITE_CONCURRENCY_MOD.key();
 
-  public static final ConfigProperty<String> WRITE_META_KEY_PREFIXES_PROP = ConfigProperty
+  public static final ConfigProperty<String> WRITE_META_KEY_PREFIXES = ConfigProperty
       .key("hoodie.write.meta.key.prefixes")
       .defaultValue("")
       .withDocumentation("Comma separated metadata key prefixes to override from latest commit "
           + "during overlapping commits via multi writing");
+  @Deprecated
+  public static final String WRITE_META_KEY_PREFIXES_PROP = WRITE_META_KEY_PREFIXES.key();
 
   /**
    * Currently the  use this to specify the write schema.
    */
-  public static final ConfigProperty<String> WRITE_SCHEMA_PROP = ConfigProperty
+  public static final ConfigProperty<String> WRITE_SCHEMA = ConfigProperty
       .key("hoodie.write.schema")
       .noDefaultValue()
       .withDocumentation("The specified write schema. In most case, we do not need set this parameter,"
@@ -401,7 +433,7 @@ public class HoodieWriteConfig extends HoodieConfig {
   public static final ConfigProperty<Boolean> ALLOW_OPERATION_METADATA_FIELD = ConfigProperty
       .key("hoodie.allow.operation.metadata.field")
       .defaultValue(false)
-      .sinceVersion("0.9")
+      .sinceVersion("0.9.0")
       .withDocumentation("Whether to include '_hoodie_operation' in the metadata fields. "
           + "Once enabled, all the changes of a record are persisted to the delta log directly without merge");
 
@@ -446,7 +478,7 @@ public class HoodieWriteConfig extends HoodieConfig {
    * base properties.
    */
   public String getBasePath() {
-    return getString(BASE_PATH_PROP);
+    return getString(BASE_PATH);
   }
 
   public String getSchema() {
@@ -465,8 +497,8 @@ public class HoodieWriteConfig extends HoodieConfig {
    * @return
    */
   public String getWriteSchema() {
-    if (props.containsKey(WRITE_SCHEMA_PROP.key())) {
-      return getString(WRITE_SCHEMA_PROP);
+    if (props.containsKey(WRITE_SCHEMA.key())) {
+      return getString(WRITE_SCHEMA);
     }
     return getSchema();
   }
@@ -480,7 +512,7 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public String getPreCombineField() {
-    return getString(PRECOMBINE_FIELD_PROP);
+    return getString(PRECOMBINE_FIELD);
   }
 
   public String getWritePayloadClass() {
@@ -488,11 +520,11 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public String getKeyGeneratorClass() {
-    return getString(KEYGENERATOR_CLASS_PROP);
+    return getString(KEYGENERATOR_CLASS);
   }
 
   public Boolean shouldAutoCommit() {
-    return getBoolean(HOODIE_AUTO_COMMIT_PROP);
+    return getBoolean(HOODIE_AUTO_COMMIT);
   }
 
   public Boolean shouldAssumeDatePartitioning() {
@@ -544,15 +576,15 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public boolean shouldCombineBeforeInsert() {
-    return getBoolean(COMBINE_BEFORE_INSERT_PROP);
+    return getBoolean(COMBINE_BEFORE_INSERT);
   }
 
   public boolean shouldCombineBeforeUpsert() {
-    return getBoolean(COMBINE_BEFORE_UPSERT_PROP);
+    return getBoolean(COMBINE_BEFORE_UPSERT);
   }
 
   public boolean shouldCombineBeforeDelete() {
-    return getBoolean(COMBINE_BEFORE_DELETE_PROP);
+    return getBoolean(COMBINE_BEFORE_DELETE);
   }
 
   public boolean shouldAllowMultiWriteOnSameInstant() {
@@ -560,7 +592,7 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public String getWriteStatusClassName() {
-    return getString(HOODIE_WRITE_STATUS_CLASS_PROP);
+    return getString(HOODIE_WRITE_STATUS_CLASS);
   }
 
   public int getFinalizeWriteParallelism() {
@@ -568,16 +600,16 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public MarkerType getMarkersType() {
-    String markerType = getString(MARKERS_TYPE_PROP);
+    String markerType = getString(MARKERS_TYPE);
     return MarkerType.valueOf(markerType.toUpperCase());
   }
 
   public int getMarkersTimelineServerBasedBatchNumThreads() {
-    return getInt(MARKERS_TIMELINE_SERVER_BASED_BATCH_NUM_THREADS_PROP);
+    return getInt(MARKERS_TIMELINE_SERVER_BASED_BATCH_NUM_THREADS);
   }
 
   public long getMarkersTimelineServerBasedBatchIntervalMs() {
-    return getLong(MARKERS_TIMELINE_SERVER_BASED_BATCH_INTERVAL_MS_PROP);
+    return getLong(MARKERS_TIMELINE_SERVER_BASED_BATCH_INTERVAL_MS);
   }
 
   public int getMarkersDeleteParallelism() {
@@ -609,19 +641,19 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public boolean isFailOnTimelineArchivingEnabled() {
-    return getBoolean(FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP);
+    return getBoolean(FAIL_ON_TIMELINE_ARCHIVING_ENABLED);
   }
 
   public int getMaxConsistencyChecks() {
-    return getInt(MAX_CONSISTENCY_CHECKS_PROP);
+    return getInt(MAX_CONSISTENCY_CHECKS);
   }
 
   public int getInitialConsistencyCheckIntervalMs() {
-    return getInt(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP);
+    return getInt(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS);
   }
 
   public int getMaxConsistencyCheckIntervalMs() {
-    return getInt(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP);
+    return getInt(MAX_CONSISTENCY_CHECK_INTERVAL_MS);
   }
 
   public BulkInsertSortMode getBulkInsertSortMode() {
@@ -650,23 +682,23 @@ public class HoodieWriteConfig extends HoodieConfig {
    * compaction properties.
    */
   public HoodieCleaningPolicy getCleanerPolicy() {
-    return HoodieCleaningPolicy.valueOf(getString(HoodieCompactionConfig.CLEANER_POLICY_PROP));
+    return HoodieCleaningPolicy.valueOf(getString(HoodieCompactionConfig.CLEANER_POLICY));
   }
 
   public int getCleanerFileVersionsRetained() {
-    return getInt(HoodieCompactionConfig.CLEANER_FILE_VERSIONS_RETAINED_PROP);
+    return getInt(HoodieCompactionConfig.CLEANER_FILE_VERSIONS_RETAINED);
   }
 
   public int getCleanerCommitsRetained() {
-    return getInt(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP);
+    return getInt(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED);
   }
 
   public int getMaxCommitsToKeep() {
-    return getInt(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP_PROP);
+    return getInt(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP);
   }
 
   public int getMinCommitsToKeep() {
-    return getInt(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP);
+    return getInt(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP);
   }
 
   public int getParquetSmallFileLimit() {
@@ -674,7 +706,7 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public double getRecordSizeEstimationThreshold() {
-    return getDouble(HoodieCompactionConfig.RECORD_SIZE_ESTIMATION_THRESHOLD_PROP);
+    return getDouble(HoodieCompactionConfig.RECORD_SIZE_ESTIMATION_THRESHOLD);
   }
 
   public int getCopyOnWriteInsertSplitSize() {
@@ -694,11 +726,11 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public boolean isAutoClean() {
-    return getBoolean(HoodieCompactionConfig.AUTO_CLEAN_PROP);
+    return getBoolean(HoodieCompactionConfig.AUTO_CLEAN);
   }
 
   public boolean isAsyncClean() {
-    return getBoolean(HoodieCompactionConfig.ASYNC_CLEAN_PROP);
+    return getBoolean(HoodieCompactionConfig.ASYNC_CLEAN);
   }
 
   public boolean incrementalCleanerModeEnabled() {
@@ -706,39 +738,39 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public boolean inlineCompactionEnabled() {
-    return getBoolean(HoodieCompactionConfig.INLINE_COMPACT_PROP);
+    return getBoolean(HoodieCompactionConfig.INLINE_COMPACT);
   }
 
   public CompactionTriggerStrategy getInlineCompactTriggerStrategy() {
-    return CompactionTriggerStrategy.valueOf(getString(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY_PROP));
+    return CompactionTriggerStrategy.valueOf(getString(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY));
   }
 
   public int getInlineCompactDeltaCommitMax() {
-    return getInt(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP);
+    return getInt(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS);
   }
 
   public int getInlineCompactDeltaSecondsMax() {
-    return getInt(HoodieCompactionConfig.INLINE_COMPACT_TIME_DELTA_SECONDS_PROP);
+    return getInt(HoodieCompactionConfig.INLINE_COMPACT_TIME_DELTA_SECONDS);
   }
 
   public CompactionStrategy getCompactionStrategy() {
-    return ReflectionUtils.loadClass(getString(HoodieCompactionConfig.COMPACTION_STRATEGY_PROP));
+    return ReflectionUtils.loadClass(getString(HoodieCompactionConfig.COMPACTION_STRATEGY));
   }
 
   public Long getTargetIOPerCompactionInMB() {
-    return getLong(HoodieCompactionConfig.TARGET_IO_PER_COMPACTION_IN_MB_PROP);
+    return getLong(HoodieCompactionConfig.TARGET_IO_PER_COMPACTION_IN_MB);
   }
 
   public Boolean getCompactionLazyBlockReadEnabled() {
-    return getBoolean(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP);
+    return getBoolean(HoodieCompactionConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED);
   }
 
   public Boolean getCompactionReverseLogReadEnabled() {
-    return getBoolean(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLED_PROP);
+    return getBoolean(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLED);
   }
 
   public boolean inlineClusteringEnabled() {
-    return getBoolean(HoodieClusteringConfig.INLINE_CLUSTERING_PROP);
+    return getBoolean(HoodieClusteringConfig.INLINE_CLUSTERING);
   }
 
   public boolean isAsyncClusteringEnabled() {
@@ -755,7 +787,7 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public int getInlineClusterMaxCommits() {
-    return getInt(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP);
+    return getInt(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT);
   }
 
   public int getAsyncClusterMaxCommits() {
@@ -763,15 +795,15 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public String getPayloadClass() {
-    return getString(HoodieCompactionConfig.PAYLOAD_CLASS_PROP);
+    return getString(HoodieCompactionConfig.PAYLOAD_CLASS);
   }
 
   public int getTargetPartitionsPerDayBasedCompaction() {
-    return getInt(HoodieCompactionConfig.TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP);
+    return getInt(HoodieCompactionConfig.TARGET_PARTITIONS_PER_DAYBASED_COMPACTION);
   }
 
   public int getCommitArchivalBatchSize() {
-    return getInt(HoodieCompactionConfig.COMMITS_ARCHIVAL_BATCH_SIZE_PROP);
+    return getInt(HoodieCompactionConfig.COMMITS_ARCHIVAL_BATCH_SIZE);
   }
 
   public Boolean shouldCleanBootstrapBaseFile() {
@@ -779,12 +811,12 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public String getClusteringUpdatesStrategyClass() {
-    return getString(HoodieClusteringConfig.CLUSTERING_UPDATES_STRATEGY_PROP);
+    return getString(HoodieClusteringConfig.CLUSTERING_UPDATES_STRATEGY);
   }
 
   public HoodieFailedWritesCleaningPolicy getFailedWritesCleanPolicy() {
     return HoodieFailedWritesCleaningPolicy
-        .valueOf(getString(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY_PROP));
+        .valueOf(getString(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY));
   }
 
   /**
@@ -830,11 +862,11 @@ public class HoodieWriteConfig extends HoodieConfig {
    * index properties.
    */
   public HoodieIndex.IndexType getIndexType() {
-    return HoodieIndex.IndexType.valueOf(getString(HoodieIndexConfig.INDEX_TYPE_PROP));
+    return HoodieIndex.IndexType.valueOf(getString(HoodieIndexConfig.INDEX_TYPE));
   }
 
   public String getIndexClass() {
-    return getString(HoodieIndexConfig.INDEX_CLASS_PROP);
+    return getString(HoodieIndexConfig.INDEX_CLASS);
   }
 
   public int getBloomFilterNumEntries() {
@@ -846,11 +878,11 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public String getHbaseZkQuorum() {
-    return getString(HoodieHBaseIndexConfig.HBASE_ZKQUORUM_PROP);
+    return getString(HoodieHBaseIndexConfig.HBASE_ZKQUORUM);
   }
 
   public int getHbaseZkPort() {
-    return getInt(HoodieHBaseIndexConfig.HBASE_ZKPORT_PROP);
+    return getInt(HoodieHBaseIndexConfig.HBASE_ZKPORT);
   }
 
   public String getHBaseZkZnodeParent() {
@@ -858,11 +890,11 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public String getHbaseTableName() {
-    return getString(HoodieHBaseIndexConfig.HBASE_TABLENAME_PROP);
+    return getString(HoodieHBaseIndexConfig.HBASE_TABLENAME);
   }
 
   public int getHbaseIndexGetBatchSize() {
-    return getInt(HoodieHBaseIndexConfig.HBASE_GET_BATCH_SIZE_PROP);
+    return getInt(HoodieHBaseIndexConfig.HBASE_GET_BATCH_SIZE);
   }
 
   public Boolean getHBaseIndexRollbackSync() {
@@ -870,11 +902,11 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public int getHbaseIndexPutBatchSize() {
-    return getInt(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE_PROP);
+    return getInt(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE);
   }
 
   public Boolean getHbaseIndexPutBatchSizeAutoCompute() {
-    return getBoolean(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP);
+    return getBoolean(HoodieHBaseIndexConfig.HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE);
   }
 
   public String getHBaseQPSResourceAllocatorClass() {
@@ -915,15 +947,15 @@ public class HoodieWriteConfig extends HoodieConfig {
    * the jobs would be (0.17) 1/6, 0.33 (2/6) and 0.5 (3/6) respectively.
    */
   public float getHbaseIndexQPSFraction() {
-    return getFloat(HoodieHBaseIndexConfig.HBASE_QPS_FRACTION_PROP);
+    return getFloat(HoodieHBaseIndexConfig.HBASE_QPS_FRACTION);
   }
 
   public float getHBaseIndexMinQPSFraction() {
-    return getFloat(HoodieHBaseIndexConfig.HBASE_MIN_QPS_FRACTION_PROP);
+    return getFloat(HoodieHBaseIndexConfig.HBASE_MIN_QPS_FRACTION);
   }
 
   public float getHBaseIndexMaxQPSFraction() {
-    return getFloat(HoodieHBaseIndexConfig.HBASE_MAX_QPS_FRACTION_PROP);
+    return getFloat(HoodieHBaseIndexConfig.HBASE_MAX_QPS_FRACTION);
   }
 
   /**
@@ -931,7 +963,7 @@ public class HoodieWriteConfig extends HoodieConfig {
    * Hoodie jobs to an Hbase Region Server
    */
   public int getHbaseIndexMaxQPSPerRegionServer() {
-    return getInt(HoodieHBaseIndexConfig.HBASE_MAX_QPS_PER_REGION_SERVER_PROP);
+    return getInt(HoodieHBaseIndexConfig.HBASE_MAX_QPS_PER_REGION_SERVER);
   }
 
   public boolean getHbaseIndexUpdatePartitionPath() {
@@ -939,27 +971,27 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public int getBloomIndexParallelism() {
-    return getInt(HoodieIndexConfig.BLOOM_INDEX_PARALLELISM_PROP);
+    return getInt(HoodieIndexConfig.BLOOM_INDEX_PARALLELISM);
   }
 
   public boolean getBloomIndexPruneByRanges() {
-    return getBoolean(HoodieIndexConfig.BLOOM_INDEX_PRUNE_BY_RANGES_PROP);
+    return getBoolean(HoodieIndexConfig.BLOOM_INDEX_PRUNE_BY_RANGES);
   }
 
   public boolean getBloomIndexUseCaching() {
-    return getBoolean(HoodieIndexConfig.BLOOM_INDEX_USE_CACHING_PROP);
+    return getBoolean(HoodieIndexConfig.BLOOM_INDEX_USE_CACHING);
   }
 
   public boolean useBloomIndexTreebasedFilter() {
-    return getBoolean(HoodieIndexConfig.BLOOM_INDEX_TREE_BASED_FILTER_PROP);
+    return getBoolean(HoodieIndexConfig.BLOOM_INDEX_TREE_BASED_FILTER);
   }
 
   public boolean useBloomIndexBucketizedChecking() {
-    return getBoolean(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING_PROP);
+    return getBoolean(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING);
   }
 
   public int getBloomIndexKeysPerBucket() {
-    return getInt(HoodieIndexConfig.BLOOM_INDEX_KEYS_PER_BUCKET_PROP);
+    return getInt(HoodieIndexConfig.BLOOM_INDEX_KEYS_PER_BUCKET);
   }
 
   public boolean getBloomIndexUpdatePartitionPath() {
@@ -967,15 +999,15 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public int getSimpleIndexParallelism() {
-    return getInt(HoodieIndexConfig.SIMPLE_INDEX_PARALLELISM_PROP);
+    return getInt(HoodieIndexConfig.SIMPLE_INDEX_PARALLELISM);
   }
 
   public boolean getSimpleIndexUseCaching() {
-    return getBoolean(HoodieIndexConfig.SIMPLE_INDEX_USE_CACHING_PROP);
+    return getBoolean(HoodieIndexConfig.SIMPLE_INDEX_USE_CACHING);
   }
 
   public int getGlobalSimpleIndexParallelism() {
-    return getInt(HoodieIndexConfig.GLOBAL_SIMPLE_INDEX_PARALLELISM_PROP);
+    return getInt(HoodieIndexConfig.GLOBAL_SIMPLE_INDEX_PARALLELISM);
   }
 
   public boolean getGlobalSimpleIndexUpdatePartitionPath() {
@@ -1156,15 +1188,15 @@ public class HoodieWriteConfig extends HoodieConfig {
    * memory configs.
    */
   public int getMaxDFSStreamBufferSize() {
-    return getInt(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP);
+    return getInt(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE);
   }
 
   public String getSpillableMapBasePath() {
-    return getString(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH_PROP);
+    return getString(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH);
   }
 
   public double getWriteStatusFailureFraction() {
-    return getDouble(HoodieMemoryConfig.WRITESTATUS_FAILURE_FRACTION_PROP);
+    return getDouble(HoodieMemoryConfig.WRITESTATUS_FAILURE_FRACTION);
   }
 
   public ConsistencyGuardConfig getConsistencyGuardConfig() {
@@ -1211,11 +1243,11 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public String getCallbackClass() {
-    return getString(HoodieWriteCommitCallbackConfig.CALLBACK_CLASS_PROP);
+    return getString(HoodieWriteCommitCallbackConfig.CALLBACK_CLASS);
   }
 
   public String getBootstrapSourceBasePath() {
-    return getString(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP);
+    return getString(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH);
   }
 
   public String getBootstrapModeSelectorClass() {
@@ -1251,15 +1283,15 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public Long getMaxMemoryPerPartitionMerge() {
-    return getLong(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE_PROP);
+    return getLong(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE);
   }
 
   public Long getHoodieClientHeartbeatIntervalInMs() {
-    return getLong(CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP);
+    return getLong(CLIENT_HEARTBEAT_INTERVAL_IN_MS);
   }
 
   public Integer getHoodieClientHeartbeatTolerableMisses() {
-    return getInt(CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES_PROP);
+    return getInt(CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES);
   }
 
   /**
@@ -1303,27 +1335,27 @@ public class HoodieWriteConfig extends HoodieConfig {
    */
 
   public String getLockProviderClass() {
-    return getString(HoodieLockConfig.LOCK_PROVIDER_CLASS_PROP);
+    return getString(HoodieLockConfig.LOCK_PROVIDER_CLASS);
   }
 
   public String getLockHiveDatabaseName() {
-    return getString(HoodieLockConfig.HIVE_DATABASE_NAME_PROP);
+    return getString(HoodieLockConfig.HIVE_DATABASE_NAME);
   }
 
   public String getLockHiveTableName() {
-    return getString(HoodieLockConfig.HIVE_TABLE_NAME_PROP);
+    return getString(HoodieLockConfig.HIVE_TABLE_NAME);
   }
 
   public ConflictResolutionStrategy getWriteConflictResolutionStrategy() {
-    return ReflectionUtils.loadClass(getString(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_PROP));
+    return ReflectionUtils.loadClass(getString(HoodieLockConfig.WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS));
   }
 
   public Long getLockAcquireWaitTimeoutInMs() {
-    return getLong(HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP);
+    return getLong(HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS);
   }
 
   public WriteConcurrencyMode getWriteConcurrencyMode() {
-    return WriteConcurrencyMode.fromValue(getString(WRITE_CONCURRENCY_MODE_PROP));
+    return WriteConcurrencyMode.fromValue(getString(WRITE_CONCURRENCY_MOD));
   }
 
   public Boolean inlineTableServices() {
@@ -1331,7 +1363,7 @@ public class HoodieWriteConfig extends HoodieConfig {
   }
 
   public String getWriteMetaKeyPrefixes() {
-    return getString(WRITE_META_KEY_PREFIXES_PROP);
+    return getString(WRITE_META_KEY_PREFIXES);
   }
   
   public String getPreCommitValidators() {
@@ -1404,7 +1436,7 @@ public class HoodieWriteConfig extends HoodieConfig {
     }
 
     public Builder withPath(String basePath) {
-      writeConfig.setValue(BASE_PATH_PROP, basePath);
+      writeConfig.setValue(BASE_PATH, basePath);
       return this;
     }
 
@@ -1424,7 +1456,7 @@ public class HoodieWriteConfig extends HoodieConfig {
     }
 
     public Builder withPreCombineField(String preCombineField) {
-      writeConfig.setValue(PRECOMBINE_FIELD_PROP, preCombineField);
+      writeConfig.setValue(PRECOMBINE_FIELD, preCombineField);
       return this;
     }
 
@@ -1434,7 +1466,7 @@ public class HoodieWriteConfig extends HoodieConfig {
     }
 
     public Builder withKeyGenerator(String keyGeneratorClass) {
-      writeConfig.setValue(KEYGENERATOR_CLASS_PROP, keyGeneratorClass);
+      writeConfig.setValue(KEYGENERATOR_CLASS, keyGeneratorClass);
       return this;
     }
 
@@ -1480,13 +1512,13 @@ public class HoodieWriteConfig extends HoodieConfig {
     }
 
     public Builder combineInput(boolean onInsert, boolean onUpsert) {
-      writeConfig.setValue(COMBINE_BEFORE_INSERT_PROP, String.valueOf(onInsert));
-      writeConfig.setValue(COMBINE_BEFORE_UPSERT_PROP, String.valueOf(onUpsert));
+      writeConfig.setValue(COMBINE_BEFORE_INSERT, String.valueOf(onInsert));
+      writeConfig.setValue(COMBINE_BEFORE_UPSERT, String.valueOf(onUpsert));
       return this;
     }
 
     public Builder combineDeleteInput(boolean onDelete) {
-      writeConfig.setValue(COMBINE_BEFORE_DELETE_PROP, String.valueOf(onDelete));
+      writeConfig.setValue(COMBINE_BEFORE_DELETE, String.valueOf(onDelete));
       return this;
     }
 
@@ -1562,12 +1594,12 @@ public class HoodieWriteConfig extends HoodieConfig {
     }
 
     public Builder withAutoCommit(boolean autoCommit) {
-      writeConfig.setValue(HOODIE_AUTO_COMMIT_PROP, String.valueOf(autoCommit));
+      writeConfig.setValue(HOODIE_AUTO_COMMIT, String.valueOf(autoCommit));
       return this;
     }
 
     public Builder withWriteStatusClass(Class<? extends WriteStatus> writeStatusClass) {
-      writeConfig.setValue(HOODIE_WRITE_STATUS_CLASS_PROP, writeStatusClass.getName());
+      writeConfig.setValue(HOODIE_WRITE_STATUS_CLASS, writeStatusClass.getName());
       return this;
     }
 
@@ -1595,17 +1627,17 @@ public class HoodieWriteConfig extends HoodieConfig {
     }
 
     public Builder withMarkersType(String markerType) {
-      writeConfig.setValue(MARKERS_TYPE_PROP, markerType);
+      writeConfig.setValue(MARKERS_TYPE, markerType);
       return this;
     }
 
     public Builder withMarkersTimelineServerBasedBatchNumThreads(int numThreads) {
-      writeConfig.setValue(MARKERS_TIMELINE_SERVER_BASED_BATCH_NUM_THREADS_PROP, String.valueOf(numThreads));
+      writeConfig.setValue(MARKERS_TIMELINE_SERVER_BASED_BATCH_NUM_THREADS, String.valueOf(numThreads));
       return this;
     }
 
     public Builder withMarkersTimelineServerBasedBatchIntervalMs(long intervalMs) {
-      writeConfig.setValue(MARKERS_TIMELINE_SERVER_BASED_BATCH_INTERVAL_MS_PROP, String.valueOf(intervalMs));
+      writeConfig.setValue(MARKERS_TIMELINE_SERVER_BASED_BATCH_INTERVAL_MS, String.valueOf(intervalMs));
       return this;
     }
 
@@ -1655,22 +1687,22 @@ public class HoodieWriteConfig extends HoodieConfig {
     }
 
     public Builder withHeartbeatIntervalInMs(Integer heartbeatIntervalInMs) {
-      writeConfig.setValue(CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP, String.valueOf(heartbeatIntervalInMs));
+      writeConfig.setValue(CLIENT_HEARTBEAT_INTERVAL_IN_MS, String.valueOf(heartbeatIntervalInMs));
       return this;
     }
 
     public Builder withHeartbeatTolerableMisses(Integer heartbeatTolerableMisses) {
-      writeConfig.setValue(CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES_PROP, String.valueOf(heartbeatTolerableMisses));
+      writeConfig.setValue(CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES, String.valueOf(heartbeatTolerableMisses));
       return this;
     }
 
     public Builder withWriteConcurrencyMode(WriteConcurrencyMode concurrencyMode) {
-      writeConfig.setValue(WRITE_CONCURRENCY_MODE_PROP, concurrencyMode.value());
+      writeConfig.setValue(WRITE_CONCURRENCY_MOD, concurrencyMode.value());
       return this;
     }
 
     public Builder withWriteMetaKeyPrefixes(String writeMetaKeyPrefixes) {
-      writeConfig.setValue(WRITE_META_KEY_PREFIXES_PROP, writeMetaKeyPrefixes);
+      writeConfig.setValue(WRITE_META_KEY_PREFIXES, writeMetaKeyPrefixes);
       return this;
     }
 
@@ -1729,10 +1761,10 @@ public class HoodieWriteConfig extends HoodieConfig {
       String layoutVersion = writeConfig.getString(TIMELINE_LAYOUT_VERSION);
       // Ensure Layout Version is good
       new TimelineLayoutVersion(Integer.parseInt(layoutVersion));
-      Objects.requireNonNull(writeConfig.getString(BASE_PATH_PROP));
-      if (writeConfig.getString(WRITE_CONCURRENCY_MODE_PROP)
+      Objects.requireNonNull(writeConfig.getString(BASE_PATH));
+      if (writeConfig.getString(WRITE_CONCURRENCY_MOD)
           .equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name())) {
-        ValidationUtils.checkArgument(writeConfig.getString(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY_PROP)
+        ValidationUtils.checkArgument(writeConfig.getString(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY)
             != HoodieFailedWritesCleaningPolicy.EAGER.name(), "To enable optimistic concurrency control, set hoodie.cleaner.policy.failed.writes=LAZY");
       }
     }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
index 6215df5..60d8500 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java
@@ -25,10 +25,10 @@ import org.apache.hudi.common.util.Option;
 
 import static org.apache.hudi.config.HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
 import static org.apache.hudi.config.HoodieMemoryConfig.DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
-import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION_PROP;
-import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE_PROP;
-import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP;
-import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE_PROP;
+import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION;
+import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
+import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION;
+import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE;
 
 public class IOUtils {
   /**
@@ -54,18 +54,18 @@ public class IOUtils {
   }
 
   public static long getMaxMemoryPerPartitionMerge(TaskContextSupplier context, HoodieConfig hoodieConfig) {
-    if (hoodieConfig.contains(MAX_MEMORY_FOR_MERGE_PROP)) {
-      return hoodieConfig.getLong(MAX_MEMORY_FOR_MERGE_PROP);
+    if (hoodieConfig.contains(MAX_MEMORY_FOR_MERGE)) {
+      return hoodieConfig.getLong(MAX_MEMORY_FOR_MERGE);
     }
-    String fraction = hoodieConfig.getStringOrDefault(MAX_MEMORY_FRACTION_FOR_MERGE_PROP);
+    String fraction = hoodieConfig.getStringOrDefault(MAX_MEMORY_FRACTION_FOR_MERGE);
     return getMaxMemoryAllowedForMerge(context, fraction);
   }
 
   public static long getMaxMemoryPerCompaction(TaskContextSupplier context, HoodieConfig hoodieConfig) {
-    if (hoodieConfig.contains(MAX_MEMORY_FOR_COMPACTION_PROP)) {
-      return hoodieConfig.getLong(MAX_MEMORY_FOR_COMPACTION_PROP);
+    if (hoodieConfig.contains(MAX_MEMORY_FOR_COMPACTION)) {
+      return hoodieConfig.getLong(MAX_MEMORY_FOR_COMPACTION);
     }
-    String fraction = hoodieConfig.getStringOrDefault(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP);
+    String fraction = hoodieConfig.getStringOrDefault(MAX_MEMORY_FRACTION_FOR_COMPACTION);
     return getMaxMemoryAllowedForMerge(context, fraction);
   }
 }
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
index 05a6ada..0046c18 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java
@@ -186,7 +186,7 @@ public class KeyGenUtils {
    */
   public static KeyGenerator createKeyGeneratorByClassName(TypedProperties props) throws IOException {
     KeyGenerator keyGenerator = null;
-    String keyGeneratorClass = props.getString(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP.key(), null);
+    String keyGeneratorClass = props.getString(HoodieWriteConfig.KEYGENERATOR_CLASS.key(), null);
     if (!StringUtils.isNullOrEmpty(keyGeneratorClass)) {
       try {
         keyGenerator = (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, props);
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java
index 393d26e..6010ab9 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/factory/HoodieAvroKeyGeneratorFactory.java
@@ -41,7 +41,7 @@ import java.util.Objects;
 /**
  * Factory help to create {@link org.apache.hudi.keygen.KeyGenerator}.
  * <p>
- * This factory will try {@link HoodieWriteConfig#KEYGENERATOR_CLASS_PROP} firstly, this ensures the class prop
+ * This factory will try {@link HoodieWriteConfig#KEYGENERATOR_CLASS} firstly, this ensures the class prop
  * will not be overwritten by {@link KeyGeneratorType}
  */
 public class HoodieAvroKeyGeneratorFactory {
@@ -57,10 +57,10 @@ public class HoodieAvroKeyGeneratorFactory {
   private static KeyGenerator createAvroKeyGeneratorByType(TypedProperties props) throws IOException {
     // Use KeyGeneratorType.SIMPLE as default keyGeneratorType
     String keyGeneratorType =
-        props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key(), null);
+        props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), null);
 
     if (StringUtils.isNullOrEmpty(keyGeneratorType)) {
-      LOG.info("The value of {} is empty, using SIMPLE", HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key());
+      LOG.info("The value of {} is empty, using SIMPLE", HoodieWriteConfig.KEYGENERATOR_TYPE.key());
       keyGeneratorType = KeyGeneratorType.SIMPLE.name();
     }
 
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 89f7a97..ed6b9e6 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -44,9 +44,9 @@ public class TestHoodieWriteConfig {
   public void testPropertyLoading(boolean withAlternative) throws IOException {
     Builder builder = HoodieWriteConfig.newBuilder().withPath("/tmp");
     Map<String, String> params = new HashMap<>(3);
-    params.put(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP.key(), "1");
-    params.put(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP_PROP.key(), "5");
-    params.put(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP.key(), "2");
+    params.put(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "1");
+    params.put(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "5");
+    params.put(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "2");
     if (withAlternative) {
       params.put("hoodie.avro.schema.externalTransformation", "true");
     } else {
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java
index 52ab09d..7983c66 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestCreateAvroKeyGeneratorByTypeWithFactory.java
@@ -70,7 +70,7 @@ public class TestCreateAvroKeyGeneratorByTypeWithFactory {
   @ParameterizedTest
   @MethodSource("configParams")
   public void testKeyGeneratorTypes(String keyGenType) throws IOException {
-    props.put(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key(), keyGenType);
+    props.put(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), keyGenType);
     KeyGeneratorType keyType = KeyGeneratorType.valueOf(keyGenType);
 
     KeyGenerator keyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator(props);
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestHoodieAvroKeyGeneratorFactory.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestHoodieAvroKeyGeneratorFactory.java
index 2ae2e3b..11771c3 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestHoodieAvroKeyGeneratorFactory.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/keygen/factory/TestHoodieAvroKeyGeneratorFactory.java
@@ -38,30 +38,30 @@ public class TestHoodieAvroKeyGeneratorFactory {
     TypedProperties props = getCommonProps();
 
     // set KeyGenerator type only
-    props.put(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key(), KeyGeneratorType.SIMPLE.name());
+    props.put(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), KeyGeneratorType.SIMPLE.name());
     KeyGenerator keyGenerator = HoodieAvroKeyGeneratorFactory.createKeyGenerator(props);
     Assertions.assertEquals(SimpleAvroKeyGenerator.class.getName(), keyGenerator.getClass().getName());
 
     // set KeyGenerator class only
     props = getCommonProps();
-    props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP.key(), SimpleAvroKeyGenerator.class.getName());
+    props.put(HoodieWriteConfig.KEYGENERATOR_CLASS.key(), SimpleAvroKeyGenerator.class.getName());
     KeyGenerator keyGenerator2 = HoodieAvroKeyGeneratorFactory.createKeyGenerator(props);
     Assertions.assertEquals(SimpleAvroKeyGenerator.class.getName(), keyGenerator2.getClass().getName());
 
     // set both class name and keyGenerator type
-    props.put(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key(), KeyGeneratorType.CUSTOM.name());
+    props.put(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), KeyGeneratorType.CUSTOM.name());
     KeyGenerator keyGenerator3 = HoodieAvroKeyGeneratorFactory.createKeyGenerator(props);
     // KEYGENERATOR_TYPE_PROP was overitten by KEYGENERATOR_CLASS_PROP
     Assertions.assertEquals(SimpleAvroKeyGenerator.class.getName(), keyGenerator3.getClass().getName());
 
     // set wrong class name
     final TypedProperties props2 = getCommonProps();
-    props2.put(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP.key(), TestHoodieAvroKeyGeneratorFactory.class.getName());
+    props2.put(HoodieWriteConfig.KEYGENERATOR_CLASS.key(), TestHoodieAvroKeyGeneratorFactory.class.getName());
     assertThrows(IOException.class, () -> HoodieAvroKeyGeneratorFactory.createKeyGenerator(props2));
 
     // set wrong keyGenerator type
     final TypedProperties props3 = getCommonProps();
-    props3.put(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key(), "wrong_type");
+    props3.put(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), "wrong_type");
     assertThrows(HoodieKeyGeneratorException.class, () -> HoodieAvroKeyGeneratorFactory.createKeyGenerator(props3));
   }
 
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
index 1c1cf5e..4dd5cd0 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
@@ -65,7 +65,7 @@ public class SparkSortAndSizeExecutionStrategy<T extends HoodieRecordPayload<T>>
     Properties props = getWriteConfig().getProps();
     props.put(HoodieWriteConfig.BULKINSERT_PARALLELISM.key(), String.valueOf(numOutputGroups));
     // We are calling another action executor - disable auto commit. Strategy is only expected to write data in new files.
-    props.put(HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP.key(), Boolean.FALSE.toString());
+    props.put(HoodieWriteConfig.HOODIE_AUTO_COMMIT.key(), Boolean.FALSE.toString());
     props.put(HoodieStorageConfig.PARQUET_FILE_MAX_BYTES.key(), String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
     HoodieWriteConfig newConfig =  HoodieWriteConfig.newBuilder().withProps(props).build();
     return (JavaRDD<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
index 579e2b3..b3b9eb4 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java
@@ -105,7 +105,7 @@ public class SparkHoodieHBaseIndex<T extends HoodieRecordPayload> extends SparkH
 
   /**
    * multiPutBatchSize will be computed and re-set in updateLocation if
-   * {@link HoodieHBaseIndexConfig#HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP} is set to true.
+   * {@link HoodieHBaseIndexConfig#HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE} is set to true.
    */
   private Integer multiPutBatchSize;
   private Integer numRegionServersForTable;
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
index a31ee9b..0fd6011 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/factory/HoodieSparkKeyGeneratorFactory.java
@@ -43,7 +43,7 @@ import java.util.Objects;
 /**
  * Factory help to create {@link org.apache.hudi.keygen.KeyGenerator}.
  * <p>
- * This factory will try {@link HoodieWriteConfig#KEYGENERATOR_CLASS_PROP} firstly, this ensures the class prop
+ * This factory will try {@link HoodieWriteConfig#KEYGENERATOR_CLASS} firstly, this ensures the class prop
  * will not be overwritten by {@link KeyGeneratorType}
  */
 public class HoodieSparkKeyGeneratorFactory {
@@ -60,10 +60,10 @@ public class HoodieSparkKeyGeneratorFactory {
   private static BuiltinKeyGenerator createKeyGeneratorByType(TypedProperties props) throws IOException {
     // Use KeyGeneratorType.SIMPLE as default keyGeneratorType
     String keyGeneratorType =
-        props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key(), null);
+        props.getString(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), null);
 
     if (StringUtils.isNullOrEmpty(keyGeneratorType)) {
-      LOG.info("The value of {} is empty, use SIMPLE", HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key());
+      LOG.info("The value of {} is empty, use SIMPLE", HoodieWriteConfig.KEYGENERATOR_TYPE.key());
       keyGeneratorType = KeyGeneratorType.SIMPLE.name();
     }
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 056f98c..a1f8180 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -833,7 +833,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
       .setTimelineLayoutVersion(VERSION_0)
       .initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
     // Set rollback to LAZY so no inflights are deleted
-    hoodieWriteConfig.getProps().put(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY_PROP.key(),
+    hoodieWriteConfig.getProps().put(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY.key(),
         HoodieFailedWritesCleaningPolicy.LAZY.name());
     SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
index d5d66e7..5d96e1b 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
@@ -470,9 +470,9 @@ public class TestHBaseIndex extends FunctionalTestHarness {
   public void testHbaseTagLocationForArchivedCommits() throws Exception {
     // Load to memory
     Map<String, String> params = new HashMap<String, String>();
-    params.put(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED_PROP.key(), "1");
-    params.put(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP_PROP.key(), "3");
-    params.put(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP_PROP.key(), "2");
+    params.put(HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), "1");
+    params.put(HoodieCompactionConfig.MAX_COMMITS_TO_KEEP.key(), "3");
+    params.put(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), "2");
     HoodieWriteConfig config = getConfigBuilder(100, false, false).withProps(params).build();
 
     SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestSparkIOUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestSparkIOUtils.java
index f6fb4e4..7490a4d 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestSparkIOUtils.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestSparkIOUtils.java
@@ -25,8 +25,8 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
-import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP;
-import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE_PROP;
+import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_COMPACTION;
+import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FRACTION_FOR_MERGE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class TestSparkIOUtils {
@@ -55,10 +55,10 @@ public class TestSparkIOUtils {
 
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(path).build();
 
-    String compactionFraction = config.getProps().getProperty(MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP.key(), MAX_MEMORY_FRACTION_FOR_COMPACTION_PROP.defaultValue());
+    String compactionFraction = config.getProps().getProperty(MAX_MEMORY_FRACTION_FOR_COMPACTION.key(), MAX_MEMORY_FRACTION_FOR_COMPACTION.defaultValue());
     long compactionMaxSize = IOUtils.getMaxMemoryAllowedForMerge(contextSupplier, compactionFraction);
 
-    String mergeFraction = config.getProps().getProperty(MAX_MEMORY_FRACTION_FOR_MERGE_PROP.key(), MAX_MEMORY_FRACTION_FOR_MERGE_PROP.defaultValue());
+    String mergeFraction = config.getProps().getProperty(MAX_MEMORY_FRACTION_FOR_MERGE.key(), MAX_MEMORY_FRACTION_FOR_MERGE.defaultValue());
     long mergeMaxSize = IOUtils.getMaxMemoryAllowedForMerge(contextSupplier, mergeFraction);
 
     assertEquals(mergeMaxSize, IOUtils.getMaxMemoryPerPartitionMerge(contextSupplier, config));
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
index 1d23429..d479923 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java
@@ -53,9 +53,9 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
       properties.put(KeyGeneratorOptions.RECORDKEY_FIELD.key(), "_row_key");
     }
     if (useKeyGeneratorClassName) {
-      properties.put(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP.key(), CustomKeyGenerator.class.getName());
+      properties.put(HoodieWriteConfig.KEYGENERATOR_CLASS.key(), CustomKeyGenerator.class.getName());
     } else {
-      properties.put(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key(), KeyGeneratorType.CUSTOM.name());
+      properties.put(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), KeyGeneratorType.CUSTOM.name());
     }
     properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING.key(), "true");
     return properties;
@@ -96,9 +96,9 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities {
     TypedProperties properties = new TypedProperties();
     properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD.key(), "timestamp:simple");
     if (useKeyGeneratorClassName) {
-      properties.put(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP.key(), CustomKeyGenerator.class.getName());
+      properties.put(HoodieWriteConfig.KEYGENERATOR_CLASS.key(), CustomKeyGenerator.class.getName());
     } else {
-      properties.put(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key(), KeyGeneratorType.CUSTOM.name());
+      properties.put(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), KeyGeneratorType.CUSTOM.name());
     }
     return properties;
   }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java
index 0f54fd6..10bc506 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/factory/TestCreateKeyGeneratorByTypeWithFactory.java
@@ -71,7 +71,7 @@ public class TestCreateKeyGeneratorByTypeWithFactory {
   @ParameterizedTest
   @MethodSource("configParams")
   public void testKeyGeneratorTypes(String keyGenType) throws IOException {
-    props.put(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key(), keyGenType);
+    props.put(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), keyGenType);
     KeyGeneratorType keyType = KeyGeneratorType.valueOf(keyGenType);
 
     KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java
index 5b89c7e..8aaba76 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/factory/TestHoodieSparkKeyGeneratorFactory.java
@@ -45,30 +45,30 @@ public class TestHoodieSparkKeyGeneratorFactory {
     TypedProperties props = getCommonProps();
 
     // set KeyGenerator type only
-    props.put(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key(), KeyGeneratorType.SIMPLE.name());
+    props.put(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), KeyGeneratorType.SIMPLE.name());
     KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
     Assertions.assertEquals(SimpleKeyGenerator.class.getName(), keyGenerator.getClass().getName());
 
     // set KeyGenerator class only
     props = getCommonProps();
-    props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP.key(), SimpleKeyGenerator.class.getName());
+    props.put(HoodieWriteConfig.KEYGENERATOR_CLASS.key(), SimpleKeyGenerator.class.getName());
     KeyGenerator keyGenerator2 = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
     Assertions.assertEquals(SimpleKeyGenerator.class.getName(), keyGenerator2.getClass().getName());
 
     // set both class name and keyGenerator type
-    props.put(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key(), KeyGeneratorType.CUSTOM.name());
+    props.put(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), KeyGeneratorType.CUSTOM.name());
     KeyGenerator keyGenerator3 = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
     // KEYGENERATOR_TYPE_PROP was overitten by KEYGENERATOR_CLASS_PROP
     Assertions.assertEquals(SimpleKeyGenerator.class.getName(), keyGenerator3.getClass().getName());
 
     // set wrong class name
     final TypedProperties props2 = getCommonProps();
-    props2.put(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP.key(), TestHoodieSparkKeyGeneratorFactory.class.getName());
+    props2.put(HoodieWriteConfig.KEYGENERATOR_CLASS.key(), TestHoodieSparkKeyGeneratorFactory.class.getName());
     assertThrows(IOException.class, () -> HoodieSparkKeyGeneratorFactory.createKeyGenerator(props2));
 
     // set wrong keyGenerator type
     final TypedProperties props3 = getCommonProps();
-    props3.put(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key(), "wrong_type");
+    props3.put(HoodieWriteConfig.KEYGENERATOR_TYPE.key(), "wrong_type");
     assertThrows(HoodieKeyGeneratorException.class, () -> HoodieSparkKeyGeneratorFactory.createKeyGenerator(props3));
   }
 
diff --git a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 21fdb84..5a2751a 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -309,13 +309,13 @@ public class FlinkOptions extends HoodieConfig {
           + "By default false (the names of partition folders are only partition values)");
 
   public static final ConfigOption<String> KEYGEN_CLASS = ConfigOptions
-      .key(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP.key())
+      .key(HoodieWriteConfig.KEYGENERATOR_CLASS.key())
       .stringType()
       .defaultValue("")
       .withDescription("Key generator class, that implements will extract the key out of incoming record");
 
   public static final ConfigOption<String> KEYGEN_TYPE = ConfigOptions
-      .key(HoodieWriteConfig.KEYGENERATOR_TYPE_PROP.key())
+      .key(HoodieWriteConfig.KEYGENERATOR_TYPE.key())
       .stringType()
       .defaultValue(KeyGeneratorType.SIMPLE.name())
       .withDescription("Key generator type, that implements will extract the key out of incoming record");
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index bcdc5aa..603fda4 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -281,8 +281,8 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
               HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
           .withReadBlocksLazily(true)
           .withReverseReader(false)
-          .withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP.defaultValue())
-          .withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH_PROP.defaultValue())
+          .withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
+          .withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue())
           .withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
           .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
           .build();
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 6e4b377..de7d375 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -52,6 +52,8 @@ object DataSourceReadOptions {
     .withDocumentation("Whether data needs to be read, in incremental mode (new data since an instantTime) " +
       "(or) Read Optimized mode (obtain latest view, based on base files) (or) Snapshot mode " +
       "(obtain latest view, by merging base and (if any) log files)")
+  @Deprecated
+  val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key()
 
   val REALTIME_SKIP_MERGE_OPT_VAL = "skip_merge"
   val REALTIME_PAYLOAD_COMBINE_OPT_VAL = "payload_combine"
@@ -61,13 +63,17 @@ object DataSourceReadOptions {
     .withDocumentation("For Snapshot query on merge on read table, control whether we invoke the record " +
       s"payload implementation to merge (${REALTIME_PAYLOAD_COMBINE_OPT_VAL}) or skip merging altogether" +
       s"${REALTIME_SKIP_MERGE_OPT_VAL}")
+  @Deprecated
+  val REALTIME_MERGE_OPT_KEY = REALTIME_MERGE.key()
 
   val READ_PATHS: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.read.paths")
     .noDefaultValue()
     .withDocumentation("Comma separated list of file paths to read within a Hudi table.")
+  @Deprecated
+  val READ_PATHS_OPT_KEY = READ_PATHS.key()
 
-  val READ_PRE_COMBINE_FIELD = HoodieWriteConfig.PRECOMBINE_FIELD_PROP
+  val READ_PRE_COMBINE_FIELD = HoodieWriteConfig.PRECOMBINE_FIELD
 
   val ENABLE_HOODIE_FILE_INDEX: ConfigProperty[Boolean] = ConfigProperty
     .key("hoodie.file.index.enable")
@@ -92,17 +98,23 @@ object DataSourceReadOptions {
     .withDocumentation("Instant time to start incrementally pulling data from. The instanttime here need not necessarily " +
       "correspond to an instant on the timeline. New data written with an instant_time > BEGIN_INSTANTTIME are fetched out. " +
       "For e.g: ‘20170901080000’ will get all new data written after Sep 1, 2017 08:00AM.")
+  @Deprecated
+  val BEGIN_INSTANTTIME_OPT_KEY = BEGIN_INSTANTTIME.key()
 
   val END_INSTANTTIME: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.read.end.instanttime")
     .noDefaultValue()
     .withDocumentation("Instant time to limit incrementally fetched data to. " +
       "New data written with an instant_time <= END_INSTANTTIME are fetched out.")
+  @Deprecated
+  val END_INSTANTTIME_OPT_KEY = END_INSTANTTIME.key()
 
   val INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.read.schema.use.end.instanttime")
     .defaultValue("false")
     .withDocumentation("Uses end instant schema when incrementally fetched data to. Default: users latest instant schema.")
+  @Deprecated
+  val INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_KEY = INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME.key()
 
   val PUSH_DOWN_INCR_FILTERS: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.read.incr.filters")
@@ -110,12 +122,16 @@ object DataSourceReadOptions {
     .withDocumentation("For use-cases like DeltaStreamer which reads from Hoodie Incremental table and applies "
       + "opaque map functions, filters appearing late in the sequence of transformations cannot be automatically "
       + "pushed down. This option allows setting filters directly on Hoodie Source.")
+  @Deprecated
+  val PUSH_DOWN_INCR_FILTERS_OPT_KEY = PUSH_DOWN_INCR_FILTERS.key()
 
   val INCR_PATH_GLOB: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.read.incr.path.glob")
     .defaultValue("")
     .withDocumentation("For the use-cases like users only want to incremental pull from certain partitions "
       + "instead of the full table. This option allows using glob pattern to directly filter on path.")
+  @Deprecated
+  val INCR_PATH_GLOB_OPT_KEY = INCR_PATH_GLOB.key()
 
   val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = ConfigProperty
     .key("as.of.instant")
@@ -144,6 +160,8 @@ object DataSourceWriteOptions {
     .withDocumentation("Whether to do upsert, insert or bulkinsert for the write operation. " +
       "Use bulkinsert to load new data into a table, and there on use upsert/insert. " +
       "bulk insert uses a disk based write path to scale to load large inputs without need to cache it.")
+  @Deprecated
+  val OPERATION_OPT_KEY = OPERATION.key()
 
   val COW_TABLE_TYPE_OPT_VAL = HoodieTableType.COPY_ON_WRITE.name
   val MOR_TABLE_TYPE_OPT_VAL = HoodieTableType.MERGE_ON_READ.name
@@ -152,6 +170,8 @@ object DataSourceWriteOptions {
     .defaultValue(COW_TABLE_TYPE_OPT_VAL)
     .withAlternatives("hoodie.datasource.write.storage.type")
     .withDocumentation("The table type for the underlying data, for this write. This can’t change between writes.")
+  @Deprecated
+  val TABLE_TYPE_OPT_KEY = TABLE_TYPE.key()
 
   @Deprecated
   val STORAGE_TYPE_OPT = "hoodie.datasource.write.storage.type"
@@ -203,19 +223,25 @@ object DataSourceWriteOptions {
     .key("hoodie.datasource.write.table.name")
     .noDefaultValue()
     .withDocumentation("Table name for the datasource write. Also used to register the table into meta stores.")
+  @Deprecated
+  val TABLE_NAME_OPT_KEY = TABLE_NAME.key()
 
   /**
     * Field used in preCombining before actual write. When two records have the same
     * key value, we will pick the one with the largest value for the precombine field,
     * determined by Object.compareTo(..)
     */
-  val PRECOMBINE_FIELD = HoodieWriteConfig.PRECOMBINE_FIELD_PROP
+  val PRECOMBINE_FIELD = HoodieWriteConfig.PRECOMBINE_FIELD
+  @Deprecated
+  val PRECOMBINE_FIELD_OPT_KEY = HoodieWriteConfig.PRECOMBINE_FIELD.key()
 
   /**
     * Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting.
     * This will render any value set for `PRECOMBINE_FIELD_OPT_VAL` in-effective
     */
   val PAYLOAD_CLASS = HoodieWriteConfig.WRITE_PAYLOAD_CLASS
+  @Deprecated
+  val PAYLOAD_CLASS_OPT_KEY = HoodieWriteConfig.WRITE_PAYLOAD_CLASS.key()
 
   /**
     * Record key field. Value to be used as the `recordKey` component of `HoodieKey`. Actual value
@@ -224,12 +250,16 @@ object DataSourceWriteOptions {
     *
     */
   val RECORDKEY_FIELD = KeyGeneratorOptions.RECORDKEY_FIELD
+  @Deprecated
+  val RECORDKEY_FIELD_OPT_KEY = KeyGeneratorOptions.RECORDKEY_FIELD.key()
 
   /**
     * Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`. Actual
     * value obtained by invoking .toString()
     */
   val PARTITIONPATH_FIELD = KeyGeneratorOptions.PARTITIONPATH_FIELD
+  @Deprecated
+  val PARTITIONPATH_FIELD_OPT_KEY = KeyGeneratorOptions.PARTITIONPATH_FIELD.key()
 
   /**
     * Flag to indicate whether to use Hive style partitioning.
@@ -238,13 +268,19 @@ object DataSourceWriteOptions {
     */
   val HIVE_STYLE_PARTITIONING = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING
   val URL_ENCODE_PARTITIONING = KeyGeneratorOptions.URL_ENCODE_PARTITIONING
+  @Deprecated
+  val HIVE_STYLE_PARTITIONING_OPT_KEY = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING.key()
+  @Deprecated
+  val URL_ENCODE_PARTITIONING_OPT_KEY = KeyGeneratorOptions.URL_ENCODE_PARTITIONING.key()
 
   /**
     * Key generator class, that implements will extract the key out of incoming record
     *
     */
-  val KEYGENERATOR_CLASS = HoodieWriteConfig.KEYGENERATOR_CLASS_PROP
+  val KEYGENERATOR_CLASS = HoodieWriteConfig.KEYGENERATOR_CLASS
   val DEFAULT_KEYGENERATOR_CLASS_OPT_VAL = classOf[SimpleKeyGenerator].getName
+  @Deprecated
+  val KEYGENERATOR_CLASS_OPT_KEY = HoodieWriteConfig.KEYGENERATOR_CLASS.key()
 
   /**
    *
@@ -255,6 +291,8 @@ object DataSourceWriteOptions {
     .defaultValue("false")
     .withDocumentation("When set to true, will perform write operations directly using the spark native " +
       "`Row` representation, avoiding any additional conversion costs.")
+  @Deprecated
+  val ENABLE_ROW_WRITER_OPT_KEY = ENABLE_ROW_WRITER.key()
 
   /**
    * Enable the bulk insert for sql insert statement.
@@ -277,21 +315,29 @@ object DataSourceWriteOptions {
     .defaultValue("_")
     .withDocumentation("Option keys beginning with this prefix, are automatically added to the commit/deltacommit metadata. " +
       "This is useful to store checkpointing information, in a consistent way with the hudi timeline")
+  @Deprecated
+  val COMMIT_METADATA_KEYPREFIX_OPT_KEY = COMMIT_METADATA_KEYPREFIX.key()
 
   val INSERT_DROP_DUPS: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.write.insert.drop.duplicates")
     .defaultValue("false")
     .withDocumentation("If set to true, filters out all duplicate records from incoming dataframe, during insert operations.")
+  @Deprecated
+  val INSERT_DROP_DUPS_OPT_KEY = INSERT_DROP_DUPS.key()
 
   val STREAMING_RETRY_CNT: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.write.streaming.retry.count")
     .defaultValue("3")
     .withDocumentation("Config to indicate how many times streaming job should retry for a failed micro batch.")
+  @Deprecated
+  val STREAMING_RETRY_CNT_OPT_KEY = STREAMING_RETRY_CNT.key()
 
   val STREAMING_RETRY_INTERVAL_MS: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.write.streaming.retry.interval.ms")
     .defaultValue("2000")
     .withDocumentation(" Config to indicate how long (by millisecond) before a retry should issued for failed microbatch")
+  @Deprecated
+  val STREAMING_RETRY_INTERVAL_MS_OPT_KEY = STREAMING_RETRY_INTERVAL_MS.key()
 
   /**
    *
@@ -302,6 +348,8 @@ object DataSourceWriteOptions {
     .defaultValue("true")
     .withDocumentation("Config to indicate whether to ignore any non exception error (e.g. writestatus error)"
       + " within a streaming microbatch")
+  @Deprecated
+  val STREAMING_IGNORE_FAILED_BATCH_OPT_KEY = STREAMING_IGNORE_FAILED_BATCH.key()
 
   val META_SYNC_CLIENT_TOOL_CLASS: ConfigProperty[String] = ConfigProperty
     .key("hoodie.meta.sync.client.tool.class")
@@ -322,61 +370,85 @@ object DataSourceWriteOptions {
     .key("hoodie.datasource.hive_sync.enable")
     .defaultValue("false")
     .withDocumentation("When set to true, register/sync the table to Apache Hive metastore")
+  @Deprecated
+  val HIVE_SYNC_ENABLED_OPT_KEY = HIVE_SYNC_ENABLED.key()
 
   val META_SYNC_ENABLED: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.meta.sync.enable")
     .defaultValue("false")
     .withDocumentation("")
+  @Deprecated
+  val META_SYNC_ENABLED_OPT_KEY = META_SYNC_ENABLED.key()
 
   val HIVE_DATABASE: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.hive_sync.database")
     .defaultValue("default")
     .withDocumentation("database to sync to")
+  @Deprecated
+  val HIVE_DATABASE_OPT_KEY = HIVE_DATABASE.key()
 
   val HIVE_TABLE: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.hive_sync.table")
     .defaultValue("unknown")
     .withDocumentation("table to sync to")
+  @Deprecated
+  val HIVE_TABLE_OPT_KEY = HIVE_TABLE.key()
 
   val HIVE_BASE_FILE_FORMAT: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.hive_sync.base_file_format")
     .defaultValue("PARQUET")
     .withDocumentation("Base file format for the sync.")
+  @Deprecated
+  val HIVE_BASE_FILE_FORMAT_OPT_KEY = HIVE_BASE_FILE_FORMAT.key()
 
   val HIVE_USER: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.hive_sync.username")
     .defaultValue("hive")
     .withDocumentation("hive user name to use")
+  @Deprecated
+  val HIVE_USER_OPT_KEY = HIVE_USER.key()
 
   val HIVE_PASS: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.hive_sync.password")
     .defaultValue("hive")
     .withDocumentation("hive password to use")
+  @Deprecated
+  val HIVE_PASS_OPT_KEY = HIVE_PASS.key()
 
   val HIVE_URL: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.hive_sync.jdbcurl")
     .defaultValue("jdbc:hive2://localhost:10000")
     .withDocumentation("Hive metastore url")
+  @Deprecated
+  val HIVE_URL_OPT_KEY = HIVE_URL.key()
 
   val HIVE_PARTITION_FIELDS: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.hive_sync.partition_fields")
     .defaultValue("")
     .withDocumentation("field in the table to use for determining hive partition columns.")
+  @Deprecated
+  val HIVE_PARTITION_FIELDS_OPT_KEY = HIVE_PARTITION_FIELDS.key()
 
   val HIVE_PARTITION_EXTRACTOR_CLASS: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.hive_sync.partition_extractor_class")
     .defaultValue(classOf[SlashEncodedDayPartitionValueExtractor].getCanonicalName)
     .withDocumentation("")
+  @Deprecated
+  val HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY = HIVE_PARTITION_EXTRACTOR_CLASS.key()
 
   val HIVE_ASSUME_DATE_PARTITION: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.hive_sync.assume_date_partitioning")
     .defaultValue("false")
     .withDocumentation("Assume partitioning is yyyy/mm/dd")
+  @Deprecated
+  val HIVE_ASSUME_DATE_PARTITION_OPT_KEY = HIVE_ASSUME_DATE_PARTITION.key()
 
   val HIVE_USE_PRE_APACHE_INPUT_FORMAT: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.hive_sync.use_pre_apache_input_format")
     .defaultValue("false")
     .withDocumentation("")
+  @Deprecated
+  val HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY = HIVE_USE_PRE_APACHE_INPUT_FORMAT.key()
 
   // We should use HIVE_SYNC_MODE instead of this config from 0.9.0
   @Deprecated
@@ -385,16 +457,22 @@ object DataSourceWriteOptions {
     .defaultValue("true")
     .deprecatedAfter("0.9.0")
     .withDocumentation("Use JDBC when hive synchronization is enabled")
+  @Deprecated
+  val HIVE_USE_JDBC_OPT_KEY = HIVE_USE_JDBC.key()
 
   val HIVE_AUTO_CREATE_DATABASE: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.hive_sync.auto_create_database")
     .defaultValue("true")
     .withDocumentation("Auto create hive database if does not exists")
+  @Deprecated
+  val HIVE_AUTO_CREATE_DATABASE_OPT_KEY = HIVE_AUTO_CREATE_DATABASE.key()
 
   val HIVE_IGNORE_EXCEPTIONS: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.hive_sync.ignore_exceptions")
     .defaultValue("false")
     .withDocumentation("")
+  @Deprecated
+  val HIVE_IGNORE_EXCEPTIONS_OPT_KEY = HIVE_IGNORE_EXCEPTIONS.key()
 
   val HIVE_SKIP_RO_SUFFIX: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.hive_sync.skip_ro_suffix")
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 9b60d9d..e0b89f1 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -33,7 +33,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, T
 import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils}
-import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP}
+import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH, BOOTSTRAP_INDEX_CLASS}
 import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows}
@@ -170,7 +170,7 @@ object HoodieSparkSqlWriter {
             // Create a HoodieWriteClient & issue the delete.
             val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
             null, path.get, tblName,
-            mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP.key)))
+            mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT.key)))
             .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
 
             if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
@@ -196,7 +196,7 @@ object HoodieSparkSqlWriter {
             // Create a HoodieWriteClient & issue the delete.
             val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
               null, path.get, tblName,
-              mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP.key)))
+              mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT.key)))
               .asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
             // Issue delete partitions
             client.startCommitWithTime(instantTime, commitActionType)
@@ -221,8 +221,8 @@ object HoodieSparkSqlWriter {
               org.apache.hudi.common.util.Option.of(schema))
             val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
               operation.equals(WriteOperationType.UPSERT) ||
-              parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(),
-                HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.defaultValue()).toBoolean
+              parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
+                HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
             val hoodieAllIncomingRecords = genericRecords.map(gr => {
               val hoodieRecord = if (shouldCombine) {
                 val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, hoodieConfig.getString(PRECOMBINE_FIELD), false)
@@ -238,7 +238,7 @@ object HoodieSparkSqlWriter {
 
             // Create a HoodieWriteClient & issue the write.
             val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get,
-              tblName, mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP.key)
+              tblName, mapAsJavaMap(parameters - HoodieWriteConfig.HOODIE_AUTO_COMMIT.key)
             )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
 
             if (isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
@@ -312,10 +312,10 @@ object HoodieSparkSqlWriter {
     val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(parameters)
     val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TABLE_NAME, s"'${HoodieWriteConfig.TABLE_NAME.key}' must be set.")
     val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
-    val bootstrapBasePath = hoodieConfig.getStringOrThrow(BOOTSTRAP_BASE_PATH_PROP,
-      s"'${BOOTSTRAP_BASE_PATH_PROP.key}' is required for '${BOOTSTRAP_OPERATION_OPT_VAL}'" +
+    val bootstrapBasePath = hoodieConfig.getStringOrThrow(BOOTSTRAP_BASE_PATH,
+      s"'${BOOTSTRAP_BASE_PATH.key}' is required for '${BOOTSTRAP_OPERATION_OPT_VAL}'" +
         " operation'")
-    val bootstrapIndexClass = hoodieConfig.getStringOrDefault(BOOTSTRAP_INDEX_CLASS_PROP)
+    val bootstrapIndexClass = hoodieConfig.getStringOrDefault(BOOTSTRAP_INDEX_CLASS)
 
     var schema: String = null
     if (df.schema.nonEmpty) {
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index 02e7427..ec3e411 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -270,7 +270,7 @@ object InsertIntoHoodieTableCommand extends Logging {
         PARTITIONPATH_FIELD.key -> partitionFields,
         PAYLOAD_CLASS.key -> payloadClassName,
         ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
-        HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key -> isPrimaryKeyTable.toString,
+        HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> isPrimaryKeyTable.toString,
         META_SYNC_ENABLED.key -> enableHive.toString,
         HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
         HIVE_USE_JDBC.key -> "false",
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 70b1a60..febd6f3 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -230,7 +230,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
     // insert actions.
     var writeParams = parameters +
       (OPERATION.key -> UPSERT_OPERATION_OPT_VAL) +
-      (HoodieWriteConfig.WRITE_SCHEMA_PROP.key -> getTableSchema.toString) +
+      (HoodieWriteConfig.WRITE_SCHEMA.key -> getTableSchema.toString) +
       (DataSourceWriteOptions.TABLE_TYPE.key -> targetTableType)
 
     // Map of Condition -> Assignments
@@ -277,7 +277,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab
 
     var writeParams = parameters +
       (OPERATION.key -> INSERT_OPERATION_OPT_VAL) +
-      (HoodieWriteConfig.WRITE_SCHEMA_PROP.key -> getTableSchema.toString)
+      (HoodieWriteConfig.WRITE_SCHEMA.key -> getTableSchema.toString)
 
     writeParams += (PAYLOAD_INSERT_CONDITION_AND_ASSIGNMENTS ->
       serializedInsertConditionAndExpressions(insertActions))
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
index 2ae0821..bd2d69f 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
@@ -48,7 +48,7 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
       val keyGenProps = new TypedProperties()
       keyGenProps.putAll(props)
       keyGenProps.remove(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS)
-      keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_PROP.key, beforeKeyGenClassName)
+      keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS.key, beforeKeyGenClassName)
       Some(KeyGenUtils.createKeyGeneratorByClassName(keyGenProps))
     } else {
       None
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
index caa3af4..ea55127 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
@@ -214,9 +214,9 @@ class ExpressionPayload(record: GenericRecord,
    */
   private def initWriteSchemaIfNeed(properties: Properties): Unit = {
     if (writeSchema == null) {
-      ValidationUtils.checkArgument(properties.containsKey(HoodieWriteConfig.WRITE_SCHEMA_PROP.key),
-        s"Missing ${HoodieWriteConfig.WRITE_SCHEMA_PROP.key}")
-      writeSchema = new Schema.Parser().parse(properties.getProperty(HoodieWriteConfig.WRITE_SCHEMA_PROP.key))
+      ValidationUtils.checkArgument(properties.containsKey(HoodieWriteConfig.WRITE_SCHEMA.key),
+        s"Missing ${HoodieWriteConfig.WRITE_SCHEMA.key}")
+      writeSchema = new Schema.Parser().parse(properties.getProperty(HoodieWriteConfig.WRITE_SCHEMA.key))
     }
   }
 
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
index 06e745c..c254e23 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java
@@ -182,7 +182,7 @@ public class HoodieJavaApp {
         .option(DataSourceWriteOptions.KEYGENERATOR_CLASS().key(),
             nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName()
                 : SimpleKeyGenerator.class.getCanonicalName()) // Add Key Extractor
-        .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key(), "1")
+        .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1")
         .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "false")
         .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true")
         .option(HoodieWriteConfig.TABLE_NAME.key(), tableName).mode(SaveMode.Append);
@@ -210,7 +210,7 @@ public class HoodieJavaApp {
         .option(DataSourceWriteOptions.KEYGENERATOR_CLASS().key(),
             nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName()
                 : SimpleKeyGenerator.class.getCanonicalName()) // Add Key Extractor
-        .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key(), "1")
+        .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1")
         .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "false")
         .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true")
         .option(HoodieWriteConfig.TABLE_NAME.key(), tableName).mode(SaveMode.Append);
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
index dfaa887..ea71e55 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
@@ -360,7 +360,7 @@ public class HoodieJavaStreamingApp {
         .option(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key")
         .option(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition")
         .option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp")
-        .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key(), "1")
+        .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1")
         .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE().key(), "true")
         .option(DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE().key(), "true")
         .option(HoodieWriteConfig.TABLE_NAME.key(), tableName).option("checkpointLocation", checkpointLocation)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
index 84b9c98..e767f8a 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/HoodieSparkSqlWriterSuite.scala
@@ -428,7 +428,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
             .save(srcPath.toAbsolutePath.toString)
 
           val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
-            HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP.key -> srcPath.toAbsolutePath.toString,
+            HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH.key -> srcPath.toAbsolutePath.toString,
             HoodieWriteConfig.TABLE_NAME.key -> hoodieFooTableName,
             DataSourceWriteOptions.TABLE_TYPE.key -> tableType,
             HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM.key -> "4",
@@ -652,7 +652,7 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
         df.write.mode(SaveMode.Overwrite).save(baseBootStrapPath)
         spark.emptyDataFrame.write.format("hudi")
           .options(options)
-          .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP.key, baseBootStrapPath)
+          .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH.key, baseBootStrapPath)
           .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS.key, classOf[NonpartitionedKeyGenerator].getCanonicalName)
           .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
           .option(HoodieBootstrapConfig.BOOTSTRAP_PARALLELISM.key, "4")
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 9c182b4..04c6a10 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -510,7 +510,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
     inputDF1.write.format("org.apache.hudi")
       .options(commonOpts)
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
-      .option(HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP.key, "true")
+      .option(HoodieWriteConfig.HOODIE_AUTO_COMMIT.key, "true")
       .mode(SaveMode.Overwrite)
       .save(basePath)
 
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index 5da87b1..8c0a337 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -297,8 +297,8 @@ class TestDataSourceForBootstrap {
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
       .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
-      .option(HoodieCompactionConfig.INLINE_COMPACT_PROP.key, "true")
-      .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP.key, "1")
+      .option(HoodieCompactionConfig.INLINE_COMPACT.key, "true")
+      .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key, "1")
       .mode(SaveMode.Append)
       .save(basePath)
 
@@ -429,7 +429,7 @@ class TestDataSourceForBootstrap {
       .options(commonOpts)
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
-      .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP.key, srcPath)
+      .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH.key, srcPath)
       .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS.key, classOf[SimpleKeyGenerator].getName)
       .option(HoodieBootstrapConfig.BOOTSTRAP_MODE_SELECTOR.key, classOf[FullRecordBootstrapModeSelector].getName)
       .option(HoodieBootstrapConfig.FULL_BOOTSTRAP_INPUT_PROVIDER.key, classOf[SparkParquetBootstrapDataProvider].getName)
@@ -480,7 +480,7 @@ class TestDataSourceForBootstrap {
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType)
       .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, partitionColumns.getOrElse(""))
-      .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP.key, srcPath)
+      .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH.key, srcPath)
       .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS.key, classOf[SimpleKeyGenerator].getName)
       .mode(SaveMode.Overwrite)
       .save(basePath)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 82f6cf9..b7daecd 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -753,7 +753,7 @@ class TestMORDataSource extends HoodieClientTestBase {
       .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
       // Use InMemoryIndex to generate log only mor table.
-      .option(HoodieIndexConfig.INDEX_TYPE_PROP.key, IndexType.INMEMORY.toString)
+      .option(HoodieIndexConfig.INDEX_TYPE.key, IndexType.INMEMORY.toString)
       .mode(SaveMode.Overwrite)
       .save(basePath)
     // There should no base file in the file list.
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index a8bd9bc..9286ee0 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -193,8 +193,8 @@ class TestStructuredStreaming extends HoodieClientTestBase {
 
   def getClusteringOpts(isInlineClustering: String, isAsyncClustering: String, isAsyncCompaction: String,
                         clusteringNumCommit: String, fileMaxRecordNum: Int):Map[String, String] = {
-    commonOpts + (HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key -> isInlineClustering,
-      HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP.key -> clusteringNumCommit,
+    commonOpts + (HoodieClusteringConfig.INLINE_CLUSTERING.key -> isInlineClustering,
+      HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT.key -> clusteringNumCommit,
       DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering,
       DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key -> isAsyncCompaction,
       HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMIT_PROP.key -> clusteringNumCommit,
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 5272d20..f4597fd 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -102,11 +102,11 @@ import scala.collection.JavaConversions;
 
 import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP;
 import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
-import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING_PROP;
-import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_PROP;
-import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP;
-import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT_PROP;
-import static org.apache.hudi.config.HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP;
+import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING;
+import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT;
+import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT;
+import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT;
+import static org.apache.hudi.config.HoodieWriteConfig.HOODIE_AUTO_COMMIT;
 import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
 import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY;
 import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
@@ -706,17 +706,17 @@ public class DeltaSync implements Serializable {
 
     // Validate what deltastreamer assumes of write-config to be really safe
     ValidationUtils.checkArgument(config.inlineCompactionEnabled() == cfg.isInlineCompactionEnabled(),
-        String.format("%s should be set to %s", INLINE_COMPACT_PROP.key(), cfg.isInlineCompactionEnabled()));
+        String.format("%s should be set to %s", INLINE_COMPACT.key(), cfg.isInlineCompactionEnabled()));
     ValidationUtils.checkArgument(config.inlineClusteringEnabled() == cfg.isInlineClusteringEnabled(),
-        String.format("%s should be set to %s", INLINE_CLUSTERING_PROP.key(), cfg.isInlineClusteringEnabled()));
+        String.format("%s should be set to %s", INLINE_CLUSTERING.key(), cfg.isInlineClusteringEnabled()));
     ValidationUtils.checkArgument(config.isAsyncClusteringEnabled() == cfg.isAsyncClusteringEnabled(),
         String.format("%s should be set to %s", ASYNC_CLUSTERING_ENABLE.key(), cfg.isAsyncClusteringEnabled()));
     ValidationUtils.checkArgument(!config.shouldAutoCommit(),
-        String.format("%s should be set to %s", HOODIE_AUTO_COMMIT_PROP.key(), autoCommit));
+        String.format("%s should be set to %s", HOODIE_AUTO_COMMIT.key(), autoCommit));
     ValidationUtils.checkArgument(config.shouldCombineBeforeInsert() == cfg.filterDupes,
-        String.format("%s should be set to %s", COMBINE_BEFORE_INSERT_PROP.key(), cfg.filterDupes));
+        String.format("%s should be set to %s", COMBINE_BEFORE_INSERT.key(), cfg.filterDupes));
     ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert(),
-        String.format("%s should be set to %s", COMBINE_BEFORE_UPSERT_PROP.key(), combineBeforeUpsert));
+        String.format("%s should be set to %s", COMBINE_BEFORE_UPSERT.key(), combineBeforeUpsert));
 
     return config;
   }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 58225b3..ec65d68 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -375,7 +375,7 @@ public class HoodieDeltaStreamer implements Serializable {
 
     public boolean isInlineClusteringEnabled() {
       return Boolean.parseBoolean(String.valueOf(UtilHelpers.getConfig(this.configs).getConfig()
-          .getOrDefault(HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key(), false)));
+          .getOrDefault(HoodieClusteringConfig.INLINE_CLUSTERING.key(), false)));
     }
 
     @Override
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 4ec1f99..7848679 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -711,7 +711,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
     cfg.continuousMode = true;
     cfg.tableType = tableType.name();
     cfg.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
-    cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
+    cfg.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
       if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
@@ -742,7 +742,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
     cfgIngestionJob.continuousMode = true;
     cfgIngestionJob.tableType = tableType.name();
     cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
-    cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
+    cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
     HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc);
 
     // Prepare base dataset with some commits
@@ -769,7 +769,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
         .fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
     cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
     cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
-    cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
+    cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
     HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc);
 
     // re-init ingestion job to start sync service
@@ -795,14 +795,14 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
         .fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
     cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
     cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
-    cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
+    cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
 
     cfgIngestionJob = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
         Arrays.asList(TestIdentityTransformer.class.getName()), PROPS_FILENAME_TEST_MULTI_WRITER, false);
     cfgIngestionJob.continuousMode = true;
     cfgIngestionJob.tableType = tableType.name();
     cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
-    cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
+    cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
     // re-init ingestion job
     HoodieDeltaStreamer ingestionJob3 = new HoodieDeltaStreamer(cfgIngestionJob, jsc);
     // re-init backfill job
@@ -829,7 +829,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
     cfgIngestionJob.continuousMode = true;
     cfgIngestionJob.tableType = tableType.name();
     cfgIngestionJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
-    cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
+    cfgIngestionJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
     HoodieDeltaStreamer ingestionJob = new HoodieDeltaStreamer(cfgIngestionJob, jsc);
 
     // Prepare base dataset with some commits
@@ -871,7 +871,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
     // checkpoint will move from 00000 to 00001 for this backfill job
     cfgBackfillJob.checkpoint = commitMetadataForFirstInstant.getMetadata(CHECKPOINT_KEY);
     cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
-    cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
+    cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
     HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc);
     backfillJob.sync();
 
@@ -977,13 +977,13 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
     List<String> configs = new ArrayList<>();
     configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
     if (!StringUtils.isNullOrEmpty(autoClean)) {
-      configs.add(String.format("%s=%s", HoodieCompactionConfig.AUTO_CLEAN_PROP.key(), autoClean));
+      configs.add(String.format("%s=%s", HoodieCompactionConfig.AUTO_CLEAN.key(), autoClean));
     }
     if (!StringUtils.isNullOrEmpty(inlineCluster)) {
-      configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_PROP.key(), inlineCluster));
+      configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING.key(), inlineCluster));
     }
     if (!StringUtils.isNullOrEmpty(inlineClusterMaxCommit)) {
-      configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP.key(), inlineClusterMaxCommit));
+      configs.add(String.format("%s=%s", HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT.key(), inlineClusterMaxCommit));
     }
     if (!StringUtils.isNullOrEmpty(asyncCluster)) {
       configs.add(String.format("%s=%s", HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE.key(), asyncCluster));
@@ -1308,7 +1308,7 @@ public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
     cfg2.filterDupes = false;
     cfg2.sourceLimit = 2000;
     cfg2.operation = WriteOperationType.UPSERT;
-    cfg2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN_PROP.key()));
+    cfg2.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
     HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg2, jsc);
     ds2.sync();
     mClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();