You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/04/01 04:21:39 UTC

[hudi] branch master updated: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty (#8152)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fd885fb3dc [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty (#8152)
6fd885fb3dc is described below

commit 6fd885fb3dc5c66caf6a1775fa87b4f7212056d8
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Sat Apr 1 09:51:28 2023 +0530

    [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty (#8152)
    
    This commit moves the configs for Deltastreamer and schema providers to config classes extending `HoodieConfig` and using `ConfigProperty`, so that when we generate the documentation of configurations for Hudi website, these configs are surfaced.  Configs in the following classes are in scope:
    ```
    InitialCheckPointProvider
    HoodieDeltaStreamer
    HoodieMultiTableDeltaStreamer
    FilebasedSchemaProvider
    HiveSchemaProvider
    JdbcbasedSchemaProvider
    ProtoClassBasedSchemaProvider
    SchemaPostProcessor
    SchemaRegistryProvider
    SparkAvroPostProcessor
    DropColumnSchemaPostProcessor
    BaseSchemaPostProcessorConfig
    KafkaOffsetPostProcessor
    SanitizationUtils
    'hoodie.deltastreamer.multiwriter.source.checkpoint.id' in HoodieWriteConfig
    ```
    
    Co-authored-by: Y Ethan Guo <et...@gmail.com>
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  7 --
 .../apache/hudi/common/config/ConfigGroups.java    |  5 ++
 .../testsuite/HoodieInlineTestSuiteWriter.java     |  5 +-
 .../org/apache/hudi/utilities/UtilHelpers.java     |  7 +-
 .../checkpointing/InitialCheckPointProvider.java   | 18 ++---
 .../config/FilebasedSchemaProviderConfig.java      | 49 ++++++++++++
 .../utilities/config/HiveSchemaProviderConfig.java | 59 ++++++++++++++
 .../config/HoodieDeltaStreamerConfig.java          | 89 ++++++++++++++++++++++
 .../config/HoodieSchemaProviderConfig.java         | 82 ++++++++++++++++++++
 .../config/JdbcbasedSchemaProviderConfig.java      | 77 +++++++++++++++++++
 .../ProtoClassBasedSchemaProviderConfig.java       | 69 +++++++++++++++++
 .../config/SchemaProviderPostProcessorConfig.java  | 76 ++++++++++++++++++
 .../hudi/utilities/deltastreamer/DeltaSync.java    |  6 +-
 .../HoodieMultiTableDeltaStreamer.java             | 57 +++++++-------
 .../deltastreamer/SourceFormatAdapter.java         |  4 +-
 .../utilities/schema/FilebasedSchemaProvider.java  | 17 ++---
 .../hudi/utilities/schema/HiveSchemaProvider.java  | 23 ++----
 .../utilities/schema/JdbcbasedSchemaProvider.java  | 36 ++-------
 .../utilities/schema/KafkaOffsetPostProcessor.java | 12 ++-
 .../schema/ProtoClassBasedSchemaProvider.java      | 52 ++++++-------
 .../hudi/utilities/schema/SchemaPostProcessor.java |  9 ++-
 .../utilities/schema/SchemaRegistryProvider.java   | 25 +++---
 .../utilities/schema/SparkAvroPostProcessor.java   |  5 +-
 .../DropColumnSchemaPostProcessor.java             | 12 ++-
 .../add/AddPrimitiveColumnSchemaPostProcessor.java | 15 ++--
 .../add/BaseSchemaPostProcessorConfig.java         | 53 -------------
 .../hudi/utilities/sources/ProtoKafkaSource.java   |  6 +-
 .../utilities/sources/debezium/DebeziumSource.java |  3 +-
 .../utilities/sources/helpers/AvroConvertor.java   |  4 +-
 .../sources/helpers/SanitizationUtils.java         | 24 +++---
 .../hudi/utilities/TestSchemaPostProcessor.java    | 27 ++++---
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 14 ++--
 .../TestHoodieMultiTableDeltaStreamer.java         |  8 +-
 .../deltastreamer/TestSourceFormatAdapter.java     | 10 +--
 .../schema/TestFilebasedSchemaProvider.java        |  4 +-
 .../schema/TestProtoClassBasedSchemaProvider.java  | 17 +++--
 .../utilities/sources/TestAvroKafkaSource.java     |  6 +-
 .../utilities/sources/TestJsonKafkaSource.java     |  4 +-
 .../utilities/sources/TestProtoKafkaSource.java    |  5 +-
 .../utilities/testutils/SanitizationTestUtils.java |  2 +-
 40 files changed, 708 insertions(+), 295 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 5887604d3ad..2ace2277dd1 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -656,13 +656,6 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Whether to enable commit conflict checking or not during early "
           + "conflict detection.");
 
-  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
-      .key("hoodie.deltastreamer.multiwriter.source.checkpoint.id")
-      .noDefaultValue()
-      .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
-          + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
-          + "a single deltastreamer for a table then you do not need to set this config.");
-
   public static final ConfigProperty<String> SENSITIVE_CONFIG_KEYS_FILTER = ConfigProperty
       .key("hoodie.sensitive.config.keys")
       .defaultValue("ssl,tls,sasl,auth,credentials")
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java
index bd4589b82b1..234a0a5a45a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java
@@ -61,6 +61,11 @@ public class ConfigGroups {
         "Commit Callback Configs",
         "Configurations controlling callback behavior into HTTP endpoints, to push "
             + "notifications on commits on hudi tables."),
+
+    SCHEMA_PROVIDER(
+        "DeltaStreamer Schema Provider Configs",
+        "Configurations that control the schema provider for DeltaStreamer."),
+
     NONE(
         "None",
         "No subgroup. This description should be hidden.");
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java
index 46f793ef12b..1bfa6fb98fb 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java
@@ -35,6 +35,7 @@ import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.compact.CompactHelpers;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 
 import org.apache.avro.Schema;
@@ -196,7 +197,7 @@ public class HoodieInlineTestSuiteWriter extends HoodieTestSuiteWriter {
       Map<String, String> extraMetadata = new HashMap<>();
       /** Store the checkpoint in the commit metadata just like
        * {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/
-      extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get());
+      extraMetadata.put(HoodieDeltaStreamer.CHECKPOINT_KEY, lastCheckpoint.get());
       if (generatedDataStats != null && generatedDataStats.count() > 1) {
         // Just stores the path where this batch of data is generated to
         extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0));
@@ -211,7 +212,7 @@ public class HoodieInlineTestSuiteWriter extends HoodieTestSuiteWriter {
       Map<String, String> extraMetadata = new HashMap<>();
       /** Store the checkpoint in the commit metadata just like
        * {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/
-      extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get());
+      extraMetadata.put(HoodieDeltaStreamer.CHECKPOINT_KEY, lastCheckpoint.get());
       if (generatedDataStats != null && generatedDataStats.count() > 1) {
         // Just stores the path where this batch of data is generated to
         extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0));
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 66c67d105f5..fef5c68109a 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -45,6 +45,8 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.utilities.checkpointing.InitialCheckPointProvider;
+import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
+import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig;
 import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
 import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
@@ -52,7 +54,6 @@ import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
 import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
 import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaPostProcessor;
-import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProviderWithPostProcessor;
 import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
@@ -497,8 +498,8 @@ public class UtilHelpers {
       return provider;
     }
 
-    String schemaPostProcessorClass = cfg.getString(Config.SCHEMA_POST_PROCESSOR_PROP, null);
-    boolean enableSparkAvroPostProcessor = Boolean.parseBoolean(cfg.getString(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE, "true"));
+    String schemaPostProcessorClass = cfg.getString(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR.key(), null);
+    boolean enableSparkAvroPostProcessor = Boolean.parseBoolean(cfg.getString(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key(), "true"));
 
     if (transformerClassNames != null && !transformerClassNames.isEmpty()
         && enableSparkAvroPostProcessor && StringUtils.isNullOrEmpty(schemaPostProcessorClass)) {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java
index 4b9039e4308..bfcd7d19486 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java
@@ -18,16 +18,18 @@
 
 package org.apache.hudi.utilities.checkpointing;
 
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
 import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.PublicAPIClass;
 import org.apache.hudi.PublicAPIMethod;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
 
 /**
  * Provide the initial checkpoint for delta streamer.
@@ -38,17 +40,13 @@ public abstract class InitialCheckPointProvider {
   protected transient FileSystem fs;
   protected transient TypedProperties props;
 
-  static class Config {
-    private static String CHECKPOINT_PROVIDER_PATH_PROP = "hoodie.deltastreamer.checkpoint.provider.path";
-  }
-
   /**
    * Construct InitialCheckPointProvider.
    * @param props All properties passed to Delta Streamer
    */
   public InitialCheckPointProvider(TypedProperties props) {
     this.props = props;
-    this.path = new Path(props.getString(Config.CHECKPOINT_PROVIDER_PATH_PROP));
+    this.path = new Path(props.getString(HoodieDeltaStreamerConfig.CHECKPOINT_PROVIDER_PATH.key()));
   }
 
   /**
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/FilebasedSchemaProviderConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/FilebasedSchemaProviderConfig.java
new file mode 100644
index 00000000000..2c2379c40e1
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/FilebasedSchemaProviderConfig.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import static org.apache.hudi.utilities.config.HoodieSchemaProviderConfig.SCHEMAPROVIDER_CONFIG_PREFIX;
+
+/**
+ * File-based Schema Provider Configs.
+ */
+@Immutable
+@ConfigClassProperty(name = "File-based Schema Provider Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    subGroupName = ConfigGroups.SubGroupNames.SCHEMA_PROVIDER,
+    description = "Configurations for file-based schema provider.")
+public class FilebasedSchemaProviderConfig extends HoodieConfig {
+  public static final ConfigProperty<String> SOURCE_SCHEMA_FILE = ConfigProperty
+      .key(SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.file")
+      .noDefaultValue()
+      .withDocumentation("The schema of the source you are reading from");
+
+  public static final ConfigProperty<String> TARGET_SCHEMA_FILE = ConfigProperty
+      .key(SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.file")
+      .noDefaultValue()
+      .withDocumentation("The schema of the target you are writing to");
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HiveSchemaProviderConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HiveSchemaProviderConfig.java
new file mode 100644
index 00000000000..5d33973b59b
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HiveSchemaProviderConfig.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import static org.apache.hudi.utilities.config.HoodieSchemaProviderConfig.SCHEMAPROVIDER_CONFIG_PREFIX;
+
+/**
+ * Hive Schema Provider Configs.
+ */
+@Immutable
+@ConfigClassProperty(name = "Hive Schema Provider Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    subGroupName = ConfigGroups.SubGroupNames.SCHEMA_PROVIDER,
+    description = "Configurations for Hive schema provider.")
+public class HiveSchemaProviderConfig extends HoodieConfig {
+  public static final ConfigProperty<String> SOURCE_SCHEMA_DATABASE = ConfigProperty
+      .key(SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.hive.database")
+      .noDefaultValue()
+      .withDocumentation("Hive database from where source schema can be fetched");
+
+  public static final ConfigProperty<String> SOURCE_SCHEMA_TABLE = ConfigProperty
+      .key(SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.hive.table")
+      .noDefaultValue()
+      .withDocumentation("Hive table from where source schema can be fetched");
+
+  public static final ConfigProperty<String> TARGET_SCHEMA_DATABASE = ConfigProperty
+      .key(SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.hive.database")
+      .noDefaultValue()
+      .withDocumentation("Hive database from where target schema can be fetched");
+
+  public static final ConfigProperty<String> TARGET_SCHEMA_TABLE = ConfigProperty
+      .key(SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.hive.table")
+      .noDefaultValue()
+      .withDocumentation("Hive table from where target schema can be fetched");
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java
new file mode 100644
index 00000000000..71cd77f9a98
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  public static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("The path for providing the checkpoints.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("Kafka topic name.");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")
+      .defaultValue("false")
+      .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+          + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+          + "By default its disabled and no kafka offsets are added");
+
+  public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.schema.field.names")
+      .defaultValue(false)
+      .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
+          + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
+          + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
+
+  public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.char.mask")
+      .defaultValue("__")
+      .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
+          + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+
+  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "multiwriter.source.checkpoint.id")
+      .noDefaultValue()
+      .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
+          + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
+          + "a single deltastreamer for a table then you do not need to set this config.");
+
+  public static final ConfigProperty<String> TABLES_TO_BE_INGESTED = ConfigProperty
+      .key(INGESTION_PREFIX + "tablesToBeIngested")
+      .noDefaultValue()
+      .withDocumentation("Comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2");
+
+  public static final ConfigProperty<String> TARGET_BASE_PATH = ConfigProperty
+      .key(INGESTION_PREFIX + "targetBasePath")
+      .noDefaultValue()
+      .withDocumentation("The path to which a particular table is ingested. The config is specific to HoodieMultiTableDeltaStreamer"
+          + " and overrides path determined using option `--base-path-prefix` for a table. This config is ignored for a single"
+          + " table deltastreamer");
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieSchemaProviderConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieSchemaProviderConfig.java
new file mode 100644
index 00000000000..6a10905e17e
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieSchemaProviderConfig.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import static org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.DELTA_STREAMER_CONFIG_PREFIX;
+
+/**
+ * Delta Streamer Schema Provider related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Schema Provider Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    subGroupName = ConfigGroups.SubGroupNames.SCHEMA_PROVIDER,
+    areCommonConfigs = true,
+    description = "")
+public class HoodieSchemaProviderConfig extends HoodieConfig {
+  public static final String SCHEMAPROVIDER_CONFIG_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "schemaprovider.";
+
+  public static final ConfigProperty<String> SRC_SCHEMA_REGISTRY_URL = ConfigProperty
+      .key(SCHEMAPROVIDER_CONFIG_PREFIX + "registry.url")
+      .noDefaultValue()
+      .withDocumentation("The schema of the source you are reading from e.g. https://foo:bar@schemaregistry.org");
+
+  public static final ConfigProperty<String> TARGET_SCHEMA_REGISTRY_URL = ConfigProperty
+      .key(SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrl")
+      .noDefaultValue()
+      .withDocumentation("The schema of the target you are writing to e.g. https://foo:bar@schemaregistry.org");
+
+  public static final ConfigProperty<String> SCHEMA_CONVERTER = ConfigProperty
+      .key(SCHEMAPROVIDER_CONFIG_PREFIX + "registry.schemaconverter")
+      .noDefaultValue()
+      .withDocumentation("The class name of the custom schema converter to use.");
+
+  public static final ConfigProperty<Boolean> SPARK_AVRO_POST_PROCESSOR_ENABLE = ConfigProperty
+      .key(SCHEMAPROVIDER_CONFIG_PREFIX + "spark_avro_post_processor.enable")
+      .defaultValue(false)
+      .withDocumentation("Whether to enable Spark Avro post processor.");
+
+  public static final ConfigProperty<String> SCHEMA_REGISTRY_BASE_URL = ConfigProperty
+      .key(SCHEMAPROVIDER_CONFIG_PREFIX + "registry.baseUrl")
+      .noDefaultValue()
+      .withDocumentation("The base URL of the schema registry.");
+
+  public static final ConfigProperty<String> SCHEMA_REGISTRY_URL_SUFFIX = ConfigProperty
+      .key(SCHEMAPROVIDER_CONFIG_PREFIX + "registry.urlSuffix")
+      .noDefaultValue()
+      .withDocumentation("The suffix of the URL for the schema registry.");
+
+  public static final ConfigProperty<String> SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = ConfigProperty
+      .key(SCHEMAPROVIDER_CONFIG_PREFIX + "registry.sourceUrlSuffix")
+      .noDefaultValue()
+      .withDocumentation("The source URL suffix.");
+
+  public static final ConfigProperty<String> SCHEMA_REGISTRY_TARGET_URL_SUFFIX = ConfigProperty
+      .key(SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrlSuffix")
+      .noDefaultValue()
+      .withDocumentation("The target URL suffix.");
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/JdbcbasedSchemaProviderConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/JdbcbasedSchemaProviderConfig.java
new file mode 100644
index 00000000000..dadf6757bd9
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/JdbcbasedSchemaProviderConfig.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import static org.apache.hudi.utilities.config.HoodieSchemaProviderConfig.SCHEMAPROVIDER_CONFIG_PREFIX;
+
+/**
+ * JDBC-based Schema Provider Configs.
+ */
+@Immutable
+@ConfigClassProperty(name = "JDBC-based Schema Provider Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    subGroupName = ConfigGroups.SubGroupNames.SCHEMA_PROVIDER,
+    description = "Configurations for JDBC-based schema provider.")
+public class JdbcbasedSchemaProviderConfig extends HoodieConfig {
+  public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_CONNECTION_URL = ConfigProperty
+      .key(SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.connection.url")
+      .noDefaultValue()
+      .withDocumentation("The JDBC URL to connect to. The source-specific connection properties may be specified in the URL."
+          + " e.g., jdbc:postgresql://localhost/test?user=fred&password=secret");
+
+  public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_DRIVER_TYPE = ConfigProperty
+      .key(SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.driver.type")
+      .noDefaultValue()
+      .withDocumentation("The class name of the JDBC driver to use to connect to this URL. e.g. org.h2.Driver");
+
+  public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_USERNAME = ConfigProperty
+      .key(SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.username")
+      .noDefaultValue()
+      .withDocumentation("Username for the connection e.g. fred");
+
+  public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_PASSWORD = ConfigProperty
+      .key(SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.password")
+      .noDefaultValue()
+      .withDocumentation("Password for the connection e.g. secret");
+
+  public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_DBTABLE = ConfigProperty
+      .key(SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.dbtable")
+      .noDefaultValue()
+      .withDocumentation("The table with the schema to reference e.g. test_database.test1_table or test1_table");
+
+  public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_TIMEOUT = ConfigProperty
+      .key(SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.timeout")
+      .noDefaultValue()
+      .withDocumentation("The number of seconds the driver will wait for a Statement object to execute. Zero means there is no limit. "
+          + "In the write path, this option depends on how JDBC drivers implement the API setQueryTimeout, e.g., the h2 JDBC driver "
+          + "checks the timeout of each query instead of an entire JDBC batch. It defaults to 0.");
+
+  public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_NULLABLE = ConfigProperty
+      .key(SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.nullable")
+      .noDefaultValue()
+      .withDocumentation("If true, all the columns are nullable.");
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ProtoClassBasedSchemaProviderConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ProtoClassBasedSchemaProviderConfig.java
new file mode 100644
index 00000000000..8e62de8f1c0
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ProtoClassBasedSchemaProviderConfig.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import static org.apache.hudi.utilities.config.HoodieSchemaProviderConfig.SCHEMAPROVIDER_CONFIG_PREFIX;
+
+/**
+ * JDBC-based Schema Provider Configs.
+ */
+@Immutable
+@ConfigClassProperty(name = "JDBC-based Schema Provider Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    subGroupName = ConfigGroups.SubGroupNames.SCHEMA_PROVIDER,
+    description = "Configurations for Proto schema provider.")
+public class ProtoClassBasedSchemaProviderConfig extends HoodieConfig {
+  private static final String PROTO_SCHEMA_PROVIDER_PREFIX = SCHEMAPROVIDER_CONFIG_PREFIX + "proto.";
+
+  public static final ConfigProperty<String> PROTO_SCHEMA_CLASS_NAME = ConfigProperty
+      .key(PROTO_SCHEMA_PROVIDER_PREFIX + "class.name")
+      .noDefaultValue()
+      .sinceVersion("0.13.0")
+      .withDocumentation("The Protobuf Message class used as the source for the schema.");
+
+  public static final ConfigProperty<Boolean> PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS = ConfigProperty
+      .key(PROTO_SCHEMA_PROVIDER_PREFIX + "flatten.wrappers")
+      .defaultValue(false)
+      .sinceVersion("0.13.0")
+      .withDocumentation("When set to true wrapped primitives like Int64Value are translated to a record with a single 'value' field. By default, the value is false and the wrapped primitives are "
+          + "treated as a nullable value");
+
+  public static final ConfigProperty<Boolean> PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS = ConfigProperty
+      .key(PROTO_SCHEMA_PROVIDER_PREFIX + "timestamps.as.records")
+      .defaultValue(false)
+      .sinceVersion("0.13.0")
+      .withDocumentation("When set to true Timestamp fields are translated to a record with a seconds and nanos field. By default, the value is false and the timestamp is converted to a long with "
+          + "the timestamp-micros logical type");
+
+  public static final ConfigProperty<Integer> PROTO_SCHEMA_MAX_RECURSION_DEPTH = ConfigProperty
+      .key(PROTO_SCHEMA_PROVIDER_PREFIX + "max.recursion.depth")
+      .defaultValue(5)
+      .sinceVersion("0.13.0")
+      .withDocumentation("The max depth to unravel the Proto schema when translating into an Avro schema. Setting this depth allows the user to convert a schema that is recursive in proto into "
+          + "something that can be represented in their lake format like Parquet. After a given class has been seen N times within a single branch, the schema provider will create a record with a "
+          + "byte array to hold the remaining proto data and a string to hold the message descriptor's name for context.");
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/SchemaProviderPostProcessorConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/SchemaProviderPostProcessorConfig.java
new file mode 100644
index 00000000000..c2ce82c1735
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/SchemaProviderPostProcessorConfig.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Configurations for Schema Post Processor.
+ */
+@Immutable
+@ConfigClassProperty(name = "Schema Post Processor Config Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    subGroupName = ConfigGroups.SubGroupNames.SCHEMA_PROVIDER,
+    description = "Configurations for Schema Post Processor")
+public class SchemaProviderPostProcessorConfig extends HoodieConfig {
+
+  private static final String PREFIX = HoodieSchemaProviderConfig.SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.";
+
+  public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR = ConfigProperty
+      .key(HoodieSchemaProviderConfig.SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor")
+      .noDefaultValue()
+      .withDocumentation("The class name of the schema post processor.");
+
+  public static final ConfigProperty<String> DELETE_COLUMN_POST_PROCESSOR_COLUMN = ConfigProperty
+      .key(PREFIX + "delete.columns")
+      .noDefaultValue()
+      .withDocumentation("Columns to delete in the schema post processor.");
+
+  public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP = ConfigProperty
+      .key(PREFIX + "add.column.name")
+      .noDefaultValue()
+      .withDocumentation("New column's name");
+
+  public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP = ConfigProperty
+      .key(PREFIX + "add.column.type")
+      .noDefaultValue()
+      .withDocumentation("New column's type");
+
+  public static final ConfigProperty<Boolean> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP = ConfigProperty
+      .key(PREFIX + "add.column.nullable")
+      .defaultValue(true)
+      .withDocumentation("New column's nullable");
+
+  public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP = ConfigProperty
+      .key(PREFIX + "add.column.default")
+      .noDefaultValue()
+      .withDocumentation("New column's default value");
+
+  public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP = ConfigProperty
+      .key(PREFIX + "add.column.doc")
+      .noDefaultValue()
+      .withDocumentation("Docs about new column");
+
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 1f79564a7f4..610c215d227 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -144,14 +144,14 @@ import static org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_ENABLED;
 import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE;
 import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT;
 import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT;
-import static org.apache.hudi.config.HoodieWriteConfig.MUTLI_WRITER_SOURCE_CHECKPOINT_ID;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_BUCKET_SYNC_SPEC;
 import static org.apache.hudi.utilities.UtilHelpers.createRecordMerger;
-import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_FORCE_SKIP_PROP;
+import static org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.MUTLI_WRITER_SOURCE_CHECKPOINT_ID;
 import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
-import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY;
 import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.DEFAULT_CHECKPOINT_FORCE_SKIP_PROP;
+import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY;
+import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_FORCE_SKIP_PROP;
 import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
 import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
 
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
index 3532c347050..71018feb18f 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
@@ -18,27 +18,29 @@
 
 package org.apache.hudi.utilities.deltastreamer;
 
-import com.beust.jcommander.Parameter;
 import org.apache.hudi.client.utils.OperationConverter;
+import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hive.HiveSyncTool;
 import org.apache.hudi.sync.common.HoodieSyncConfig;
 import org.apache.hudi.utilities.IdentitySplitter;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
+import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
 import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
+import org.apache.hudi.utilities.sources.JsonDFSSource;
 
 import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hudi.utilities.sources.JsonDFSSource;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -52,9 +54,6 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 
-import static org.apache.hudi.utilities.schema.SchemaRegistryProvider.Config.SRC_SCHEMA_REGISTRY_URL_PROP;
-import static org.apache.hudi.utilities.schema.SchemaRegistryProvider.Config.TARGET_SCHEMA_REGISTRY_URL_PROP;
-
 /**
  * Wrapper over HoodieDeltaStreamer.java class.
  * Helps with ingesting incremental data into hoodie datasets for multiple tables.
@@ -117,7 +116,7 @@ public class HoodieMultiTableDeltaStreamer {
       String[] tableWithDatabase = table.split("\\.");
       String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : "default";
       String currentTable = tableWithDatabase.length > 1 ? tableWithDatabase[1] : table;
-      String configProp = Constants.INGESTION_PREFIX + database + Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
+      String configProp = HoodieDeltaStreamerConfig.INGESTION_PREFIX + database + Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
       String configFilePath = properties.getString(configProp, Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
       checkIfTableConfigFileExists(configFolder, fs, configFilePath);
       TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), new Path(configFilePath), new ArrayList<String>()).getProps();
@@ -130,7 +129,7 @@ public class HoodieMultiTableDeltaStreamer {
       //copy all the values from config to cfg
       String targetBasePath = resetTarget(config, database, currentTable);
       Helpers.deepCopyConfigs(config, cfg);
-      String overriddenTargetBasePath = tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
+      String overriddenTargetBasePath = tableProperties.getString(HoodieDeltaStreamerConfig.TARGET_BASE_PATH.key(), "");
       cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) ? targetBasePath : overriddenTargetBasePath;
       if (cfg.enableMetaSync && StringUtils.isNullOrEmpty(tableProperties.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), ""))) {
         throw new HoodieException("Meta sync table field not provided!");
@@ -146,7 +145,7 @@ public class HoodieMultiTableDeltaStreamer {
   }
 
   private List<String> getTablesToBeIngested(TypedProperties properties) {
-    String combinedTablesString = properties.getString(Constants.TABLES_TO_BE_INGESTED_PROP);
+    String combinedTablesString = properties.getString(HoodieDeltaStreamerConfig.TABLES_TO_BE_INGESTED.key());
     if (combinedTablesString == null) {
       return new ArrayList<>();
     }
@@ -162,32 +161,34 @@ public class HoodieMultiTableDeltaStreamer {
   }
 
   private void populateTargetRegistryProp(TypedProperties typedProperties) {
-    String schemaRegistryTargetUrl = typedProperties.getString(TARGET_SCHEMA_REGISTRY_URL_PROP, null);
+    String schemaRegistryTargetUrl = typedProperties.getString(HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL.key(), null);
     if (StringUtils.isNullOrEmpty(schemaRegistryTargetUrl)) {
-      String schemaRegistryBaseUrl = typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP);
-      String schemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP, null);
+      String schemaRegistryBaseUrl = typedProperties.getString(HoodieSchemaProviderConfig.SCHEMA_REGISTRY_BASE_URL.key());
+      String schemaRegistrySuffix = typedProperties.getString(HoodieSchemaProviderConfig.SCHEMA_REGISTRY_URL_SUFFIX.key(), null);
       String targetSchemaRegistrySuffix;
       if (StringUtils.isNullOrEmpty(schemaRegistrySuffix)) {
-        targetSchemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_TARGET_URL_SUFFIX);
+        targetSchemaRegistrySuffix = typedProperties.getString(HoodieSchemaProviderConfig.SCHEMA_REGISTRY_TARGET_URL_SUFFIX.key());
       } else {
         targetSchemaRegistrySuffix = schemaRegistrySuffix;
       }
-      typedProperties.setProperty(TARGET_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + targetSchemaRegistrySuffix);
+      typedProperties.setProperty(HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL.key(),
+          schemaRegistryBaseUrl + typedProperties.getString(HoodieDeltaStreamerConfig.KAFKA_TOPIC.key()) + targetSchemaRegistrySuffix);
     }
   }
 
   private void populateSourceRegistryProp(TypedProperties typedProperties) {
-    String schemaRegistrySourceUrl = typedProperties.getString(SRC_SCHEMA_REGISTRY_URL_PROP, null);
+    String schemaRegistrySourceUrl = typedProperties.getString(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL.key(), null);
     if (StringUtils.isNullOrEmpty(schemaRegistrySourceUrl)) {
-      String schemaRegistryBaseUrl = typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP);
-      String schemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP, null);
+      String schemaRegistryBaseUrl = typedProperties.getString(HoodieSchemaProviderConfig.SCHEMA_REGISTRY_BASE_URL.key());
+      String schemaRegistrySuffix = typedProperties.getString(HoodieSchemaProviderConfig.SCHEMA_REGISTRY_URL_SUFFIX.key(), null);
       String sourceSchemaRegistrySuffix;
       if (StringUtils.isNullOrEmpty(schemaRegistrySuffix)) {
-        sourceSchemaRegistrySuffix = typedProperties.getString(Constants.SCHEMA_REGISTRY_SOURCE_URL_SUFFIX);
+        sourceSchemaRegistrySuffix = typedProperties.getString(HoodieSchemaProviderConfig.SCHEMA_REGISTRY_SOURCE_URL_SUFFIX.key());
       } else {
         sourceSchemaRegistrySuffix = schemaRegistrySuffix;
       }
-      typedProperties.setProperty(SRC_SCHEMA_REGISTRY_URL_PROP, schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + sourceSchemaRegistrySuffix);
+      typedProperties.setProperty(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL.key(),
+          schemaRegistryBaseUrl + typedProperties.getString(HoodieDeltaStreamerConfig.KAFKA_TOPIC.key()) + sourceSchemaRegistrySuffix);
     }
   }
 
@@ -247,7 +248,7 @@ public class HoodieMultiTableDeltaStreamer {
 
     if (config.targetTableName != null) {
       logger.warn(String.format("--target-table is deprecated and will be removed in a future release due to it's useless;"
-              + " please use %s to configure multiple target tables", Constants.TABLES_TO_BE_INGESTED_PROP));
+          + " please use %s to configure multiple target tables", HoodieDeltaStreamerConfig.TABLES_TO_BE_INGESTED.key()));
     }
 
     JavaSparkContext jssc = UtilHelpers.buildSparkContext("multi-table-delta-streamer", Constants.LOCAL_SPARK_MASTER);
@@ -441,18 +442,12 @@ public class HoodieMultiTableDeltaStreamer {
     }
   }
 
-  public static class Constants {
-    public static final String KAFKA_TOPIC_PROP = "hoodie.deltastreamer.source.kafka.topic";
-    public static final String HIVE_SYNC_TABLE_PROP = "hoodie.datasource.hive_sync.table";
-    private static final String SCHEMA_REGISTRY_BASE_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.baseUrl";
-    private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP = "hoodie.deltastreamer.schemaprovider.registry.urlSuffix";
-    private static final String SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = "hoodie.deltastreamer.schemaprovider.registry.sourceUrlSuffix";
-    private static final String SCHEMA_REGISTRY_TARGET_URL_SUFFIX = "hoodie.deltastreamer.schemaprovider.registry.targetUrlSuffix";
-    private static final String TABLES_TO_BE_INGESTED_PROP = "hoodie.deltastreamer.ingestion.tablesToBeIngested";
-    private static final String INGESTION_PREFIX = "hoodie.deltastreamer.ingestion.";
+  static class Constants {
+    @Deprecated
+    private static final String KAFKA_TOPIC_PROP = HoodieDeltaStreamerConfig.KAFKA_TOPIC.key();
+    static final String HIVE_SYNC_TABLE_PROP = "hoodie.datasource.hive_sync.table";
     private static final String INGESTION_CONFIG_SUFFIX = ".configFile";
     private static final String DEFAULT_CONFIG_FILE_NAME_SUFFIX = "_config.properties";
-    private static final String TARGET_BASE_PATH_PROP = "hoodie.deltastreamer.ingestion.targetBasePath";
     private static final String LOCAL_SPARK_MASTER = "local[2]";
     private static final String FILE_DELIMITER = "/";
     private static final String DELIMITER = ".";
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
index afc9c3416fc..26512a405a5 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
@@ -57,8 +57,8 @@ import scala.util.Either;
 import static org.apache.hudi.utilities.deltastreamer.BaseErrorTableWriter.ERROR_TABLE_CURRUPT_RECORD_COL_NAME;
 import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
 import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
-import static org.apache.hudi.utilities.sources.helpers.SanitizationUtils.Config.SANITIZE_SCHEMA_FIELD_NAMES;
-import static org.apache.hudi.utilities.sources.helpers.SanitizationUtils.Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
+import static org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES;
+import static org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
 
 /**
  * Adapts data-format provided by the source to the data-format required by the client (DeltaStreamer).
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
index fe8ed3432d7..9956e1528a4 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.utilities.config.FilebasedSchemaProviderConfig;
 import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
 
 import org.apache.avro.Schema;
@@ -39,14 +40,6 @@ import java.util.Collections;
  */
 public class FilebasedSchemaProvider extends SchemaProvider {
 
-  /**
-   * Configs supported.
-   */
-  public static class Config {
-    private static final String SOURCE_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider.source.schema.file";
-    private static final String TARGET_SCHEMA_FILE_PROP = "hoodie.deltastreamer.schemaprovider.target.schema.file";
-  }
-
   private final FileSystem fs;
 
   protected Schema sourceSchema;
@@ -55,14 +48,14 @@ public class FilebasedSchemaProvider extends SchemaProvider {
 
   public FilebasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
     super(props, jssc);
-    DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_FILE_PROP));
-    String sourceFile = props.getString(Config.SOURCE_SCHEMA_FILE_PROP);
+    DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE.key()));
+    String sourceFile = props.getString(FilebasedSchemaProviderConfig.SOURCE_SCHEMA_FILE.key());
     boolean shouldSanitize = SanitizationUtils.getShouldSanitize(props);
     String invalidCharMask = SanitizationUtils.getInvalidCharMask(props);
     this.fs = FSUtils.getFs(sourceFile, jssc.hadoopConfiguration(), true);
     this.sourceSchema = readAvroSchemaFromFile(sourceFile, this.fs, shouldSanitize, invalidCharMask);
-    if (props.containsKey(Config.TARGET_SCHEMA_FILE_PROP)) {
-      this.targetSchema = readAvroSchemaFromFile(props.getString(Config.TARGET_SCHEMA_FILE_PROP), this.fs, shouldSanitize, invalidCharMask);
+    if (props.containsKey(FilebasedSchemaProviderConfig.TARGET_SCHEMA_FILE.key())) {
+      this.targetSchema = readAvroSchemaFromFile(props.getString(FilebasedSchemaProviderConfig.TARGET_SCHEMA_FILE.key()), this.fs, shouldSanitize, invalidCharMask);
     }
   }
 
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
index 9fca2a241a6..e6b113b2782 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
@@ -22,6 +22,7 @@ package org.apache.hudi.utilities.schema;
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.config.HiveSchemaProviderConfig;
 import org.apache.hudi.utilities.exception.HoodieSchemaProviderException;
 
 import org.apache.avro.Schema;
@@ -39,24 +40,14 @@ import java.util.Collections;
  */
 public class HiveSchemaProvider extends SchemaProvider {
 
-  /**
-   * Configs supported.
-   */
-  public static class Config {
-    private static final String SOURCE_SCHEMA_DATABASE_PROP = "hoodie.deltastreamer.schemaprovider.source.schema.hive.database";
-    private static final String SOURCE_SCHEMA_TABLE_PROP = "hoodie.deltastreamer.schemaprovider.source.schema.hive.table";
-    private static final String TARGET_SCHEMA_DATABASE_PROP = "hoodie.deltastreamer.schemaprovider.target.schema.hive.database";
-    private static final String TARGET_SCHEMA_TABLE_PROP = "hoodie.deltastreamer.schemaprovider.target.schema.hive.table";
-  }
-
   private final Schema sourceSchema;
   private Schema targetSchema;
 
   public HiveSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
     super(props, jssc);
-    DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SOURCE_SCHEMA_TABLE_PROP));
-    String sourceSchemaDatabaseName = props.getString(Config.SOURCE_SCHEMA_DATABASE_PROP, "default");
-    String sourceSchemaTableName = props.getString(Config.SOURCE_SCHEMA_TABLE_PROP);
+    DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(HiveSchemaProviderConfig.SOURCE_SCHEMA_TABLE.key()));
+    String sourceSchemaDatabaseName = props.getString(HiveSchemaProviderConfig.SOURCE_SCHEMA_DATABASE.key(), "default");
+    String sourceSchemaTableName = props.getString(HiveSchemaProviderConfig.SOURCE_SCHEMA_TABLE.key());
     SparkSession spark = SparkSession.builder().config(jssc.getConf()).enableHiveSupport().getOrCreate();
 
     // source schema
@@ -72,9 +63,9 @@ public class HiveSchemaProvider extends SchemaProvider {
     }
 
     // target schema
-    if (props.containsKey(Config.TARGET_SCHEMA_TABLE_PROP)) {
-      String targetSchemaDatabaseName = props.getString(Config.TARGET_SCHEMA_DATABASE_PROP, "default");
-      String targetSchemaTableName = props.getString(Config.TARGET_SCHEMA_TABLE_PROP);
+    if (props.containsKey(HiveSchemaProviderConfig.TARGET_SCHEMA_TABLE.key())) {
+      String targetSchemaDatabaseName = props.getString(HiveSchemaProviderConfig.TARGET_SCHEMA_DATABASE.key(), "default");
+      String targetSchemaTableName = props.getString(HiveSchemaProviderConfig.TARGET_SCHEMA_TABLE.key());
       try {
         TableIdentifier targetSchemaTable = new TableIdentifier(targetSchemaTableName, scala.Option.apply(targetSchemaDatabaseName));
         StructType targetSchema = spark.sessionState().catalog().getTableMetadata(targetSchemaTable).schema();
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/JdbcbasedSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/JdbcbasedSchemaProvider.java
index 2a3c2a8b487..c548db3650f 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/JdbcbasedSchemaProvider.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/JdbcbasedSchemaProvider.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.schema;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.config.JdbcbasedSchemaProviderConfig;
 
 import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -35,38 +36,17 @@ public class JdbcbasedSchemaProvider extends SchemaProvider {
   private Schema sourceSchema;
   private Map<String, String> options = new HashMap<>();
 
-  /**
-   * Configs supported.
-   */
-  public static class Config {
-    // The JDBC URL to connect to. The source-specific connection properties may be specified in the URL.
-    // e.g., jdbc:postgresql://localhost/test?user=fred&password=secret
-    private static final String SOURCE_SCHEMA_JDBC_CONNECTION_URL = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.connection.url";
-    // The class name of the JDBC driver to use to connect to this URL. such as org.h2.Driver
-    private static final String SOURCE_SCHEMA_JDBC_DRIVER_TYPE = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.driver.type";
-    private static final String SOURCE_SCHEMA_JDBC_USERNAME = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.username";
-    private static final String SOURCE_SCHEMA_JDBC_PASSWORD = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.password";
-    // example : test_database.test1_table or test1_table
-    private static final String SOURCE_SCHEMA_JDBC_DBTABLE = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.dbtable";
-    // The number of seconds the driver will wait for a Statement object to execute to the given number of seconds.
-    // Zero means there is no limit. In the write path, this option depends on how JDBC drivers implement the API setQueryTimeout,
-    // e.g., the h2 JDBC driver checks the timeout of each query instead of an entire JDBC batch. It defaults to 0.
-    private static final String SOURCE_SCHEMA_JDBC_TIMEOUT = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.timeout";
-    // If true, all the columns are nullable.
-    private static final String SOURCE_SCHEMA_JDBC_NULLABLE = "hoodie.deltastreamer.schemaprovider.source.schema.jdbc.nullable";
-  }
-
   public JdbcbasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
     super(props, jssc);
-    options.put("url", props.getString(Config.SOURCE_SCHEMA_JDBC_CONNECTION_URL));
-    options.put("driver", props.getString(Config.SOURCE_SCHEMA_JDBC_DRIVER_TYPE));
-    options.put("user", props.getString(Config.SOURCE_SCHEMA_JDBC_USERNAME));
-    options.put("password", props.getString(Config.SOURCE_SCHEMA_JDBC_PASSWORD));
-    options.put("dbtable", props.getString(Config.SOURCE_SCHEMA_JDBC_DBTABLE));
+    options.put("url", props.getString(JdbcbasedSchemaProviderConfig.SOURCE_SCHEMA_JDBC_CONNECTION_URL.key()));
+    options.put("driver", props.getString(JdbcbasedSchemaProviderConfig.SOURCE_SCHEMA_JDBC_DRIVER_TYPE.key()));
+    options.put("user", props.getString(JdbcbasedSchemaProviderConfig.SOURCE_SCHEMA_JDBC_USERNAME.key()));
+    options.put("password", props.getString(JdbcbasedSchemaProviderConfig.SOURCE_SCHEMA_JDBC_PASSWORD.key()));
+    options.put("dbtable", props.getString(JdbcbasedSchemaProviderConfig.SOURCE_SCHEMA_JDBC_DBTABLE.key()));
     // the number of seconds the driver will wait for a Statement object to execute to the given
     // number of seconds. Zero means there is no limit.
-    options.put("queryTimeout", props.getString(Config.SOURCE_SCHEMA_JDBC_TIMEOUT, "0"));
-    options.put("nullable", props.getString(Config.SOURCE_SCHEMA_JDBC_NULLABLE, "true"));
+    options.put("queryTimeout", props.getString(JdbcbasedSchemaProviderConfig.SOURCE_SCHEMA_JDBC_TIMEOUT.key(), "0"));
+    options.put("nullable", props.getString(JdbcbasedSchemaProviderConfig.SOURCE_SCHEMA_JDBC_NULLABLE.key(), "true"));
   }
 
   @Override
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
index cc0ce2cf35a..575926db0bc 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.schema;
 
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
 
 import org.apache.avro.Schema;
 import org.apache.log4j.LogManager;
@@ -36,15 +37,12 @@ import java.util.stream.Collectors;
 public class KafkaOffsetPostProcessor extends SchemaPostProcessor {
 
   public static class Config {
-    public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
-        .key("hoodie.deltastreamer.source.kafka.append.offsets")
-        .defaultValue("false")
-        .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
-            + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
-            + "By default its disabled and no kafka offsets are added");
+    @Deprecated
+    public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS =
+        HoodieDeltaStreamerConfig.KAFKA_APPEND_OFFSETS;
 
     public static boolean shouldAddOffsets(TypedProperties props) {
-      return  props.getBoolean(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), Boolean.parseBoolean(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.defaultValue()));
+      return props.getBoolean(HoodieDeltaStreamerConfig.KAFKA_APPEND_OFFSETS.key(), Boolean.parseBoolean(HoodieDeltaStreamerConfig.KAFKA_APPEND_OFFSETS.defaultValue()));
     }
   }
 
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
index f156c15929e..92eca1f6a89 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
 import org.apache.hudi.utilities.sources.helpers.ProtoConversionUtil;
 
 import org.apache.avro.Schema;
@@ -38,31 +39,21 @@ public class ProtoClassBasedSchemaProvider extends SchemaProvider {
   /**
    * Configs supported.
    */
+  @Deprecated
   public static class Config {
-    private static final String PROTO_SCHEMA_PROVIDER_PREFIX = "hoodie.deltastreamer.schemaprovider.proto";
-    public static final ConfigProperty<String> PROTO_SCHEMA_CLASS_NAME = ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".class.name")
-        .noDefaultValue()
-        .sinceVersion("0.13.0")
-        .withDocumentation("The Protobuf Message class used as the source for the schema.");
-
-    public static final ConfigProperty<Boolean> PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS = ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".flatten.wrappers")
-        .defaultValue(false)
-        .sinceVersion("0.13.0")
-        .withDocumentation("When set to true wrapped primitives like Int64Value are translated to a record with a single 'value' field. By default, the value is false and the wrapped primitives are "
-            + "treated as a nullable value");
-
-    public static final ConfigProperty<Boolean> PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS = ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".timestamps.as.records")
-        .defaultValue(false)
-        .sinceVersion("0.13.0")
-        .withDocumentation("When set to true Timestamp fields are translated to a record with a seconds and nanos field. By default, the value is false and the timestamp is converted to a long with "
-            + "the timestamp-micros logical type");
-
-    public static final ConfigProperty<Integer> PROTO_SCHEMA_MAX_RECURSION_DEPTH = ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".max.recursion.depth")
-        .defaultValue(5)
-        .sinceVersion("0.13.0")
-        .withDocumentation("The max depth to unravel the Proto schema when translating into an Avro schema. Setting this depth allows the user to convert a schema that is recursive in proto into "
-            + "something that can be represented in their lake format like Parquet. After a given class has been seen N times within a single branch, the schema provider will create a record with a "
-            + "byte array to hold the remaining proto data and a string to hold the message descriptor's name for context.");
+    // Use {@link ProtoClassBasedSchemaProviderConfig} instead
+    @Deprecated
+    public static final ConfigProperty<String> PROTO_SCHEMA_CLASS_NAME =
+        ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME;
+    @Deprecated
+    public static final ConfigProperty<Boolean> PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS =
+        ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS;
+    @Deprecated
+    public static final ConfigProperty<Boolean> PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS =
+        ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS;
+    @Deprecated
+    public static final ConfigProperty<Integer> PROTO_SCHEMA_MAX_RECURSION_DEPTH =
+        ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_MAX_RECURSION_DEPTH;
   }
 
   private final String schemaString;
@@ -75,12 +66,13 @@ public class ProtoClassBasedSchemaProvider extends SchemaProvider {
   public ProtoClassBasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
     super(props, jssc);
     DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(
-        Config.PROTO_SCHEMA_CLASS_NAME.key()));
-    String className = config.getString(Config.PROTO_SCHEMA_CLASS_NAME.key());
-    boolean wrappedPrimitivesAsRecords = props.getBoolean(Config.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(),
-        Config.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.defaultValue());
-    int maxRecursionDepth = props.getInteger(Config.PROTO_SCHEMA_MAX_RECURSION_DEPTH.key(), Config.PROTO_SCHEMA_MAX_RECURSION_DEPTH.defaultValue());
-    boolean timestampsAsRecords = props.getBoolean(Config.PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS.key(), false);
+        ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key()));
+    String className = config.getString(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key());
+    boolean wrappedPrimitivesAsRecords = props.getBoolean(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(),
+        ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.defaultValue());
+    int maxRecursionDepth = props.getInteger(
+        ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_MAX_RECURSION_DEPTH.key(), ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_MAX_RECURSION_DEPTH.defaultValue());
+    boolean timestampsAsRecords = props.getBoolean(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS.key(), false);
     ProtoConversionUtil.SchemaConfig schemaConfig = new ProtoConversionUtil.SchemaConfig(wrappedPrimitivesAsRecords, maxRecursionDepth, timestampsAsRecords);
     try {
       schemaString = ProtoConversionUtil.getAvroSchemaForMessageClass(ReflectionUtils.getClass(className), schemaConfig).toString();
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaPostProcessor.java
index f0879e055c5..addabe18cbf 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaPostProcessor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaPostProcessor.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.utilities.schema;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig;
 
 import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -31,10 +32,14 @@ import java.io.Serializable;
  */
 public abstract class SchemaPostProcessor implements Serializable {
 
-  /** Configs supported. */
+  /**
+   * Configs supported.
+   */
+  @Deprecated
   public static class Config {
+    @Deprecated
     public static final String SCHEMA_POST_PROCESSOR_PROP =
-        "hoodie.deltastreamer.schemaprovider.schema_post_processor";
+        SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR.key();
   }
 
   private static final long serialVersionUID = 1L;
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
index 9ade80ecce4..b26be1b421b 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
@@ -22,6 +22,7 @@ import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -61,11 +62,15 @@ public class SchemaRegistryProvider extends SchemaProvider {
    * Configs supported.
    */
   public static class Config {
-
-    public static final String SRC_SCHEMA_REGISTRY_URL_PROP = "hoodie.deltastreamer.schemaprovider.registry.url";
+    @Deprecated
+    public static final String SRC_SCHEMA_REGISTRY_URL_PROP =
+        HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL.key();
+    @Deprecated
     public static final String TARGET_SCHEMA_REGISTRY_URL_PROP =
-        "hoodie.deltastreamer.schemaprovider.registry.targetUrl";
-    public static final String SCHEMA_CONVERTER_PROP = "hoodie.deltastreamer.schemaprovider.registry.schemaconverter";
+        HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL.key();
+    @Deprecated
+    public static final String SCHEMA_CONVERTER_PROP =
+        HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key();
     public static final String SSL_KEYSTORE_LOCATION_PROP = "schema.registry.ssl.keystore.location";
     public static final String SSL_TRUSTSTORE_LOCATION_PROP = "schema.registry.ssl.truststore.location";
     public static final String SSL_KEYSTORE_PASSWORD_PROP = "schema.registry.ssl.keystore.password";
@@ -86,8 +91,8 @@ public class SchemaRegistryProvider extends SchemaProvider {
 
   public Schema parseSchemaFromRegistry(String registryUrl) throws IOException {
     String schema = fetchSchemaFromRegistry(registryUrl);
-    SchemaConverter converter = config.containsKey(Config.SCHEMA_CONVERTER_PROP)
-        ? ReflectionUtils.loadClass(config.getString(Config.SCHEMA_CONVERTER_PROP))
+    SchemaConverter converter = config.containsKey(HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key())
+        ? ReflectionUtils.loadClass(config.getString(HoodieSchemaProviderConfig.SCHEMA_CONVERTER.key()))
         : s -> s;
     return new Schema.Parser().parse(converter.convert(schema));
   }
@@ -142,7 +147,7 @@ public class SchemaRegistryProvider extends SchemaProvider {
 
   public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
     super(props, jssc);
-    DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP));
+    DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL.key()));
     if (config.containsKey(Config.SSL_KEYSTORE_LOCATION_PROP)
         || config.containsKey(Config.SSL_TRUSTSTORE_LOCATION_PROP)) {
       setUpSSLStores();
@@ -174,7 +179,7 @@ public class SchemaRegistryProvider extends SchemaProvider {
 
   @Override
   public Schema getSourceSchema() {
-    String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+    String registryUrl = config.getString(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL.key());
     try {
       return parseSchemaFromRegistry(registryUrl);
     } catch (IOException ioe) {
@@ -184,8 +189,8 @@ public class SchemaRegistryProvider extends SchemaProvider {
 
   @Override
   public Schema getTargetSchema() {
-    String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
-    String targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl);
+    String registryUrl = config.getString(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL.key());
+    String targetRegistryUrl = config.getString(HoodieSchemaProviderConfig.TARGET_SCHEMA_REGISTRY_URL.key(), registryUrl);
     try {
       return parseSchemaFromRegistry(targetRegistryUrl);
     } catch (IOException ioe) {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java
index ddaf94c93a0..406f71872d5 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.schema;
 
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
 
 import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -30,9 +31,11 @@ import org.apache.spark.sql.types.StructType;
  */
 public class SparkAvroPostProcessor extends SchemaPostProcessor {
 
+  @Deprecated
   public static class Config {
+    @Deprecated
     public static final String SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE =
-            "hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable";
+        HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key();
   }
 
   public SparkAvroPostProcessor(TypedProperties props, JavaSparkContext jssc) {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java
index 05b446f8451..dc601367fb7 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java
@@ -20,10 +20,11 @@ package org.apache.hudi.utilities.schema.postprocessor;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig;
 import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
+import org.apache.hudi.utilities.schema.SchemaPostProcessor;
 
 import org.apache.avro.Schema;
-import org.apache.hudi.utilities.schema.SchemaPostProcessor;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -51,18 +52,21 @@ public class DropColumnSchemaPostProcessor extends SchemaPostProcessor {
     super(props, jssc);
   }
 
+  @Deprecated
   public static class Config {
+    @Deprecated
     public static final String DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP =
-        "hoodie.deltastreamer.schemaprovider.schema_post_processor.delete.columns";
+        SchemaProviderPostProcessorConfig.DELETE_COLUMN_POST_PROCESSOR_COLUMN.key();
   }
 
   @Override
   public Schema processSchema(Schema schema) {
 
-    String columnToDeleteStr = this.config.getString(Config.DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP);
+    String columnToDeleteStr = this.config.getString(SchemaProviderPostProcessorConfig.DELETE_COLUMN_POST_PROCESSOR_COLUMN.key());
 
     if (StringUtils.isNullOrEmpty(columnToDeleteStr)) {
-      LOG.warn(String.format("Param %s is null or empty, return original schema", Config.DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP));
+      LOG.warn(String.format("Param %s is null or empty, return original schema",
+          SchemaProviderPostProcessorConfig.DELETE_COLUMN_POST_PROCESSOR_COLUMN.key()));
     }
 
     // convert field to lowerCase for compare purpose
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/AddPrimitiveColumnSchemaPostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/AddPrimitiveColumnSchemaPostProcessor.java
index f7d31b5cf47..465d227dedb 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/AddPrimitiveColumnSchemaPostProcessor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/AddPrimitiveColumnSchemaPostProcessor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.schema.postprocessor.add;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig;
 import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
 import org.apache.hudi.utilities.schema.SchemaPostProcessor;
 
@@ -47,7 +48,7 @@ public class AddPrimitiveColumnSchemaPostProcessor extends SchemaPostProcessor {
 
   @Override
   public Schema processSchema(Schema schema) {
-    String newColumnName = this.config.getString(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key());
+    String newColumnName = this.config.getString(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key());
 
     if (schema.getField(newColumnName) != null) {
       throw new HoodieSchemaPostProcessException(String.format("Column %s already exist!", newColumnName));
@@ -69,13 +70,13 @@ public class AddPrimitiveColumnSchemaPostProcessor extends SchemaPostProcessor {
 
   private Schema.Field buildNewColumn() {
 
-    String columnName = this.config.getString(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key());
-    String type = this.config.getString(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key()).toUpperCase(Locale.ROOT);
-    String doc = this.config.getString(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), null);
-    Object defaultValue = this.config.getOrDefault(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP.key(),
+    String columnName = this.config.getString(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key());
+    String type = this.config.getString(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key()).toUpperCase(Locale.ROOT);
+    String doc = this.config.getString(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), null);
+    Object defaultValue = this.config.getOrDefault(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP.key(),
         null);
-    boolean nullable = this.config.getBoolean(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP.key(),
-        BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP.defaultValue());
+    boolean nullable = this.config.getBoolean(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP.key(),
+        SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP.defaultValue());
 
     ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(columnName));
     ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(type));
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/BaseSchemaPostProcessorConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/BaseSchemaPostProcessorConfig.java
deleted file mode 100644
index d1528c362c8..00000000000
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/BaseSchemaPostProcessorConfig.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utilities.schema.postprocessor.add;
-
-import org.apache.hudi.common.config.ConfigProperty;
-
-/**
- * Base configs to describe a primitive type column.
- */
-public class BaseSchemaPostProcessorConfig {
-
-  public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP = ConfigProperty
-      .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.name")
-      .noDefaultValue()
-      .withDocumentation("New column's name");
-
-  public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP = ConfigProperty
-      .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.type")
-      .noDefaultValue()
-      .withDocumentation("New column's type");
-
-  public static final ConfigProperty<Boolean> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP = ConfigProperty
-      .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.nullable")
-      .defaultValue(true)
-      .withDocumentation("New column's nullable");
-
-  public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP = ConfigProperty
-      .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.default")
-      .noDefaultValue()
-      .withDocumentation("New column's default value");
-
-  public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP = ConfigProperty
-      .key("hoodie.deltastreamer.schemaprovider.schema_post_processor.add.column.doc")
-      .noDefaultValue()
-      .withDocumentation("Docs about new column");
-
-}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
index 9241582b720..1f6653e9655 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
@@ -22,8 +22,8 @@ import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
-import org.apache.hudi.utilities.schema.ProtoClassBasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
 
@@ -53,10 +53,10 @@ public class ProtoKafkaSource extends KafkaSource<Message> {
                           SparkSession sparkSession, SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) {
     super(props, sparkContext, sparkSession, schemaProvider, SourceType.PROTO, metrics);
     DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(
-        ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key()));
+        ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key()));
     props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
     props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, ByteArrayDeserializer.class);
-    className = props.getString(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key());
+    className = props.getString(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key());
     this.offsetGen = new KafkaOffsetGen(props);
     if (this.shouldAddOffsets) {
       throw new HoodieException("Appending kafka offsets to ProtoKafkaSource is not supported");
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
index 7dbb26890e7..5aec478875c 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/DebeziumSource.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
@@ -118,7 +119,7 @@ public abstract class DebeziumSource extends RowSource {
       return Pair.of(Option.of(sparkSession.emptyDataFrame()), overrideCheckpointStr.isEmpty() ? CheckpointUtils.offsetsToStr(offsetRanges) : overrideCheckpointStr);
     } else {
       try {
-        String schemaStr = schemaRegistryProvider.fetchSchemaFromRegistry(props.getString(SchemaRegistryProvider.Config.SRC_SCHEMA_REGISTRY_URL_PROP));
+        String schemaStr = schemaRegistryProvider.fetchSchemaFromRegistry(props.getString(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL.key()));
         Dataset<Row> dataset = toDataset(offsetRanges, offsetGen, schemaStr);
         LOG.info(String.format("Spark schema of Kafka Payload for topic %s:\n%s", offsetGen.getTopicName(), dataset.schema().treeString()));
         LOG.info(String.format("New checkpoint string: %s", CheckpointUtils.offsetsToStr(offsetRanges)));
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
index eeb08a7fd75..a965dbb0ebb 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroConvertor.java
@@ -38,8 +38,8 @@ import scala.util.Either;
 import scala.util.Left;
 import scala.util.Right;
 
-import static org.apache.hudi.utilities.sources.helpers.SanitizationUtils.Config.SANITIZE_SCHEMA_FIELD_NAMES;
-import static org.apache.hudi.utilities.sources.helpers.SanitizationUtils.Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
+import static org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES;
+import static org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
 
 /**
  * Convert a variety of datum into Avro GenericRecords. Has a bunch of lazy fields to circumvent issues around
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/SanitizationUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/SanitizationUtils.java
index 9eeee8f5c92..ffcc3949ade 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/SanitizationUtils.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/SanitizationUtils.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
 
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -48,30 +49,25 @@ public class SanitizationUtils {
 
   private static final ObjectMapper OM = new ObjectMapper();
 
+  @Deprecated
   public static class Config {
+    @Deprecated
+    public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES =
+        HoodieDeltaStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES;
 
-    public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
-        .key("hoodie.deltastreamer.source.sanitize.invalid.schema.field.names")
-        .defaultValue(false)
-        .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
-            + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
-            + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
-
-    public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
-        .key("hoodie.deltastreamer.source.sanitize.invalid.char.mask")
-        .defaultValue("__")
-        .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
-          + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+    @Deprecated
+    public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK =
+        HoodieDeltaStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
   }
 
   private static final String AVRO_FIELD_NAME_KEY = "name";
 
   public static boolean getShouldSanitize(TypedProperties props) {
-    return props.getBoolean(Config.SANITIZE_SCHEMA_FIELD_NAMES.key(),Config.SANITIZE_SCHEMA_FIELD_NAMES.defaultValue());
+    return props.getBoolean(HoodieDeltaStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.key(), HoodieDeltaStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.defaultValue());
   }
 
   public static String getInvalidCharMask(TypedProperties props) {
-    return props.getString(Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(),Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue());
+    return props.getString(HoodieDeltaStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), HoodieDeltaStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue());
   }
 
   private static DataType sanitizeDataTypeForAvro(DataType dataType, String invalidCharMask) {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
index a0dc49aa670..0818c43cc02 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
@@ -19,15 +19,14 @@
 package org.apache.hudi.utilities;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig;
 import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
 import org.apache.hudi.utilities.schema.SchemaPostProcessor;
-import org.apache.hudi.utilities.schema.SchemaPostProcessor.Config;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
 import org.apache.hudi.utilities.schema.postprocessor.DeleteSupportSchemaPostProcessor;
 import org.apache.hudi.utilities.schema.postprocessor.DropColumnSchemaPostProcessor;
 import org.apache.hudi.utilities.schema.postprocessor.add.AddPrimitiveColumnSchemaPostProcessor;
-import org.apache.hudi.utilities.schema.postprocessor.add.BaseSchemaPostProcessorConfig;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 import org.apache.hudi.utilities.transform.FlatteningTransformer;
 
@@ -75,7 +74,7 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
 
   @Test
   public void testPostProcessor() throws IOException {
-    properties.put(Config.SCHEMA_POST_PROCESSOR_PROP, DummySchemaPostProcessor.class.getName());
+    properties.put(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR.key(), DummySchemaPostProcessor.class.getName());
     SchemaProvider provider =
         UtilHelpers.wrapSchemaProviderWithPostProcessor(
             UtilHelpers.createSchemaProvider(DummySchemaProvider.class.getName(), properties, jsc),
@@ -89,7 +88,7 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
 
   @Test
   public void testSparkAvro() throws IOException {
-    properties.put(Config.SCHEMA_POST_PROCESSOR_PROP, SparkAvroPostProcessor.class.getName());
+    properties.put(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR.key(), SparkAvroPostProcessor.class.getName());
     List<String> transformerClassNames = new ArrayList<>();
     transformerClassNames.add(FlatteningTransformer.class.getName());
 
@@ -115,10 +114,10 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
   @Test
   public void testChainedSchemaPostProcessor() {
     // DeleteSupportSchemaPostProcessor first, DummySchemaPostProcessor second
-    properties.put(Config.SCHEMA_POST_PROCESSOR_PROP,
+    properties.put(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR.key(),
         "org.apache.hudi.utilities.schema.postprocessor.DeleteSupportSchemaPostProcessor,org.apache.hudi.utilities.DummySchemaPostProcessor");
 
-    SchemaPostProcessor processor = UtilHelpers.createSchemaPostProcessor(properties.getString(Config.SCHEMA_POST_PROCESSOR_PROP), properties, jsc);
+    SchemaPostProcessor processor = UtilHelpers.createSchemaPostProcessor(properties.getString(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR.key()), properties, jsc);
     Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
     Schema targetSchema = processor.processSchema(schema);
 
@@ -127,10 +126,10 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
     assertNotNull(targetSchema.getField("testString"));
 
     // DummySchemaPostProcessor first, DeleteSupportSchemaPostProcessor second
-    properties.put(Config.SCHEMA_POST_PROCESSOR_PROP,
+    properties.put(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR.key(),
         "org.apache.hudi.utilities.DummySchemaPostProcessor,org.apache.hudi.utilities.schema.postprocessor.DeleteSupportSchemaPostProcessor");
 
-    processor = UtilHelpers.createSchemaPostProcessor(properties.getString(Config.SCHEMA_POST_PROCESSOR_PROP), properties, jsc);
+    processor = UtilHelpers.createSchemaPostProcessor(properties.getString(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR.key()), properties, jsc);
     schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
     targetSchema = processor.processSchema(schema);
 
@@ -142,7 +141,7 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
   @Test
   public void testDeleteColumn() {
     // remove column ums_id_ from source schema
-    properties.put(DropColumnSchemaPostProcessor.Config.DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP, "rider");
+    properties.put(SchemaProviderPostProcessorConfig.DELETE_COLUMN_POST_PROCESSOR_COLUMN.key(), "rider");
     DropColumnSchemaPostProcessor processor = new DropColumnSchemaPostProcessor(properties, null);
     Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
     Schema targetSchema = processor.processSchema(schema);
@@ -154,7 +153,7 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
   @Test
   public void testDeleteColumnThrows() {
     // remove all columns from source schema
-    properties.put(DropColumnSchemaPostProcessor.Config.DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP, "timestamp,_row_key,rider,driver,fare");
+    properties.put(SchemaProviderPostProcessorConfig.DELETE_COLUMN_POST_PROCESSOR_COLUMN.key(), "timestamp,_row_key,rider,driver,fare");
     DropColumnSchemaPostProcessor processor = new DropColumnSchemaPostProcessor(properties, null);
     Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
 
@@ -164,9 +163,9 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
   @ParameterizedTest
   @MethodSource("configParams")
   public void testAddPrimitiveTypeColumn(String type) {
-    properties.put(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key(), "primitive_column");
-    properties.put(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key(), type);
-    properties.put(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), "primitive column test");
+    properties.put(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP.key(), "primitive_column");
+    properties.put(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP.key(), type);
+    properties.put(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP.key(), "primitive column test");
 
     AddPrimitiveColumnSchemaPostProcessor processor = new AddPrimitiveColumnSchemaPostProcessor(properties, null);
     Schema schema = new Schema.Parser().parse(ORIGINAL_SCHEMA);
@@ -180,7 +179,7 @@ public class TestSchemaPostProcessor extends UtilitiesTestBase {
     assertNotEquals(type, newColumn.schema().getType().getName());
 
     // test not nullable
-    properties.put(BaseSchemaPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP.key(), false);
+    properties.put(SchemaProviderPostProcessorConfig.SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP.key(), false);
     targetSchema = processor.processSchema(schema);
     newColumn = targetSchema.getField("primitive_column");
     assertEquals(type, newColumn.schema().getType().getName());
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 1a3e79e150a..dbc87cbbda2 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -68,10 +68,10 @@ import org.apache.hudi.metrics.Metrics;
 import org.apache.hudi.utilities.DummySchemaProvider;
 import org.apache.hudi.utilities.HoodieClusteringJob;
 import org.apache.hudi.utilities.HoodieIndexer;
+import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
+import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
-import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
 import org.apache.hudi.utilities.schema.SchemaProvider;
-import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
 import org.apache.hudi.utilities.sources.CsvDFSSource;
 import org.apache.hudi.utilities.sources.HoodieIncrSource;
 import org.apache.hudi.utilities.sources.InputBatch;
@@ -143,7 +143,6 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.hudi.config.HoodieWriteConfig.MUTLI_WRITER_SOURCE_CHECKPOINT_ID;
 import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE;
 import static org.apache.hudi.config.metrics.HoodieMetricsConfig.TURN_METRICS_ON;
 import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
@@ -151,6 +150,7 @@ import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
 import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
 import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE;
 import static org.apache.hudi.utilities.UtilHelpers.SCHEDULE_AND_EXECUTE;
+import static org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.MUTLI_WRITER_SOURCE_CHECKPOINT_ID;
 import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
 import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_OFFSET_COLUMN;
 import static org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_PARTITION_COLUMN;
@@ -766,7 +766,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + basePath + "/source.avsc");
     cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
     if (!useSchemaPostProcessor) {
-      cfg.configs.add(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE + "=false");
+      cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key() + "=false");
     }
     new HoodieDeltaStreamer(cfg, jsc).sync();
     TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
@@ -780,7 +780,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc");
     cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
     if (!useSchemaPostProcessor) {
-      cfg.configs.add(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE + "=false");
+      cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key() + "=false");
     }
     new HoodieDeltaStreamer(cfg, jsc).sync();
     // out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes.
@@ -806,7 +806,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
       cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + basePath + "/source_evolved.avsc");
     }
     if (!useSchemaPostProcessor) {
-      cfg.configs.add(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE + "=false");
+      cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key() + "=false");
     }
     cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
     new HoodieDeltaStreamer(cfg, jsc).sync();
@@ -1865,7 +1865,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     if (extraProps != null && !extraProps.isEmpty()) {
       extraProps.forEach(props::setProperty);
     }
-    props.setProperty(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), Boolean.toString(shouldAddOffsets));
+    props.setProperty(HoodieDeltaStreamerConfig.KAFKA_APPEND_OFFSETS.key(), Boolean.toString(shouldAddOffsets));
     UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, basePath + "/" + propsFileName);
   }
 
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
index a2b480ac1ba..0e561eee5fb 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
@@ -23,6 +23,8 @@ import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
+import org.apache.hudi.utilities.config.HoodieSchemaProviderConfig;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
 import org.apache.hudi.utilities.sources.JsonKafkaSource;
@@ -120,13 +122,13 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa
     assertEquals(2, streamer.getTableExecutionContexts().size());
     assertEquals(basePath + "/multi_table_dataset/uber_db/dummy_table_uber", executionContext.getConfig().targetBasePath);
     assertEquals("uber_db.dummy_table_uber", executionContext.getConfig().targetTableName);
-    assertEquals("topic1", executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.KAFKA_TOPIC_PROP));
+    assertEquals("topic1", executionContext.getProperties().getString(HoodieDeltaStreamerConfig.KAFKA_TOPIC.key()));
     assertEquals("_row_key", executionContext.getProperties().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key()));
     assertEquals(TestHoodieDeltaStreamer.TestGenerator.class.getName(), executionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key()));
     assertEquals("uber_hive_dummy_table", executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.HIVE_SYNC_TABLE_PROP));
-    assertEquals("http://localhost:8081/subjects/random-value/versions/latest", executionContext.getProperties().getString(SchemaRegistryProvider.Config.SRC_SCHEMA_REGISTRY_URL_PROP));
+    assertEquals("http://localhost:8081/subjects/random-value/versions/latest", executionContext.getProperties().getString(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL.key()));
     assertEquals("http://localhost:8081/subjects/topic2-value/versions/latest",
-            streamer.getTableExecutionContexts().get(0).getProperties().getString(SchemaRegistryProvider.Config.SRC_SCHEMA_REGISTRY_URL_PROP));
+        streamer.getTableExecutionContexts().get(0).getProperties().getString(HoodieSchemaProviderConfig.SRC_SCHEMA_REGISTRY_URL.key()));
   }
 
   @Test
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
index caa54c01ebf..24b4839735c 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
@@ -21,10 +21,10 @@ package org.apache.hudi.utilities.deltastreamer;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.InputBatch;
 import org.apache.hudi.utilities.sources.Source;
-import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
 import org.apache.hudi.utilities.testutils.SanitizationTestUtils;
 
 import org.apache.avro.Schema;
@@ -93,8 +93,8 @@ public class TestSourceFormatAdapter {
 
   private InputBatch<Dataset<Row>> fetchRowData(JavaRDD<String> rdd, StructType unsanitizedSchema) {
     TypedProperties typedProperties = new TypedProperties();
-    typedProperties.put(SanitizationUtils.Config.SANITIZE_SCHEMA_FIELD_NAMES.key(), true);
-    typedProperties.put(SanitizationUtils.Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), "__");
+    typedProperties.put(HoodieDeltaStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.key(), true);
+    typedProperties.put(HoodieDeltaStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), "__");
     setupRowSource(spark.read().schema(unsanitizedSchema).json(rdd));
     SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(testRowDataSource, Option.empty(), Option.of(typedProperties));
     return sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(DUMMY_CHECKPOINT), 10L);
@@ -102,8 +102,8 @@ public class TestSourceFormatAdapter {
 
   private InputBatch<Dataset<Row>> fetchJsonData(JavaRDD<String> rdd, StructType sanitizedSchema) {
     TypedProperties typedProperties = new TypedProperties();
-    typedProperties.put(SanitizationUtils.Config.SANITIZE_SCHEMA_FIELD_NAMES.key(), true);
-    typedProperties.put(SanitizationUtils.Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), "__");
+    typedProperties.put(HoodieDeltaStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.key(), true);
+    typedProperties.put(HoodieDeltaStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), "__");
     setupJsonSource(rdd, SchemaConverters.toAvroType(sanitizedSchema, false, "record", ""));
     SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(testJsonDataSource, Option.empty(), Option.of(typedProperties));
     return sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(DUMMY_CHECKPOINT), 10L);
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
index fe6f0ff9f2f..59e82c27edf 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestFilebasedSchemaProvider.java
@@ -30,8 +30,8 @@ import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 
-import static org.apache.hudi.utilities.sources.helpers.SanitizationUtils.Config.SANITIZE_SCHEMA_FIELD_NAMES;
-import static org.apache.hudi.utilities.sources.helpers.SanitizationUtils.Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
+import static org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES;
+import static org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
 import static org.apache.hudi.utilities.testutils.SanitizationTestUtils.generateProperFormattedSchema;
 import static org.apache.hudi.utilities.testutils.SanitizationTestUtils.generateRenamedSchemaWithConfiguredReplacement;
 import static org.apache.hudi.utilities.testutils.SanitizationTestUtils.generateRenamedSchemaWithDefaultReplacement;
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestProtoClassBasedSchemaProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestProtoClassBasedSchemaProvider.java
index 5a9efbba915..2bb40807028 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestProtoClassBasedSchemaProvider.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestProtoClassBasedSchemaProvider.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.utilities.schema;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
 import org.apache.hudi.utilities.test.proto.Parent;
 import org.apache.hudi.utilities.test.proto.Sample;
 import org.apache.hudi.utilities.test.proto.WithOneOf;
@@ -35,7 +36,7 @@ public class TestProtoClassBasedSchemaProvider {
   @Test
   public void validateDefaultSchemaGeneration() throws IOException {
     TypedProperties properties = new TypedProperties();
-    properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key(), Sample.class.getName());
+    properties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key(), Sample.class.getName());
     ProtoClassBasedSchemaProvider protoToAvroSchemaProvider = new ProtoClassBasedSchemaProvider(properties, null);
     Schema convertedSchema = protoToAvroSchemaProvider.getSourceSchema();
     Schema.Parser parser = new Schema.Parser();
@@ -46,9 +47,9 @@ public class TestProtoClassBasedSchemaProvider {
   @Test
   public void validateWrappedPrimitiveAndTimestampsAsRecordSchemaGeneration() throws IOException {
     TypedProperties properties = new TypedProperties();
-    properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key(), Sample.class.getName());
-    properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(), "true");
-    properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS.key(), "true");
+    properties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key(), Sample.class.getName());
+    properties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(), "true");
+    properties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS.key(), "true");
     ProtoClassBasedSchemaProvider protoToAvroSchemaProvider = new ProtoClassBasedSchemaProvider(properties, null);
     Schema convertedSchema = protoToAvroSchemaProvider.getSourceSchema();
     Schema.Parser parser = new Schema.Parser();
@@ -59,8 +60,8 @@ public class TestProtoClassBasedSchemaProvider {
   @Test
   public void validateRecursiveSchemaGeneration_depth2() throws IOException {
     TypedProperties properties = new TypedProperties();
-    properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key(), Parent.class.getName());
-    properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_MAX_RECURSION_DEPTH.key(), String.valueOf(2));
+    properties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key(), Parent.class.getName());
+    properties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_MAX_RECURSION_DEPTH.key(), String.valueOf(2));
     ProtoClassBasedSchemaProvider protoToAvroSchemaProvider = new ProtoClassBasedSchemaProvider(properties, null);
     Schema convertedSchema = protoToAvroSchemaProvider.getSourceSchema();
     Schema.Parser parser = new Schema.Parser();
@@ -71,7 +72,7 @@ public class TestProtoClassBasedSchemaProvider {
   @Test
   public void validateRecursiveSchemaGeneration_defaultDepth() throws IOException {
     TypedProperties properties = new TypedProperties();
-    properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key(), Parent.class.getName());
+    properties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key(), Parent.class.getName());
     ProtoClassBasedSchemaProvider protoToAvroSchemaProvider = new ProtoClassBasedSchemaProvider(properties, null);
     Schema convertedSchema = protoToAvroSchemaProvider.getSourceSchema();
     Schema.Parser parser = new Schema.Parser();
@@ -82,7 +83,7 @@ public class TestProtoClassBasedSchemaProvider {
   @Test
   public void validateOneOfSchemaGeneration() throws IOException {
     TypedProperties properties = new TypedProperties();
-    properties.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key(), WithOneOf.class.getName());
+    properties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key(), WithOneOf.class.getName());
     ProtoClassBasedSchemaProvider protoToAvroSchemaProvider = new ProtoClassBasedSchemaProvider(properties, null);
     Schema protoSchema = protoToAvroSchemaProvider.getSourceSchema();
     Schema.Parser parser = new Schema.Parser();
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
index 020526b159d..3448f11eec5 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
@@ -24,10 +24,10 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
 import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
-import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
@@ -140,7 +140,7 @@ public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness {
     AvroKafkaSource avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, null);
     GenericRecord withoutKafkaOffsets = avroKafkaSource.maybeAppendKafkaOffsets(rdd).collect().get(0);
 
-    props.put(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), "true");
+    props.put(HoodieDeltaStreamerConfig.KAFKA_APPEND_OFFSETS.key(), "true");
     schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
         UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>());
     avroKafkaSource = new AvroKafkaSource(props, jsc(), spark(), schemaProvider, null);
@@ -168,7 +168,7 @@ public class TestAvroKafkaSource extends SparkClientFunctionalTestHarness {
     Dataset<Row> c = kafkaSource.fetchNewDataInRowFormat(Option.empty(),Long.MAX_VALUE)
         .getBatch().get();
     List<String> columns = Arrays.stream(c.columns()).collect(Collectors.toList());
-    props.put(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), "true");
+    props.put(HoodieDeltaStreamerConfig.KAFKA_APPEND_OFFSETS.key(), "true");
 
     schemaProvider = UtilHelpers.wrapSchemaProviderWithPostProcessor(
         UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>());
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index f05107f2c25..3609cec419d 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -25,12 +25,12 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
 import org.apache.hudi.utilities.deltastreamer.BaseErrorTableWriter;
 import org.apache.hudi.utilities.deltastreamer.ErrorEvent;
 import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
-import org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor;
 import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -320,7 +320,7 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource {
     assertEquals(numMessages, c.count());
     List<String> columns = Arrays.stream(c.columns()).collect(Collectors.toList());
 
-    props.put(KafkaOffsetPostProcessor.Config.KAFKA_APPEND_OFFSETS.key(), "true");
+    props.put(HoodieDeltaStreamerConfig.KAFKA_APPEND_OFFSETS.key(), "true");
     jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
     kafkaSource = new SourceFormatAdapter(jsonSource);
     Dataset<Row> d = kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get();
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
index bf761509abf..ddf1e1b784d 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.sources;
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
 import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
 import org.apache.hudi.utilities.schema.ProtoClassBasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -80,7 +81,7 @@ public class TestProtoKafkaSource extends BaseTestKafkaSource {
         maxEventsToReadFromKafkaSource != null ? String.valueOf(maxEventsToReadFromKafkaSource) :
             String.valueOf(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP.defaultValue()));
     props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
-    props.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_CLASS_NAME.key(), Sample.class.getName());
+    props.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key(), Sample.class.getName());
     return props;
   }
 
@@ -98,7 +99,7 @@ public class TestProtoKafkaSource extends BaseTestKafkaSource {
     final String topic = TEST_TOPIC_PREFIX + "testProtoKafkaSourceFlatten";
     testUtils.createTopic(topic, 2);
     TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
-    props.setProperty(ProtoClassBasedSchemaProvider.Config.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(), "true");
+    props.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(), "true");
     SchemaProvider schemaProvider = new ProtoClassBasedSchemaProvider(props, jsc());
     Source protoKafkaSource = new ProtoKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
     SourceFormatAdapter kafkaSource = new SourceFormatAdapter(protoKafkaSource);
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SanitizationTestUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SanitizationTestUtils.java
index d54b0b54e67..2349fad1d76 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SanitizationTestUtils.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SanitizationTestUtils.java
@@ -33,7 +33,7 @@ import org.junit.jupiter.params.provider.Arguments;
 
 import java.util.stream.Stream;
 
-import static org.apache.hudi.utilities.sources.helpers.SanitizationUtils.Config.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
+import static org.apache.hudi.utilities.config.HoodieDeltaStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK;
 
 public class SanitizationTestUtils {
   public static String invalidCharMask = SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue();