You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "lokeshj1703 (via GitHub)" <gi...@apache.org> on 2023/03/10 07:33:05 UTC

[GitHub] [hudi] lokeshj1703 opened a new pull request, #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

lokeshj1703 opened a new pull request, #8152:
URL: https://github.com/apache/hudi/pull/8152

   ### Change Logs
   
   The configs in the following classes are implemented not using HoodieConfig, making it impossible to be surfaced on the Configurations page.  We need to refactor the code so that each config property is implemented using ConfigProperty in a corresponding new HoodieConfig class.  Refer to HoodieArchivalConfig for existing implementation of configs.
   
   InitialCheckPointProvider
   HoodieDeltaStreamer
   HoodieMultiTableDeltaStreamer
   FilebasedSchemaProvider
   HiveSchemaProvider
   JdbcbasedSchemaProvider
   ProtoClassBasedSchemaProvider
   SchemaPostProcessor
   SchemaRegistryProvider
   SparkAvroPostProcessor
   DropColumnSchemaPostProcessor
   BaseSchemaPostProcessorConfig
   
   KafkaOffsetPostProcessor
   
   SanitizationUtils
   
   Also 'hoodie.deltastreamer.multiwriter.source.checkpoint.id' in HoodieWriteConfig
   
   ### Impact
   
   NA
   
   ### Risk level (write none, low medium or high below)
   
   low
   
   ### Documentation Update
   
   _Describe any necessary documentation update if there is any new feature, config, or user-facing change_
   
   - _The config description must be updated if new configs are added or the default value of the configs are changed_
   - _Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
     ticket number here and follow the [instruction](https://hudi.apache.org/contribute/developer-setup#website) to make
     changes to the website._
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8152:
URL: https://github.com/apache/hudi/pull/8152#issuecomment-1491532633

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15682",
       "triggerID" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15760",
       "triggerID" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15761",
       "triggerID" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "95a8f8750fb4a3ece5166c12796c4d565a6e1692",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15762",
       "triggerID" : "95a8f8750fb4a3ece5166c12796c4d565a6e1692",
       "triggerType" : "PUSH"
     }, {
       "hash" : "242e6f5531391c0f1de80c52c118cd841dca1c04",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15872",
       "triggerID" : "242e6f5531391c0f1de80c52c118cd841dca1c04",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1ca78802a885c6510565a89eb6435cc62de3957e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1ca78802a885c6510565a89eb6435cc62de3957e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 242e6f5531391c0f1de80c52c118cd841dca1c04 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15872) 
   * 1ca78802a885c6510565a89eb6435cc62de3957e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8152:
URL: https://github.com/apache/hudi/pull/8152#issuecomment-1465542770

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15682",
       "triggerID" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a5158ffe8b28d05dd4b523ac7401ee7df215416f Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15682) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #8152:
URL: https://github.com/apache/hudi/pull/8152#discussion_r1145712079


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/BaseSchemaPostProcessorConfig.java:
##########
@@ -7,23 +7,33 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.utilities.schema.postprocessor.add;
+package org.apache.hudi.utilities.config;
 
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
 
 /**
  * Base configs to describe a primitive type column.
  */
-public class BaseSchemaPostProcessorConfig {
+@Immutable
+@ConfigClassProperty(name = "Schema Post Processor Config Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Base configs to describe a primitive type column.")
+public class BaseSchemaPostProcessorConfig extends HoodieConfig {

Review Comment:
   Addressed.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java:
##########
@@ -442,17 +443,11 @@ public void sync() {
   }
 
   public static class Constants {

Review Comment:
   Addressed.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/BaseSchemaPostProcessorConfig.java:
##########
@@ -7,23 +7,33 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.utilities.schema.postprocessor.add;
+package org.apache.hudi.utilities.config;
 
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
 
 /**
  * Base configs to describe a primitive type column.
  */
-public class BaseSchemaPostProcessorConfig {
+@Immutable
+@ConfigClassProperty(name = "Schema Post Processor Config Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Base configs to describe a primitive type column.")

Review Comment:
   Addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] yihua commented on a diff in pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "yihua (via GitHub)" <gi...@apache.org>.
yihua commented on code in PR #8152:
URL: https://github.com/apache/hudi/pull/8152#discussion_r1155050423


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ProtoClassBasedSchemaProviderConfig.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import static org.apache.hudi.utilities.config.HoodieSchemaProviderConfig.SCHEMAPROVIDER_CONFIG_PREFIX;
+
+/**
+ * JDBC-based Schema Provider Configs.
+ */
+@Immutable
+@ConfigClassProperty(name = "JDBC-based Schema Provider Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    subGroupName = ConfigGroups.SubGroupNames.SCHEMA_PROVIDER,
+    description = "Configurations for Proto schema provider.")
+public class ProtoClassBasedSchemaProviderConfig extends HoodieConfig {
+  private static final String PROTO_SCHEMA_PROVIDER_PREFIX = SCHEMAPROVIDER_CONFIG_PREFIX + "proto";

Review Comment:
   What I mean is that we should follow the convention where the prefix should not end with dot, to be consistent with other config classes like `HoodieMetadataConfig`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] yihua commented on a diff in pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "yihua (via GitHub)" <gi...@apache.org>.
yihua commented on code in PR #8152:
URL: https://github.com/apache/hudi/pull/8152#discussion_r1152533507


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ProtoClassBasedSchemaProviderConfig.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import static org.apache.hudi.utilities.config.HoodieSchemaProviderConfig.SCHEMAPROVIDER_CONFIG_PREFIX;
+
+/**
+ * JDBC-based Schema Provider Configs.
+ */
+@Immutable
+@ConfigClassProperty(name = "JDBC-based Schema Provider Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    subGroupName = ConfigGroups.SubGroupNames.SCHEMA_PROVIDER,
+    description = "Configurations for Proto schema provider.")
+public class ProtoClassBasedSchemaProviderConfig extends HoodieConfig {
+  private static final String PROTO_SCHEMA_PROVIDER_PREFIX = SCHEMAPROVIDER_CONFIG_PREFIX + "proto";

Review Comment:
   We should follow the second convention.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8152:
URL: https://github.com/apache/hudi/pull/8152#issuecomment-1472709694

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15682",
       "triggerID" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15760",
       "triggerID" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15761",
       "triggerID" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "95a8f8750fb4a3ece5166c12796c4d565a6e1692",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "95a8f8750fb4a3ece5166c12796c4d565a6e1692",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 21b703c9c0b9d9a77ad9294c71d7d6473b047a5e Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15761) 
   * 95a8f8750fb4a3ece5166c12796c4d565a6e1692 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] yihua commented on a diff in pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "yihua (via GitHub)" <gi...@apache.org>.
yihua commented on code in PR #8152:
URL: https://github.com/apache/hudi/pull/8152#discussion_r1138116099


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  private static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_RESET_KEY = ConfigProperty
+      .key("deltastreamer.checkpoint.reset_key")
+      .noDefaultValue()
+      .withDocumentation("");

Review Comment:
   I added all documentation now so a separate JIRA ticket is not needed.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java:
##########
@@ -110,7 +110,6 @@ public class HoodieDeltaStreamer implements Serializable {
   private static final String SENSITIVE_VALUES_MASKED = "SENSITIVE_INFO_MASKED";
 
   public static final String CHECKPOINT_KEY = HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY;
-  public static final String CHECKPOINT_RESET_KEY = "deltastreamer.checkpoint.reset_key";

Review Comment:
   Note, this particular key is for commit metadata, which should not be configured by the user.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  private static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_RESET_KEY = ConfigProperty
+      .key("deltastreamer.checkpoint.reset_key")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"

Review Comment:
   Fixed.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  private static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_RESET_KEY = ConfigProperty
+      .key("deltastreamer.checkpoint.reset_key")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
+          + " to archive commits if we cross a maximum value of commits."
+          + " It's recommended to enable this, to ensure number of active commits is bounded.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")
+      .defaultValue("false")
+      .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+          + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+          + "By default its disabled and no kafka offsets are added");
+
+  public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.schema.field.names")
+      .defaultValue(false)
+      .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
+          + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
+          + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
+
+  public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.char.mask")
+      .defaultValue("__")
+      .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
+          + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+
+  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "multiwriter.source.checkpoint.id")
+      .noDefaultValue()
+      .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
+          + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
+          + "a single deltastreamer for a table then you do not need to set this config.");
+
+  public static final ConfigProperty<String> TABLES_TO_BE_INGESTED_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "tablesToBeIngested")
+      .noDefaultValue()
+      .withDocumentation("Comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2");
+
+  public static final ConfigProperty<String> TARGET_BASE_PATH_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "targetBasePath")
+      .noDefaultValue()
+      .withDocumentation("The path to which a particular table is ingested. The config is specific to HoodieMultiTableDeltaStreamer"
+          + " and overrides path determined using option `--base-path-prefix` for a table");
+
+  /**
+   * Delta Streamer Schema Provider related config.
+   */
+  @Immutable
+  @ConfigClassProperty(name = "DeltaStreamer Schema Provider Configs",
+      groupName = ConfigGroups.Names.DELTA_STREAMER,
+      description = "Configurations related to source and target schema for DeltaStreamer.")
+  public static class HoodieDeltaStreamerSchemaProviderConfig extends HoodieConfig {

Review Comment:
   Fixed.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  private static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_RESET_KEY = ConfigProperty
+      .key("deltastreamer.checkpoint.reset_key")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
+          + " to archive commits if we cross a maximum value of commits."
+          + " It's recommended to enable this, to ensure number of active commits is bounded.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")
+      .defaultValue("false")
+      .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+          + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+          + "By default its disabled and no kafka offsets are added");
+
+  public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.schema.field.names")
+      .defaultValue(false)
+      .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
+          + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
+          + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
+
+  public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.char.mask")
+      .defaultValue("__")
+      .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
+          + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+
+  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "multiwriter.source.checkpoint.id")
+      .noDefaultValue()
+      .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
+          + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
+          + "a single deltastreamer for a table then you do not need to set this config.");
+
+  public static final ConfigProperty<String> TABLES_TO_BE_INGESTED_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "tablesToBeIngested")
+      .noDefaultValue()
+      .withDocumentation("Comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2");
+
+  public static final ConfigProperty<String> TARGET_BASE_PATH_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "targetBasePath")
+      .noDefaultValue()
+      .withDocumentation("The path to which a particular table is ingested. The config is specific to HoodieMultiTableDeltaStreamer"
+          + " and overrides path determined using option `--base-path-prefix` for a table");
+
+  /**
+   * Delta Streamer Schema Provider related config.
+   */
+  @Immutable
+  @ConfigClassProperty(name = "DeltaStreamer Schema Provider Configs",
+      groupName = ConfigGroups.Names.DELTA_STREAMER,
+      description = "Configurations related to source and target schema for DeltaStreamer.")
+  public static class HoodieDeltaStreamerSchemaProviderConfig extends HoodieConfig {
+
+    private static final String DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "schemaprovider.";
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SRC_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.url")
+        .noDefaultValue()
+        .withDocumentation("The schema of the source you are reading from e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrl")
+        .noDefaultValue()
+        .withDocumentation("The schema of the target you are writing to e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> SCHEMA_CONVERTER_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.schemaconverter")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<Boolean> SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "spark_avro_post_processor.enable")
+        .defaultValue(false)
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.delete.columns")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.name")
+        .noDefaultValue()
+        .withDocumentation("New column's name");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.type")
+        .noDefaultValue()
+        .withDocumentation("New column's type");
+
+    public static final ConfigProperty<Boolean> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.nullable")
+        .defaultValue(true)
+        .withDocumentation("New column's nullable");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.default")
+        .noDefaultValue()
+        .withDocumentation("New column's default value");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.doc")
+        .noDefaultValue()
+        .withDocumentation("Docs about new column");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_BASE_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.baseUrl")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_URL_SUFFIX_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.urlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.sourceUrlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_TARGET_URL_SUFFIX = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_FILE_PROP = ConfigProperty

Review Comment:
   Fixed.  A new class is added for `FilebasedSchemaProvider` configs.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  private static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_RESET_KEY = ConfigProperty
+      .key("deltastreamer.checkpoint.reset_key")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
+          + " to archive commits if we cross a maximum value of commits."
+          + " It's recommended to enable this, to ensure number of active commits is bounded.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")
+      .defaultValue("false")
+      .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+          + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+          + "By default its disabled and no kafka offsets are added");
+
+  public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.schema.field.names")
+      .defaultValue(false)
+      .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
+          + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
+          + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
+
+  public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.char.mask")
+      .defaultValue("__")
+      .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
+          + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+
+  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "multiwriter.source.checkpoint.id")
+      .noDefaultValue()
+      .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
+          + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
+          + "a single deltastreamer for a table then you do not need to set this config.");
+
+  public static final ConfigProperty<String> TABLES_TO_BE_INGESTED_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "tablesToBeIngested")
+      .noDefaultValue()
+      .withDocumentation("Comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2");
+
+  public static final ConfigProperty<String> TARGET_BASE_PATH_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "targetBasePath")
+      .noDefaultValue()
+      .withDocumentation("The path to which a particular table is ingested. The config is specific to HoodieMultiTableDeltaStreamer"
+          + " and overrides path determined using option `--base-path-prefix` for a table");
+
+  /**
+   * Delta Streamer Schema Provider related config.
+   */
+  @Immutable
+  @ConfigClassProperty(name = "DeltaStreamer Schema Provider Configs",
+      groupName = ConfigGroups.Names.DELTA_STREAMER,
+      description = "Configurations related to source and target schema for DeltaStreamer.")
+  public static class HoodieDeltaStreamerSchemaProviderConfig extends HoodieConfig {
+
+    private static final String DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "schemaprovider.";
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SRC_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.url")
+        .noDefaultValue()
+        .withDocumentation("The schema of the source you are reading from e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrl")
+        .noDefaultValue()
+        .withDocumentation("The schema of the target you are writing to e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> SCHEMA_CONVERTER_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.schemaconverter")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<Boolean> SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "spark_avro_post_processor.enable")
+        .defaultValue(false)
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.delete.columns")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.name")
+        .noDefaultValue()
+        .withDocumentation("New column's name");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.type")
+        .noDefaultValue()
+        .withDocumentation("New column's type");
+
+    public static final ConfigProperty<Boolean> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.nullable")
+        .defaultValue(true)
+        .withDocumentation("New column's nullable");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.default")
+        .noDefaultValue()
+        .withDocumentation("New column's default value");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.doc")
+        .noDefaultValue()
+        .withDocumentation("Docs about new column");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_BASE_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.baseUrl")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_URL_SUFFIX_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.urlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.sourceUrlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_TARGET_URL_SUFFIX = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_FILE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.file")
+        .noDefaultValue()
+        .withDocumentation("The schema of the source you are reading from");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_FILE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.file")
+        .noDefaultValue()
+        .withDocumentation("The schema of the target you are writing to");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_DATABASE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.hive.database")
+        .noDefaultValue()
+        .withDocumentation("Hive database from where source schema can be fetched");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_TABLE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.hive.table")
+        .noDefaultValue()
+        .withDocumentation("Hive table from where source schema can be fetched");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_DATABASE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.hive.database")
+        .noDefaultValue()
+        .withDocumentation("Hive database from where target schema can be fetched");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_TABLE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.hive.table")
+        .noDefaultValue()
+        .withDocumentation("Hive table from where target schema can be fetched");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_CONNECTION_URL = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.connection.url")
+        .noDefaultValue()
+        .withDocumentation("The JDBC URL to connect to. The source-specific connection properties may be specified in the URL."
+            + " e.g., jdbc:postgresql://localhost/test?user=fred&password=secret");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_DRIVER_TYPE = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.driver.type")
+        .noDefaultValue()
+        .withDocumentation("The class name of the JDBC driver to use to connect to this URL. e.g. org.h2.Driver");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_USERNAME = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.username")
+        .noDefaultValue()
+        .withDocumentation("Username for the connection e.g. fred");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_PASSWORD = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.password")
+        .noDefaultValue()
+        .withDocumentation("Password for the connection e.g. secret");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_DBTABLE = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.dbtable")
+        .noDefaultValue()
+        .withDocumentation("The table with the schema to reference e.g. test_database.test1_table or test1_table");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_TIMEOUT = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.timeout")
+        .noDefaultValue()
+        .withDocumentation("The number of seconds the driver will wait for a Statement object to execute. Zero means there is no limit. "
+            + "In the write path, this option depends on how JDBC drivers implement the API setQueryTimeout, e.g., the h2 JDBC driver "
+            + "checks the timeout of each query instead of an entire JDBC batch. It defaults to 0.");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_NULLABLE = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.nullable")
+        .noDefaultValue()
+        .withDocumentation("If true, all the columns are nullable.");
+
+    private static final ConfigProperty<String> PROTO_SCHEMA_PROVIDER_PREFIX = ConfigProperty

Review Comment:
   Fixed.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  private static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_RESET_KEY = ConfigProperty
+      .key("deltastreamer.checkpoint.reset_key")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
+          + " to archive commits if we cross a maximum value of commits."
+          + " It's recommended to enable this, to ensure number of active commits is bounded.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")
+      .defaultValue("false")
+      .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+          + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+          + "By default its disabled and no kafka offsets are added");
+
+  public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.schema.field.names")
+      .defaultValue(false)
+      .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
+          + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
+          + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
+
+  public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.char.mask")
+      .defaultValue("__")
+      .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
+          + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+
+  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "multiwriter.source.checkpoint.id")
+      .noDefaultValue()
+      .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
+          + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
+          + "a single deltastreamer for a table then you do not need to set this config.");
+
+  public static final ConfigProperty<String> TABLES_TO_BE_INGESTED_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "tablesToBeIngested")
+      .noDefaultValue()
+      .withDocumentation("Comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2");
+
+  public static final ConfigProperty<String> TARGET_BASE_PATH_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "targetBasePath")
+      .noDefaultValue()
+      .withDocumentation("The path to which a particular table is ingested. The config is specific to HoodieMultiTableDeltaStreamer"
+          + " and overrides path determined using option `--base-path-prefix` for a table");
+
+  /**
+   * Delta Streamer Schema Provider related config.
+   */
+  @Immutable
+  @ConfigClassProperty(name = "DeltaStreamer Schema Provider Configs",
+      groupName = ConfigGroups.Names.DELTA_STREAMER,
+      description = "Configurations related to source and target schema for DeltaStreamer.")
+  public static class HoodieDeltaStreamerSchemaProviderConfig extends HoodieConfig {
+
+    private static final String DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "schemaprovider.";
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SRC_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.url")
+        .noDefaultValue()
+        .withDocumentation("The schema of the source you are reading from e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrl")
+        .noDefaultValue()
+        .withDocumentation("The schema of the target you are writing to e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> SCHEMA_CONVERTER_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.schemaconverter")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<Boolean> SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "spark_avro_post_processor.enable")
+        .defaultValue(false)
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.delete.columns")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.name")
+        .noDefaultValue()
+        .withDocumentation("New column's name");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.type")
+        .noDefaultValue()
+        .withDocumentation("New column's type");
+
+    public static final ConfigProperty<Boolean> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.nullable")
+        .defaultValue(true)
+        .withDocumentation("New column's nullable");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.default")
+        .noDefaultValue()
+        .withDocumentation("New column's default value");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.doc")
+        .noDefaultValue()
+        .withDocumentation("Docs about new column");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_BASE_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.baseUrl")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_URL_SUFFIX_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.urlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.sourceUrlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_TARGET_URL_SUFFIX = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_FILE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.file")
+        .noDefaultValue()
+        .withDocumentation("The schema of the source you are reading from");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_FILE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.file")
+        .noDefaultValue()
+        .withDocumentation("The schema of the target you are writing to");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_DATABASE_PROP = ConfigProperty

Review Comment:
   Fixed.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java:
##########
@@ -110,7 +110,6 @@ public class HoodieDeltaStreamer implements Serializable {
   private static final String SENSITIVE_VALUES_MASKED = "SENSITIVE_INFO_MASKED";
 
   public static final String CHECKPOINT_KEY = HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY;
-  public static final String CHECKPOINT_RESET_KEY = "deltastreamer.checkpoint.reset_key";

Review Comment:
   We still keep this variable here.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  private static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_RESET_KEY = ConfigProperty
+      .key("deltastreamer.checkpoint.reset_key")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH_PROP = ConfigProperty

Review Comment:
   Fixed.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  private static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_RESET_KEY = ConfigProperty
+      .key("deltastreamer.checkpoint.reset_key")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
+          + " to archive commits if we cross a maximum value of commits."
+          + " It's recommended to enable this, to ensure number of active commits is bounded.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")
+      .defaultValue("false")
+      .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+          + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+          + "By default its disabled and no kafka offsets are added");
+
+  public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.schema.field.names")
+      .defaultValue(false)
+      .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
+          + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
+          + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
+
+  public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.char.mask")
+      .defaultValue("__")
+      .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
+          + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+
+  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "multiwriter.source.checkpoint.id")
+      .noDefaultValue()
+      .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
+          + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
+          + "a single deltastreamer for a table then you do not need to set this config.");
+
+  public static final ConfigProperty<String> TABLES_TO_BE_INGESTED_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "tablesToBeIngested")
+      .noDefaultValue()
+      .withDocumentation("Comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2");
+
+  public static final ConfigProperty<String> TARGET_BASE_PATH_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "targetBasePath")
+      .noDefaultValue()
+      .withDocumentation("The path to which a particular table is ingested. The config is specific to HoodieMultiTableDeltaStreamer"
+          + " and overrides path determined using option `--base-path-prefix` for a table");
+
+  /**
+   * Delta Streamer Schema Provider related config.
+   */
+  @Immutable
+  @ConfigClassProperty(name = "DeltaStreamer Schema Provider Configs",
+      groupName = ConfigGroups.Names.DELTA_STREAMER,
+      description = "Configurations related to source and target schema for DeltaStreamer.")
+  public static class HoodieDeltaStreamerSchemaProviderConfig extends HoodieConfig {
+
+    private static final String DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "schemaprovider.";

Review Comment:
   Fixed.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  private static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_RESET_KEY = ConfigProperty
+      .key("deltastreamer.checkpoint.reset_key")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
+          + " to archive commits if we cross a maximum value of commits."
+          + " It's recommended to enable this, to ensure number of active commits is bounded.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")
+      .defaultValue("false")
+      .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+          + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+          + "By default its disabled and no kafka offsets are added");
+
+  public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.schema.field.names")
+      .defaultValue(false)
+      .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
+          + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
+          + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
+
+  public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.char.mask")
+      .defaultValue("__")
+      .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
+          + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+
+  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "multiwriter.source.checkpoint.id")
+      .noDefaultValue()
+      .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
+          + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
+          + "a single deltastreamer for a table then you do not need to set this config.");
+
+  public static final ConfigProperty<String> TABLES_TO_BE_INGESTED_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "tablesToBeIngested")
+      .noDefaultValue()
+      .withDocumentation("Comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2");
+
+  public static final ConfigProperty<String> TARGET_BASE_PATH_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "targetBasePath")
+      .noDefaultValue()
+      .withDocumentation("The path to which a particular table is ingested. The config is specific to HoodieMultiTableDeltaStreamer"
+          + " and overrides path determined using option `--base-path-prefix` for a table");
+
+  /**
+   * Delta Streamer Schema Provider related config.
+   */
+  @Immutable
+  @ConfigClassProperty(name = "DeltaStreamer Schema Provider Configs",
+      groupName = ConfigGroups.Names.DELTA_STREAMER,
+      description = "Configurations related to source and target schema for DeltaStreamer.")
+  public static class HoodieDeltaStreamerSchemaProviderConfig extends HoodieConfig {
+
+    private static final String DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "schemaprovider.";
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SRC_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.url")
+        .noDefaultValue()
+        .withDocumentation("The schema of the source you are reading from e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrl")
+        .noDefaultValue()
+        .withDocumentation("The schema of the target you are writing to e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> SCHEMA_CONVERTER_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.schemaconverter")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<Boolean> SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "spark_avro_post_processor.enable")
+        .defaultValue(false)
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.delete.columns")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.name")
+        .noDefaultValue()
+        .withDocumentation("New column's name");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.type")
+        .noDefaultValue()
+        .withDocumentation("New column's type");
+
+    public static final ConfigProperty<Boolean> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.nullable")
+        .defaultValue(true)
+        .withDocumentation("New column's nullable");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.default")
+        .noDefaultValue()
+        .withDocumentation("New column's default value");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.doc")
+        .noDefaultValue()
+        .withDocumentation("Docs about new column");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_BASE_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.baseUrl")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_URL_SUFFIX_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.urlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.sourceUrlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_TARGET_URL_SUFFIX = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_FILE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.file")
+        .noDefaultValue()
+        .withDocumentation("The schema of the source you are reading from");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_FILE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.file")
+        .noDefaultValue()
+        .withDocumentation("The schema of the target you are writing to");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_DATABASE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.hive.database")
+        .noDefaultValue()
+        .withDocumentation("Hive database from where source schema can be fetched");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_TABLE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.hive.table")
+        .noDefaultValue()
+        .withDocumentation("Hive table from where source schema can be fetched");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_DATABASE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.hive.database")
+        .noDefaultValue()
+        .withDocumentation("Hive database from where target schema can be fetched");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_TABLE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.hive.table")
+        .noDefaultValue()
+        .withDocumentation("Hive table from where target schema can be fetched");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_CONNECTION_URL = ConfigProperty

Review Comment:
   Fixed.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  private static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_RESET_KEY = ConfigProperty
+      .key("deltastreamer.checkpoint.reset_key")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
+          + " to archive commits if we cross a maximum value of commits."
+          + " It's recommended to enable this, to ensure number of active commits is bounded.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")
+      .defaultValue("false")
+      .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+          + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+          + "By default its disabled and no kafka offsets are added");
+
+  public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.schema.field.names")
+      .defaultValue(false)
+      .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
+          + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
+          + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
+
+  public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.char.mask")
+      .defaultValue("__")
+      .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
+          + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+
+  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "multiwriter.source.checkpoint.id")
+      .noDefaultValue()
+      .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
+          + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
+          + "a single deltastreamer for a table then you do not need to set this config.");
+
+  public static final ConfigProperty<String> TABLES_TO_BE_INGESTED_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "tablesToBeIngested")
+      .noDefaultValue()
+      .withDocumentation("Comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2");
+
+  public static final ConfigProperty<String> TARGET_BASE_PATH_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "targetBasePath")
+      .noDefaultValue()
+      .withDocumentation("The path to which a particular table is ingested. The config is specific to HoodieMultiTableDeltaStreamer"
+          + " and overrides path determined using option `--base-path-prefix` for a table");
+
+  /**
+   * Delta Streamer Schema Provider related config.
+   */
+  @Immutable
+  @ConfigClassProperty(name = "DeltaStreamer Schema Provider Configs",
+      groupName = ConfigGroups.Names.DELTA_STREAMER,
+      description = "Configurations related to source and target schema for DeltaStreamer.")
+  public static class HoodieDeltaStreamerSchemaProviderConfig extends HoodieConfig {
+
+    private static final String DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "schemaprovider.";
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SRC_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.url")
+        .noDefaultValue()
+        .withDocumentation("The schema of the source you are reading from e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrl")
+        .noDefaultValue()
+        .withDocumentation("The schema of the target you are writing to e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> SCHEMA_CONVERTER_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.schemaconverter")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<Boolean> SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "spark_avro_post_processor.enable")
+        .defaultValue(false)
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.delete.columns")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP = ConfigProperty

Review Comment:
   I keep the `BaseSchemaPostProcessorConfig`, which extends `HoodieConfig`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on a diff in pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "xushiyan (via GitHub)" <gi...@apache.org>.
xushiyan commented on code in PR #8152:
URL: https://github.com/apache/hudi/pull/8152#discussion_r1142847225


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/BaseSchemaPostProcessorConfig.java:
##########
@@ -7,23 +7,33 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.utilities.schema.postprocessor.add;
+package org.apache.hudi.utilities.config;
 
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
 
 /**
  * Base configs to describe a primitive type column.
  */
-public class BaseSchemaPostProcessorConfig {
+@Immutable
+@ConfigClassProperty(name = "Schema Post Processor Config Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Base configs to describe a primitive type column.")
+public class BaseSchemaPostProcessorConfig extends HoodieConfig {

Review Comment:
   not sure why it's called `Base` in the first place. would it better to remove `Base` prefix to avoid implication of subclassing?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/BaseSchemaPostProcessorConfig.java:
##########
@@ -7,23 +7,33 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.utilities.schema.postprocessor.add;
+package org.apache.hudi.utilities.config;
 
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
 
 /**
  * Base configs to describe a primitive type column.
  */
-public class BaseSchemaPostProcessorConfig {
+@Immutable
+@ConfigClassProperty(name = "Schema Post Processor Config Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Base configs to describe a primitive type column.")
+public class BaseSchemaPostProcessorConfig extends HoodieConfig {

Review Comment:
   should this be under SchemaProvider subgroup? as it's tightly coupled with SchemaProvider



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java:
##########
@@ -442,17 +443,11 @@ public void sync() {
   }
 
   public static class Constants {

Review Comment:
   this class ideally should be private. but for tests, let's make it and its members all package access?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8152:
URL: https://github.com/apache/hudi/pull/8152#issuecomment-1465537271

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a5158ffe8b28d05dd4b523ac7401ee7df215416f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8152:
URL: https://github.com/apache/hudi/pull/8152#issuecomment-1465698810

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15682",
       "triggerID" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a5158ffe8b28d05dd4b523ac7401ee7df215416f Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15682) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8152:
URL: https://github.com/apache/hudi/pull/8152#issuecomment-1480656013

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15682",
       "triggerID" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15760",
       "triggerID" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15761",
       "triggerID" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "95a8f8750fb4a3ece5166c12796c4d565a6e1692",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15762",
       "triggerID" : "95a8f8750fb4a3ece5166c12796c4d565a6e1692",
       "triggerType" : "PUSH"
     }, {
       "hash" : "242e6f5531391c0f1de80c52c118cd841dca1c04",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15872",
       "triggerID" : "242e6f5531391c0f1de80c52c118cd841dca1c04",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 95a8f8750fb4a3ece5166c12796c4d565a6e1692 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15762) 
   * 242e6f5531391c0f1de80c52c118cd841dca1c04 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15872) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on PR #8152:
URL: https://github.com/apache/hudi/pull/8152#issuecomment-1480657709

   > can you think how we can ensure we don't make any mistakes on config key (string) while doing this refactoring? for eg, 
   is it possible to compare config keys prior to this patch to config keys w/ this patch and ensure they match 1 on 1.
   
   I do not know of a good way to do this. Ideally if we had a hudi-site.xml or sth like that we could compare the new configs and the old configs. Is there a good way of achieving this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8152:
URL: https://github.com/apache/hudi/pull/8152#issuecomment-1472429567

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15682",
       "triggerID" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a5158ffe8b28d05dd4b523ac7401ee7df215416f Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15682) 
   * 257289e10914bf590c4e8ce9317fbf9773130f55 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8152:
URL: https://github.com/apache/hudi/pull/8152#issuecomment-1480650777

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15682",
       "triggerID" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15760",
       "triggerID" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15761",
       "triggerID" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "95a8f8750fb4a3ece5166c12796c4d565a6e1692",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15762",
       "triggerID" : "95a8f8750fb4a3ece5166c12796c4d565a6e1692",
       "triggerType" : "PUSH"
     }, {
       "hash" : "242e6f5531391c0f1de80c52c118cd841dca1c04",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "242e6f5531391c0f1de80c52c118cd841dca1c04",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 95a8f8750fb4a3ece5166c12796c4d565a6e1692 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15762) 
   * 242e6f5531391c0f1de80c52c118cd841dca1c04 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8152:
URL: https://github.com/apache/hudi/pull/8152#issuecomment-1491870371

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15682",
       "triggerID" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15760",
       "triggerID" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15761",
       "triggerID" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "95a8f8750fb4a3ece5166c12796c4d565a6e1692",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15762",
       "triggerID" : "95a8f8750fb4a3ece5166c12796c4d565a6e1692",
       "triggerType" : "PUSH"
     }, {
       "hash" : "242e6f5531391c0f1de80c52c118cd841dca1c04",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15872",
       "triggerID" : "242e6f5531391c0f1de80c52c118cd841dca1c04",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1ca78802a885c6510565a89eb6435cc62de3957e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16015",
       "triggerID" : "1ca78802a885c6510565a89eb6435cc62de3957e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1ca78802a885c6510565a89eb6435cc62de3957e Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16015) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8152:
URL: https://github.com/apache/hudi/pull/8152#issuecomment-1472639192

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15682",
       "triggerID" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15760",
       "triggerID" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a5158ffe8b28d05dd4b523ac7401ee7df215416f Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15682) 
   * 257289e10914bf590c4e8ce9317fbf9773130f55 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15760) 
   * 21b703c9c0b9d9a77ad9294c71d7d6473b047a5e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8152:
URL: https://github.com/apache/hudi/pull/8152#issuecomment-1472697938

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15682",
       "triggerID" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15760",
       "triggerID" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15761",
       "triggerID" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "95a8f8750fb4a3ece5166c12796c4d565a6e1692",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "95a8f8750fb4a3ece5166c12796c4d565a6e1692",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 257289e10914bf590c4e8ce9317fbf9773130f55 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15760) 
   * 21b703c9c0b9d9a77ad9294c71d7d6473b047a5e Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15761) 
   * 95a8f8750fb4a3ece5166c12796c4d565a6e1692 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "nsivabalan (via GitHub)" <gi...@apache.org>.
nsivabalan commented on code in PR #8152:
URL: https://github.com/apache/hudi/pull/8152#discussion_r1142860752


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  public static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("The path for providing the checkpoints.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("Kafka topic name.");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")

Review Comment:
   same question as above



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/BaseSchemaPostProcessorConfig.java:
##########
@@ -7,23 +7,33 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.utilities.schema.postprocessor.add;
+package org.apache.hudi.utilities.config;
 
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
 import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
 
 /**
  * Base configs to describe a primitive type column.
  */
-public class BaseSchemaPostProcessorConfig {
+@Immutable
+@ConfigClassProperty(name = "Schema Post Processor Config Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Base configs to describe a primitive type column.")

Review Comment:
   guess we need to fix L30 and this description. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  public static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("The path for providing the checkpoints.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC = ConfigProperty

Review Comment:
   is this not specific to kafka source? why its located in this file ?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  public static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("The path for providing the checkpoints.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("Kafka topic name.");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")
+      .defaultValue("false")
+      .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+          + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+          + "By default its disabled and no kafka offsets are added");
+
+  public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.schema.field.names")
+      .defaultValue(false)
+      .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
+          + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
+          + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
+
+  public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.char.mask")
+      .defaultValue("__")
+      .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
+          + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+
+  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "multiwriter.source.checkpoint.id")
+      .noDefaultValue()
+      .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
+          + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
+          + "a single deltastreamer for a table then you do not need to set this config.");
+
+  public static final ConfigProperty<String> TABLES_TO_BE_INGESTED = ConfigProperty
+      .key(INGESTION_PREFIX + "tablesToBeIngested")
+      .noDefaultValue()
+      .withDocumentation("Comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2");
+
+  public static final ConfigProperty<String> TARGET_BASE_PATH = ConfigProperty
+      .key(INGESTION_PREFIX + "targetBasePath")
+      .noDefaultValue()
+      .withDocumentation("The path to which a particular table is ingested. The config is specific to HoodieMultiTableDeltaStreamer"
+          + " and overrides path determined using option `--base-path-prefix` for a table");

Review Comment:
   so for a single table deltastreamer, his config value is ignore right? can we call it out



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ProtoClassBasedSchemaProviderConfig.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import static org.apache.hudi.utilities.config.HoodieSchemaProviderConfig.SCHEMAPROVIDER_CONFIG_PREFIX;
+
+/**
+ * JDBC-based Schema Provider Configs.
+ */
+@Immutable
+@ConfigClassProperty(name = "JDBC-based Schema Provider Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    subGroupName = ConfigGroups.SubGroupNames.SCHEMA_PROVIDER,
+    description = "Configurations for Proto schema provider.")
+public class ProtoClassBasedSchemaProviderConfig extends HoodieConfig {
+  private static final String PROTO_SCHEMA_PROVIDER_PREFIX = SCHEMAPROVIDER_CONFIG_PREFIX + "proto";

Review Comment:
   minor :
   in previous classes, I see "." at the end of prefix variables. and individual ones don't have "." at the beginning. but here we are changing it. can we standardize on one. 
   
   for eg, 
   
   SCHEMAPROVIDER_CONFIG_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "schemaprovider."
   
   ends with dot (".")
   
   and then each config is named as 
   SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor"
   
   there is no dot (".") here. 
   
   where as in this class, its diff. 
   PROTO_SCHEMA_PROVIDER_PREFIX = SCHEMAPROVIDER_CONFIG_PREFIX + "proto";
   PROTO_SCHEMA_PROVIDER_PREFIX + ".class.name"
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8152:
URL: https://github.com/apache/hudi/pull/8152#issuecomment-1480902560

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15682",
       "triggerID" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15760",
       "triggerID" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15761",
       "triggerID" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "95a8f8750fb4a3ece5166c12796c4d565a6e1692",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15762",
       "triggerID" : "95a8f8750fb4a3ece5166c12796c4d565a6e1692",
       "triggerType" : "PUSH"
     }, {
       "hash" : "242e6f5531391c0f1de80c52c118cd841dca1c04",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15872",
       "triggerID" : "242e6f5531391c0f1de80c52c118cd841dca1c04",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 242e6f5531391c0f1de80c52c118cd841dca1c04 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15872) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8152:
URL: https://github.com/apache/hudi/pull/8152#issuecomment-1472726357

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15682",
       "triggerID" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15760",
       "triggerID" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15761",
       "triggerID" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "95a8f8750fb4a3ece5166c12796c4d565a6e1692",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15762",
       "triggerID" : "95a8f8750fb4a3ece5166c12796c4d565a6e1692",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 21b703c9c0b9d9a77ad9294c71d7d6473b047a5e Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15761) 
   * 95a8f8750fb4a3ece5166c12796c4d565a6e1692 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15762) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8152:
URL: https://github.com/apache/hudi/pull/8152#issuecomment-1472647404

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15682",
       "triggerID" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15760",
       "triggerID" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15761",
       "triggerID" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 257289e10914bf590c4e8ce9317fbf9773130f55 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15760) 
   * 21b703c9c0b9d9a77ad9294c71d7d6473b047a5e Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15761) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] yihua commented on pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "yihua (via GitHub)" <gi...@apache.org>.
yihua commented on PR #8152:
URL: https://github.com/apache/hudi/pull/8152#issuecomment-1489388199

   > can you think how we can ensure we don't make any mistakes on config key (string) while doing this refactoring? for eg, is it possible to compare config keys prior to this patch to config keys w/ this patch and ensure they match 1 on 1.
   
   During my review I verified that there's one-to-one mapping between the configs before and after the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8152:
URL: https://github.com/apache/hudi/pull/8152#issuecomment-1472897711

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15682",
       "triggerID" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15760",
       "triggerID" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15761",
       "triggerID" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "95a8f8750fb4a3ece5166c12796c4d565a6e1692",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15762",
       "triggerID" : "95a8f8750fb4a3ece5166c12796c4d565a6e1692",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 95a8f8750fb4a3ece5166c12796c4d565a6e1692 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15762) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] yihua merged pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "yihua (via GitHub)" <gi...@apache.org>.
yihua merged PR #8152:
URL: https://github.com/apache/hudi/pull/8152


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #8152:
URL: https://github.com/apache/hudi/pull/8152#discussion_r1145712418


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  public static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("The path for providing the checkpoints.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("Kafka topic name.");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")
+      .defaultValue("false")
+      .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+          + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+          + "By default its disabled and no kafka offsets are added");
+
+  public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.schema.field.names")
+      .defaultValue(false)
+      .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
+          + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
+          + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
+
+  public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.char.mask")
+      .defaultValue("__")
+      .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
+          + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+
+  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "multiwriter.source.checkpoint.id")
+      .noDefaultValue()
+      .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
+          + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
+          + "a single deltastreamer for a table then you do not need to set this config.");
+
+  public static final ConfigProperty<String> TABLES_TO_BE_INGESTED = ConfigProperty
+      .key(INGESTION_PREFIX + "tablesToBeIngested")
+      .noDefaultValue()
+      .withDocumentation("Comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2");
+
+  public static final ConfigProperty<String> TARGET_BASE_PATH = ConfigProperty
+      .key(INGESTION_PREFIX + "targetBasePath")
+      .noDefaultValue()
+      .withDocumentation("The path to which a particular table is ingested. The config is specific to HoodieMultiTableDeltaStreamer"
+          + " and overrides path determined using option `--base-path-prefix` for a table");

Review Comment:
   Addressed



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  public static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("The path for providing the checkpoints.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("Kafka topic name.");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")

Review Comment:
   Will fix it as part of HUDI-5780



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  public static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("The path for providing the checkpoints.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC = ConfigProperty

Review Comment:
   Will fix it as part of HUDI-5780



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] yihua commented on a diff in pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "yihua (via GitHub)" <gi...@apache.org>.
yihua commented on code in PR #8152:
URL: https://github.com/apache/hudi/pull/8152#discussion_r1133445729


##########
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java:
##########
@@ -189,7 +190,7 @@ public void commit(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats> genera
       Map<String, String> extraMetadata = new HashMap<>();
       /** Store the checkpoint in the commit metadata just like
        * {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/
-      extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get());
+      extraMetadata.put(HoodieDeltaStreamer.CHECKPOINT_KEY, lastCheckpoint.get());

Review Comment:
   the changes are not strictly required?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  private static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_RESET_KEY = ConfigProperty
+      .key("deltastreamer.checkpoint.reset_key")
+      .noDefaultValue()
+      .withDocumentation("");

Review Comment:
   @lokeshj1703 could you file a JIRA ticket for the follow-up of adding the remaining documentation?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java:
##########
@@ -110,7 +110,6 @@ public class HoodieDeltaStreamer implements Serializable {
   private static final String SENSITIVE_VALUES_MASKED = "SENSITIVE_INFO_MASKED";
 
   public static final String CHECKPOINT_KEY = HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY;
-  public static final String CHECKPOINT_RESET_KEY = "deltastreamer.checkpoint.reset_key";

Review Comment:
   for any public reference like this, instead of removing it, mark it as `@deprecated` and assign the value with `.key()`, for backward compatibility in case it is used outside Hudi repo.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java:
##########
@@ -35,35 +35,6 @@
  * A schema provider that takes in a class name for a generated protobuf class that is on the classpath.
  */
 public class ProtoClassBasedSchemaProvider extends SchemaProvider {
-  /**
-   * Configs supported.
-   */
-  public static class Config {
-    private static final String PROTO_SCHEMA_PROVIDER_PREFIX = "hoodie.deltastreamer.schemaprovider.proto";
-    public static final ConfigProperty<String> PROTO_SCHEMA_CLASS_NAME = ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".class.name")

Review Comment:
   Keep all public ones alike and check all such declarations.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  private static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_RESET_KEY = ConfigProperty
+      .key("deltastreamer.checkpoint.reset_key")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH_PROP = ConfigProperty

Review Comment:
   remove `_PROP` suffix for this and other `ConfigProperty`



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  private static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_RESET_KEY = ConfigProperty
+      .key("deltastreamer.checkpoint.reset_key")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"

Review Comment:
   the docs does not seem to be right.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  private static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_RESET_KEY = ConfigProperty
+      .key("deltastreamer.checkpoint.reset_key")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
+          + " to archive commits if we cross a maximum value of commits."
+          + " It's recommended to enable this, to ensure number of active commits is bounded.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")
+      .defaultValue("false")
+      .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+          + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+          + "By default its disabled and no kafka offsets are added");
+
+  public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.schema.field.names")
+      .defaultValue(false)
+      .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
+          + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
+          + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
+
+  public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.char.mask")
+      .defaultValue("__")
+      .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
+          + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+
+  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "multiwriter.source.checkpoint.id")
+      .noDefaultValue()
+      .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
+          + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
+          + "a single deltastreamer for a table then you do not need to set this config.");
+
+  public static final ConfigProperty<String> TABLES_TO_BE_INGESTED_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "tablesToBeIngested")
+      .noDefaultValue()
+      .withDocumentation("Comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2");
+
+  public static final ConfigProperty<String> TARGET_BASE_PATH_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "targetBasePath")
+      .noDefaultValue()
+      .withDocumentation("The path to which a particular table is ingested. The config is specific to HoodieMultiTableDeltaStreamer"
+          + " and overrides path determined using option `--base-path-prefix` for a table");
+
+  /**
+   * Delta Streamer Schema Provider related config.
+   */
+  @Immutable
+  @ConfigClassProperty(name = "DeltaStreamer Schema Provider Configs",
+      groupName = ConfigGroups.Names.DELTA_STREAMER,
+      description = "Configurations related to source and target schema for DeltaStreamer.")
+  public static class HoodieDeltaStreamerSchemaProviderConfig extends HoodieConfig {
+
+    private static final String DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "schemaprovider.";

Review Comment:
   rename to `SCHEMA_PROVIDER_CONFIG_PREFIX` for simplicity.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  private static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_RESET_KEY = ConfigProperty
+      .key("deltastreamer.checkpoint.reset_key")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
+          + " to archive commits if we cross a maximum value of commits."
+          + " It's recommended to enable this, to ensure number of active commits is bounded.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")
+      .defaultValue("false")
+      .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+          + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+          + "By default its disabled and no kafka offsets are added");
+
+  public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.schema.field.names")
+      .defaultValue(false)
+      .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
+          + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
+          + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
+
+  public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.char.mask")
+      .defaultValue("__")
+      .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
+          + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+
+  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "multiwriter.source.checkpoint.id")
+      .noDefaultValue()
+      .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
+          + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
+          + "a single deltastreamer for a table then you do not need to set this config.");
+
+  public static final ConfigProperty<String> TABLES_TO_BE_INGESTED_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "tablesToBeIngested")
+      .noDefaultValue()
+      .withDocumentation("Comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2");
+
+  public static final ConfigProperty<String> TARGET_BASE_PATH_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "targetBasePath")
+      .noDefaultValue()
+      .withDocumentation("The path to which a particular table is ingested. The config is specific to HoodieMultiTableDeltaStreamer"
+          + " and overrides path determined using option `--base-path-prefix` for a table");
+
+  /**
+   * Delta Streamer Schema Provider related config.
+   */
+  @Immutable
+  @ConfigClassProperty(name = "DeltaStreamer Schema Provider Configs",
+      groupName = ConfigGroups.Names.DELTA_STREAMER,
+      description = "Configurations related to source and target schema for DeltaStreamer.")
+  public static class HoodieDeltaStreamerSchemaProviderConfig extends HoodieConfig {
+
+    private static final String DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "schemaprovider.";
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SRC_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.url")
+        .noDefaultValue()
+        .withDocumentation("The schema of the source you are reading from e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrl")
+        .noDefaultValue()
+        .withDocumentation("The schema of the target you are writing to e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> SCHEMA_CONVERTER_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.schemaconverter")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<Boolean> SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "spark_avro_post_processor.enable")
+        .defaultValue(false)
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.delete.columns")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP = ConfigProperty

Review Comment:
   I see these are moved from `BaseSchemaPostProcessorConfig`.  Should schema post processor configs still be kept in a separate class, extending `HoodieConfig`?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  private static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_RESET_KEY = ConfigProperty
+      .key("deltastreamer.checkpoint.reset_key")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
+          + " to archive commits if we cross a maximum value of commits."
+          + " It's recommended to enable this, to ensure number of active commits is bounded.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")
+      .defaultValue("false")
+      .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+          + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+          + "By default its disabled and no kafka offsets are added");
+
+  public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.schema.field.names")
+      .defaultValue(false)
+      .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
+          + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
+          + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
+
+  public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.char.mask")
+      .defaultValue("__")
+      .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
+          + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+
+  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "multiwriter.source.checkpoint.id")
+      .noDefaultValue()
+      .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
+          + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
+          + "a single deltastreamer for a table then you do not need to set this config.");
+
+  public static final ConfigProperty<String> TABLES_TO_BE_INGESTED_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "tablesToBeIngested")
+      .noDefaultValue()
+      .withDocumentation("Comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2");
+
+  public static final ConfigProperty<String> TARGET_BASE_PATH_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "targetBasePath")
+      .noDefaultValue()
+      .withDocumentation("The path to which a particular table is ingested. The config is specific to HoodieMultiTableDeltaStreamer"
+          + " and overrides path determined using option `--base-path-prefix` for a table");
+
+  /**
+   * Delta Streamer Schema Provider related config.
+   */
+  @Immutable
+  @ConfigClassProperty(name = "DeltaStreamer Schema Provider Configs",
+      groupName = ConfigGroups.Names.DELTA_STREAMER,
+      description = "Configurations related to source and target schema for DeltaStreamer.")
+  public static class HoodieDeltaStreamerSchemaProviderConfig extends HoodieConfig {

Review Comment:
   move this class to a separate file.  Rename it to `HoodieSchemaProviderConfig`.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java:
##########
@@ -36,15 +36,8 @@
 public class KafkaOffsetPostProcessor extends SchemaPostProcessor {
 
   public static class Config {
-    public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty

Review Comment:
   similar here, annotate with `@deprecated` and do `KAFKA_APPEND_OFFSETS = HoodieDeltaStreamerConfig.KAFKA_APPEND_OFFSETS;`



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  private static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_RESET_KEY = ConfigProperty
+      .key("deltastreamer.checkpoint.reset_key")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
+          + " to archive commits if we cross a maximum value of commits."
+          + " It's recommended to enable this, to ensure number of active commits is bounded.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")
+      .defaultValue("false")
+      .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+          + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+          + "By default its disabled and no kafka offsets are added");
+
+  public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.schema.field.names")
+      .defaultValue(false)
+      .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
+          + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
+          + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
+
+  public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.char.mask")
+      .defaultValue("__")
+      .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
+          + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+
+  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "multiwriter.source.checkpoint.id")
+      .noDefaultValue()
+      .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
+          + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
+          + "a single deltastreamer for a table then you do not need to set this config.");
+
+  public static final ConfigProperty<String> TABLES_TO_BE_INGESTED_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "tablesToBeIngested")
+      .noDefaultValue()
+      .withDocumentation("Comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2");
+
+  public static final ConfigProperty<String> TARGET_BASE_PATH_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "targetBasePath")
+      .noDefaultValue()
+      .withDocumentation("The path to which a particular table is ingested. The config is specific to HoodieMultiTableDeltaStreamer"
+          + " and overrides path determined using option `--base-path-prefix` for a table");
+
+  /**
+   * Delta Streamer Schema Provider related config.
+   */
+  @Immutable
+  @ConfigClassProperty(name = "DeltaStreamer Schema Provider Configs",
+      groupName = ConfigGroups.Names.DELTA_STREAMER,
+      description = "Configurations related to source and target schema for DeltaStreamer.")
+  public static class HoodieDeltaStreamerSchemaProviderConfig extends HoodieConfig {
+
+    private static final String DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "schemaprovider.";
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SRC_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.url")
+        .noDefaultValue()
+        .withDocumentation("The schema of the source you are reading from e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrl")
+        .noDefaultValue()
+        .withDocumentation("The schema of the target you are writing to e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> SCHEMA_CONVERTER_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.schemaconverter")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<Boolean> SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "spark_avro_post_processor.enable")
+        .defaultValue(false)
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.delete.columns")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.name")
+        .noDefaultValue()
+        .withDocumentation("New column's name");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.type")
+        .noDefaultValue()
+        .withDocumentation("New column's type");
+
+    public static final ConfigProperty<Boolean> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.nullable")
+        .defaultValue(true)
+        .withDocumentation("New column's nullable");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.default")
+        .noDefaultValue()
+        .withDocumentation("New column's default value");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.doc")
+        .noDefaultValue()
+        .withDocumentation("Docs about new column");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_BASE_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.baseUrl")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_URL_SUFFIX_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.urlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.sourceUrlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_TARGET_URL_SUFFIX = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_FILE_PROP = ConfigProperty

Review Comment:
   Add a comment to indicate the section of configs for `FilebasedSchemaProvider`.  Mention in the docs that the configs apply only to `FilebasedSchemaProvider`.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java:
##########
@@ -35,35 +35,6 @@
  * A schema provider that takes in a class name for a generated protobuf class that is on the classpath.
  */
 public class ProtoClassBasedSchemaProvider extends SchemaProvider {
-  /**
-   * Configs supported.
-   */
-  public static class Config {
-    private static final String PROTO_SCHEMA_PROVIDER_PREFIX = "hoodie.deltastreamer.schemaprovider.proto";

Review Comment:
   the private one can be removed.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  private static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_RESET_KEY = ConfigProperty
+      .key("deltastreamer.checkpoint.reset_key")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
+          + " to archive commits if we cross a maximum value of commits."
+          + " It's recommended to enable this, to ensure number of active commits is bounded.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")
+      .defaultValue("false")
+      .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+          + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+          + "By default its disabled and no kafka offsets are added");
+
+  public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.schema.field.names")
+      .defaultValue(false)
+      .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
+          + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
+          + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
+
+  public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.char.mask")
+      .defaultValue("__")
+      .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
+          + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+
+  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "multiwriter.source.checkpoint.id")
+      .noDefaultValue()
+      .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
+          + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
+          + "a single deltastreamer for a table then you do not need to set this config.");
+
+  public static final ConfigProperty<String> TABLES_TO_BE_INGESTED_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "tablesToBeIngested")
+      .noDefaultValue()
+      .withDocumentation("Comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2");
+
+  public static final ConfigProperty<String> TARGET_BASE_PATH_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "targetBasePath")
+      .noDefaultValue()
+      .withDocumentation("The path to which a particular table is ingested. The config is specific to HoodieMultiTableDeltaStreamer"
+          + " and overrides path determined using option `--base-path-prefix` for a table");
+
+  /**
+   * Delta Streamer Schema Provider related config.
+   */
+  @Immutable
+  @ConfigClassProperty(name = "DeltaStreamer Schema Provider Configs",
+      groupName = ConfigGroups.Names.DELTA_STREAMER,
+      description = "Configurations related to source and target schema for DeltaStreamer.")
+  public static class HoodieDeltaStreamerSchemaProviderConfig extends HoodieConfig {
+
+    private static final String DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "schemaprovider.";
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SRC_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.url")
+        .noDefaultValue()
+        .withDocumentation("The schema of the source you are reading from e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrl")
+        .noDefaultValue()
+        .withDocumentation("The schema of the target you are writing to e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> SCHEMA_CONVERTER_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.schemaconverter")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<Boolean> SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "spark_avro_post_processor.enable")
+        .defaultValue(false)
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.delete.columns")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.name")
+        .noDefaultValue()
+        .withDocumentation("New column's name");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.type")
+        .noDefaultValue()
+        .withDocumentation("New column's type");
+
+    public static final ConfigProperty<Boolean> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.nullable")
+        .defaultValue(true)
+        .withDocumentation("New column's nullable");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.default")
+        .noDefaultValue()
+        .withDocumentation("New column's default value");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.doc")
+        .noDefaultValue()
+        .withDocumentation("Docs about new column");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_BASE_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.baseUrl")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_URL_SUFFIX_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.urlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.sourceUrlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_TARGET_URL_SUFFIX = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_FILE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.file")
+        .noDefaultValue()
+        .withDocumentation("The schema of the source you are reading from");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_FILE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.file")
+        .noDefaultValue()
+        .withDocumentation("The schema of the target you are writing to");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_DATABASE_PROP = ConfigProperty

Review Comment:
   Mark these for `HiveSchemaProvider` in comment and docs.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  private static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_RESET_KEY = ConfigProperty
+      .key("deltastreamer.checkpoint.reset_key")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
+          + " to archive commits if we cross a maximum value of commits."
+          + " It's recommended to enable this, to ensure number of active commits is bounded.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")
+      .defaultValue("false")
+      .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+          + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+          + "By default its disabled and no kafka offsets are added");
+
+  public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.schema.field.names")
+      .defaultValue(false)
+      .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
+          + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
+          + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
+
+  public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.char.mask")
+      .defaultValue("__")
+      .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
+          + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+
+  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "multiwriter.source.checkpoint.id")
+      .noDefaultValue()
+      .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
+          + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
+          + "a single deltastreamer for a table then you do not need to set this config.");
+
+  public static final ConfigProperty<String> TABLES_TO_BE_INGESTED_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "tablesToBeIngested")
+      .noDefaultValue()
+      .withDocumentation("Comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2");
+
+  public static final ConfigProperty<String> TARGET_BASE_PATH_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "targetBasePath")
+      .noDefaultValue()
+      .withDocumentation("The path to which a particular table is ingested. The config is specific to HoodieMultiTableDeltaStreamer"
+          + " and overrides path determined using option `--base-path-prefix` for a table");
+
+  /**
+   * Delta Streamer Schema Provider related config.
+   */
+  @Immutable
+  @ConfigClassProperty(name = "DeltaStreamer Schema Provider Configs",
+      groupName = ConfigGroups.Names.DELTA_STREAMER,
+      description = "Configurations related to source and target schema for DeltaStreamer.")
+  public static class HoodieDeltaStreamerSchemaProviderConfig extends HoodieConfig {
+
+    private static final String DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "schemaprovider.";
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SRC_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.url")
+        .noDefaultValue()
+        .withDocumentation("The schema of the source you are reading from e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrl")
+        .noDefaultValue()
+        .withDocumentation("The schema of the target you are writing to e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> SCHEMA_CONVERTER_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.schemaconverter")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<Boolean> SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "spark_avro_post_processor.enable")
+        .defaultValue(false)
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.delete.columns")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.name")
+        .noDefaultValue()
+        .withDocumentation("New column's name");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.type")
+        .noDefaultValue()
+        .withDocumentation("New column's type");
+
+    public static final ConfigProperty<Boolean> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.nullable")
+        .defaultValue(true)
+        .withDocumentation("New column's nullable");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.default")
+        .noDefaultValue()
+        .withDocumentation("New column's default value");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.doc")
+        .noDefaultValue()
+        .withDocumentation("Docs about new column");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_BASE_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.baseUrl")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_URL_SUFFIX_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.urlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.sourceUrlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_TARGET_URL_SUFFIX = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_FILE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.file")
+        .noDefaultValue()
+        .withDocumentation("The schema of the source you are reading from");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_FILE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.file")
+        .noDefaultValue()
+        .withDocumentation("The schema of the target you are writing to");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_DATABASE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.hive.database")
+        .noDefaultValue()
+        .withDocumentation("Hive database from where source schema can be fetched");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_TABLE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.hive.table")
+        .noDefaultValue()
+        .withDocumentation("Hive table from where source schema can be fetched");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_DATABASE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.hive.database")
+        .noDefaultValue()
+        .withDocumentation("Hive database from where target schema can be fetched");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_TABLE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.hive.table")
+        .noDefaultValue()
+        .withDocumentation("Hive table from where target schema can be fetched");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_CONNECTION_URL = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.connection.url")
+        .noDefaultValue()
+        .withDocumentation("The JDBC URL to connect to. The source-specific connection properties may be specified in the URL."
+            + " e.g., jdbc:postgresql://localhost/test?user=fred&password=secret");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_DRIVER_TYPE = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.driver.type")
+        .noDefaultValue()
+        .withDocumentation("The class name of the JDBC driver to use to connect to this URL. e.g. org.h2.Driver");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_USERNAME = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.username")
+        .noDefaultValue()
+        .withDocumentation("Username for the connection e.g. fred");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_PASSWORD = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.password")
+        .noDefaultValue()
+        .withDocumentation("Password for the connection e.g. secret");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_DBTABLE = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.dbtable")
+        .noDefaultValue()
+        .withDocumentation("The table with the schema to reference e.g. test_database.test1_table or test1_table");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_TIMEOUT = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.timeout")
+        .noDefaultValue()
+        .withDocumentation("The number of seconds the driver will wait for a Statement object to execute. Zero means there is no limit. "
+            + "In the write path, this option depends on how JDBC drivers implement the API setQueryTimeout, e.g., the h2 JDBC driver "
+            + "checks the timeout of each query instead of an entire JDBC batch. It defaults to 0.");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_NULLABLE = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.nullable")
+        .noDefaultValue()
+        .withDocumentation("If true, all the columns are nullable.");
+
+    private static final ConfigProperty<String> PROTO_SCHEMA_PROVIDER_PREFIX = ConfigProperty

Review Comment:
   Similar for `ProtoClassBasedSchemaProvider` configs



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  private static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_RESET_KEY = ConfigProperty
+      .key("deltastreamer.checkpoint.reset_key")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
+          + " to archive commits if we cross a maximum value of commits."
+          + " It's recommended to enable this, to ensure number of active commits is bounded.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")
+      .defaultValue("false")
+      .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+          + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+          + "By default its disabled and no kafka offsets are added");
+
+  public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.schema.field.names")
+      .defaultValue(false)
+      .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
+          + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
+          + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
+
+  public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.char.mask")
+      .defaultValue("__")
+      .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
+          + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+
+  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "multiwriter.source.checkpoint.id")
+      .noDefaultValue()
+      .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
+          + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
+          + "a single deltastreamer for a table then you do not need to set this config.");
+
+  public static final ConfigProperty<String> TABLES_TO_BE_INGESTED_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "tablesToBeIngested")
+      .noDefaultValue()
+      .withDocumentation("Comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2");
+
+  public static final ConfigProperty<String> TARGET_BASE_PATH_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "targetBasePath")
+      .noDefaultValue()
+      .withDocumentation("The path to which a particular table is ingested. The config is specific to HoodieMultiTableDeltaStreamer"
+          + " and overrides path determined using option `--base-path-prefix` for a table");
+
+  /**
+   * Delta Streamer Schema Provider related config.
+   */
+  @Immutable
+  @ConfigClassProperty(name = "DeltaStreamer Schema Provider Configs",
+      groupName = ConfigGroups.Names.DELTA_STREAMER,
+      description = "Configurations related to source and target schema for DeltaStreamer.")
+  public static class HoodieDeltaStreamerSchemaProviderConfig extends HoodieConfig {
+
+    private static final String DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "schemaprovider.";
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SRC_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.url")
+        .noDefaultValue()
+        .withDocumentation("The schema of the source you are reading from e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrl")
+        .noDefaultValue()
+        .withDocumentation("The schema of the target you are writing to e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> SCHEMA_CONVERTER_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.schemaconverter")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<Boolean> SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "spark_avro_post_processor.enable")
+        .defaultValue(false)
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.delete.columns")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.name")
+        .noDefaultValue()
+        .withDocumentation("New column's name");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.type")
+        .noDefaultValue()
+        .withDocumentation("New column's type");
+
+    public static final ConfigProperty<Boolean> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.nullable")
+        .defaultValue(true)
+        .withDocumentation("New column's nullable");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.default")
+        .noDefaultValue()
+        .withDocumentation("New column's default value");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.doc")
+        .noDefaultValue()
+        .withDocumentation("Docs about new column");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_BASE_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.baseUrl")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_URL_SUFFIX_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.urlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.sourceUrlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_TARGET_URL_SUFFIX = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_FILE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.file")
+        .noDefaultValue()
+        .withDocumentation("The schema of the source you are reading from");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_FILE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.file")
+        .noDefaultValue()
+        .withDocumentation("The schema of the target you are writing to");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_DATABASE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.hive.database")
+        .noDefaultValue()
+        .withDocumentation("Hive database from where source schema can be fetched");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_TABLE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.hive.table")
+        .noDefaultValue()
+        .withDocumentation("Hive table from where source schema can be fetched");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_DATABASE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.hive.database")
+        .noDefaultValue()
+        .withDocumentation("Hive database from where target schema can be fetched");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_TABLE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.hive.table")
+        .noDefaultValue()
+        .withDocumentation("Hive table from where target schema can be fetched");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_CONNECTION_URL = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.connection.url")
+        .noDefaultValue()
+        .withDocumentation("The JDBC URL to connect to. The source-specific connection properties may be specified in the URL."
+            + " e.g., jdbc:postgresql://localhost/test?user=fred&password=secret");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_DRIVER_TYPE = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.driver.type")
+        .noDefaultValue()
+        .withDocumentation("The class name of the JDBC driver to use to connect to this URL. e.g. org.h2.Driver");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_USERNAME = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.username")
+        .noDefaultValue()
+        .withDocumentation("Username for the connection e.g. fred");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_PASSWORD = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.password")
+        .noDefaultValue()
+        .withDocumentation("Password for the connection e.g. secret");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_DBTABLE = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.dbtable")
+        .noDefaultValue()
+        .withDocumentation("The table with the schema to reference e.g. test_database.test1_table or test1_table");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_TIMEOUT = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.timeout")
+        .noDefaultValue()
+        .withDocumentation("The number of seconds the driver will wait for a Statement object to execute. Zero means there is no limit. "
+            + "In the write path, this option depends on how JDBC drivers implement the API setQueryTimeout, e.g., the h2 JDBC driver "
+            + "checks the timeout of each query instead of an entire JDBC batch. It defaults to 0.");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_NULLABLE = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.jdbc.nullable")
+        .noDefaultValue()
+        .withDocumentation("If true, all the columns are nullable.");
+
+    private static final ConfigProperty<String> PROTO_SCHEMA_PROVIDER_PREFIX = ConfigProperty

Review Comment:
   I would say that for configs specific to a particular schema provider, separate them into a new class, e.g., `ProtoClassBasedSchemaProviderConfig`, so we can have separate config subsection for each schema provider in the configuration page.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/HoodieDeltaStreamerConfig.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * Delta Streamer related config.
+ */
+@Immutable
+@ConfigClassProperty(name = "DeltaStreamer Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    description = "Configurations that control DeltaStreamer.")
+public class HoodieDeltaStreamerConfig extends HoodieConfig {
+
+  private static final String DELTA_STREAMER_CONFIG_PREFIX = "hoodie.deltastreamer.";
+  public static final String INGESTION_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "ingestion.";
+
+  public static final ConfigProperty<String> CHECKPOINT_RESET_KEY = ConfigProperty
+      .key("deltastreamer.checkpoint.reset_key")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> CHECKPOINT_PROVIDER_PATH_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "checkpoint.provider.path")
+      .noDefaultValue()
+      .withDocumentation("When enabled, the archival table service is invoked immediately after each commit,"
+          + " to archive commits if we cross a maximum value of commits."
+          + " It's recommended to enable this, to ensure number of active commits is bounded.");
+
+  public static final ConfigProperty<String> KAFKA_TOPIC_PROP = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.topic")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.kafka.append.offsets")
+      .defaultValue("false")
+      .withDocumentation("When enabled, appends kafka offset info like source offset(_hoodie_kafka_source_offset), "
+          + "partition (_hoodie_kafka_source_partition) and timestamp (_hoodie_kafka_source_timestamp) to the records. "
+          + "By default its disabled and no kafka offsets are added");
+
+  public static final ConfigProperty<Boolean> SANITIZE_SCHEMA_FIELD_NAMES = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.schema.field.names")
+      .defaultValue(false)
+      .withDocumentation("Sanitizes names of invalid schema fields both in the data read from source and also in the schema "
+          + "Replaces invalid characters with hoodie.deltastreamer.source.sanitize.invalid.char.mask. Invalid characters are by "
+          + "goes by avro naming convention (https://avro.apache.org/docs/current/spec.html#names).");
+
+  public static final ConfigProperty<String> SCHEMA_FIELD_NAME_INVALID_CHAR_MASK = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "source.sanitize.invalid.char.mask")
+      .defaultValue("__")
+      .withDocumentation("Defines the character sequence that replaces invalid characters in schema field names if "
+          + "hoodie.deltastreamer.source.sanitize.invalid.schema.field.names is enabled.");
+
+  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
+      .key(DELTA_STREAMER_CONFIG_PREFIX + "multiwriter.source.checkpoint.id")
+      .noDefaultValue()
+      .withDocumentation("Unique Id to be used for multiwriter deltastreamer scenario. This is the "
+          + "scenario when multiple deltastreamers are used to write to the same target table. If you are just using "
+          + "a single deltastreamer for a table then you do not need to set this config.");
+
+  public static final ConfigProperty<String> TABLES_TO_BE_INGESTED_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "tablesToBeIngested")
+      .noDefaultValue()
+      .withDocumentation("Comma separated names of tables to be ingested in the format <database>.<table>, for example db1.table1,db1.table2");
+
+  public static final ConfigProperty<String> TARGET_BASE_PATH_PROP = ConfigProperty
+      .key(INGESTION_PREFIX + "targetBasePath")
+      .noDefaultValue()
+      .withDocumentation("The path to which a particular table is ingested. The config is specific to HoodieMultiTableDeltaStreamer"
+          + " and overrides path determined using option `--base-path-prefix` for a table");
+
+  /**
+   * Delta Streamer Schema Provider related config.
+   */
+  @Immutable
+  @ConfigClassProperty(name = "DeltaStreamer Schema Provider Configs",
+      groupName = ConfigGroups.Names.DELTA_STREAMER,
+      description = "Configurations related to source and target schema for DeltaStreamer.")
+  public static class HoodieDeltaStreamerSchemaProviderConfig extends HoodieConfig {
+
+    private static final String DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX = DELTA_STREAMER_CONFIG_PREFIX + "schemaprovider.";
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SRC_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.url")
+        .noDefaultValue()
+        .withDocumentation("The schema of the source you are reading from e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_REGISTRY_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrl")
+        .noDefaultValue()
+        .withDocumentation("The schema of the target you are writing to e.g. https://foo:bar@schemaregistry.org");
+
+    public static final ConfigProperty<String> SCHEMA_CONVERTER_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.schemaconverter")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<Boolean> SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "spark_avro_post_processor.enable")
+        .defaultValue(false)
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.delete.columns")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.name")
+        .noDefaultValue()
+        .withDocumentation("New column's name");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.type")
+        .noDefaultValue()
+        .withDocumentation("New column's type");
+
+    public static final ConfigProperty<Boolean> SCHEMA_POST_PROCESSOR_ADD_COLUMN_NULLABLE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.nullable")
+        .defaultValue(true)
+        .withDocumentation("New column's nullable");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DEFAULT_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.default")
+        .noDefaultValue()
+        .withDocumentation("New column's default value");
+
+    public static final ConfigProperty<String> SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "schema_post_processor.add.column.doc")
+        .noDefaultValue()
+        .withDocumentation("Docs about new column");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_BASE_URL_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.baseUrl")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_URL_SUFFIX_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.urlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_SOURCE_URL_SUFFIX = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.sourceUrlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SCHEMA_REGISTRY_TARGET_URL_SUFFIX = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "registry.targetUrlSuffix")
+        .noDefaultValue()
+        .withDocumentation("");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_FILE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.file")
+        .noDefaultValue()
+        .withDocumentation("The schema of the source you are reading from");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_FILE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.file")
+        .noDefaultValue()
+        .withDocumentation("The schema of the target you are writing to");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_DATABASE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.hive.database")
+        .noDefaultValue()
+        .withDocumentation("Hive database from where source schema can be fetched");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_TABLE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "source.schema.hive.table")
+        .noDefaultValue()
+        .withDocumentation("Hive table from where source schema can be fetched");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_DATABASE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.hive.database")
+        .noDefaultValue()
+        .withDocumentation("Hive database from where target schema can be fetched");
+
+    public static final ConfigProperty<String> TARGET_SCHEMA_TABLE_PROP = ConfigProperty
+        .key(DELTA_STREAMER_SCHEMAPROVIDER_CONFIG_PREFIX + "target.schema.hive.table")
+        .noDefaultValue()
+        .withDocumentation("Hive table from where target schema can be fetched");
+
+    public static final ConfigProperty<String> SOURCE_SCHEMA_JDBC_CONNECTION_URL = ConfigProperty

Review Comment:
   Similar for `JdbcbasedSchemaProvider` configs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8152:
URL: https://github.com/apache/hudi/pull/8152#issuecomment-1472439817

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15682",
       "triggerID" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15760",
       "triggerID" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a5158ffe8b28d05dd4b523ac7401ee7df215416f Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15682) 
   * 257289e10914bf590c4e8ce9317fbf9773130f55 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15760) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] yihua commented on a diff in pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "yihua (via GitHub)" <gi...@apache.org>.
yihua commented on code in PR #8152:
URL: https://github.com/apache/hudi/pull/8152#discussion_r1139271824


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java:
##########
@@ -36,15 +36,8 @@
 public class KafkaOffsetPostProcessor extends SchemaPostProcessor {
 
   public static class Config {
-    public static final ConfigProperty<String> KAFKA_APPEND_OFFSETS = ConfigProperty

Review Comment:
   Fixed.



##########
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieInlineTestSuiteWriter.java:
##########
@@ -189,7 +190,7 @@ public void commit(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats> genera
       Map<String, String> extraMetadata = new HashMap<>();
       /** Store the checkpoint in the commit metadata just like
        * {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/
-      extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get());
+      extraMetadata.put(HoodieDeltaStreamer.CHECKPOINT_KEY, lastCheckpoint.get());

Review Comment:
   Actually good to have this to standardize the reference.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java:
##########
@@ -35,35 +35,6 @@
  * A schema provider that takes in a class name for a generated protobuf class that is on the classpath.
  */
 public class ProtoClassBasedSchemaProvider extends SchemaProvider {
-  /**
-   * Configs supported.
-   */
-  public static class Config {
-    private static final String PROTO_SCHEMA_PROVIDER_PREFIX = "hoodie.deltastreamer.schemaprovider.proto";
-    public static final ConfigProperty<String> PROTO_SCHEMA_CLASS_NAME = ConfigProperty.key(PROTO_SCHEMA_PROVIDER_PREFIX + ".class.name")

Review Comment:
   Fixed.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java:
##########
@@ -35,35 +35,6 @@
  * A schema provider that takes in a class name for a generated protobuf class that is on the classpath.
  */
 public class ProtoClassBasedSchemaProvider extends SchemaProvider {
-  /**
-   * Configs supported.
-   */
-  public static class Config {
-    private static final String PROTO_SCHEMA_PROVIDER_PREFIX = "hoodie.deltastreamer.schemaprovider.proto";

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #8152:
URL: https://github.com/apache/hudi/pull/8152#discussion_r1152777848


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ProtoClassBasedSchemaProviderConfig.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import static org.apache.hudi.utilities.config.HoodieSchemaProviderConfig.SCHEMAPROVIDER_CONFIG_PREFIX;
+
+/**
+ * JDBC-based Schema Provider Configs.
+ */
+@Immutable
+@ConfigClassProperty(name = "JDBC-based Schema Provider Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    subGroupName = ConfigGroups.SubGroupNames.SCHEMA_PROVIDER,
+    description = "Configurations for Proto schema provider.")
+public class ProtoClassBasedSchemaProviderConfig extends HoodieConfig {
+  private static final String PROTO_SCHEMA_PROVIDER_PREFIX = SCHEMAPROVIDER_CONFIG_PREFIX + "proto";

Review Comment:
   Addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8152:
URL: https://github.com/apache/hudi/pull/8152#issuecomment-1491542693

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15682",
       "triggerID" : "a5158ffe8b28d05dd4b523ac7401ee7df215416f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15760",
       "triggerID" : "257289e10914bf590c4e8ce9317fbf9773130f55",
       "triggerType" : "PUSH"
     }, {
       "hash" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15761",
       "triggerID" : "21b703c9c0b9d9a77ad9294c71d7d6473b047a5e",
       "triggerType" : "PUSH"
     }, {
       "hash" : "95a8f8750fb4a3ece5166c12796c4d565a6e1692",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15762",
       "triggerID" : "95a8f8750fb4a3ece5166c12796c4d565a6e1692",
       "triggerType" : "PUSH"
     }, {
       "hash" : "242e6f5531391c0f1de80c52c118cd841dca1c04",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15872",
       "triggerID" : "242e6f5531391c0f1de80c52c118cd841dca1c04",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1ca78802a885c6510565a89eb6435cc62de3957e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16015",
       "triggerID" : "1ca78802a885c6510565a89eb6435cc62de3957e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 242e6f5531391c0f1de80c52c118cd841dca1c04 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15872) 
   * 1ca78802a885c6510565a89eb6435cc62de3957e Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=16015) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] yihua commented on a diff in pull request #8152: [HUDI-5740] Refactor Deltastreamer and schema providers to use HoodieConfig/ConfigProperty

Posted by "yihua (via GitHub)" <gi...@apache.org>.
yihua commented on code in PR #8152:
URL: https://github.com/apache/hudi/pull/8152#discussion_r1155050423


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/ProtoClassBasedSchemaProviderConfig.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import static org.apache.hudi.utilities.config.HoodieSchemaProviderConfig.SCHEMAPROVIDER_CONFIG_PREFIX;
+
+/**
+ * JDBC-based Schema Provider Configs.
+ */
+@Immutable
+@ConfigClassProperty(name = "JDBC-based Schema Provider Configs",
+    groupName = ConfigGroups.Names.DELTA_STREAMER,
+    subGroupName = ConfigGroups.SubGroupNames.SCHEMA_PROVIDER,
+    description = "Configurations for Proto schema provider.")
+public class ProtoClassBasedSchemaProviderConfig extends HoodieConfig {
+  private static final String PROTO_SCHEMA_PROVIDER_PREFIX = SCHEMAPROVIDER_CONFIG_PREFIX + "proto";

Review Comment:
   What I mean is that we should follow the convention where the prefix should end with dot, to be consistent with other config classes like `HoodieClusteringConfig` and `AvroKafkaSource`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org