You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/06/13 23:45:47 UTC

[hudi] branch release-feature-rfc55 updated (d65555b106 -> c88813d8a2)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a change to branch release-feature-rfc55
in repository https://gitbox.apache.org/repos/asf/hudi.git


 discard d65555b106 [HUDI-3730] Improve meta sync class design and hierarchies (#5754)
     add 5aaac21d1d [HUDI-4224] Fix CI issues (#5842)
     add c82e3462e3 [MINOR]  fix AvroSchemaConverter duplicate branch in 'switch' (#5813)
     add 14d8735a1c Strip extra spaces when creating new configuration (#5849)
     add e89f5627e4 [HUDI-3682] testReaderFilterRowKeys fails in TestHoodieOrcReaderWriter (#5790)
     add 0d859fe58b [HUDI-3863] Add UT for drop partition column in deltastreamer testsuite (#5727)
     add 4774c4248f [HUDI-4006] failOnDataLoss on delta-streamer kafka sources (#5718)
     add 264b15df87 [HUDI-4207] HoodieFlinkWriteClient.getOrCreateWriteHandle throws an e… (#5788)
     new c88813d8a2 [HUDI-3730] Improve meta sync class design and hierarchies (#5754)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (d65555b106)
            \
             N -- N -- N   refs/heads/release-feature-rfc55 (c88813d8a2)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 azure-pipelines.yml                                | 116 +++++++++++++++------
 hudi-aws/pom.xml                                   |  30 ++++++
 hudi-cli/pom.xml                                   |  66 +++++++++++-
 .../org/apache/hudi/cli/commands/SparkMain.java    |   2 +-
 .../SparkUtilTest.java => TestSparkUtil.java}      |   4 +-
 .../hudi/cli/integ/ITTestBootstrapCommand.java     |   4 +-
 .../hudi/cli/integ/ITTestClusteringCommand.java    |   9 +-
 .../hudi/cli/integ/ITTestCommitsCommand.java       |  32 ++++--
 .../hudi/cli/integ/ITTestCompactionCommand.java    |   7 +-
 .../cli/integ/ITTestHDFSParquetImportCommand.java  |   4 +-
 .../hudi/cli/integ/ITTestMarkersCommand.java       |   4 +-
 .../hudi/cli/integ/ITTestRepairsCommand.java       |   4 +-
 .../hudi/cli/integ/ITTestSavepointsCommand.java    |  10 +-
 ...Test.java => HoodieCLIIntegrationTestBase.java} |   2 +-
 ...t.java => HoodieCLIIntegrationTestHarness.java} |   2 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  26 +++--
 .../hudi/io/storage/TestHoodieOrcReaderWriter.java |  10 +-
 .../io/storage/TestHoodieReaderWriterBase.java     |  18 +++-
 hudi-examples/hudi-examples-common/pom.xml         |  28 +++++
 hudi-examples/hudi-examples-java/pom.xml           |  28 +++++
 hudi-flink-datasource/hudi-flink/pom.xml           |  20 ++++
 .../apache/hudi/configuration/FlinkOptions.java    |   4 +-
 .../org/apache/hudi/util/AvroSchemaConverter.java  |   5 +-
 hudi-flink-datasource/hudi-flink1.13.x/pom.xml     |  46 +++++++-
 hudi-flink-datasource/hudi-flink1.14.x/pom.xml     |  46 +++++++-
 hudi-gcp/pom.xml                                   |  36 +++++++
 hudi-integ-test/pom.xml                            |  47 ++++++++-
 .../TestDFSHoodieTestSuiteWriterAdapter.java       |   2 +-
 .../integ/testsuite/TestFileDeltaInputWriter.java  |   2 +-
 .../testsuite/job/TestHoodieTestSuiteJob.java      |   2 +-
 .../reader/TestDFSAvroDeltaInputReader.java        |   2 +-
 .../reader/TestDFSHoodieDatasetInputReader.java    |   2 +-
 hudi-spark-datasource/hudi-spark-common/pom.xml    |  37 +++++++
 .../HoodieBulkInsertInternalWriterTestBase.java    |  11 +-
 .../test/resources/log4j-surefire-quiet.properties |   0
 .../src/test/resources/log4j-surefire.properties   |   0
 hudi-spark-datasource/hudi-spark2-common/pom.xml   |  43 ++++++++
 hudi-spark-datasource/hudi-spark2/pom.xml          |  60 +++++++++++
 .../TestHoodieBulkInsertDataInternalWriter.java    |  13 +--
 .../TestHoodieDataSourceInternalWriter.java        |   1 -
 .../TestHoodieBulkInsertDataInternalWriter.java    |   2 +-
 hudi-sync/hudi-adb-sync/pom.xml                    |  36 +++++++
 hudi-sync/hudi-datahub-sync/pom.xml                |  18 ++++
 .../utilities/sources/helpers/KafkaOffsetGen.java  |  19 +++-
 .../functional/HoodieDeltaStreamerTestBase.java    |   2 +-
 .../functional/TestHoodieDeltaStreamer.java        |  22 ++++
 .../utilities/sources/TestHoodieIncrSource.java    |  38 +++----
 .../utilities/sources/TestJsonKafkaSource.java     |  39 +++++++
 .../hudi/utilities/sources/TestSqlSource.java      |   2 +-
 .../debezium/TestAbstractDebeziumSource.java       |   2 +-
 .../utilities/testutils/UtilitiesTestBase.java     |  20 ++--
 .../AbstractCloudObjectsSourceTestBase.java        |   2 +-
 .../sources/AbstractDFSSourceTestBase.java         |   2 +-
 .../transform/TestSqlFileBasedTransformer.java     |   2 +-
 packaging/hudi-aws-bundle/pom.xml                  |   1 +
 packaging/hudi-datahub-sync-bundle/pom.xml         |   1 +
 packaging/hudi-flink-bundle/pom.xml                |   1 +
 packaging/hudi-gcp-bundle/pom.xml                  |   1 +
 packaging/hudi-hadoop-mr-bundle/pom.xml            |   1 +
 packaging/hudi-hive-sync-bundle/pom.xml            |   1 +
 packaging/hudi-integ-test-bundle/pom.xml           |   1 +
 packaging/hudi-kafka-connect-bundle/pom.xml        |   1 +
 packaging/hudi-presto-bundle/pom.xml               |   1 +
 packaging/hudi-spark-bundle/pom.xml                |   1 +
 packaging/hudi-timeline-server-bundle/pom.xml      |   1 +
 packaging/hudi-trino-bundle/pom.xml                |   1 +
 packaging/hudi-utilities-bundle/pom.xml            |   1 +
 packaging/hudi-utilities-slim-bundle/pom.xml       |   1 +
 pom.xml                                            |  45 ++++----
 69 files changed, 875 insertions(+), 175 deletions(-)
 rename hudi-cli/src/test/java/org/apache/hudi/cli/{testutils/SparkUtilTest.java => TestSparkUtil.java} (95%)
 rename hudi-cli/src/test/java/org/apache/hudi/cli/testutils/{AbstractShellIntegrationTest.java => HoodieCLIIntegrationTestBase.java} (93%)
 rename hudi-cli/src/test/java/org/apache/hudi/cli/testutils/{AbstractShellBaseIntegrationTest.java => HoodieCLIIntegrationTestHarness.java} (96%)
 copy {hudi-common => hudi-spark-datasource/hudi-spark-common}/src/test/resources/log4j-surefire-quiet.properties (100%)
 copy {hudi-client/hudi-client-common => hudi-spark-datasource/hudi-spark-common}/src/test/resources/log4j-surefire.properties (100%)


[hudi] 01/01: [HUDI-3730] Improve meta sync class design and hierarchies (#5754)

Posted by xu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-feature-rfc55
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit c88813d8a2b6f04d087fd575ebff35b8dd0da748
Author: 冯健 <fe...@gmail.com>
AuthorDate: Mon Jun 13 01:55:09 2022 +0800

    [HUDI-3730] Improve meta sync class design and hierarchies (#5754)
    
    
    
    Co-authored-by: jian.feng <fe...@gmial.com>
    Co-authored-by: jian.feng <ji...@shopee.com>
---
 .../hudi/aws/sync/AWSGlueCatalogSyncClient.java    |  20 +-
 .../hudi/aws/sync/AwsGlueCatalogSyncTool.java      |   4 +-
 .../apache/hudi/sink/utils/HiveSyncContext.java    |  46 ++---
 .../hudi/sink/utils/TestHiveSyncContext.java       |   4 +-
 .../apache/hudi/gcp/bigquery/BigQuerySyncTool.java |   4 +-
 .../gcp/bigquery/HoodieBigQuerySyncClient.java     |  41 +---
 .../integ/testsuite/dag/nodes/HiveQueryNode.java   |   4 +-
 .../writers/KafkaConnectTransactionServices.java   |   3 +-
 .../main/java/org/apache/hudi/DataSourceUtils.java |  67 +++----
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      |  78 +++----
 .../hudi/command/MergeIntoHoodieTableCommand.scala |   6 +-
 .../java/org/apache/hudi/TestDataSourceUtils.java  |  12 +-
 .../hudi/sync/adb/AbstractAdbSyncHoodieClient.java |  27 +--
 .../org/apache/hudi/sync/adb/AdbSyncConfig.java    | 215 ++++++++++----------
 .../java/org/apache/hudi/sync/adb/AdbSyncTool.java |  63 +++---
 .../apache/hudi/sync/adb/HoodieAdbJdbcClient.java  |  65 +++---
 .../apache/hudi/sync/adb/TestAdbSyncConfig.java    |  54 ++---
 .../hudi/sync/datahub/DataHubSyncClient.java       |  59 +-----
 .../apache/hudi/sync/datahub/DataHubSyncTool.java  |  10 +-
 .../config/HoodieDataHubDatasetIdentifier.java     |   2 +-
 .../hudi/hive/AbstractHiveSyncHoodieClient.java    |  16 +-
 .../java/org/apache/hudi/hive/HiveSyncConfig.java  | 223 ++++++++++-----------
 .../java/org/apache/hudi/hive/HiveSyncTool.java    |  74 +++----
 .../org/apache/hudi/hive/HoodieHiveClient.java     |  40 ++--
 .../org/apache/hudi/hive/ddl/HMSDDLExecutor.java   |  54 ++---
 .../apache/hudi/hive/ddl/HiveQueryDDLExecutor.java |  10 +-
 .../org/apache/hudi/hive/ddl/JDBCExecutor.java     |  16 +-
 .../hudi/hive/ddl/QueryBasedDDLExecutor.java       |  34 ++--
 .../hive/replication/GlobalHiveSyncConfig.java     |  28 +--
 .../hudi/hive/replication/GlobalHiveSyncTool.java  |   2 +-
 .../replication/HiveSyncGlobalCommitConfig.java    |  12 +-
 .../hive/replication/HiveSyncGlobalCommitTool.java |   2 +-
 .../apache/hudi/hive/util/HivePartitionUtil.java   |  12 +-
 .../org/apache/hudi/hive/util/HiveSchemaUtil.java  |  22 +-
 .../hudi/hive/TestHiveSyncGlobalCommitTool.java    |  24 +--
 .../org/apache/hudi/hive/TestHiveSyncTool.java     |   4 +-
 .../testutils/HiveSyncFunctionalTestHarness.java   |  28 +--
 ...SyncHoodieClient.java => HoodieSyncClient.java} | 109 +---------
 .../apache/hudi/sync/common/HoodieSyncConfig.java  |  88 ++++----
 .../apache/hudi/sync/common/HoodieSyncTool.java    |  49 +++++
 .../apache/hudi/sync/common/SupportMetaSync.java   |   7 +
 .../hudi/sync/common/operation/CatalogSync.java    |  32 +++
 .../hudi/sync/common/operation/PartitionsSync.java |  11 +
 .../sync/common/operation/ReplicatedTimeSync.java  |  11 +
 .../sync/common/operation/TblPropertiesSync.java   |  13 ++
 .../SparkDataSourceTableUtils.java}                |  55 +----
 .../hudi/sync/common/util/SyncUtilHelpers.java     |  20 +-
 .../hudi/sync/common/util/TestSyncUtilHelpers.java |  12 +-
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   7 +-
 .../functional/TestHoodieDeltaStreamer.java        |   8 +-
 .../utilities/testutils/UtilitiesTestBase.java     |  26 +--
 51 files changed, 855 insertions(+), 978 deletions(-)

diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index e15a698f27..e269908a4f 100644
--- a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++ b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -90,7 +90,7 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
   public AWSGlueCatalogSyncClient(HiveSyncConfig syncConfig, Configuration hadoopConf, FileSystem fs) {
     super(syncConfig, hadoopConf, fs);
     this.awsGlue = AWSGlueClientBuilder.standard().build();
-    this.databaseName = syncConfig.databaseName;
+    this.databaseName = syncConfig.hoodieSyncConfigParams.databaseName;
   }
 
   @Override
@@ -126,7 +126,7 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
       StorageDescriptor sd = table.getStorageDescriptor();
       List<PartitionInput> partitionInputs = partitionsToAdd.stream().map(partition -> {
         StorageDescriptor partitionSd = sd.clone();
-        String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
+        String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.hoodieSyncConfigParams.basePath, partition).toString();
         List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
         partitionSd.setLocation(fullPartitionPath);
         return new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
@@ -160,7 +160,7 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
       StorageDescriptor sd = table.getStorageDescriptor();
       List<BatchUpdatePartitionRequestEntry> updatePartitionEntries = changedPartitions.stream().map(partition -> {
         StorageDescriptor partitionSd = sd.clone();
-        String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
+        String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.hoodieSyncConfigParams.basePath, partition).toString();
         List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
         sd.setLocation(fullPartitionPath);
         PartitionInput partitionInput = new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
@@ -206,10 +206,10 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
   @Override
   public void updateTableDefinition(String tableName, MessageType newSchema) {
     // ToDo Cascade is set in Hive meta sync, but need to investigate how to configure it for Glue meta
-    boolean cascade = syncConfig.partitionFields.size() > 0;
+    boolean cascade = syncConfig.hoodieSyncConfigParams.partitionFields.size() > 0;
     try {
       Table table = getTable(awsGlue, databaseName, tableName);
-      Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, syncConfig.supportTimestamp, false);
+      Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, syncConfig.hiveSyncConfigParams.supportTimestamp, false);
       List<Column> newColumns = newSchemaMap.keySet().stream().map(key -> {
         String keyType = getPartitionKeyType(newSchemaMap, key);
         return new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
@@ -265,26 +265,26 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
     }
     CreateTableRequest request = new CreateTableRequest();
     Map<String, String> params = new HashMap<>();
-    if (!syncConfig.createManagedTable) {
+    if (!syncConfig.hiveSyncConfigParams.createManagedTable) {
       params.put("EXTERNAL", "TRUE");
     }
     params.putAll(tableProperties);
 
     try {
-      Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, syncConfig.supportTimestamp, false);
+      Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, syncConfig.hiveSyncConfigParams.supportTimestamp, false);
 
       List<Column> schemaWithoutPartitionKeys = new ArrayList<>();
       for (String key : mapSchema.keySet()) {
         String keyType = getPartitionKeyType(mapSchema, key);
         Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
         // In Glue, the full schema should exclude the partition keys
-        if (!syncConfig.partitionFields.contains(key)) {
+        if (!syncConfig.hoodieSyncConfigParams.partitionFields.contains(key)) {
           schemaWithoutPartitionKeys.add(column);
         }
       }
 
       // now create the schema partition
-      List<Column> schemaPartitionKeys = syncConfig.partitionFields.stream().map(partitionKey -> {
+      List<Column> schemaPartitionKeys = syncConfig.hoodieSyncConfigParams.partitionFields.stream().map(partitionKey -> {
         String keyType = getPartitionKeyType(mapSchema, partitionKey);
         return new Column().withName(partitionKey).withType(keyType.toLowerCase()).withComment("");
       }).collect(Collectors.toList());
@@ -293,7 +293,7 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
       serdeProperties.put("serialization.format", "1");
       storageDescriptor
           .withSerdeInfo(new SerDeInfo().withSerializationLibrary(serdeClass).withParameters(serdeProperties))
-          .withLocation(s3aToS3(syncConfig.basePath))
+          .withLocation(s3aToS3(syncConfig.hoodieSyncConfigParams.basePath))
           .withInputFormat(inputFormatClass)
           .withOutputFormat(outputFormatClass)
           .withColumns(schemaWithoutPartitionKeys);
diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
index bb1be377c9..ed5a49dbdb 100644
--- a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
+++ b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
@@ -58,11 +58,11 @@ public class AwsGlueCatalogSyncTool extends HiveSyncTool {
     // parse the params
     final HiveSyncConfig cfg = new HiveSyncConfig();
     JCommander cmd = new JCommander(cfg, null, args);
-    if (cfg.help || args.length == 0) {
+    if (cfg.hiveSyncConfigParams.help || args.length == 0) {
       cmd.usage();
       System.exit(1);
     }
-    FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
+    FileSystem fs = FSUtils.getFs(cfg.hoodieSyncConfigParams.basePath, new Configuration());
     HiveConf hiveConf = new HiveConf();
     hiveConf.addResource(fs.getConf());
     new AwsGlueCatalogSyncTool(cfg, hiveConf, fs).syncHoodieTable();
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
index 9fc5323d46..dee87e9966 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
@@ -52,7 +52,7 @@ public class HiveSyncContext {
   }
 
   public HiveSyncTool hiveSyncTool() {
-    HiveSyncMode syncMode = HiveSyncMode.of(syncConfig.syncMode);
+    HiveSyncMode syncMode = HiveSyncMode.of(syncConfig.hiveSyncConfigParams.syncMode);
     if (syncMode == HiveSyncMode.GLUE) {
       return new AwsGlueCatalogSyncTool(this.syncConfig, this.hiveConf, this.fs);
     }
@@ -76,28 +76,28 @@ public class HiveSyncContext {
   @VisibleForTesting
   public static HiveSyncConfig buildSyncConfig(Configuration conf) {
     HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
-    hiveSyncConfig.basePath = conf.getString(FlinkOptions.PATH);
-    hiveSyncConfig.baseFileFormat = conf.getString(FlinkOptions.HIVE_SYNC_FILE_FORMAT);
-    hiveSyncConfig.usePreApacheInputFormat = false;
-    hiveSyncConfig.databaseName = conf.getString(FlinkOptions.HIVE_SYNC_DB);
-    hiveSyncConfig.tableName = conf.getString(FlinkOptions.HIVE_SYNC_TABLE);
-    hiveSyncConfig.syncMode = conf.getString(FlinkOptions.HIVE_SYNC_MODE);
-    hiveSyncConfig.hiveUser = conf.getString(FlinkOptions.HIVE_SYNC_USERNAME);
-    hiveSyncConfig.hivePass = conf.getString(FlinkOptions.HIVE_SYNC_PASSWORD);
-    hiveSyncConfig.tableProperties = conf.getString(FlinkOptions.HIVE_SYNC_TABLE_PROPERTIES);
-    hiveSyncConfig.serdeProperties = conf.getString(FlinkOptions.HIVE_SYNC_TABLE_SERDE_PROPERTIES);
-    hiveSyncConfig.jdbcUrl = conf.getString(FlinkOptions.HIVE_SYNC_JDBC_URL);
-    hiveSyncConfig.partitionFields = Arrays.asList(FilePathUtils.extractHivePartitionFields(conf));
-    hiveSyncConfig.partitionValueExtractorClass = conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME);
-    hiveSyncConfig.useJdbc = conf.getBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC);
-    hiveSyncConfig.useFileListingFromMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED);
-    hiveSyncConfig.ignoreExceptions = conf.getBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS);
-    hiveSyncConfig.supportTimestamp = conf.getBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP);
-    hiveSyncConfig.autoCreateDatabase = conf.getBoolean(FlinkOptions.HIVE_SYNC_AUTO_CREATE_DB);
-    hiveSyncConfig.decodePartition = conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING);
-    hiveSyncConfig.skipROSuffix = conf.getBoolean(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX);
-    hiveSyncConfig.assumeDatePartitioning = conf.getBoolean(FlinkOptions.HIVE_SYNC_ASSUME_DATE_PARTITION);
-    hiveSyncConfig.withOperationField = conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED);
+    hiveSyncConfig.hoodieSyncConfigParams.basePath = conf.getString(FlinkOptions.PATH);
+    hiveSyncConfig.hoodieSyncConfigParams.baseFileFormat = conf.getString(FlinkOptions.HIVE_SYNC_FILE_FORMAT);
+    hiveSyncConfig.hiveSyncConfigParams.usePreApacheInputFormat = false;
+    hiveSyncConfig.hoodieSyncConfigParams.databaseName = conf.getString(FlinkOptions.HIVE_SYNC_DB);
+    hiveSyncConfig.hoodieSyncConfigParams.tableName = conf.getString(FlinkOptions.HIVE_SYNC_TABLE);
+    hiveSyncConfig.hiveSyncConfigParams.syncMode = conf.getString(FlinkOptions.HIVE_SYNC_MODE);
+    hiveSyncConfig.hiveSyncConfigParams.hiveUser = conf.getString(FlinkOptions.HIVE_SYNC_USERNAME);
+    hiveSyncConfig.hiveSyncConfigParams.hivePass = conf.getString(FlinkOptions.HIVE_SYNC_PASSWORD);
+    hiveSyncConfig.hiveSyncConfigParams.tableProperties = conf.getString(FlinkOptions.HIVE_SYNC_TABLE_PROPERTIES);
+    hiveSyncConfig.hiveSyncConfigParams.serdeProperties = conf.getString(FlinkOptions.HIVE_SYNC_TABLE_SERDE_PROPERTIES);
+    hiveSyncConfig.hiveSyncConfigParams.jdbcUrl = conf.getString(FlinkOptions.HIVE_SYNC_JDBC_URL);
+    hiveSyncConfig.hoodieSyncConfigParams.partitionFields = Arrays.asList(FilePathUtils.extractHivePartitionFields(conf));
+    hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass = conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME);
+    hiveSyncConfig.hiveSyncConfigParams.useJdbc = conf.getBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC);
+    hiveSyncConfig.hoodieSyncConfigParams.useFileListingFromMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED);
+    hiveSyncConfig.hiveSyncConfigParams.ignoreExceptions = conf.getBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS);
+    hiveSyncConfig.hiveSyncConfigParams.supportTimestamp = conf.getBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP);
+    hiveSyncConfig.hiveSyncConfigParams.autoCreateDatabase = conf.getBoolean(FlinkOptions.HIVE_SYNC_AUTO_CREATE_DB);
+    hiveSyncConfig.hoodieSyncConfigParams.decodePartition = conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING);
+    hiveSyncConfig.hiveSyncConfigParams.skipROSuffix = conf.getBoolean(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX);
+    hiveSyncConfig.hoodieSyncConfigParams.assumeDatePartitioning = conf.getBoolean(FlinkOptions.HIVE_SYNC_ASSUME_DATE_PARTITION);
+    hiveSyncConfig.hiveSyncConfigParams.withOperationField = conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED);
     return hiveSyncConfig;
   }
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java
index 7bfaade59e..2e8cc00bb1 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java
@@ -55,8 +55,8 @@ public class TestHiveSyncContext {
     HiveSyncConfig hiveSyncConfig1 = HiveSyncContext.buildSyncConfig(configuration1);
     HiveSyncConfig hiveSyncConfig2 = HiveSyncContext.buildSyncConfig(configuration2);
 
-    assertTrue(hiveSyncConfig1.partitionFields.get(0).equals(hiveSyncPartitionField));
-    assertTrue(hiveSyncConfig2.partitionFields.get(0).equals(partitionPathField));
+    assertTrue(hiveSyncConfig1.hoodieSyncConfigParams.partitionFields.get(0).equals(hiveSyncPartitionField));
+    assertTrue(hiveSyncConfig2.hoodieSyncConfigParams.partitionFields.get(0).equals(partitionPathField));
 
   }
 }
diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
index 0cb75eea89..b6904a6887 100644
--- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
+++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
@@ -23,7 +23,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.hudi.sync.common.HoodieSyncTool;
 import org.apache.hudi.sync.common.util.ManifestFileWriter;
 
 import com.beust.jcommander.JCommander;
@@ -40,7 +40,7 @@ import org.apache.log4j.Logger;
  *
  * @Experimental
  */
-public class BigQuerySyncTool extends AbstractSyncTool {
+public class BigQuerySyncTool extends HoodieSyncTool {
 
   private static final Logger LOG = LogManager.getLogger(BigQuerySyncTool.class);
 
diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
index cb41ca2272..c5222aca91 100644
--- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
+++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
@@ -20,7 +20,7 @@
 package org.apache.hudi.gcp.bigquery;
 
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hudi.sync.common.HoodieSyncClient;
 
 import com.google.cloud.bigquery.BigQuery;
 import com.google.cloud.bigquery.BigQueryException;
@@ -39,6 +39,8 @@ import com.google.cloud.bigquery.TableId;
 import com.google.cloud.bigquery.TableInfo;
 import com.google.cloud.bigquery.ViewDefinition;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.sync.common.operation.CatalogSync;
+import org.apache.hudi.sync.common.operation.TblPropertiesSync;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.parquet.schema.MessageType;
@@ -47,7 +49,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-public class HoodieBigQuerySyncClient extends AbstractSyncHoodieClient {
+public class HoodieBigQuerySyncClient extends HoodieSyncClient implements CatalogSync, TblPropertiesSync {
   private static final Logger LOG = LogManager.getLogger(HoodieBigQuerySyncClient.class);
 
   private final BigQuerySyncConfig syncConfig;
@@ -171,12 +173,6 @@ public class HoodieBigQuerySyncClient extends AbstractSyncHoodieClient {
     return Collections.emptyMap();
   }
 
-  @Override
-  public void addPartitionsToTable(final String tableName, final List<String> partitionsToAdd) {
-    // bigQuery discovers the new partitions automatically, so do nothing.
-    throw new UnsupportedOperationException("No support for addPartitionsToTable yet.");
-  }
-
   public boolean datasetExists() {
     Dataset dataset = bigquery.getDataset(DatasetId.of(syncConfig.projectId, syncConfig.datasetName));
     return dataset != null;
@@ -207,33 +203,8 @@ public class HoodieBigQuerySyncClient extends AbstractSyncHoodieClient {
   }
 
   @Override
-  public Option<String> getLastReplicatedTime(String tableName) {
-    // bigQuery doesn't support tblproperties, so do nothing.
-    throw new UnsupportedOperationException("Not support getLastReplicatedTime yet.");
-  }
-
-  @Override
-  public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
-    // bigQuery doesn't support tblproperties, so do nothing.
-    throw new UnsupportedOperationException("No support for updateLastReplicatedTimeStamp yet.");
-  }
-
-  @Override
-  public void deleteLastReplicatedTimeStamp(String tableName) {
-    // bigQuery doesn't support tblproperties, so do nothing.
-    throw new UnsupportedOperationException("No support for deleteLastReplicatedTimeStamp yet.");
-  }
-
-  @Override
-  public void updatePartitionsToTable(final String tableName, final List<String> changedPartitions) {
-    // bigQuery updates the partitions automatically, so do nothing.
-    throw new UnsupportedOperationException("No support for updatePartitionsToTable yet.");
-  }
-
-  @Override
-  public void dropPartitions(String tableName, List<String> partitionsToDrop) {
-    // bigQuery discovers the new partitions automatically, so do nothing.
-    throw new UnsupportedOperationException("No support for dropPartitions yet.");
+  public void updateTableProperties(String tableName, Map<String, String> tableProperties) {
+    throw new UnsupportedOperationException("No support for updateTableProperties yet.");
   }
 
   @Override
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
index 4736133f2c..8aef7bdf81 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
@@ -57,8 +57,8 @@ public class HiveQueryNode extends DagNode<Boolean> {
         .getDeltaSyncService().getDeltaSync().getCfg().baseFileFormat);
     HiveSyncConfig hiveSyncConfig = new HiveSyncConfig(properties);
     this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter());
-    Connection con = DriverManager.getConnection(hiveSyncConfig.jdbcUrl, hiveSyncConfig.hiveUser,
-        hiveSyncConfig.hivePass);
+    Connection con = DriverManager.getConnection(hiveSyncConfig.hiveSyncConfigParams.jdbcUrl, hiveSyncConfig.hiveSyncConfigParams.hiveUser,
+        hiveSyncConfig.hiveSyncConfigParams.hivePass);
     Statement stmt = con.createStatement();
     stmt.execute("set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat");
     for (String hiveProperty : this.config.getHiveProperties()) {
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
index 934dbadf1c..e351fe14c3 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
@@ -37,6 +37,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
 import org.apache.hudi.sync.common.HoodieSyncConfig;
+import org.apache.hudi.sync.common.SupportMetaSync;
 import org.apache.hudi.sync.common.util.SyncUtilHelpers;
 
 import org.apache.hadoop.conf.Configuration;
@@ -56,7 +57,7 @@ import java.util.Set;
  * {@link TransactionCoordinator}
  * using {@link HoodieJavaWriteClient}.
  */
-public class KafkaConnectTransactionServices implements ConnectTransactionServices {
+public class KafkaConnectTransactionServices implements ConnectTransactionServices, SupportMetaSync {
 
   private static final Logger LOG = LogManager.getLogger(KafkaConnectTransactionServices.class);
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index 4042f431d7..64b8079a64 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -279,50 +279,43 @@ public class DataSourceUtils {
   public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String basePath, String baseFileFormat) {
     checkRequiredProperties(props, Collections.singletonList(DataSourceWriteOptions.HIVE_TABLE().key()));
     HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
-    hiveSyncConfig.basePath = basePath;
-    hiveSyncConfig.usePreApacheInputFormat =
-        props.getBoolean(DataSourceWriteOptions.HIVE_USE_PRE_APACHE_INPUT_FORMAT().key(),
+    hiveSyncConfig.hoodieSyncConfigParams.basePath = basePath;
+    hiveSyncConfig.hiveSyncConfigParams.usePreApacheInputFormat = props.getBoolean(DataSourceWriteOptions.HIVE_USE_PRE_APACHE_INPUT_FORMAT().key(),
             Boolean.parseBoolean(DataSourceWriteOptions.HIVE_USE_PRE_APACHE_INPUT_FORMAT().defaultValue()));
-    hiveSyncConfig.databaseName = props.getString(DataSourceWriteOptions.HIVE_DATABASE().key(),
-        DataSourceWriteOptions.HIVE_DATABASE().defaultValue());
-    hiveSyncConfig.tableName = props.getString(DataSourceWriteOptions.HIVE_TABLE().key());
-    hiveSyncConfig.baseFileFormat = baseFileFormat;
-    hiveSyncConfig.hiveUser =
-        props.getString(DataSourceWriteOptions.HIVE_USER().key(), DataSourceWriteOptions.HIVE_USER().defaultValue());
-    hiveSyncConfig.hivePass =
-        props.getString(DataSourceWriteOptions.HIVE_PASS().key(), DataSourceWriteOptions.HIVE_PASS().defaultValue());
-    hiveSyncConfig.jdbcUrl =
-        props.getString(DataSourceWriteOptions.HIVE_URL().key(), DataSourceWriteOptions.HIVE_URL().defaultValue());
-    hiveSyncConfig.metastoreUris =
-            props.getString(DataSourceWriteOptions.METASTORE_URIS().key(), DataSourceWriteOptions.METASTORE_URIS().defaultValue());
-    hiveSyncConfig.partitionFields =
-        props.getStringList(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), ",", new ArrayList<>());
-    hiveSyncConfig.partitionValueExtractorClass =
-        props.getString(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(),
+    hiveSyncConfig.hoodieSyncConfigParams.databaseName = props.getString(DataSourceWriteOptions.HIVE_DATABASE().key(),
+            DataSourceWriteOptions.HIVE_DATABASE().defaultValue());
+    hiveSyncConfig.hoodieSyncConfigParams.tableName = props.getString(DataSourceWriteOptions.HIVE_TABLE().key());
+    hiveSyncConfig.hoodieSyncConfigParams.baseFileFormat = baseFileFormat;
+    hiveSyncConfig.hiveSyncConfigParams.hiveUser = props.getString(DataSourceWriteOptions.HIVE_USER().key(), DataSourceWriteOptions.HIVE_USER().defaultValue());
+    hiveSyncConfig.hiveSyncConfigParams.hivePass = props.getString(DataSourceWriteOptions.HIVE_PASS().key(), DataSourceWriteOptions.HIVE_PASS().defaultValue());
+    hiveSyncConfig.hiveSyncConfigParams.jdbcUrl = props.getString(DataSourceWriteOptions.HIVE_URL().key(), DataSourceWriteOptions.HIVE_URL().defaultValue());
+    hiveSyncConfig.hiveSyncConfigParams.metastoreUris = props.getString(DataSourceWriteOptions.METASTORE_URIS().key(), DataSourceWriteOptions.METASTORE_URIS().defaultValue());
+    hiveSyncConfig.hoodieSyncConfigParams.partitionFields = props.getStringList(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), ",", new ArrayList<>());
+    hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass = props.getString(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(),
             SlashEncodedDayPartitionValueExtractor.class.getName());
-    hiveSyncConfig.useJdbc = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_USE_JDBC().key(),
-        DataSourceWriteOptions.HIVE_USE_JDBC().defaultValue()));
+    hiveSyncConfig.hiveSyncConfigParams.useJdbc = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_USE_JDBC().key(),
+            DataSourceWriteOptions.HIVE_USE_JDBC().defaultValue()));
     if (props.containsKey(DataSourceWriteOptions.HIVE_SYNC_MODE().key())) {
-      hiveSyncConfig.syncMode = props.getString(DataSourceWriteOptions.HIVE_SYNC_MODE().key());
+      hiveSyncConfig.hiveSyncConfigParams.syncMode = props.getString(DataSourceWriteOptions.HIVE_SYNC_MODE().key());
     }
-    hiveSyncConfig.autoCreateDatabase = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE().key(),
-        DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE().defaultValue()));
-    hiveSyncConfig.ignoreExceptions = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS().key(),
-        DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS().defaultValue()));
-    hiveSyncConfig.skipROSuffix = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE().key(),
-        DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE().defaultValue()));
-    hiveSyncConfig.supportTimestamp = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP_TYPE().key(),
-        DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP_TYPE().defaultValue()));
-    hiveSyncConfig.isConditionalSync = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_CONDITIONAL_SYNC().key(),
-        DataSourceWriteOptions.HIVE_CONDITIONAL_SYNC().defaultValue()));
-    hiveSyncConfig.bucketSpec = props.getBoolean(DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().key(),
-        DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().defaultValue())
-        ? HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()),
+    hiveSyncConfig.hiveSyncConfigParams.autoCreateDatabase = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE().key(),
+            DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE().defaultValue()));
+    hiveSyncConfig.hiveSyncConfigParams.ignoreExceptions = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS().key(),
+            DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS().defaultValue()));
+    hiveSyncConfig.hiveSyncConfigParams.skipROSuffix = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE().key(),
+            DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE().defaultValue()));
+    hiveSyncConfig.hiveSyncConfigParams.supportTimestamp = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP_TYPE().key(),
+            DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP_TYPE().defaultValue()));
+    hiveSyncConfig.hoodieSyncConfigParams.isConditionalSync = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_CONDITIONAL_SYNC().key(),
+            DataSourceWriteOptions.HIVE_CONDITIONAL_SYNC().defaultValue()));
+    hiveSyncConfig.hiveSyncConfigParams.bucketSpec = props.getBoolean(DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().key(),
+            DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().defaultValue())
+            ? HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()),
             props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())) : null;
     if (props.containsKey(HiveExternalCatalog.CREATED_SPARK_VERSION())) {
-      hiveSyncConfig.sparkVersion = props.getString(HiveExternalCatalog.CREATED_SPARK_VERSION());
+      hiveSyncConfig.hoodieSyncConfigParams.sparkVersion = props.getString(HiveExternalCatalog.CREATED_SPARK_VERSION());
     }
-    hiveSyncConfig.syncComment = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SYNC_COMMENT().key(),
+    hiveSyncConfig.hiveSyncConfigParams.syncComment = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SYNC_COMMENT().key(),
             DataSourceWriteOptions.HIVE_SYNC_COMMENT().defaultValue()));
     return hiveSyncConfig;
   }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 3f67d5017f..cb2057a995 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -77,12 +77,12 @@ trait ProvidesHoodieConfig extends Logging {
         PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
         HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
         HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
-        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
-        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName,
-        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName,
+        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.hiveSyncConfigParams.syncMode,
+        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.hoodieSyncConfigParams.databaseName,
+        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.hoodieSyncConfigParams.tableName,
         HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
-        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass,
-        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
+        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass,
+        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.hiveSyncConfigParams.supportTimestamp.toString,
         HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
         SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
       )
@@ -195,11 +195,11 @@ trait ProvidesHoodieConfig extends Logging {
         HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
         HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
         HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
-        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
-        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName,
-        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName,
-        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
-        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass,
+        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.hiveSyncConfigParams.syncMode,
+        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.hoodieSyncConfigParams.databaseName,
+        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.hoodieSyncConfigParams.tableName,
+        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.hiveSyncConfigParams.supportTimestamp.toString,
+        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass,
         HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"),
         HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
         SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
@@ -232,12 +232,12 @@ trait ProvidesHoodieConfig extends Logging {
         PARTITIONPATH_FIELD.key -> partitionFields,
         HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
         HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
-        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
-        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName,
-        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName,
-        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
+        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.hiveSyncConfigParams.syncMode,
+        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.hoodieSyncConfigParams.databaseName,
+        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.hoodieSyncConfigParams.tableName,
+        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.hiveSyncConfigParams.supportTimestamp.toString,
         HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields,
-        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass
+        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass
       )
         .filter { case (_, v) => v != null }
     }
@@ -274,8 +274,8 @@ trait ProvidesHoodieConfig extends Logging {
         PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
         HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
         HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
-        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
-        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
+        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.hiveSyncConfigParams.syncMode,
+        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.hiveSyncConfigParams.supportTimestamp.toString,
         HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"),
         SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL
       )
@@ -290,32 +290,32 @@ trait ProvidesHoodieConfig extends Logging {
 
   def buildHiveSyncConfig(props: TypedProperties, hoodieCatalogTable: HoodieCatalogTable): HiveSyncConfig = {
     val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig
-    hiveSyncConfig.basePath = hoodieCatalogTable.tableLocation
-    hiveSyncConfig.baseFileFormat = hoodieCatalogTable.baseFileFormat
-    hiveSyncConfig.usePreApacheInputFormat = props.getBoolean(HiveSyncConfig.HIVE_USE_PRE_APACHE_INPUT_FORMAT.key, HiveSyncConfig.HIVE_USE_PRE_APACHE_INPUT_FORMAT.defaultValue.toBoolean)
-    hiveSyncConfig.databaseName = hoodieCatalogTable.table.identifier.database.getOrElse("default")
+    hiveSyncConfig.hoodieSyncConfigParams.basePath = hoodieCatalogTable.tableLocation
+    hiveSyncConfig.hoodieSyncConfigParams.baseFileFormat = hoodieCatalogTable.baseFileFormat
+    hiveSyncConfig.hiveSyncConfigParams.usePreApacheInputFormat = props.getBoolean(HiveSyncConfig.HIVE_USE_PRE_APACHE_INPUT_FORMAT.key, HiveSyncConfig.HIVE_USE_PRE_APACHE_INPUT_FORMAT.defaultValue.toBoolean)
+    hiveSyncConfig.hoodieSyncConfigParams.databaseName = hoodieCatalogTable.table.identifier.database.getOrElse("default")
     if (props.containsKey(HoodieSyncConfig.META_SYNC_TABLE_NAME.key)) {
-      hiveSyncConfig.tableName = props.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME.key)
+      hiveSyncConfig.hoodieSyncConfigParams.tableName = props.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME.key)
     } else {
-      hiveSyncConfig.tableName = hoodieCatalogTable.table.identifier.table
+      hiveSyncConfig.hoodieSyncConfigParams.tableName = hoodieCatalogTable.table.identifier.table
     }
-    hiveSyncConfig.syncMode = props.getString(HiveSyncConfig.HIVE_SYNC_MODE.key, HiveSyncMode.HMS.name())
-    hiveSyncConfig.hiveUser = props.getString(HiveSyncConfig.HIVE_USER.key, HiveSyncConfig.HIVE_USER.defaultValue)
-    hiveSyncConfig.hivePass = props.getString(HiveSyncConfig.HIVE_PASS.key, HiveSyncConfig.HIVE_PASS.defaultValue)
-    hiveSyncConfig.jdbcUrl = props.getString(HiveSyncConfig.HIVE_URL.key, HiveSyncConfig.HIVE_URL.defaultValue)
-    hiveSyncConfig.metastoreUris = props.getString(HiveSyncConfig.METASTORE_URIS.key, HiveSyncConfig.METASTORE_URIS.defaultValue)
-    hiveSyncConfig.partitionFields = props.getStringList(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key, ",", new util.ArrayList[String])
-    hiveSyncConfig.partitionValueExtractorClass = props.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key, classOf[MultiPartKeysValueExtractor].getName)
-    if (props.containsKey(HiveSyncConfig.HIVE_SYNC_MODE.key)) hiveSyncConfig.syncMode = props.getString(HiveSyncConfig.HIVE_SYNC_MODE.key)
-    hiveSyncConfig.autoCreateDatabase = props.getString(HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key, HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.defaultValue).toBoolean
-    hiveSyncConfig.ignoreExceptions = props.getString(HiveSyncConfig.HIVE_IGNORE_EXCEPTIONS.key, HiveSyncConfig.HIVE_IGNORE_EXCEPTIONS.defaultValue).toBoolean
-    hiveSyncConfig.skipROSuffix = props.getString(HiveSyncConfig.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.key, HiveSyncConfig.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.defaultValue).toBoolean
-    hiveSyncConfig.supportTimestamp = props.getString(HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key, "true").toBoolean
-    hiveSyncConfig.isConditionalSync = props.getString(HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC.key, HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC.defaultValue).toBoolean
-    hiveSyncConfig.bucketSpec = if (props.getBoolean(HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.key, HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.defaultValue)) HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key), props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key))
+    hiveSyncConfig.hiveSyncConfigParams.syncMode = props.getString(HiveSyncConfig.HIVE_SYNC_MODE.key, HiveSyncMode.HMS.name())
+    hiveSyncConfig.hiveSyncConfigParams.hiveUser = props.getString(HiveSyncConfig.HIVE_USER.key, HiveSyncConfig.HIVE_USER.defaultValue)
+    hiveSyncConfig.hiveSyncConfigParams.hivePass = props.getString(HiveSyncConfig.HIVE_PASS.key, HiveSyncConfig.HIVE_PASS.defaultValue)
+    hiveSyncConfig.hiveSyncConfigParams.jdbcUrl = props.getString(HiveSyncConfig.HIVE_URL.key, HiveSyncConfig.HIVE_URL.defaultValue)
+    hiveSyncConfig.hiveSyncConfigParams.metastoreUris = props.getString(HiveSyncConfig.METASTORE_URIS.key, HiveSyncConfig.METASTORE_URIS.defaultValue)
+    hiveSyncConfig.hoodieSyncConfigParams.partitionFields = props.getStringList(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key, ",", new util.ArrayList[String])
+    hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass = props.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key, classOf[MultiPartKeysValueExtractor].getName)
+    if (props.containsKey(HiveSyncConfig.HIVE_SYNC_MODE.key)) hiveSyncConfig.hiveSyncConfigParams.syncMode = props.getString(HiveSyncConfig.HIVE_SYNC_MODE.key)
+    hiveSyncConfig.hiveSyncConfigParams.autoCreateDatabase = props.getString(HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key, HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.defaultValue).toBoolean
+    hiveSyncConfig.hiveSyncConfigParams.ignoreExceptions = props.getString(HiveSyncConfig.HIVE_IGNORE_EXCEPTIONS.key, HiveSyncConfig.HIVE_IGNORE_EXCEPTIONS.defaultValue).toBoolean
+    hiveSyncConfig.hiveSyncConfigParams.skipROSuffix = props.getString(HiveSyncConfig.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.key, HiveSyncConfig.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.defaultValue).toBoolean
+    hiveSyncConfig.hiveSyncConfigParams.supportTimestamp = props.getString(HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key, "true").toBoolean
+    hiveSyncConfig.hoodieSyncConfigParams.isConditionalSync = props.getString(HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC.key, HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC.defaultValue).toBoolean
+    hiveSyncConfig.hiveSyncConfigParams.bucketSpec = if (props.getBoolean(HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.key, HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.defaultValue)) HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key), props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key))
     else null
-    if (props.containsKey(HiveExternalCatalog.CREATED_SPARK_VERSION)) hiveSyncConfig.sparkVersion = props.getString(HiveExternalCatalog.CREATED_SPARK_VERSION)
-    hiveSyncConfig.syncComment = props.getString(DataSourceWriteOptions.HIVE_SYNC_COMMENT.key, DataSourceWriteOptions.HIVE_SYNC_COMMENT.defaultValue).toBoolean
+    if (props.containsKey(HiveExternalCatalog.CREATED_SPARK_VERSION)) hiveSyncConfig.hoodieSyncConfigParams.sparkVersion = props.getString(HiveExternalCatalog.CREATED_SPARK_VERSION)
+    hiveSyncConfig.hiveSyncConfigParams.syncComment = props.getString(DataSourceWriteOptions.HIVE_SYNC_COMMENT.key, DataSourceWriteOptions.HIVE_SYNC_COMMENT.defaultValue).toBoolean
     hiveSyncConfig
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 636599ce0c..296b359e90 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -467,13 +467,13 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
         KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
         SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
         HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
+        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.hiveSyncConfigParams.syncMode,
         HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
-        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
         HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> targetTableDb,
         HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> targetTableName,
-        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
+        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.hiveSyncConfigParams.supportTimestamp.toString,
         HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
-        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass,
+        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass,
         HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"), // set the default parallelism to 200 for sql
         HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
         HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"),
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
index af5bbe7717..0a1f08d048 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
@@ -261,14 +261,14 @@ public class TestDataSourceUtils {
     HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, config.getBasePath(), PARQUET.name());
 
     if (useSyncMode) {
-      assertFalse(hiveSyncConfig.useJdbc);
-      assertEquals(HMS.name(), hiveSyncConfig.syncMode);
+      assertFalse(hiveSyncConfig.hiveSyncConfigParams.useJdbc);
+      assertEquals(HMS.name(), hiveSyncConfig.hiveSyncConfigParams.syncMode);
     } else {
-      assertTrue(hiveSyncConfig.useJdbc);
-      assertNull(hiveSyncConfig.syncMode);
+      assertTrue(hiveSyncConfig.hiveSyncConfigParams.useJdbc);
+      assertNull(hiveSyncConfig.hiveSyncConfigParams.syncMode);
     }
-    assertEquals(HIVE_DATABASE, hiveSyncConfig.databaseName);
-    assertEquals(HIVE_TABLE, hiveSyncConfig.tableName);
+    assertEquals(HIVE_DATABASE, hiveSyncConfig.hoodieSyncConfigParams.databaseName);
+    assertEquals(HIVE_TABLE, hiveSyncConfig.hoodieSyncConfigParams.tableName);
   }
 
   private void setAndVerifyHoodieWriteClientWith(final String partitionerClassName) {
diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AbstractAdbSyncHoodieClient.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AbstractAdbSyncHoodieClient.java
index 84316ddb11..b8dec82b7b 100644
--- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AbstractAdbSyncHoodieClient.java
+++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AbstractAdbSyncHoodieClient.java
@@ -24,26 +24,29 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hive.PartitionValueExtractor;
 import org.apache.hudi.hive.SchemaDifference;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hudi.sync.common.HoodieSyncClient;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.sync.common.operation.CatalogSync;
+import org.apache.hudi.sync.common.operation.PartitionsSync;
+import org.apache.hudi.sync.common.operation.TblPropertiesSync;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public abstract class AbstractAdbSyncHoodieClient extends AbstractSyncHoodieClient {
+public abstract class AbstractAdbSyncHoodieClient extends HoodieSyncClient implements PartitionsSync, CatalogSync, TblPropertiesSync {
   protected AdbSyncConfig adbSyncConfig;
   protected PartitionValueExtractor partitionValueExtractor;
   protected HoodieTimeline activeTimeline;
 
   public AbstractAdbSyncHoodieClient(AdbSyncConfig syncConfig, FileSystem fs) {
-    super(syncConfig.basePath, syncConfig.assumeDatePartitioning,
-        syncConfig.useFileListingFromMetadata, false, fs);
+    super(syncConfig.hoodieSyncConfigParams.basePath, syncConfig.hoodieSyncConfigParams.assumeDatePartitioning,
+            syncConfig.hoodieSyncConfigParams.useFileListingFromMetadata, false, fs);
     this.adbSyncConfig = syncConfig;
-    final String clazz = adbSyncConfig.partitionValueExtractorClass;
+    final String clazz = adbSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass;
     try {
       this.partitionValueExtractor = (PartitionValueExtractor) Class.forName(clazz).newInstance();
     } catch (Exception e) {
@@ -64,13 +67,13 @@ public abstract class AbstractAdbSyncHoodieClient extends AbstractSyncHoodieClie
     }
     List<PartitionEvent> events = new ArrayList<>();
     for (String storagePartition : partitionStoragePartitions) {
-      Path storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, storagePartition);
+      Path storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.hoodieSyncConfigParams.basePath, storagePartition);
       String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
       // Check if the partition values or if hdfs path is the same
       List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
-      if (adbSyncConfig.useHiveStylePartitioning) {
+      if (adbSyncConfig.adbSyncConfigParams.useHiveStylePartitioning) {
         String partition = String.join("/", storagePartitionValues);
-        storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, partition);
+        storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.hoodieSyncConfigParams.basePath, partition);
         fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
       }
       if (!storagePartitionValues.isEmpty()) {
@@ -100,13 +103,13 @@ public abstract class AbstractAdbSyncHoodieClient extends AbstractSyncHoodieClie
   public abstract void dropTable(String tableName);
 
   protected String getDatabasePath() {
-    String dbLocation = adbSyncConfig.dbLocation;
+    String dbLocation = adbSyncConfig.adbSyncConfigParams.dbLocation;
     Path dbLocationPath;
     if (StringUtils.isNullOrEmpty(dbLocation)) {
-      if (new Path(adbSyncConfig.basePath).isRoot()) {
-        dbLocationPath = new Path(adbSyncConfig.basePath);
+      if (new Path(adbSyncConfig.hoodieSyncConfigParams.basePath).isRoot()) {
+        dbLocationPath = new Path(adbSyncConfig.hoodieSyncConfigParams.basePath);
       } else {
-        dbLocationPath = new Path(adbSyncConfig.basePath).getParent();
+        dbLocationPath = new Path(adbSyncConfig.hoodieSyncConfigParams.basePath).getParent();
       }
     } else {
       dbLocationPath = new Path(dbLocation);
diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java
index ae2e7024e5..a9a1c847d5 100644
--- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java
+++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java
@@ -18,64 +18,22 @@
 
 package org.apache.hudi.sync.adb;
 
+import com.beust.jcommander.ParametersDelegate;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.sync.common.HoodieSyncConfig;
 
 import com.beust.jcommander.Parameter;
 
+import java.io.Serializable;
+
 /**
  * Configs needed to sync data into Alibaba Cloud AnalyticDB(ADB).
  */
 public class AdbSyncConfig extends HoodieSyncConfig {
 
-  @Parameter(names = {"--user"}, description = "Adb username", required = true)
-  public String adbUser;
-
-  @Parameter(names = {"--pass"}, description = "Adb password", required = true)
-  public String adbPass;
-
-  @Parameter(names = {"--jdbc-url"}, description = "Adb jdbc connect url", required = true)
-  public String jdbcUrl;
-
-  @Parameter(names = {"--skip-ro-suffix"}, description = "Whether skip the `_ro` suffix for read optimized table when syncing")
-  public Boolean skipROSuffix;
-
-  @Parameter(names = {"--skip-rt-sync"}, description = "Whether skip the rt table when syncing")
-  public Boolean skipRTSync;
-
-  @Parameter(names = {"--hive-style-partitioning"}, description = "Whether use hive style partitioning, true if like the following style: field1=value1/field2=value2")
-  public Boolean useHiveStylePartitioning;
-
-  @Parameter(names = {"--support-timestamp"}, description = "If true, converts int64(timestamp_micros) to timestamp type")
-  public Boolean supportTimestamp;
-
-  @Parameter(names = {"--spark-datasource"}, description = "Whether sync this table as spark data source table")
-  public Boolean syncAsSparkDataSourceTable;
-
-  @Parameter(names = {"--table-properties"}, description = "Table properties, to support read hoodie table as datasource table", required = true)
-  public String tableProperties;
-
-  @Parameter(names = {"--serde-properties"}, description = "Serde properties, to support read hoodie table as datasource table", required = true)
-  public String serdeProperties;
-
-  @Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore")
-  public int sparkSchemaLengthThreshold;
-
-  @Parameter(names = {"--db-location"}, description = "Database location")
-  public String dbLocation;
-
-  @Parameter(names = {"--auto-create-database"}, description = "Whether auto create adb database")
-  public Boolean autoCreateDatabase = true;
-
-  @Parameter(names = {"--skip-last-commit-time-sync"}, description = "Whether skip last commit time syncing")
-  public Boolean skipLastCommitTimeSync = false;
-
-  @Parameter(names = {"--drop-table-before-creation"}, description = "Whether drop table before creation")
-  public Boolean dropTableBeforeCreation = false;
-
-  @Parameter(names = {"--help", "-h"}, help = true)
-  public Boolean help = false;
+  public final AdbSyncConfigParams adbSyncConfigParams = new AdbSyncConfigParams();
 
   public static final ConfigProperty<String> ADB_SYNC_USER = ConfigProperty
       .key("hoodie.datasource.adb.sync.username")
@@ -159,48 +117,48 @@ public class AdbSyncConfig extends HoodieSyncConfig {
   public AdbSyncConfig(TypedProperties props) {
     super(props);
 
-    adbUser = getString(ADB_SYNC_USER);
-    adbPass = getString(ADB_SYNC_PASS);
-    jdbcUrl = getString(ADB_SYNC_JDBC_URL);
-    skipROSuffix = getBooleanOrDefault(ADB_SYNC_SKIP_RO_SUFFIX);
-    skipRTSync = getBooleanOrDefault(ADB_SYNC_SKIP_RT_SYNC);
-    useHiveStylePartitioning = getBooleanOrDefault(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING);
-    supportTimestamp = getBooleanOrDefault(ADB_SYNC_SUPPORT_TIMESTAMP);
-    syncAsSparkDataSourceTable = getBooleanOrDefault(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE);
-    tableProperties = getString(ADB_SYNC_TABLE_PROPERTIES);
-    serdeProperties = getString(ADB_SYNC_SERDE_PROPERTIES);
-    sparkSchemaLengthThreshold = getIntOrDefault(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD);
-    dbLocation = getString(ADB_SYNC_DB_LOCATION);
-    autoCreateDatabase = getBooleanOrDefault(ADB_SYNC_AUTO_CREATE_DATABASE);
-    skipLastCommitTimeSync = getBooleanOrDefault(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC);
-    dropTableBeforeCreation = getBooleanOrDefault(ADB_SYNC_DROP_TABLE_BEFORE_CREATION);
+    adbSyncConfigParams.hiveSyncConfigParams.hiveUser = getString(ADB_SYNC_USER);
+    adbSyncConfigParams.hiveSyncConfigParams.hivePass = getString(ADB_SYNC_PASS);
+    adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl = getString(ADB_SYNC_JDBC_URL);
+    adbSyncConfigParams.hiveSyncConfigParams.skipROSuffix = getBooleanOrDefault(ADB_SYNC_SKIP_RO_SUFFIX);
+    adbSyncConfigParams.skipRTSync = getBooleanOrDefault(ADB_SYNC_SKIP_RT_SYNC);
+    adbSyncConfigParams.useHiveStylePartitioning = getBooleanOrDefault(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING);
+    adbSyncConfigParams.supportTimestamp = getBooleanOrDefault(ADB_SYNC_SUPPORT_TIMESTAMP);
+    adbSyncConfigParams.syncAsSparkDataSourceTable = getBooleanOrDefault(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE);
+    adbSyncConfigParams.tableProperties = getString(ADB_SYNC_TABLE_PROPERTIES);
+    adbSyncConfigParams.serdeProperties = getString(ADB_SYNC_SERDE_PROPERTIES);
+    adbSyncConfigParams.sparkSchemaLengthThreshold = getIntOrDefault(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD);
+    adbSyncConfigParams.dbLocation = getString(ADB_SYNC_DB_LOCATION);
+    adbSyncConfigParams.autoCreateDatabase = getBooleanOrDefault(ADB_SYNC_AUTO_CREATE_DATABASE);
+    adbSyncConfigParams.skipLastCommitTimeSync = getBooleanOrDefault(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC);
+    adbSyncConfigParams.dropTableBeforeCreation = getBooleanOrDefault(ADB_SYNC_DROP_TABLE_BEFORE_CREATION);
   }
 
   public static TypedProperties toProps(AdbSyncConfig cfg) {
     TypedProperties properties = new TypedProperties();
-    properties.put(META_SYNC_DATABASE_NAME.key(), cfg.databaseName);
-    properties.put(META_SYNC_TABLE_NAME.key(), cfg.tableName);
-    properties.put(ADB_SYNC_USER.key(), cfg.adbUser);
-    properties.put(ADB_SYNC_PASS.key(), cfg.adbPass);
-    properties.put(ADB_SYNC_JDBC_URL.key(), cfg.jdbcUrl);
-    properties.put(META_SYNC_BASE_PATH.key(), cfg.basePath);
-    properties.put(META_SYNC_PARTITION_FIELDS.key(), String.join(",", cfg.partitionFields));
-    properties.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), cfg.partitionValueExtractorClass);
-    properties.put(META_SYNC_ASSUME_DATE_PARTITION.key(), String.valueOf(cfg.assumeDatePartitioning));
-    properties.put(ADB_SYNC_SKIP_RO_SUFFIX.key(), String.valueOf(cfg.skipROSuffix));
-    properties.put(ADB_SYNC_SKIP_RT_SYNC.key(), String.valueOf(cfg.skipRTSync));
-    properties.put(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING.key(), String.valueOf(cfg.useHiveStylePartitioning));
-    properties.put(META_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), String.valueOf(cfg.useFileListingFromMetadata));
-    properties.put(ADB_SYNC_SUPPORT_TIMESTAMP.key(), String.valueOf(cfg.supportTimestamp));
-    properties.put(ADB_SYNC_TABLE_PROPERTIES.key(), cfg.tableProperties);
-    properties.put(ADB_SYNC_SERDE_PROPERTIES.key(), cfg.serdeProperties);
-    properties.put(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE.key(), String.valueOf(cfg.syncAsSparkDataSourceTable));
-    properties.put(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key(), String.valueOf(cfg.sparkSchemaLengthThreshold));
-    properties.put(META_SYNC_SPARK_VERSION.key(), cfg.sparkVersion);
-    properties.put(ADB_SYNC_DB_LOCATION.key(), cfg.dbLocation);
-    properties.put(ADB_SYNC_AUTO_CREATE_DATABASE.key(), String.valueOf(cfg.autoCreateDatabase));
-    properties.put(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC.key(), String.valueOf(cfg.skipLastCommitTimeSync));
-    properties.put(ADB_SYNC_DROP_TABLE_BEFORE_CREATION.key(), String.valueOf(cfg.dropTableBeforeCreation));
+    properties.put(META_SYNC_DATABASE_NAME.key(), cfg.hoodieSyncConfigParams.databaseName);
+    properties.put(META_SYNC_TABLE_NAME.key(), cfg.hoodieSyncConfigParams.tableName);
+    properties.put(ADB_SYNC_USER.key(), cfg.adbSyncConfigParams.hiveSyncConfigParams.hiveUser);
+    properties.put(ADB_SYNC_PASS.key(), cfg.adbSyncConfigParams.hiveSyncConfigParams.hivePass);
+    properties.put(ADB_SYNC_JDBC_URL.key(), cfg.adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl);
+    properties.put(META_SYNC_BASE_PATH.key(), cfg.hoodieSyncConfigParams.basePath);
+    properties.put(META_SYNC_PARTITION_FIELDS.key(), String.join(",", cfg.hoodieSyncConfigParams.partitionFields));
+    properties.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), cfg.hoodieSyncConfigParams.partitionValueExtractorClass);
+    properties.put(META_SYNC_ASSUME_DATE_PARTITION.key(), String.valueOf(cfg.hoodieSyncConfigParams.assumeDatePartitioning));
+    properties.put(ADB_SYNC_SKIP_RO_SUFFIX.key(), String.valueOf(cfg.adbSyncConfigParams.hiveSyncConfigParams.skipROSuffix));
+    properties.put(ADB_SYNC_SKIP_RT_SYNC.key(), String.valueOf(cfg.adbSyncConfigParams.skipRTSync));
+    properties.put(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING.key(), String.valueOf(cfg.adbSyncConfigParams.useHiveStylePartitioning));
+    properties.put(META_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), String.valueOf(cfg.hoodieSyncConfigParams.useFileListingFromMetadata));
+    properties.put(ADB_SYNC_SUPPORT_TIMESTAMP.key(), String.valueOf(cfg.adbSyncConfigParams.supportTimestamp));
+    properties.put(ADB_SYNC_TABLE_PROPERTIES.key(), cfg.adbSyncConfigParams.tableProperties);
+    properties.put(ADB_SYNC_SERDE_PROPERTIES.key(), cfg.adbSyncConfigParams.serdeProperties);
+    properties.put(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE.key(), String.valueOf(cfg.adbSyncConfigParams.syncAsSparkDataSourceTable));
+    properties.put(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key(), String.valueOf(cfg.adbSyncConfigParams.sparkSchemaLengthThreshold));
+    properties.put(META_SYNC_SPARK_VERSION.key(), cfg.hoodieSyncConfigParams.sparkVersion);
+    properties.put(ADB_SYNC_DB_LOCATION.key(), cfg.adbSyncConfigParams.dbLocation);
+    properties.put(ADB_SYNC_AUTO_CREATE_DATABASE.key(), String.valueOf(cfg.adbSyncConfigParams.autoCreateDatabase));
+    properties.put(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC.key(), String.valueOf(cfg.adbSyncConfigParams.skipLastCommitTimeSync));
+    properties.put(ADB_SYNC_DROP_TABLE_BEFORE_CREATION.key(), String.valueOf(cfg.adbSyncConfigParams.dropTableBeforeCreation));
 
     return properties;
   }
@@ -208,33 +166,66 @@ public class AdbSyncConfig extends HoodieSyncConfig {
   @Override
   public String toString() {
     return "AdbSyncConfig{"
-        + "adbUser='" + adbUser + '\''
-        + ", adbPass='" + adbPass + '\''
-        + ", jdbcUrl='" + jdbcUrl + '\''
-        + ", skipROSuffix=" + skipROSuffix
-        + ", skipRTSync=" + skipRTSync
-        + ", useHiveStylePartitioning=" + useHiveStylePartitioning
-        + ", supportTimestamp=" + supportTimestamp
-        + ", syncAsSparkDataSourceTable=" + syncAsSparkDataSourceTable
-        + ", tableProperties='" + tableProperties + '\''
-        + ", serdeProperties='" + serdeProperties + '\''
-        + ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold
-        + ", dbLocation='" + dbLocation + '\''
-        + ", autoCreateDatabase=" + autoCreateDatabase
-        + ", skipLastCommitTimeSync=" + skipLastCommitTimeSync
-        + ", dropTableBeforeCreation=" + dropTableBeforeCreation
-        + ", help=" + help
-        + ", databaseName='" + databaseName + '\''
-        + ", tableName='" + tableName + '\''
-        + ", basePath='" + basePath + '\''
-        + ", baseFileFormat='" + baseFileFormat + '\''
-        + ", partitionFields=" + partitionFields
-        + ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\''
-        + ", assumeDatePartitioning=" + assumeDatePartitioning
-        + ", decodePartition=" + decodePartition
-        + ", useFileListingFromMetadata=" + useFileListingFromMetadata
-        + ", isConditionalSync=" + isConditionalSync
-        + ", sparkVersion='" + sparkVersion + '\''
+        + "adbUser='" + adbSyncConfigParams.hiveSyncConfigParams.hiveUser + '\''
+        + ", adbPass='" + adbSyncConfigParams.hiveSyncConfigParams.hivePass + '\''
+        + ", jdbcUrl='" + adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl + '\''
+        + ", skipROSuffix=" + adbSyncConfigParams.hiveSyncConfigParams.skipROSuffix
+        + ", skipRTSync=" + adbSyncConfigParams.skipRTSync
+        + ", useHiveStylePartitioning=" + adbSyncConfigParams.useHiveStylePartitioning
+        + ", supportTimestamp=" + adbSyncConfigParams.supportTimestamp
+        + ", syncAsSparkDataSourceTable=" + adbSyncConfigParams.syncAsSparkDataSourceTable
+        + ", tableProperties='" + adbSyncConfigParams.tableProperties + '\''
+        + ", serdeProperties='" + adbSyncConfigParams.serdeProperties + '\''
+        + ", sparkSchemaLengthThreshold=" + adbSyncConfigParams.sparkSchemaLengthThreshold
+        + ", dbLocation='" + adbSyncConfigParams.dbLocation + '\''
+        + ", autoCreateDatabase=" + adbSyncConfigParams.autoCreateDatabase
+        + ", skipLastCommitTimeSync=" + adbSyncConfigParams.skipLastCommitTimeSync
+        + ", dropTableBeforeCreation=" + adbSyncConfigParams.dropTableBeforeCreation
+        + ", help=" + adbSyncConfigParams.help
+        + ", databaseName='" + hoodieSyncConfigParams.databaseName + '\''
+        + ", tableName='" + hoodieSyncConfigParams.tableName + '\''
+        + ", basePath='" + hoodieSyncConfigParams.basePath + '\''
+        + ", baseFileFormat='" + hoodieSyncConfigParams.baseFileFormat + '\''
+        + ", partitionFields=" + hoodieSyncConfigParams.partitionFields
+        + ", partitionValueExtractorClass='" + hoodieSyncConfigParams.partitionValueExtractorClass + '\''
+        + ", assumeDatePartitioning=" + hoodieSyncConfigParams.assumeDatePartitioning
+        + ", decodePartition=" + hoodieSyncConfigParams.decodePartition
+        + ", useFileListingFromMetadata=" + hoodieSyncConfigParams.useFileListingFromMetadata
+        + ", isConditionalSync=" + hoodieSyncConfigParams.isConditionalSync
+        + ", sparkVersion='" + hoodieSyncConfigParams.sparkVersion + '\''
         + '}';
   }
+
+  public static class AdbSyncConfigParams implements Serializable {
+    @ParametersDelegate()
+    public HiveSyncConfig.HiveSyncConfigParams hiveSyncConfigParams = new HiveSyncConfig.HiveSyncConfigParams();
+
+    @Parameter(names = {"--support-timestamp"}, description = "If true, converts int64(timestamp_micros) to timestamp type")
+    public Boolean supportTimestamp;
+    @Parameter(names = {"--spark-datasource"}, description = "Whether sync this table as spark data source table")
+    public Boolean syncAsSparkDataSourceTable;
+    @Parameter(names = {"--table-properties"}, description = "Table properties, to support read hoodie table as datasource table", required = true)
+    public String tableProperties;
+    @Parameter(names = {"--serde-properties"}, description = "Serde properties, to support read hoodie table as datasource table", required = true)
+    public String serdeProperties;
+    @Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore")
+    public int sparkSchemaLengthThreshold;
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public Boolean help = false;
+    @Parameter(names = {"--hive-style-partitioning"}, description = "Whether use hive style partitioning, true if like the following style: field1=value1/field2=value2")
+    public Boolean useHiveStylePartitioning;
+    @Parameter(names = {"--skip-rt-sync"}, description = "Whether skip the rt table when syncing")
+    public Boolean skipRTSync;
+    @Parameter(names = {"--db-location"}, description = "Database location")
+    public String dbLocation;
+    @Parameter(names = {"--auto-create-database"}, description = "Whether auto create adb database")
+    public Boolean autoCreateDatabase = true;
+    @Parameter(names = {"--skip-last-commit-time-sync"}, description = "Whether skip last commit time syncing")
+    public Boolean skipLastCommitTimeSync = false;
+    @Parameter(names = {"--drop-table-before-creation"}, description = "Whether drop table before creation")
+    public Boolean dropTableBeforeCreation = false;
+
+    public AdbSyncConfigParams() {
+    }
+  }
 }
diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
index 8c2f9e2045..fb9911ace5 100644
--- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
+++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
@@ -26,9 +26,9 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.hive.SchemaDifference;
 import org.apache.hudi.hive.util.HiveSchemaUtil;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
-import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.hudi.sync.common.HoodieSyncClient.PartitionEvent;
+import org.apache.hudi.sync.common.HoodieSyncClient.PartitionEvent.PartitionEventType;
+import org.apache.hudi.sync.common.HoodieSyncTool;
 import org.apache.hudi.sync.common.util.ConfigUtils;
 
 import com.beust.jcommander.JCommander;
@@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils;
 import org.apache.parquet.schema.MessageType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,7 +56,7 @@ import java.util.stream.Collectors;
  * incremental partitions will be synced as well.
  */
 @SuppressWarnings("WeakerAccess")
-public class AdbSyncTool extends AbstractSyncTool {
+public class AdbSyncTool extends HoodieSyncTool {
   private static final Logger LOG = LoggerFactory.getLogger(AdbSyncTool.class);
 
   public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
@@ -72,13 +73,13 @@ public class AdbSyncTool extends AbstractSyncTool {
     this.hoodieAdbClient = getHoodieAdbClient(adbSyncConfig, fs);
     switch (hoodieAdbClient.getTableType()) {
       case COPY_ON_WRITE:
-        this.snapshotTableName = adbSyncConfig.tableName;
+        this.snapshotTableName = adbSyncConfig.hoodieSyncConfigParams.tableName;
         this.roTableTableName = Option.empty();
         break;
       case MERGE_ON_READ:
-        this.snapshotTableName = adbSyncConfig.tableName + SUFFIX_SNAPSHOT_TABLE;
-        this.roTableTableName = adbSyncConfig.skipROSuffix ? Option.of(adbSyncConfig.tableName)
-            : Option.of(adbSyncConfig.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
+        this.snapshotTableName = adbSyncConfig.hoodieSyncConfigParams.tableName + SUFFIX_SNAPSHOT_TABLE;
+        this.roTableTableName = adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.skipROSuffix ? Option.of(adbSyncConfig.hoodieSyncConfigParams.tableName)
+            : Option.of(adbSyncConfig.hoodieSyncConfigParams.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
         break;
       default:
         throw new HoodieAdbSyncException("Unknown table type:" + hoodieAdbClient.getTableType()
@@ -101,7 +102,7 @@ public class AdbSyncTool extends AbstractSyncTool {
           // Sync a ro table for MOR table
           syncHoodieTable(roTableTableName.get(), false, true);
           // Sync a rt table for MOR table
-          if (!adbSyncConfig.skipRTSync) {
+          if (!adbSyncConfig.adbSyncConfigParams.skipRTSync) {
             syncHoodieTable(snapshotTableName, true, false);
           }
           break;
@@ -110,7 +111,7 @@ public class AdbSyncTool extends AbstractSyncTool {
               + ", basePath:" + hoodieAdbClient.getBasePath());
       }
     } catch (Exception re) {
-      throw new HoodieAdbSyncException("Sync hoodie table to ADB failed, tableName:" + adbSyncConfig.tableName, re);
+      throw new HoodieAdbSyncException("Sync hoodie table to ADB failed, tableName:" + adbSyncConfig.hoodieSyncConfigParams.tableName, re);
     } finally {
       hoodieAdbClient.close();
     }
@@ -121,19 +122,19 @@ public class AdbSyncTool extends AbstractSyncTool {
     LOG.info("Try to sync hoodie table, tableName:{}, path:{}, tableType:{}",
         tableName, hoodieAdbClient.getBasePath(), hoodieAdbClient.getTableType());
 
-    if (adbSyncConfig.autoCreateDatabase) {
+    if (adbSyncConfig.adbSyncConfigParams.autoCreateDatabase) {
       try {
         synchronized (AdbSyncTool.class) {
-          if (!hoodieAdbClient.databaseExists(adbSyncConfig.databaseName)) {
-            hoodieAdbClient.createDatabase(adbSyncConfig.databaseName);
+          if (!hoodieAdbClient.databaseExists(adbSyncConfig.hoodieSyncConfigParams.databaseName)) {
+            hoodieAdbClient.createDatabase(adbSyncConfig.hoodieSyncConfigParams.databaseName);
           }
         }
       } catch (Exception e) {
-        throw new HoodieAdbSyncException("Failed to create database:" + adbSyncConfig.databaseName
+        throw new HoodieAdbSyncException("Failed to create database:" + adbSyncConfig.hoodieSyncConfigParams.databaseName
             + ", useRealtimeInputFormat = " + useRealtimeInputFormat, e);
       }
-    } else if (!hoodieAdbClient.databaseExists(adbSyncConfig.databaseName)) {
-      throw new HoodieAdbSyncException("ADB database does not exists:" + adbSyncConfig.databaseName);
+    } else if (!hoodieAdbClient.databaseExists(adbSyncConfig.hoodieSyncConfigParams.databaseName)) {
+      throw new HoodieAdbSyncException("ADB database does not exists:" + adbSyncConfig.hoodieSyncConfigParams.databaseName);
     }
 
     // Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table,
@@ -144,11 +145,11 @@ public class AdbSyncTool extends AbstractSyncTool {
     if (hoodieAdbClient.isBootstrap()
         && hoodieAdbClient.getTableType() == HoodieTableType.MERGE_ON_READ
         && !readAsOptimized) {
-      adbSyncConfig.syncAsSparkDataSourceTable = false;
+      adbSyncConfig.adbSyncConfigParams.syncAsSparkDataSourceTable = false;
       LOG.info("Disable sync as spark datasource table for mor rt table:{}", tableName);
     }
 
-    if (adbSyncConfig.dropTableBeforeCreation) {
+    if (adbSyncConfig.adbSyncConfigParams.dropTableBeforeCreation) {
       LOG.info("Drop table before creation, tableName:{}", tableName);
       hoodieAdbClient.dropTable(tableName);
     }
@@ -171,7 +172,7 @@ public class AdbSyncTool extends AbstractSyncTool {
 
     // Scan synced partitions
     List<String> writtenPartitionsSince;
-    if (adbSyncConfig.partitionFields.isEmpty()) {
+    if (adbSyncConfig.hoodieSyncConfigParams.partitionFields.isEmpty()) {
       writtenPartitionsSince = new ArrayList<>();
     } else {
       writtenPartitionsSince = hoodieAdbClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
@@ -183,7 +184,7 @@ public class AdbSyncTool extends AbstractSyncTool {
 
     // Update sync commit time
     // whether to skip syncing commit time stored in tbl properties, since it is time consuming.
-    if (!adbSyncConfig.skipLastCommitTimeSync) {
+    if (!adbSyncConfig.adbSyncConfigParams.skipLastCommitTimeSync) {
       hoodieAdbClient.updateLastCommitTimeSynced(tableName);
     }
     LOG.info("Sync complete for table:{}", tableName);
@@ -202,12 +203,12 @@ public class AdbSyncTool extends AbstractSyncTool {
   private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat,
                           boolean readAsOptimized, MessageType schema) throws Exception {
     // Append spark table properties & serde properties
-    Map<String, String> tableProperties = ConfigUtils.toMap(adbSyncConfig.tableProperties);
-    Map<String, String> serdeProperties = ConfigUtils.toMap(adbSyncConfig.serdeProperties);
-    if (adbSyncConfig.syncAsSparkDataSourceTable) {
-      Map<String, String> sparkTableProperties = getSparkTableProperties(adbSyncConfig.partitionFields,
-          adbSyncConfig.sparkVersion, adbSyncConfig.sparkSchemaLengthThreshold, schema);
-      Map<String, String> sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized, adbSyncConfig.basePath);
+    Map<String, String> tableProperties = ConfigUtils.toMap(adbSyncConfig.adbSyncConfigParams.tableProperties);
+    Map<String, String> serdeProperties = ConfigUtils.toMap(adbSyncConfig.adbSyncConfigParams.serdeProperties);
+    if (adbSyncConfig.adbSyncConfigParams.syncAsSparkDataSourceTable) {
+      Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(adbSyncConfig.hoodieSyncConfigParams.partitionFields,
+              adbSyncConfig.hoodieSyncConfigParams.sparkVersion, adbSyncConfig.adbSyncConfigParams.sparkSchemaLengthThreshold, schema);
+      Map<String, String> sparkSerdeProperties = SparkDataSourceTableUtils.getSparkSerdeProperties(readAsOptimized, adbSyncConfig.hoodieSyncConfigParams.basePath);
       tableProperties.putAll(sparkTableProperties);
       serdeProperties.putAll(sparkSerdeProperties);
       LOG.info("Sync as spark datasource table, tableName:{}, tableExists:{}, tableProperties:{}, sederProperties:{}",
@@ -227,8 +228,8 @@ public class AdbSyncTool extends AbstractSyncTool {
     } else {
       // Check if the table schema has evolved
       Map<String, String> tableSchema = hoodieAdbClient.getTableSchema(tableName);
-      SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, adbSyncConfig.partitionFields,
-          adbSyncConfig.supportTimestamp);
+      SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, adbSyncConfig.hoodieSyncConfigParams.partitionFields,
+              adbSyncConfig.adbSyncConfigParams.supportTimestamp);
       if (!schemaDiff.isEmpty()) {
         LOG.info("Schema difference found for table:{}", tableName);
         hoodieAdbClient.updateTableDefinition(tableName, schemaDiff);
@@ -244,7 +245,7 @@ public class AdbSyncTool extends AbstractSyncTool {
    */
   private void syncPartitions(String tableName, List<String> writtenPartitionsSince) {
     try {
-      if (adbSyncConfig.partitionFields.isEmpty()) {
+      if (adbSyncConfig.hoodieSyncConfigParams.partitionFields.isEmpty()) {
         LOG.info("Not a partitioned table.");
         return;
       }
@@ -271,13 +272,13 @@ public class AdbSyncTool extends AbstractSyncTool {
     // parse the params
     final AdbSyncConfig cfg = new AdbSyncConfig();
     JCommander cmd = new JCommander(cfg, null, args);
-    if (cfg.help || args.length == 0) {
+    if (cfg.adbSyncConfigParams.help || args.length == 0) {
       cmd.usage();
       System.exit(1);
     }
 
     Configuration hadoopConf = new Configuration();
-    FileSystem fs = FSUtils.getFs(cfg.basePath, hadoopConf);
+    FileSystem fs = FSUtils.getFs(cfg.hoodieSyncConfigParams.basePath, hadoopConf);
     new AdbSyncTool(AdbSyncConfig.toProps(cfg), hadoopConf, fs).syncHoodieTable();
   }
 }
diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java
index a347ba7011..81bd34ffa3 100644
--- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java
+++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java
@@ -69,7 +69,7 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
   public HoodieAdbJdbcClient(AdbSyncConfig syncConfig, FileSystem fs) {
     super(syncConfig, fs);
     createAdbConnection();
-    LOG.info("Init adb jdbc client success, jdbcUrl:{}", syncConfig.jdbcUrl);
+    LOG.info("Init adb jdbc client success, jdbcUrl:{}", syncConfig.adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl);
   }
 
   private void createAdbConnection() {
@@ -82,7 +82,8 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
       }
       try {
         this.connection = DriverManager.getConnection(
-            adbSyncConfig.jdbcUrl, adbSyncConfig.adbUser, adbSyncConfig.adbPass);
+                adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl,
+            adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.hiveUser, adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.hivePass);
       } catch (SQLException e) {
         throw new HoodieException("Cannot create adb connection ", e);
       }
@@ -106,7 +107,7 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
   @Override
   public void dropTable(String tableName) {
     LOG.info("Dropping table:{}", tableName);
-    String dropTable = "drop table if exists `" + adbSyncConfig.databaseName + "`.`" + tableName + "`";
+    String dropTable = "drop table if exists `" + adbSyncConfig.hoodieSyncConfigParams.databaseName + "`.`" + tableName + "`";
     executeAdbSql(dropTable);
   }
 
@@ -115,8 +116,8 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
     ResultSet result = null;
     try {
       DatabaseMetaData databaseMetaData = connection.getMetaData();
-      result = databaseMetaData.getColumns(adbSyncConfig.databaseName,
-          adbSyncConfig.databaseName, tableName, null);
+      result = databaseMetaData.getColumns(adbSyncConfig.hoodieSyncConfigParams.databaseName,
+              adbSyncConfig.hoodieSyncConfigParams.databaseName, tableName, null);
       while (result.next()) {
         String columnName = result.getString(4);
         String columnType = result.getString(6);
@@ -261,18 +262,8 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
   }
 
   @Override
-  public Option<String> getLastReplicatedTime(String tableName) {
-    throw new UnsupportedOperationException("Not support getLastReplicatedTime yet");
-  }
-
-  @Override
-  public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
-    throw new UnsupportedOperationException("Not support updateLastReplicatedTimeStamp yet");
-  }
-
-  @Override
-  public void deleteLastReplicatedTimeStamp(String tableName) {
-    throw new UnsupportedOperationException("Not support deleteLastReplicatedTimeStamp yet");
+  public void updateTableProperties(String tableName, Map<String, String> tableProperties) {
+    throw new UnsupportedOperationException("Not support updateTableProperties yet");
   }
 
   @Override
@@ -304,7 +295,7 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
             String str = resultSet.getString(1);
             if (!StringUtils.isNullOrEmpty(str)) {
               List<String> values = partitionValueExtractor.extractPartitionValuesInPath(str);
-              Path storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, String.join("/", values));
+              Path storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.hoodieSyncConfigParams.basePath, String.join("/", values));
               String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
               partitions.put(values, fullStoragePartitionPath);
             }
@@ -332,11 +323,11 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
 
   private String constructAddPartitionsSql(String tableName, List<String> partitions) {
     StringBuilder sqlBuilder = new StringBuilder("alter table `");
-    sqlBuilder.append(adbSyncConfig.databaseName).append("`").append(".`")
+    sqlBuilder.append(adbSyncConfig.hoodieSyncConfigParams.databaseName).append("`").append(".`")
         .append(tableName).append("`").append(" add if not exists ");
     for (String partition : partitions) {
       String partitionClause = getPartitionClause(partition);
-      Path partitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, partition);
+      Path partitionPath = FSUtils.getPartitionPath(adbSyncConfig.hoodieSyncConfigParams.basePath, partition);
       String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
       sqlBuilder.append("  partition (").append(partitionClause).append(") location '")
           .append(fullPartitionPathStr).append("' ");
@@ -347,13 +338,13 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
 
   private List<String> constructChangePartitionsSql(String tableName, List<String> partitions) {
     List<String> changePartitions = new ArrayList<>();
-    String useDatabase = "use `" + adbSyncConfig.databaseName + "`";
+    String useDatabase = "use `" + adbSyncConfig.hoodieSyncConfigParams.databaseName + "`";
     changePartitions.add(useDatabase);
 
     String alterTable = "alter table `" + tableName + "`";
     for (String partition : partitions) {
       String partitionClause = getPartitionClause(partition);
-      Path partitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, partition);
+      Path partitionPath = FSUtils.getPartitionPath(adbSyncConfig.hoodieSyncConfigParams.basePath, partition);
       String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
       String changePartition = alterTable + " add if not exists partition (" + partitionClause
           + ") location '" + fullPartitionPathStr + "'";
@@ -371,32 +362,32 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
    */
   private String getPartitionClause(String partition) {
     List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
-    ValidationUtils.checkArgument(adbSyncConfig.partitionFields.size() == partitionValues.size(),
-        "Partition key parts " + adbSyncConfig.partitionFields
+    ValidationUtils.checkArgument(adbSyncConfig.hoodieSyncConfigParams.partitionFields.size() == partitionValues.size(),
+        "Partition key parts " + adbSyncConfig.hoodieSyncConfigParams.partitionFields
             + " does not match with partition values " + partitionValues + ". Check partition strategy. ");
     List<String> partBuilder = new ArrayList<>();
-    for (int i = 0; i < adbSyncConfig.partitionFields.size(); i++) {
-      partBuilder.add(adbSyncConfig.partitionFields.get(i) + "='" + partitionValues.get(i) + "'");
+    for (int i = 0; i < adbSyncConfig.hoodieSyncConfigParams.partitionFields.size(); i++) {
+      partBuilder.add(adbSyncConfig.hoodieSyncConfigParams.partitionFields.get(i) + "='" + partitionValues.get(i) + "'");
     }
 
     return String.join(",", partBuilder);
   }
 
   private String constructShowPartitionSql(String tableName) {
-    return String.format("show partitions `%s`.`%s`", adbSyncConfig.databaseName, tableName);
+    return String.format("show partitions `%s`.`%s`", adbSyncConfig.hoodieSyncConfigParams.databaseName, tableName);
   }
 
   private String constructShowCreateTableSql(String tableName) {
-    return String.format("show create table `%s`.`%s`", adbSyncConfig.databaseName, tableName);
+    return String.format("show create table `%s`.`%s`", adbSyncConfig.hoodieSyncConfigParams.databaseName, tableName);
   }
 
   private String constructShowLikeTableSql(String tableName) {
-    return String.format("show tables from `%s` like '%s'", adbSyncConfig.databaseName, tableName);
+    return String.format("show tables from `%s` like '%s'", adbSyncConfig.hoodieSyncConfigParams.databaseName, tableName);
   }
 
   private String constructCreateDatabaseSql(String rootPath) {
     return String.format("create database if not exists `%s` with dbproperties(catalog = 'oss', location = '%s')",
-        adbSyncConfig.databaseName, rootPath);
+            adbSyncConfig.hoodieSyncConfigParams.databaseName, rootPath);
   }
 
   private String constructShowCreateDatabaseSql(String databaseName) {
@@ -405,25 +396,25 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
 
   private String constructUpdateTblPropertiesSql(String tableName, String lastCommitSynced) {
     return String.format("alter table `%s`.`%s` set tblproperties('%s' = '%s')",
-        adbSyncConfig.databaseName, tableName, HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced);
+            adbSyncConfig.hoodieSyncConfigParams.databaseName, tableName, HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced);
   }
 
   private String constructAddColumnSql(String tableName, String columnName, String columnType) {
     return String.format("alter table `%s`.`%s` add columns(`%s` %s)",
-        adbSyncConfig.databaseName, tableName, columnName, columnType);
+            adbSyncConfig.hoodieSyncConfigParams.databaseName, tableName, columnName, columnType);
   }
 
   private String constructChangeColumnSql(String tableName, String columnName, String columnType) {
     return String.format("alter table `%s`.`%s` change `%s` `%s` %s",
-        adbSyncConfig.databaseName, tableName, columnName, columnName, columnType);
+            adbSyncConfig.hoodieSyncConfigParams.databaseName, tableName, columnName, columnName, columnType);
   }
 
   private HiveSyncConfig getHiveSyncConfig() {
     HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
-    hiveSyncConfig.partitionFields = adbSyncConfig.partitionFields;
-    hiveSyncConfig.databaseName = adbSyncConfig.databaseName;
-    Path basePath = new Path(adbSyncConfig.basePath);
-    hiveSyncConfig.basePath = generateAbsolutePathStr(basePath);
+    hiveSyncConfig.hoodieSyncConfigParams.partitionFields = adbSyncConfig.hoodieSyncConfigParams.partitionFields;
+    hiveSyncConfig.hoodieSyncConfigParams.databaseName = adbSyncConfig.hoodieSyncConfigParams.databaseName;
+    Path basePath = new Path(adbSyncConfig.hoodieSyncConfigParams.basePath);
+    hiveSyncConfig.hoodieSyncConfigParams.basePath = generateAbsolutePathStr(basePath);
     return hiveSyncConfig;
   }
 
diff --git a/hudi-sync/hudi-adb-sync/src/test/java/org/apache/hudi/sync/adb/TestAdbSyncConfig.java b/hudi-sync/hudi-adb-sync/src/test/java/org/apache/hudi/sync/adb/TestAdbSyncConfig.java
index f4eb8fc7fc..0a3d937496 100644
--- a/hudi-sync/hudi-adb-sync/src/test/java/org/apache/hudi/sync/adb/TestAdbSyncConfig.java
+++ b/hudi-sync/hudi-adb-sync/src/test/java/org/apache/hudi/sync/adb/TestAdbSyncConfig.java
@@ -30,36 +30,36 @@ public class TestAdbSyncConfig {
   @Test
   public void testCopy() {
     AdbSyncConfig adbSyncConfig = new AdbSyncConfig();
-    adbSyncConfig.partitionFields = Arrays.asList("a", "b");
-    adbSyncConfig.basePath = "/tmp";
-    adbSyncConfig.assumeDatePartitioning = true;
-    adbSyncConfig.databaseName = "test";
-    adbSyncConfig.tableName = "test";
-    adbSyncConfig.adbUser = "adb";
-    adbSyncConfig.adbPass = "adb";
-    adbSyncConfig.jdbcUrl = "jdbc:mysql://localhost:3306";
-    adbSyncConfig.skipROSuffix = false;
-    adbSyncConfig.tableProperties = "spark.sql.sources.provider= 'hudi'\\n"
-        + "spark.sql.sources.schema.numParts = '1'\\n "
-        + "spark.sql.sources.schema.part.0 ='xx'\\n "
-        + "spark.sql.sources.schema.numPartCols = '1'\\n"
-        + "spark.sql.sources.schema.partCol.0 = 'dt'";
-    adbSyncConfig.serdeProperties = "'path'='/tmp/test_db/tbl'";
-    adbSyncConfig.dbLocation = "file://tmp/test_db";
+    adbSyncConfig.hoodieSyncConfigParams.partitionFields = Arrays.asList("a", "b");
+    adbSyncConfig.hoodieSyncConfigParams.basePath = "/tmp";
+    adbSyncConfig.hoodieSyncConfigParams.assumeDatePartitioning = true;
+    adbSyncConfig.hoodieSyncConfigParams.databaseName = "test";
+    adbSyncConfig.hoodieSyncConfigParams.tableName = "test";
+    adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.hiveUser = "adb";
+    adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.hivePass = "adb";
+    adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl = "jdbc:mysql://localhost:3306";
+    adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.skipROSuffix = false;
+    adbSyncConfig.adbSyncConfigParams.tableProperties = "spark.sql.sources.provider= 'hudi'\\n"
+            + "spark.sql.sources.schema.numParts = '1'\\n "
+            + "spark.sql.sources.schema.part.0 ='xx'\\n "
+            + "spark.sql.sources.schema.numPartCols = '1'\\n"
+            + "spark.sql.sources.schema.partCol.0 = 'dt'";
+    adbSyncConfig.adbSyncConfigParams.serdeProperties = "'path'='/tmp/test_db/tbl'";
+    adbSyncConfig.adbSyncConfigParams.dbLocation = "file://tmp/test_db";
 
     TypedProperties props = AdbSyncConfig.toProps(adbSyncConfig);
     AdbSyncConfig copied = new AdbSyncConfig(props);
 
-    assertEquals(copied.partitionFields, adbSyncConfig.partitionFields);
-    assertEquals(copied.basePath, adbSyncConfig.basePath);
-    assertEquals(copied.assumeDatePartitioning, adbSyncConfig.assumeDatePartitioning);
-    assertEquals(copied.databaseName, adbSyncConfig.databaseName);
-    assertEquals(copied.tableName, adbSyncConfig.tableName);
-    assertEquals(copied.adbUser, adbSyncConfig.adbUser);
-    assertEquals(copied.adbPass, adbSyncConfig.adbPass);
-    assertEquals(copied.basePath, adbSyncConfig.basePath);
-    assertEquals(copied.jdbcUrl, adbSyncConfig.jdbcUrl);
-    assertEquals(copied.skipROSuffix, adbSyncConfig.skipROSuffix);
-    assertEquals(copied.supportTimestamp, adbSyncConfig.supportTimestamp);
+    assertEquals(copied.hoodieSyncConfigParams.partitionFields, adbSyncConfig.hoodieSyncConfigParams.partitionFields);
+    assertEquals(copied.hoodieSyncConfigParams.basePath, adbSyncConfig.hoodieSyncConfigParams.basePath);
+    assertEquals(copied.hoodieSyncConfigParams.assumeDatePartitioning, adbSyncConfig.hoodieSyncConfigParams.assumeDatePartitioning);
+    assertEquals(copied.hoodieSyncConfigParams.databaseName, adbSyncConfig.hoodieSyncConfigParams.databaseName);
+    assertEquals(copied.hoodieSyncConfigParams.tableName, adbSyncConfig.hoodieSyncConfigParams.tableName);
+    assertEquals(copied.adbSyncConfigParams.hiveSyncConfigParams.hiveUser, adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.hiveUser);
+    assertEquals(copied.adbSyncConfigParams.hiveSyncConfigParams.hivePass, adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.hivePass);
+    assertEquals(copied.hoodieSyncConfigParams.basePath, adbSyncConfig.hoodieSyncConfigParams.basePath);
+    assertEquals(copied.adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl, adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl);
+    assertEquals(copied.adbSyncConfigParams.hiveSyncConfigParams.skipROSuffix, adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.skipROSuffix);
+    assertEquals(copied.adbSyncConfigParams.supportTimestamp, adbSyncConfig.adbSyncConfigParams.supportTimestamp);
   }
 }
diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
index 68569822cc..e784d591d6 100644
--- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
+++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
@@ -23,8 +23,9 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hudi.sync.common.HoodieSyncClient;
 import org.apache.hudi.sync.common.HoodieSyncException;
+import org.apache.hudi.sync.common.operation.TblPropertiesSync;
 import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
 
 import com.linkedin.common.urn.DatasetUrn;
@@ -53,14 +54,13 @@ import org.apache.avro.AvroTypeException;
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.parquet.schema.MessageType;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-public class DataHubSyncClient extends AbstractSyncHoodieClient {
+public class DataHubSyncClient extends HoodieSyncClient implements TblPropertiesSync {
 
   private final HoodieTimeline activeTimeline;
   private final DataHubSyncConfig syncConfig;
@@ -68,34 +68,13 @@ public class DataHubSyncClient extends AbstractSyncHoodieClient {
   private final DatasetUrn datasetUrn;
 
   public DataHubSyncClient(DataHubSyncConfig syncConfig, Configuration hadoopConf, FileSystem fs) {
-    super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata, false, fs);
+    super(syncConfig.hoodieSyncConfigParams.basePath, syncConfig.hoodieSyncConfigParams.assumeDatePartitioning, syncConfig.hoodieSyncConfigParams.useFileListingFromMetadata, false, fs);
     this.syncConfig = syncConfig;
     this.hadoopConf = hadoopConf;
     this.datasetUrn = syncConfig.datasetIdentifier.getDatasetUrn();
     this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
   }
 
-  @Override
-  public void createTable(String tableName,
-      MessageType storageSchema,
-      String inputFormatClass,
-      String outputFormatClass,
-      String serdeClass,
-      Map<String, String> serdeProperties,
-      Map<String, String> tableProperties) {
-    throw new UnsupportedOperationException("Not supported: `createTable`");
-  }
-
-  @Override
-  public boolean doesTableExist(String tableName) {
-    return tableExists(tableName);
-  }
-
-  @Override
-  public boolean tableExists(String tableName) {
-    throw new UnsupportedOperationException("Not supported: `tableExists`");
-  }
-
   @Override
   public Option<String> getLastCommitTimeSynced(String tableName) {
     throw new UnsupportedOperationException("Not supported: `getLastCommitTimeSynced`");
@@ -106,36 +85,6 @@ public class DataHubSyncClient extends AbstractSyncHoodieClient {
     updateTableProperties(tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, activeTimeline.lastInstant().get().getTimestamp()));
   }
 
-  @Override
-  public Option<String> getLastReplicatedTime(String tableName) {
-    throw new UnsupportedOperationException("Not supported: `getLastReplicatedTime`");
-  }
-
-  @Override
-  public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
-    throw new UnsupportedOperationException("Not supported: `updateLastReplicatedTimeStamp`");
-  }
-
-  @Override
-  public void deleteLastReplicatedTimeStamp(String tableName) {
-    throw new UnsupportedOperationException("Not supported: `deleteLastReplicatedTimeStamp`");
-  }
-
-  @Override
-  public void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {
-    throw new UnsupportedOperationException("Not supported: `addPartitionsToTable`");
-  }
-
-  @Override
-  public void updatePartitionsToTable(String tableName, List<String> changedPartitions) {
-    throw new UnsupportedOperationException("Not supported: `updatePartitionsToTable`");
-  }
-
-  @Override
-  public void dropPartitions(String tableName, List<String> partitionsToDrop) {
-    throw new UnsupportedOperationException("Not supported: `dropPartitions`");
-  }
-
   @Override
   public void updateTableProperties(String tableName, Map<String, String> tableProperties) {
     MetadataChangeProposalWrapper propertiesChangeProposal = MetadataChangeProposalWrapper.builder()
diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
index 9633d6b089..6502dcd1ce 100644
--- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
+++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
@@ -21,7 +21,7 @@ package org.apache.hudi.sync.datahub;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.hudi.sync.common.HoodieSyncTool;
 import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
 
 import com.beust.jcommander.JCommander;
@@ -34,7 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
  * @Experimental
  * @see <a href="https://datahubproject.io/">https://datahubproject.io/</a>
  */
-public class DataHubSyncTool extends AbstractSyncTool {
+public class DataHubSyncTool extends HoodieSyncTool {
 
   private final DataHubSyncConfig config;
 
@@ -56,8 +56,8 @@ public class DataHubSyncTool extends AbstractSyncTool {
   @Override
   public void syncHoodieTable() {
     try (DataHubSyncClient syncClient = new DataHubSyncClient(config, conf, fs)) {
-      syncClient.updateTableDefinition(config.tableName);
-      syncClient.updateLastCommitTimeSynced(config.tableName);
+      syncClient.updateTableDefinition(config.hoodieSyncConfigParams.tableName);
+      syncClient.updateLastCommitTimeSynced(config.hoodieSyncConfigParams.tableName);
     }
   }
 
@@ -68,7 +68,7 @@ public class DataHubSyncTool extends AbstractSyncTool {
       cmd.usage();
       System.exit(1);
     }
-    FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
+    FileSystem fs = FSUtils.getFs(cfg.hoodieSyncConfigParams.basePath, new Configuration());
     new DataHubSyncTool(cfg, fs.getConf(), fs).syncHoodieTable();
   }
 }
diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
index e3c1ad486c..0f9f9ece9e 100644
--- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
+++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
@@ -43,6 +43,6 @@ public class HoodieDataHubDatasetIdentifier {
   public DatasetUrn getDatasetUrn() {
     DataPlatformUrn dataPlatformUrn = new DataPlatformUrn(DEFAULT_HOODIE_DATAHUB_PLATFORM_NAME);
     DataHubSyncConfig config = new DataHubSyncConfig(props);
-    return new DatasetUrn(dataPlatformUrn, String.format("%s.%s", config.databaseName, config.tableName), FabricType.DEV);
+    return new DatasetUrn(dataPlatformUrn, String.format("%s.%s", config.hoodieSyncConfigParams.databaseName, config.hoodieSyncConfigParams.tableName), FabricType.DEV);
   }
 }
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/AbstractHiveSyncHoodieClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/AbstractHiveSyncHoodieClient.java
index f0641b6fc0..b52237c698 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/AbstractHiveSyncHoodieClient.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/AbstractHiveSyncHoodieClient.java
@@ -22,7 +22,7 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hudi.sync.common.HoodieSyncClient;
 import org.apache.hudi.sync.common.HoodieSyncException;
 import org.apache.hudi.sync.common.model.Partition;
 
@@ -31,6 +31,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hudi.sync.common.operation.CatalogSync;
+import org.apache.hudi.sync.common.operation.PartitionsSync;
+import org.apache.hudi.sync.common.operation.ReplicatedTimeSync;
+import org.apache.hudi.sync.common.operation.TblPropertiesSync;
 import org.apache.parquet.schema.MessageType;
 
 import java.util.ArrayList;
@@ -41,7 +45,8 @@ import java.util.Map;
 /**
  * Base class to sync Hudi tables with Hive based metastores, such as Hive server, HMS or managed Hive services.
  */
-public abstract class AbstractHiveSyncHoodieClient extends AbstractSyncHoodieClient {
+public abstract class AbstractHiveSyncHoodieClient extends HoodieSyncClient implements ReplicatedTimeSync,
+    PartitionsSync, CatalogSync, TblPropertiesSync {
 
   protected final HoodieTimeline activeTimeline;
   protected final HiveSyncConfig syncConfig;
@@ -49,10 +54,11 @@ public abstract class AbstractHiveSyncHoodieClient extends AbstractSyncHoodieCli
   protected final PartitionValueExtractor partitionValueExtractor;
 
   public AbstractHiveSyncHoodieClient(HiveSyncConfig syncConfig, Configuration hadoopConf, FileSystem fs) {
-    super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata, syncConfig.withOperationField, fs);
+    super(syncConfig.hoodieSyncConfigParams.basePath, syncConfig.hoodieSyncConfigParams.assumeDatePartitioning,
+        syncConfig.hoodieSyncConfigParams.useFileListingFromMetadata, syncConfig.hiveSyncConfigParams.withOperationField, fs);
     this.syncConfig = syncConfig;
     this.hadoopConf = hadoopConf;
-    this.partitionValueExtractor = ReflectionUtils.loadClass(syncConfig.partitionValueExtractorClass);
+    this.partitionValueExtractor = ReflectionUtils.loadClass(syncConfig.hoodieSyncConfigParams.partitionValueExtractorClass);
     this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
   }
 
@@ -75,7 +81,7 @@ public abstract class AbstractHiveSyncHoodieClient extends AbstractSyncHoodieCli
 
     List<PartitionEvent> events = new ArrayList<>();
     for (String storagePartition : partitionStoragePartitions) {
-      Path storagePartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, storagePartition);
+      Path storagePartitionPath = FSUtils.getPartitionPath(syncConfig.hoodieSyncConfigParams.basePath, storagePartition);
       String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
       // Check if the partition values or if hdfs path is the same
       List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index 36dba81a33..66d17661a8 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -24,79 +24,14 @@ import org.apache.hudi.sync.common.HoodieSyncConfig;
 
 import com.beust.jcommander.Parameter;
 
+import java.io.Serializable;
+
 /**
  * Configs needed to sync data into the Hive Metastore.
  */
 public class HiveSyncConfig extends HoodieSyncConfig {
 
-  @Parameter(names = {"--user"}, description = "Hive username")
-  public String hiveUser;
-
-  @Parameter(names = {"--pass"}, description = "Hive password")
-  public String hivePass;
-
-  @Parameter(names = {"--jdbc-url"}, description = "Hive jdbc connect url")
-  public String jdbcUrl;
-
-  @Parameter(names = {"--metastore-uris"}, description = "Hive metastore uris")
-  public String metastoreUris;
-
-  @Parameter(names = {"--use-pre-apache-input-format"},
-      description = "Use InputFormat under com.uber.hoodie package "
-          + "instead of org.apache.hudi package. Use this when you are in the process of migrating from "
-          + "com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to "
-          + "org.apache.hudi input format.")
-  public Boolean usePreApacheInputFormat;
-
-  @Parameter(names = {"--bucket-spec"}, description = "bucket spec stored in metastore", required = false)
-  public String bucketSpec;
-
-  @Deprecated
-  @Parameter(names = {"--use-jdbc"}, description = "Hive jdbc connect url")
-  public Boolean useJdbc;
-
-  @Parameter(names = {"--sync-mode"}, description = "Mode to choose for Hive ops. Valid values are hms,glue,jdbc and hiveql")
-  public String syncMode;
-
-  @Parameter(names = {"--auto-create-database"}, description = "Auto create hive database")
-  public Boolean autoCreateDatabase;
-
-  @Parameter(names = {"--ignore-exceptions"}, description = "Ignore hive exceptions")
-  public Boolean ignoreExceptions;
-
-  @Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering")
-  public Boolean skipROSuffix;
-
-  @Parameter(names = {"--table-properties"}, description = "Table properties to hive table")
-  public String tableProperties;
-
-  @Parameter(names = {"--serde-properties"}, description = "Serde properties to hive table")
-  public String serdeProperties;
-
-  @Parameter(names = {"--help", "-h"}, help = true)
-  public Boolean help = false;
-
-  @Parameter(names = {"--support-timestamp"}, description = "'INT64' with original type TIMESTAMP_MICROS is converted to hive 'timestamp' type."
-      + "Disabled by default for backward compatibility.")
-  public Boolean supportTimestamp;
-
-  @Parameter(names = {"--managed-table"}, description = "Create a managed table")
-  public Boolean createManagedTable;
-
-  @Parameter(names = {"--batch-sync-num"}, description = "The number of partitions one batch when synchronous partitions to hive")
-  public Integer batchSyncNum;
-
-  @Parameter(names = {"--spark-datasource"}, description = "Whether sync this table as spark data source table.")
-  public Boolean syncAsSparkDataSourceTable;
-
-  @Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore.")
-  public int sparkSchemaLengthThreshold;
-
-  @Parameter(names = {"--with-operation-field"}, description = "Whether to include the '_hoodie_operation' field in the metadata fields")
-  public Boolean withOperationField = false;
-
-  @Parameter(names = {"--sync-comment"}, description = "synchronize table comments to hive")
-  public boolean syncComment = false;
+  public final HiveSyncConfigParams hiveSyncConfigParams = new HiveSyncConfigParams();
 
   // HIVE SYNC SPECIFIC CONFIGS
   // NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes
@@ -223,64 +158,118 @@ public class HiveSyncConfig extends HoodieSyncConfig {
 
   public HiveSyncConfig(TypedProperties props) {
     super(props);
-    this.hiveUser = getStringOrDefault(HIVE_USER);
-    this.hivePass = getStringOrDefault(HIVE_PASS);
-    this.jdbcUrl = getStringOrDefault(HIVE_URL);
-    this.usePreApacheInputFormat = getBooleanOrDefault(HIVE_USE_PRE_APACHE_INPUT_FORMAT);
-    this.useJdbc = getBooleanOrDefault(HIVE_USE_JDBC);
-    this.metastoreUris = getStringOrDefault(METASTORE_URIS);
-    this.syncMode = getString(HIVE_SYNC_MODE);
-    this.autoCreateDatabase = getBooleanOrDefault(HIVE_AUTO_CREATE_DATABASE);
-    this.ignoreExceptions = getBooleanOrDefault(HIVE_IGNORE_EXCEPTIONS);
-    this.skipROSuffix = getBooleanOrDefault(HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE);
-    this.tableProperties = getString(HIVE_TABLE_PROPERTIES);
-    this.serdeProperties = getString(HIVE_TABLE_SERDE_PROPERTIES);
-    this.supportTimestamp = getBooleanOrDefault(HIVE_SUPPORT_TIMESTAMP_TYPE);
-    this.batchSyncNum = getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
-    this.syncAsSparkDataSourceTable = getBooleanOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE);
-    this.sparkSchemaLengthThreshold = getIntOrDefault(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD);
-    this.createManagedTable = getBooleanOrDefault(HIVE_CREATE_MANAGED_TABLE);
-    this.bucketSpec = getStringOrDefault(HIVE_SYNC_BUCKET_SYNC_SPEC);
-    this.syncComment = getBooleanOrDefault(HIVE_SYNC_COMMENT);
+    this.hiveSyncConfigParams.hiveUser = getStringOrDefault(HIVE_USER);
+    this.hiveSyncConfigParams.hivePass = getStringOrDefault(HIVE_PASS);
+    this.hiveSyncConfigParams.jdbcUrl = getStringOrDefault(HIVE_URL);
+    this.hiveSyncConfigParams.usePreApacheInputFormat = getBooleanOrDefault(HIVE_USE_PRE_APACHE_INPUT_FORMAT);
+    this.hiveSyncConfigParams.useJdbc = getBooleanOrDefault(HIVE_USE_JDBC);
+    this.hiveSyncConfigParams.metastoreUris = getStringOrDefault(METASTORE_URIS);
+    this.hiveSyncConfigParams.syncMode = getString(HIVE_SYNC_MODE);
+    this.hiveSyncConfigParams.autoCreateDatabase = getBooleanOrDefault(HIVE_AUTO_CREATE_DATABASE);
+    this.hiveSyncConfigParams.ignoreExceptions = getBooleanOrDefault(HIVE_IGNORE_EXCEPTIONS);
+    this.hiveSyncConfigParams.skipROSuffix = getBooleanOrDefault(HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE);
+    this.hiveSyncConfigParams.tableProperties = getString(HIVE_TABLE_PROPERTIES);
+    this.hiveSyncConfigParams.serdeProperties = getString(HIVE_TABLE_SERDE_PROPERTIES);
+    this.hiveSyncConfigParams.supportTimestamp = getBooleanOrDefault(HIVE_SUPPORT_TIMESTAMP_TYPE);
+    this.hiveSyncConfigParams.batchSyncNum = getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
+    this.hiveSyncConfigParams.syncAsSparkDataSourceTable = getBooleanOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE);
+    this.hiveSyncConfigParams.sparkSchemaLengthThreshold = getIntOrDefault(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD);
+    this.hiveSyncConfigParams.createManagedTable = getBooleanOrDefault(HIVE_CREATE_MANAGED_TABLE);
+    this.hiveSyncConfigParams.bucketSpec = getStringOrDefault(HIVE_SYNC_BUCKET_SYNC_SPEC);
+    this.hiveSyncConfigParams.syncComment = getBooleanOrDefault(HIVE_SYNC_COMMENT);
   }
 
   @Override
   public String toString() {
     return "HiveSyncConfig{"
-      + "databaseName='" + databaseName + '\''
-      + ", tableName='" + tableName + '\''
-      + ", bucketSpec='" + bucketSpec + '\''
-      + ", baseFileFormat='" + baseFileFormat + '\''
-      + ", hiveUser='" + hiveUser + '\''
-      + ", hivePass='" + hivePass + '\''
-      + ", jdbcUrl='" + jdbcUrl + '\''
-      + ", metastoreUris='" + metastoreUris + '\''
-      + ", basePath='" + basePath + '\''
-      + ", partitionFields=" + partitionFields
-      + ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\''
-      + ", assumeDatePartitioning=" + assumeDatePartitioning
-      + ", usePreApacheInputFormat=" + usePreApacheInputFormat
-      + ", useJdbc=" + useJdbc
-      + ", autoCreateDatabase=" + autoCreateDatabase
-      + ", ignoreExceptions=" + ignoreExceptions
-      + ", skipROSuffix=" + skipROSuffix
-      + ", useFileListingFromMetadata=" + useFileListingFromMetadata
-      + ", tableProperties='" + tableProperties + '\''
-      + ", serdeProperties='" + serdeProperties + '\''
-      + ", help=" + help
-      + ", supportTimestamp=" + supportTimestamp
-      + ", decodePartition=" + decodePartition
-      + ", createManagedTable=" + createManagedTable
-      + ", syncAsSparkDataSourceTable=" + syncAsSparkDataSourceTable
-      + ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold
-      + ", withOperationField=" + withOperationField
-      + ", isConditionalSync=" + isConditionalSync
-      + ", sparkVersion=" + sparkVersion
-      + ", syncComment=" + syncComment
+      + "databaseName='" + hoodieSyncConfigParams.databaseName + '\''
+      + ", tableName='" + hoodieSyncConfigParams.tableName + '\''
+      + ", bucketSpec='" + hiveSyncConfigParams.bucketSpec + '\''
+      + ", baseFileFormat='" + hoodieSyncConfigParams.baseFileFormat + '\''
+      + ", hiveUser='" + hiveSyncConfigParams.hiveUser + '\''
+      + ", hivePass='" + hiveSyncConfigParams.hivePass + '\''
+      + ", jdbcUrl='" + hiveSyncConfigParams.jdbcUrl + '\''
+      + ", metastoreUris='" + hiveSyncConfigParams.metastoreUris + '\''
+      + ", basePath='" + hoodieSyncConfigParams.basePath + '\''
+      + ", partitionFields=" + hoodieSyncConfigParams.partitionFields
+      + ", partitionValueExtractorClass='" + hoodieSyncConfigParams.partitionValueExtractorClass + '\''
+      + ", assumeDatePartitioning=" + hoodieSyncConfigParams.assumeDatePartitioning
+      + ", usePreApacheInputFormat=" + hiveSyncConfigParams.usePreApacheInputFormat
+      + ", useJdbc=" + hiveSyncConfigParams.useJdbc
+      + ", autoCreateDatabase=" + hiveSyncConfigParams.autoCreateDatabase
+      + ", ignoreExceptions=" + hiveSyncConfigParams.ignoreExceptions
+      + ", skipROSuffix=" + hiveSyncConfigParams.skipROSuffix
+      + ", useFileListingFromMetadata=" + hoodieSyncConfigParams.useFileListingFromMetadata
+      + ", tableProperties='" + hiveSyncConfigParams.tableProperties + '\''
+      + ", serdeProperties='" + hiveSyncConfigParams.serdeProperties + '\''
+      + ", help=" + hiveSyncConfigParams.help
+      + ", supportTimestamp=" + hiveSyncConfigParams.supportTimestamp
+      + ", decodePartition=" + hoodieSyncConfigParams.decodePartition
+      + ", createManagedTable=" + hiveSyncConfigParams.createManagedTable
+      + ", syncAsSparkDataSourceTable=" + hiveSyncConfigParams.syncAsSparkDataSourceTable
+      + ", sparkSchemaLengthThreshold=" + hiveSyncConfigParams.sparkSchemaLengthThreshold
+      + ", withOperationField=" + hiveSyncConfigParams.withOperationField
+      + ", isConditionalSync=" + hoodieSyncConfigParams.isConditionalSync
+      + ", sparkVersion=" + hoodieSyncConfigParams.sparkVersion
+      + ", syncComment=" + hiveSyncConfigParams.syncComment
       + '}';
   }
 
   public static String getBucketSpec(String bucketCols, int bucketNum) {
     return "CLUSTERED BY (" + bucketCols + " INTO " + bucketNum + " BUCKETS";
   }
+
+  public static class HiveSyncConfigParams implements Serializable {
+    @Parameter(names = {"--user"}, description = "Hive username")
+    public String hiveUser;
+    @Parameter(names = {"--pass"}, description = "Hive password")
+    public String hivePass;
+    @Parameter(names = {"--jdbc-url"}, description = "Hive jdbc connect url")
+    public String jdbcUrl;
+    @Parameter(names = {"--metastore-uris"}, description = "Hive metastore uris")
+    public String metastoreUris;
+    @Parameter(names = {"--use-pre-apache-input-format"},
+            description = "Use InputFormat under com.uber.hoodie package "
+                    + "instead of org.apache.hudi package. Use this when you are in the process of migrating from "
+                    + "com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to "
+                    + "org.apache.hudi input format.")
+    public Boolean usePreApacheInputFormat;
+    @Parameter(names = {"--bucket-spec"}, description = "bucket spec stored in metastore", required = false)
+    public String bucketSpec;
+    @Deprecated
+    @Parameter(names = {"--use-jdbc"}, description = "Hive jdbc connect url")
+    public Boolean useJdbc;
+    @Parameter(names = {"--sync-mode"}, description = "Mode to choose for Hive ops. Valid values are hms,glue,jdbc and hiveql")
+    public String syncMode;
+    @Parameter(names = {"--auto-create-database"}, description = "Auto create hive database")
+    public Boolean autoCreateDatabase;
+    @Parameter(names = {"--ignore-exceptions"}, description = "Ignore hive exceptions")
+    public Boolean ignoreExceptions;
+    @Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering")
+    public Boolean skipROSuffix;
+    @Parameter(names = {"--table-properties"}, description = "Table properties to hive table")
+    public String tableProperties;
+    @Parameter(names = {"--serde-properties"}, description = "Serde properties to hive table")
+    public String serdeProperties;
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public Boolean help = false;
+    @Parameter(names = {"--support-timestamp"}, description = "'INT64' with original type TIMESTAMP_MICROS is converted to hive 'timestamp' type."
+            + "Disabled by default for backward compatibility.")
+    public Boolean supportTimestamp;
+    @Parameter(names = {"--managed-table"}, description = "Create a managed table")
+    public Boolean createManagedTable;
+    @Parameter(names = {"--batch-sync-num"}, description = "The number of partitions one batch when synchronous partitions to hive")
+    public Integer batchSyncNum;
+    @Parameter(names = {"--spark-datasource"}, description = "Whether sync this table as spark data source table.")
+    public Boolean syncAsSparkDataSourceTable;
+    @Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore.")
+    public int sparkSchemaLengthThreshold;
+    @Parameter(names = {"--with-operation-field"}, description = "Whether to include the '_hoodie_operation' field in the metadata fields")
+    public Boolean withOperationField = false;
+    @Parameter(names = {"--sync-comment"}, description = "synchronize table comments to hive")
+    public boolean syncComment = false;
+
+    public HiveSyncConfigParams() {
+    }
+  }
 }
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 4d6fad033b..ded0756f8e 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -29,9 +29,9 @@ import org.apache.hudi.exception.InvalidTableException;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.sync.common.util.ConfigUtils;
 import org.apache.hudi.hive.util.HiveSchemaUtil;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
-import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.hudi.sync.common.HoodieSyncClient.PartitionEvent;
+import org.apache.hudi.sync.common.HoodieSyncClient.PartitionEvent.PartitionEventType;
+import org.apache.hudi.sync.common.HoodieSyncTool;
 import org.apache.hudi.sync.common.model.Partition;
 
 import com.beust.jcommander.JCommander;
@@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.parquet.schema.MessageType;
@@ -57,7 +58,7 @@ import java.util.stream.Collectors;
  * partitions incrementally (all the partitions modified since the last commit)
  */
 @SuppressWarnings("WeakerAccess")
-public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
+public class HiveSyncTool extends HoodieSyncTool implements AutoCloseable {
 
   private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class);
   public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
@@ -76,7 +77,7 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
     super(hiveSyncConfig.getProps(), hiveConf, fs);
     // TODO: reconcile the way to set METASTOREURIS
     if (StringUtils.isNullOrEmpty(hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))) {
-      hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, hiveSyncConfig.metastoreUris);
+      hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, hiveSyncConfig.hiveSyncConfigParams.metastoreUris);
     }
     // HiveConf needs to load fs conf to allow instantiation via AWSGlueClientFactory
     hiveConf.addResource(fs.getConf());
@@ -88,7 +89,7 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
     try {
       this.hoodieHiveClient = new HoodieHiveClient(hiveSyncConfig, hiveConf, fs);
     } catch (RuntimeException e) {
-      if (hiveSyncConfig.ignoreExceptions) {
+      if (hiveSyncConfig.hiveSyncConfigParams.ignoreExceptions) {
         LOG.error("Got runtime exception when hive syncing, but continuing as ignoreExceptions config is set ", e);
       } else {
         throw new HoodieHiveSyncException("Got runtime exception when hive syncing", e);
@@ -99,21 +100,21 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
   private void initConfig(HiveSyncConfig hiveSyncConfig) {
     // Set partitionFields to empty, when the NonPartitionedExtractor is used
     // TODO: HiveSyncConfig should be responsible for inferring config value
-    if (NonPartitionedExtractor.class.getName().equals(hiveSyncConfig.partitionValueExtractorClass)) {
+    if (NonPartitionedExtractor.class.getName().equals(hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass)) {
       LOG.warn("Set partitionFields to empty, since the NonPartitionedExtractor is used");
-      hiveSyncConfig.partitionFields = new ArrayList<>();
+      hiveSyncConfig.hoodieSyncConfigParams.partitionFields = new ArrayList<>();
     }
     this.hiveSyncConfig = hiveSyncConfig;
     if (hoodieHiveClient != null) {
       switch (hoodieHiveClient.getTableType()) {
         case COPY_ON_WRITE:
-          this.snapshotTableName = hiveSyncConfig.tableName;
+          this.snapshotTableName = hiveSyncConfig.hoodieSyncConfigParams.tableName;
           this.roTableName = Option.empty();
           break;
         case MERGE_ON_READ:
-          this.snapshotTableName = hiveSyncConfig.tableName + SUFFIX_SNAPSHOT_TABLE;
-          this.roTableName = hiveSyncConfig.skipROSuffix ? Option.of(hiveSyncConfig.tableName) :
-              Option.of(hiveSyncConfig.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
+          this.snapshotTableName = hiveSyncConfig.hoodieSyncConfigParams.tableName + SUFFIX_SNAPSHOT_TABLE;
+          this.roTableName = hiveSyncConfig.hiveSyncConfigParams.skipROSuffix ? Option.of(hiveSyncConfig.hoodieSyncConfigParams.tableName) :
+              Option.of(hiveSyncConfig.hoodieSyncConfigParams.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
           break;
         default:
           LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
@@ -126,13 +127,13 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
   public void syncHoodieTable() {
     try {
       if (hoodieHiveClient != null) {
-        LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :"
-            + hiveSyncConfig.jdbcUrl + ", basePath :" + hiveSyncConfig.basePath);
+        LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.hoodieSyncConfigParams.tableName + "). Hive metastore URL :"
+            + hiveSyncConfig.hiveSyncConfigParams.jdbcUrl + ", basePath :" + hiveSyncConfig.hoodieSyncConfigParams.basePath);
 
         doSync();
       }
     } catch (RuntimeException re) {
-      throw new HoodieException("Got runtime exception when hive syncing " + hiveSyncConfig.tableName, re);
+      throw new HoodieException("Got runtime exception when hive syncing " + hiveSyncConfig.hoodieSyncConfigParams.tableName, re);
     } finally {
       close();
     }
@@ -172,19 +173,19 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
         + " of type " + hoodieHiveClient.getTableType());
 
     // check if the database exists else create it
-    if (hiveSyncConfig.autoCreateDatabase) {
+    if (hiveSyncConfig.hiveSyncConfigParams.autoCreateDatabase) {
       try {
-        if (!hoodieHiveClient.databaseExists(hiveSyncConfig.databaseName)) {
-          hoodieHiveClient.createDatabase(hiveSyncConfig.databaseName);
+        if (!hoodieHiveClient.databaseExists(hiveSyncConfig.hoodieSyncConfigParams.databaseName)) {
+          hoodieHiveClient.createDatabase(hiveSyncConfig.hoodieSyncConfigParams.databaseName);
         }
       } catch (Exception e) {
         // this is harmless since table creation will fail anyways, creation of DB is needed for in-memory testing
         LOG.warn("Unable to create database", e);
       }
     } else {
-      if (!hoodieHiveClient.databaseExists(hiveSyncConfig.databaseName)) {
-        LOG.error("Hive database does not exist " + hiveSyncConfig.databaseName);
-        throw new HoodieHiveSyncException("hive database does not exist " + hiveSyncConfig.databaseName);
+      if (!hoodieHiveClient.databaseExists(hiveSyncConfig.hoodieSyncConfigParams.databaseName)) {
+        LOG.error("Hive database does not exist " + hiveSyncConfig.hoodieSyncConfigParams.databaseName);
+        throw new HoodieHiveSyncException("hive database does not exist " + hiveSyncConfig.hoodieSyncConfigParams.databaseName);
       }
     }
 
@@ -204,7 +205,7 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
     if (hoodieHiveClient.isBootstrap()
             && hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ
             && !readAsOptimized) {
-      hiveSyncConfig.syncAsSparkDataSourceTable = false;
+      hiveSyncConfig.hiveSyncConfigParams.syncAsSparkDataSourceTable = false;
     }
 
     // Sync schema if needed
@@ -223,7 +224,7 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
     // Sync the partitions if needed
     boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, isDropPartition);
     boolean meetSyncConditions = schemaChanged || partitionsChanged;
-    if (!hiveSyncConfig.isConditionalSync || meetSyncConditions) {
+    if (!hiveSyncConfig.hoodieSyncConfigParams.isConditionalSync || meetSyncConditions) {
       hoodieHiveClient.updateLastCommitTimeSynced(tableName);
     }
     LOG.info("Sync complete for " + tableName);
@@ -239,12 +240,12 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
   private boolean syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat,
                           boolean readAsOptimized, MessageType schema) {
     // Append spark table properties & serde properties
-    Map<String, String> tableProperties = ConfigUtils.toMap(hiveSyncConfig.tableProperties);
-    Map<String, String> serdeProperties = ConfigUtils.toMap(hiveSyncConfig.serdeProperties);
-    if (hiveSyncConfig.syncAsSparkDataSourceTable) {
-      Map<String, String> sparkTableProperties = getSparkTableProperties(hiveSyncConfig.partitionFields,
-          hiveSyncConfig.sparkVersion, hiveSyncConfig.sparkSchemaLengthThreshold, schema);
-      Map<String, String> sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized, hiveSyncConfig.basePath);
+    Map<String, String> tableProperties = ConfigUtils.toMap(hiveSyncConfig.hiveSyncConfigParams.tableProperties);
+    Map<String, String> serdeProperties = ConfigUtils.toMap(hiveSyncConfig.hiveSyncConfigParams.serdeProperties);
+    if (hiveSyncConfig.hiveSyncConfigParams.syncAsSparkDataSourceTable) {
+      Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(hiveSyncConfig.hoodieSyncConfigParams.partitionFields,
+              hiveSyncConfig.hoodieSyncConfigParams.sparkVersion, hiveSyncConfig.hiveSyncConfigParams.sparkSchemaLengthThreshold, schema);
+      Map<String, String> sparkSerdeProperties = SparkDataSourceTableUtils.getSparkSerdeProperties(readAsOptimized, hiveSyncConfig.hoodieSyncConfigParams.basePath);
       tableProperties.putAll(sparkTableProperties);
       serdeProperties.putAll(sparkSerdeProperties);
     }
@@ -252,10 +253,10 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
     // Check and sync schema
     if (!tableExists) {
       LOG.info("Hive table " + tableName + " is not found. Creating it");
-      HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(hiveSyncConfig.baseFileFormat.toUpperCase());
+      HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(hiveSyncConfig.hoodieSyncConfigParams.baseFileFormat.toUpperCase());
       String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat, useRealTimeInputFormat);
 
-      if (baseFileFormat.equals(HoodieFileFormat.PARQUET) && hiveSyncConfig.usePreApacheInputFormat) {
+      if (baseFileFormat.equals(HoodieFileFormat.PARQUET) && hiveSyncConfig.hiveSyncConfigParams.usePreApacheInputFormat) {
         // Parquet input format had an InputFormat class visible under the old naming scheme.
         inputFormatClassName = useRealTimeInputFormat
             ? com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat.class.getName()
@@ -274,12 +275,13 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
     } else {
       // Check if the table schema has evolved
       Map<String, String> tableSchema = hoodieHiveClient.getTableSchema(tableName);
-      SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, hiveSyncConfig.partitionFields, hiveSyncConfig.supportTimestamp);
+      SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, hiveSyncConfig.hoodieSyncConfigParams.partitionFields,
+          hiveSyncConfig.hiveSyncConfigParams.supportTimestamp);
       if (!schemaDiff.isEmpty()) {
         LOG.info("Schema difference found for " + tableName);
         hoodieHiveClient.updateTableDefinition(tableName, schema);
         // Sync the table properties if the schema has changed
-        if (hiveSyncConfig.tableProperties != null || hiveSyncConfig.syncAsSparkDataSourceTable) {
+        if (hiveSyncConfig.hiveSyncConfigParams.tableProperties != null || hiveSyncConfig.hiveSyncConfigParams.syncAsSparkDataSourceTable) {
           hoodieHiveClient.updateTableProperties(tableName, tableProperties);
           LOG.info("Sync table properties for " + tableName + ", table properties is: " + tableProperties);
         }
@@ -289,7 +291,7 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
       }
     }
 
-    if (hiveSyncConfig.syncComment) {
+    if (hiveSyncConfig.hiveSyncConfigParams.syncComment) {
       Schema avroSchemaWithoutMetadataFields = hoodieHiveClient.getAvroSchemaWithoutMetadataFields();
       Map<String, String> newComments = avroSchemaWithoutMetadataFields.getFields()
               .stream().collect(Collectors.toMap(Schema.Field::name, field -> StringUtils.isNullOrEmpty(field.doc()) ? "" : field.doc()));
@@ -349,11 +351,11 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
     // parse the params
     final HiveSyncConfig cfg = new HiveSyncConfig();
     JCommander cmd = new JCommander(cfg, null, args);
-    if (cfg.help || args.length == 0) {
+    if (cfg.hiveSyncConfigParams.help || args.length == 0) {
       cmd.usage();
       System.exit(1);
     }
-    FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
+    FileSystem fs = FSUtils.getFs(cfg.hoodieSyncConfigParams.basePath, new Configuration());
     HiveConf hiveConf = new HiveConf();
     hiveConf.addResource(fs.getConf());
     new HiveSyncTool(cfg, hiveConf, fs).syncHoodieTable();
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 539d18a213..a14f4f8a89 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -66,8 +66,8 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
     // Support JDBC, HiveQL and metastore based implementations for backwards compatibility. Future users should
     // disable jdbc and depend on metastore client for all hive registrations
     try {
-      if (!StringUtils.isNullOrEmpty(cfg.syncMode)) {
-        HiveSyncMode syncMode = HiveSyncMode.of(cfg.syncMode);
+      if (!StringUtils.isNullOrEmpty(cfg.hiveSyncConfigParams.syncMode)) {
+        HiveSyncMode syncMode = HiveSyncMode.of(cfg.hiveSyncConfigParams.syncMode);
         switch (syncMode) {
           case HMS:
             ddlExecutor = new HMSDDLExecutor(configuration, cfg, fs);
@@ -79,10 +79,10 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
             ddlExecutor = new JDBCExecutor(cfg, fs);
             break;
           default:
-            throw new HoodieHiveSyncException("Invalid sync mode given " + cfg.syncMode);
+            throw new HoodieHiveSyncException("Invalid sync mode given " + cfg.hiveSyncConfigParams.syncMode);
         }
       } else {
-        ddlExecutor = cfg.useJdbc ? new JDBCExecutor(cfg, fs) : new HiveQueryDDLExecutor(cfg, fs, configuration);
+        ddlExecutor = cfg.hiveSyncConfigParams.useJdbc ? new JDBCExecutor(cfg, fs) : new HiveQueryDDLExecutor(cfg, fs, configuration);
       }
       this.client = Hive.get(configuration).getMSC();
     } catch (Exception e) {
@@ -123,11 +123,11 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
       return;
     }
     try {
-      Table table = client.getTable(syncConfig.databaseName, tableName);
+      Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
       for (Map.Entry<String, String> entry : tableProperties.entrySet()) {
         table.putToParameters(entry.getKey(), entry.getValue());
       }
-      client.alter_table(syncConfig.databaseName, tableName, table);
+      client.alter_table(syncConfig.hoodieSyncConfigParams.databaseName, tableName, table);
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed to update table properties for table: "
           + tableName, e);
@@ -141,7 +141,7 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
    */
   @Deprecated
   public List<org.apache.hadoop.hive.metastore.api.Partition> scanTablePartitions(String tableName) throws TException {
-    return client.listPartitions(syncConfig.databaseName, tableName, (short) -1);
+    return client.listPartitions(syncConfig.hoodieSyncConfigParams.databaseName, tableName, (short) -1);
   }
 
   @Override
@@ -152,12 +152,12 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
   @Override
   public List<Partition> getAllPartitions(String tableName) {
     try {
-      return client.listPartitions(syncConfig.databaseName, tableName, (short) -1)
+      return client.listPartitions(syncConfig.hoodieSyncConfigParams.databaseName, tableName, (short) -1)
           .stream()
           .map(p -> new Partition(p.getValues(), p.getSd().getLocation()))
           .collect(Collectors.toList());
     } catch (TException e) {
-      throw new HoodieHiveSyncException("Failed to get all partitions for table " + tableId(syncConfig.databaseName, tableName), e);
+      throw new HoodieHiveSyncException("Failed to get all partitions for table " + tableId(syncConfig.hoodieSyncConfigParams.databaseName, tableName), e);
     }
   }
 
@@ -189,7 +189,7 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
   @Override
   public boolean tableExists(String tableName) {
     try {
-      return client.tableExists(syncConfig.databaseName, tableName);
+      return client.tableExists(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
     } catch (TException e) {
       throw new HoodieHiveSyncException("Failed to check if table exists " + tableName, e);
     }
@@ -222,7 +222,7 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
   public Option<String> getLastCommitTimeSynced(String tableName) {
     // Get the last commit time from the TBLproperties
     try {
-      Table table = client.getTable(syncConfig.databaseName, tableName);
+      Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
       return Option.ofNullable(table.getParameters().getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, null));
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed to get the last commit time synced from the table " + tableName, e);
@@ -232,10 +232,10 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
   public Option<String> getLastReplicatedTime(String tableName) {
     // Get the last replicated time from the TBLproperties
     try {
-      Table table = client.getTable(syncConfig.databaseName, tableName);
+      Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
       return Option.ofNullable(table.getParameters().getOrDefault(GLOBALLY_CONSISTENT_READ_TIMESTAMP, null));
     } catch (NoSuchObjectException e) {
-      LOG.warn("the said table not found in hms " + syncConfig.databaseName + "." + tableName);
+      LOG.warn("the said table not found in hms " + syncConfig.hoodieSyncConfigParams.databaseName + "." + tableName);
       return Option.empty();
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed to get the last replicated time from the table " + tableName, e);
@@ -249,9 +249,9 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
           "Not a valid completed timestamp " + timeStamp + " for table " + tableName);
     }
     try {
-      Table table = client.getTable(syncConfig.databaseName, tableName);
+      Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
       table.putToParameters(GLOBALLY_CONSISTENT_READ_TIMESTAMP, timeStamp);
-      client.alter_table(syncConfig.databaseName, tableName, table);
+      client.alter_table(syncConfig.hoodieSyncConfigParams.databaseName, tableName, table);
     } catch (Exception e) {
       throw new HoodieHiveSyncException(
           "Failed to update last replicated time to " + timeStamp + " for " + tableName, e);
@@ -260,9 +260,9 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
 
   public void deleteLastReplicatedTimeStamp(String tableName) {
     try {
-      Table table = client.getTable(syncConfig.databaseName, tableName);
+      Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
       String timestamp = table.getParameters().remove(GLOBALLY_CONSISTENT_READ_TIMESTAMP);
-      client.alter_table(syncConfig.databaseName, tableName, table);
+      client.alter_table(syncConfig.hoodieSyncConfigParams.databaseName, tableName, table);
       if (timestamp != null) {
         LOG.info("deleted last replicated timestamp " + timestamp + " for table " + tableName);
       }
@@ -293,9 +293,9 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
     Option<String> lastCommitSynced = activeTimeline.lastInstant().map(HoodieInstant::getTimestamp);
     if (lastCommitSynced.isPresent()) {
       try {
-        Table table = client.getTable(syncConfig.databaseName, tableName);
+        Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
         table.putToParameters(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced.get());
-        client.alter_table(syncConfig.databaseName, tableName, table);
+        client.alter_table(syncConfig.hoodieSyncConfigParams.databaseName, tableName, table);
       } catch (Exception e) {
         throw new HoodieHiveSyncException("Failed to get update last commit time synced to " + lastCommitSynced, e);
       }
@@ -305,7 +305,7 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
   @Override
   public List<FieldSchema> getTableCommentUsingMetastoreClient(String tableName) {
     try {
-      return client.getSchema(syncConfig.databaseName, tableName);
+      return client.getSchema(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed to get table comments for : " + tableName, e);
     }
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
index 868f59b4fe..73fd31a51a 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
@@ -71,10 +71,10 @@ public class HMSDDLExecutor implements DDLExecutor {
     this.fs = fs;
     try {
       this.partitionValueExtractor =
-          (PartitionValueExtractor) Class.forName(syncConfig.partitionValueExtractorClass).newInstance();
+          (PartitionValueExtractor) Class.forName(syncConfig.hoodieSyncConfigParams.partitionValueExtractorClass).newInstance();
     } catch (Exception e) {
       throw new HoodieHiveSyncException(
-          "Failed to initialize PartitionValueExtractor class " + syncConfig.partitionValueExtractorClass, e);
+          "Failed to initialize PartitionValueExtractor class " + syncConfig.hoodieSyncConfigParams.partitionValueExtractorClass, e);
     }
   }
 
@@ -93,16 +93,16 @@ public class HMSDDLExecutor implements DDLExecutor {
   public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass, Map<String, String> serdeProperties,
                           Map<String, String> tableProperties) {
     try {
-      LinkedHashMap<String, String> mapSchema = HiveSchemaUtil.parquetSchemaToMapSchema(storageSchema, syncConfig.supportTimestamp, false);
+      LinkedHashMap<String, String> mapSchema = HiveSchemaUtil.parquetSchemaToMapSchema(storageSchema, syncConfig.hiveSyncConfigParams.supportTimestamp, false);
 
       List<FieldSchema> fieldSchema = HiveSchemaUtil.convertMapSchemaToHiveFieldSchema(mapSchema, syncConfig);
 
-      List<FieldSchema> partitionSchema = syncConfig.partitionFields.stream().map(partitionKey -> {
+      List<FieldSchema> partitionSchema = syncConfig.hoodieSyncConfigParams.partitionFields.stream().map(partitionKey -> {
         String partitionKeyType = HiveSchemaUtil.getPartitionKeyType(mapSchema, partitionKey);
         return new FieldSchema(partitionKey, partitionKeyType.toLowerCase(), "");
       }).collect(Collectors.toList());
       Table newTb = new Table();
-      newTb.setDbName(syncConfig.databaseName);
+      newTb.setDbName(syncConfig.hoodieSyncConfigParams.databaseName);
       newTb.setTableName(tableName);
       newTb.setOwner(UserGroupInformation.getCurrentUser().getShortUserName());
       newTb.setCreateTime((int) System.currentTimeMillis());
@@ -110,13 +110,13 @@ public class HMSDDLExecutor implements DDLExecutor {
       storageDescriptor.setCols(fieldSchema);
       storageDescriptor.setInputFormat(inputFormatClass);
       storageDescriptor.setOutputFormat(outputFormatClass);
-      storageDescriptor.setLocation(syncConfig.basePath);
+      storageDescriptor.setLocation(syncConfig.hoodieSyncConfigParams.basePath);
       serdeProperties.put("serialization.format", "1");
       storageDescriptor.setSerdeInfo(new SerDeInfo(null, serdeClass, serdeProperties));
       newTb.setSd(storageDescriptor);
       newTb.setPartitionKeys(partitionSchema);
 
-      if (!syncConfig.createManagedTable) {
+      if (!syncConfig.hiveSyncConfigParams.createManagedTable) {
         newTb.putToParameters("EXTERNAL", "TRUE");
       }
 
@@ -134,9 +134,9 @@ public class HMSDDLExecutor implements DDLExecutor {
   @Override
   public void updateTableDefinition(String tableName, MessageType newSchema) {
     try {
-      boolean cascade = syncConfig.partitionFields.size() > 0;
+      boolean cascade = syncConfig.hoodieSyncConfigParams.partitionFields.size() > 0;
       List<FieldSchema> fieldSchema = HiveSchemaUtil.convertParquetSchemaToHiveFieldSchema(newSchema, syncConfig);
-      Table table = client.getTable(syncConfig.databaseName, tableName);
+      Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
       StorageDescriptor sd = table.getSd();
       sd.setCols(fieldSchema);
       table.setSd(sd);
@@ -145,7 +145,7 @@ public class HMSDDLExecutor implements DDLExecutor {
         LOG.info("partition table,need cascade");
         environmentContext.putToProperties(StatsSetupConst.CASCADE, StatsSetupConst.TRUE);
       }
-      client.alter_table_with_environmentContext(syncConfig.databaseName, tableName, table, environmentContext);
+      client.alter_table_with_environmentContext(syncConfig.hoodieSyncConfigParams.databaseName, tableName, table, environmentContext);
     } catch (Exception e) {
       LOG.error("Failed to update table for " + tableName, e);
       throw new HoodieHiveSyncException("Failed to update table for " + tableName, e);
@@ -158,7 +158,7 @@ public class HMSDDLExecutor implements DDLExecutor {
       // HiveMetastoreClient returns partition keys separate from Columns, hence get both and merge to
       // get the Schema of the table.
       final long start = System.currentTimeMillis();
-      Table table = this.client.getTable(syncConfig.databaseName, tableName);
+      Table table = this.client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
       Map<String, String> partitionKeysMap =
           table.getPartitionKeys().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase()));
 
@@ -184,22 +184,22 @@ public class HMSDDLExecutor implements DDLExecutor {
     }
     LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName);
     try {
-      StorageDescriptor sd = client.getTable(syncConfig.databaseName, tableName).getSd();
+      StorageDescriptor sd = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName).getSd();
       List<Partition> partitionList = partitionsToAdd.stream().map(partition -> {
         StorageDescriptor partitionSd = new StorageDescriptor();
         partitionSd.setCols(sd.getCols());
         partitionSd.setInputFormat(sd.getInputFormat());
         partitionSd.setOutputFormat(sd.getOutputFormat());
         partitionSd.setSerdeInfo(sd.getSerdeInfo());
-        String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
+        String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.hoodieSyncConfigParams.basePath, partition).toString();
         List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
         partitionSd.setLocation(fullPartitionPath);
-        return new Partition(partitionValues, syncConfig.databaseName, tableName, 0, 0, partitionSd, null);
+        return new Partition(partitionValues, syncConfig.hoodieSyncConfigParams.databaseName, tableName, 0, 0, partitionSd, null);
       }).collect(Collectors.toList());
       client.add_partitions(partitionList, true, false);
     } catch (TException e) {
-      LOG.error(syncConfig.databaseName + "." + tableName + " add partition failed", e);
-      throw new HoodieHiveSyncException(syncConfig.databaseName + "." + tableName + " add partition failed", e);
+      LOG.error(syncConfig.hoodieSyncConfigParams.databaseName + "." + tableName + " add partition failed", e);
+      throw new HoodieHiveSyncException(syncConfig.hoodieSyncConfigParams.databaseName + "." + tableName + " add partition failed", e);
     }
   }
 
@@ -211,20 +211,20 @@ public class HMSDDLExecutor implements DDLExecutor {
     }
     LOG.info("Changing partitions " + changedPartitions.size() + " on " + tableName);
     try {
-      StorageDescriptor sd = client.getTable(syncConfig.databaseName, tableName).getSd();
+      StorageDescriptor sd = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName).getSd();
       List<Partition> partitionList = changedPartitions.stream().map(partition -> {
-        Path partitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition);
+        Path partitionPath = FSUtils.getPartitionPath(syncConfig.hoodieSyncConfigParams.basePath, partition);
         String partitionScheme = partitionPath.toUri().getScheme();
         String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme)
             ? FSUtils.getDFSFullPartitionPath(fs, partitionPath) : partitionPath.toString();
         List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
         sd.setLocation(fullPartitionPath);
-        return new Partition(partitionValues, syncConfig.databaseName, tableName, 0, 0, sd, null);
+        return new Partition(partitionValues, syncConfig.hoodieSyncConfigParams.databaseName, tableName, 0, 0, sd, null);
       }).collect(Collectors.toList());
-      client.alter_partitions(syncConfig.databaseName, tableName, partitionList, null);
+      client.alter_partitions(syncConfig.hoodieSyncConfigParams.databaseName, tableName, partitionList, null);
     } catch (TException e) {
-      LOG.error(syncConfig.databaseName + "." + tableName + " update partition failed", e);
-      throw new HoodieHiveSyncException(syncConfig.databaseName + "." + tableName + " update partition failed", e);
+      LOG.error(syncConfig.hoodieSyncConfigParams.databaseName + "." + tableName + " update partition failed", e);
+      throw new HoodieHiveSyncException(syncConfig.hoodieSyncConfigParams.databaseName + "." + tableName + " update partition failed", e);
     }
   }
 
@@ -241,20 +241,20 @@ public class HMSDDLExecutor implements DDLExecutor {
         if (HivePartitionUtil.partitionExists(client, tableName, dropPartition, partitionValueExtractor, syncConfig)) {
           String partitionClause =
               HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, syncConfig);
-          client.dropPartition(syncConfig.databaseName, tableName, partitionClause, false);
+          client.dropPartition(syncConfig.hoodieSyncConfigParams.databaseName, tableName, partitionClause, false);
         }
         LOG.info("Drop partition " + dropPartition + " on " + tableName);
       }
     } catch (TException e) {
-      LOG.error(syncConfig.databaseName + "." + tableName + " drop partition failed", e);
-      throw new HoodieHiveSyncException(syncConfig.databaseName + "." + tableName + " drop partition failed", e);
+      LOG.error(syncConfig.hoodieSyncConfigParams.databaseName + "." + tableName + " drop partition failed", e);
+      throw new HoodieHiveSyncException(syncConfig.hoodieSyncConfigParams.databaseName + "." + tableName + " drop partition failed", e);
     }
   }
 
   @Override
   public void updateTableComments(String tableName, Map<String, ImmutablePair<String,String>> alterSchema) {
     try {
-      Table table = client.getTable(syncConfig.databaseName, tableName);
+      Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
       StorageDescriptor sd = new StorageDescriptor(table.getSd());
       for (FieldSchema fieldSchema : sd.getCols()) {
         if (alterSchema.containsKey(fieldSchema.getName())) {
@@ -264,7 +264,7 @@ public class HMSDDLExecutor implements DDLExecutor {
       }
       table.setSd(sd);
       EnvironmentContext environmentContext = new EnvironmentContext();
-      client.alter_table_with_environmentContext(syncConfig.databaseName, tableName, table, environmentContext);
+      client.alter_table_with_environmentContext(syncConfig.hoodieSyncConfigParams.databaseName, tableName, table, environmentContext);
       sd.clear();
     } catch (Exception e) {
       LOG.error("Failed to update table comments for " + tableName, e);
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
index 4b8ceec952..f1cafbffdf 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
@@ -64,7 +64,7 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor {
       this.sessionState = new SessionState(configuration,
           UserGroupInformation.getCurrentUser().getShortUserName());
       SessionState.start(this.sessionState);
-      this.sessionState.setCurrentDatabase(config.databaseName);
+      this.sessionState.setCurrentDatabase(config.hoodieSyncConfigParams.databaseName);
       hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration);
     } catch (Exception e) {
       if (sessionState != null) {
@@ -109,7 +109,7 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor {
       // HiveMetastoreClient returns partition keys separate from Columns, hence get both and merge to
       // get the Schema of the table.
       final long start = System.currentTimeMillis();
-      Table table = metaStoreClient.getTable(config.databaseName, tableName);
+      Table table = metaStoreClient.getTable(config.hoodieSyncConfigParams.databaseName, tableName);
       Map<String, String> partitionKeysMap =
           table.getPartitionKeys().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase()));
 
@@ -141,13 +141,13 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor {
             config)) {
           String partitionClause =
               HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, config);
-          metaStoreClient.dropPartition(config.databaseName, tableName, partitionClause, false);
+          metaStoreClient.dropPartition(config.hoodieSyncConfigParams.databaseName, tableName, partitionClause, false);
         }
         LOG.info("Drop partition " + dropPartition + " on " + tableName);
       }
     } catch (Exception e) {
-      LOG.error(config.databaseName + "." + tableName + " drop partition failed", e);
-      throw new HoodieHiveSyncException(config.databaseName + "." + tableName + " drop partition failed", e);
+      LOG.error(config.hoodieSyncConfigParams.databaseName + "." + tableName + " drop partition failed", e);
+      throw new HoodieHiveSyncException(config.hoodieSyncConfigParams.databaseName + "." + tableName + " drop partition failed", e);
     }
   }
 
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
index 997d6e087c..e1b33c93f5 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
@@ -49,11 +49,11 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
 
   public JDBCExecutor(HiveSyncConfig config, FileSystem fs) {
     super(config, fs);
-    Objects.requireNonNull(config.jdbcUrl, "--jdbc-url option is required for jdbc sync mode");
-    Objects.requireNonNull(config.hiveUser, "--user option is required for jdbc sync mode");
-    Objects.requireNonNull(config.hivePass, "--pass option is required for jdbc sync mode");
+    Objects.requireNonNull(config.hiveSyncConfigParams.jdbcUrl, "--jdbc-url option is required for jdbc sync mode");
+    Objects.requireNonNull(config.hiveSyncConfigParams.hiveUser, "--user option is required for jdbc sync mode");
+    Objects.requireNonNull(config.hiveSyncConfigParams.hivePass, "--pass option is required for jdbc sync mode");
     this.config = config;
-    createHiveConnection(config.jdbcUrl, config.hiveUser, config.hivePass);
+    createHiveConnection(config.hiveSyncConfigParams.jdbcUrl, config.hiveSyncConfigParams.hiveUser, config.hiveSyncConfigParams.hivePass);
   }
 
   @Override
@@ -126,7 +126,7 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
     ResultSet result = null;
     try {
       DatabaseMetaData databaseMetaData = connection.getMetaData();
-      result = databaseMetaData.getColumns(null, config.databaseName, tableName, null);
+      result = databaseMetaData.getColumns(null, config.hoodieSyncConfigParams.databaseName, tableName, null);
       while (result.next()) {
         String columnName = result.getString(4);
         String columnType = result.getString(6);
@@ -157,11 +157,11 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
   }
 
   private List<String> constructDropPartitions(String tableName, List<String> partitions) {
-    if (config.batchSyncNum <= 0) {
+    if (config.hiveSyncConfigParams.batchSyncNum <= 0) {
       throw new HoodieHiveSyncException("batch-sync-num for sync hive table must be greater than 0, pls check your parameter");
     }
     List<String> result = new ArrayList<>();
-    int batchSyncPartitionNum = config.batchSyncNum;
+    int batchSyncPartitionNum = config.hiveSyncConfigParams.batchSyncNum;
     StringBuilder alterSQL = getAlterTableDropPrefix(tableName);
 
     for (int i = 0; i < partitions.size(); i++) {
@@ -186,7 +186,7 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
 
   public StringBuilder getAlterTableDropPrefix(String tableName) {
     StringBuilder alterSQL = new StringBuilder("ALTER TABLE ");
-    alterSQL.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName)
+    alterSQL.append(HIVE_ESCAPE_CHARACTER).append(config.hoodieSyncConfigParams.databaseName)
         .append(HIVE_ESCAPE_CHARACTER).append(".").append(HIVE_ESCAPE_CHARACTER)
         .append(tableName).append(HIVE_ESCAPE_CHARACTER).append(" DROP IF EXISTS ");
     return alterSQL;
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java
index d9b663ccb0..70b671595b 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java
@@ -55,10 +55,10 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
     this.config = config;
     try {
       this.partitionValueExtractor =
-          (PartitionValueExtractor) Class.forName(config.partitionValueExtractorClass).newInstance();
+          (PartitionValueExtractor) Class.forName(config.hoodieSyncConfigParams.partitionValueExtractorClass).newInstance();
     } catch (Exception e) {
       throw new HoodieHiveSyncException(
-          "Failed to initialize PartitionValueExtractor class " + config.partitionValueExtractorClass, e);
+          "Failed to initialize PartitionValueExtractor class " + config.hoodieSyncConfigParams.partitionValueExtractorClass, e);
     }
   }
 
@@ -90,11 +90,11 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
   @Override
   public void updateTableDefinition(String tableName, MessageType newSchema) {
     try {
-      String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, config.partitionFields, config.supportTimestamp);
+      String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, config.hoodieSyncConfigParams.partitionFields, config.hiveSyncConfigParams.supportTimestamp);
       // Cascade clause should not be present for non-partitioned tables
-      String cascadeClause = config.partitionFields.size() > 0 ? " cascade" : "";
+      String cascadeClause = config.hoodieSyncConfigParams.partitionFields.size() > 0 ? " cascade" : "";
       StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append(HIVE_ESCAPE_CHARACTER)
-          .append(config.databaseName).append(HIVE_ESCAPE_CHARACTER).append(".")
+          .append(config.hoodieSyncConfigParams.databaseName).append(HIVE_ESCAPE_CHARACTER).append(".")
           .append(HIVE_ESCAPE_CHARACTER).append(tableName)
           .append(HIVE_ESCAPE_CHARACTER).append(" REPLACE COLUMNS(")
           .append(newSchemaStr).append(" )").append(cascadeClause);
@@ -138,7 +138,7 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
       String comment = field.getValue().getRight();
       comment = comment.replace("'","");
       sql.append("ALTER TABLE ").append(HIVE_ESCAPE_CHARACTER)
-              .append(config.databaseName).append(HIVE_ESCAPE_CHARACTER).append(".")
+              .append(config.hoodieSyncConfigParams.databaseName).append(HIVE_ESCAPE_CHARACTER).append(".")
               .append(HIVE_ESCAPE_CHARACTER).append(tableName)
               .append(HIVE_ESCAPE_CHARACTER)
               .append(" CHANGE COLUMN `").append(name).append("` `").append(name)
@@ -148,15 +148,15 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
   }
 
   private List<String> constructAddPartitions(String tableName, List<String> partitions) {
-    if (config.batchSyncNum <= 0) {
+    if (config.hiveSyncConfigParams.batchSyncNum <= 0) {
       throw new HoodieHiveSyncException("batch-sync-num for sync hive table must be greater than 0, pls check your parameter");
     }
     List<String> result = new ArrayList<>();
-    int batchSyncPartitionNum = config.batchSyncNum;
+    int batchSyncPartitionNum = config.hiveSyncConfigParams.batchSyncNum;
     StringBuilder alterSQL = getAlterTablePrefix(tableName);
     for (int i = 0; i < partitions.size(); i++) {
       String partitionClause = getPartitionClause(partitions.get(i));
-      String fullPartitionPath = FSUtils.getPartitionPath(config.basePath, partitions.get(i)).toString();
+      String fullPartitionPath = FSUtils.getPartitionPath(config.hoodieSyncConfigParams.basePath, partitions.get(i)).toString();
       alterSQL.append("  PARTITION (").append(partitionClause).append(") LOCATION '").append(fullPartitionPath)
           .append("' ");
       if ((i + 1) % batchSyncPartitionNum == 0) {
@@ -173,7 +173,7 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
 
   private StringBuilder getAlterTablePrefix(String tableName) {
     StringBuilder alterSQL = new StringBuilder("ALTER TABLE ");
-    alterSQL.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName)
+    alterSQL.append(HIVE_ESCAPE_CHARACTER).append(config.hoodieSyncConfigParams.databaseName)
         .append(HIVE_ESCAPE_CHARACTER).append(".").append(HIVE_ESCAPE_CHARACTER)
         .append(tableName).append(HIVE_ESCAPE_CHARACTER).append(" ADD IF NOT EXISTS ");
     return alterSQL;
@@ -181,18 +181,18 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
 
   public String getPartitionClause(String partition) {
     List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
-    ValidationUtils.checkArgument(config.partitionFields.size() == partitionValues.size(),
-        "Partition key parts " + config.partitionFields + " does not match with partition values " + partitionValues
+    ValidationUtils.checkArgument(config.hoodieSyncConfigParams.partitionFields.size() == partitionValues.size(),
+        "Partition key parts " + config.hoodieSyncConfigParams.partitionFields + " does not match with partition values " + partitionValues
             + ". Check partition strategy. ");
     List<String> partBuilder = new ArrayList<>();
-    for (int i = 0; i < config.partitionFields.size(); i++) {
+    for (int i = 0; i < config.hoodieSyncConfigParams.partitionFields.size(); i++) {
       String partitionValue = partitionValues.get(i);
       // decode the partition before sync to hive to prevent multiple escapes of HIVE
-      if (config.decodePartition) {
+      if (config.hoodieSyncConfigParams.decodePartition) {
         // This is a decode operator for encode in KeyGenUtils#getRecordPartitionPath
         partitionValue = PartitionPathEncodeUtils.unescapePathName(partitionValue);
       }
-      partBuilder.add("`" + config.partitionFields.get(i) + "`='" + partitionValue + "'");
+      partBuilder.add("`" + config.hoodieSyncConfigParams.partitionFields.get(i) + "`='" + partitionValue + "'");
     }
     return String.join(",", partBuilder);
   }
@@ -200,12 +200,12 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
   private List<String> constructChangePartitions(String tableName, List<String> partitions) {
     List<String> changePartitions = new ArrayList<>();
     // Hive 2.x doesn't like db.table name for operations, hence we need to change to using the database first
-    String useDatabase = "USE " + HIVE_ESCAPE_CHARACTER + config.databaseName + HIVE_ESCAPE_CHARACTER;
+    String useDatabase = "USE " + HIVE_ESCAPE_CHARACTER + config.hoodieSyncConfigParams.databaseName + HIVE_ESCAPE_CHARACTER;
     changePartitions.add(useDatabase);
     String alterTable = "ALTER TABLE " + HIVE_ESCAPE_CHARACTER + tableName + HIVE_ESCAPE_CHARACTER;
     for (String partition : partitions) {
       String partitionClause = getPartitionClause(partition);
-      Path partitionPath = FSUtils.getPartitionPath(config.basePath, partition);
+      Path partitionPath = FSUtils.getPartitionPath(config.hoodieSyncConfigParams.basePath, partition);
       String partitionScheme = partitionPath.toUri().getScheme();
       String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme)
           ? FSUtils.getDFSFullPartitionPath(fs, partitionPath) : partitionPath.toString();
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java
index 16c30a16aa..6ec4a586b8 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java
@@ -36,20 +36,20 @@ public class GlobalHiveSyncConfig extends HiveSyncConfig {
 
   public static GlobalHiveSyncConfig copy(GlobalHiveSyncConfig cfg) {
     GlobalHiveSyncConfig newConfig = new GlobalHiveSyncConfig(cfg.getProps());
-    newConfig.basePath = cfg.basePath;
-    newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
-    newConfig.databaseName = cfg.databaseName;
-    newConfig.hivePass = cfg.hivePass;
-    newConfig.hiveUser = cfg.hiveUser;
-    newConfig.partitionFields = cfg.partitionFields;
-    newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass;
-    newConfig.jdbcUrl = cfg.jdbcUrl;
-    newConfig.tableName = cfg.tableName;
-    newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
-    newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
-    newConfig.supportTimestamp = cfg.supportTimestamp;
-    newConfig.decodePartition = cfg.decodePartition;
-    newConfig.batchSyncNum = cfg.batchSyncNum;
+    newConfig.hoodieSyncConfigParams.basePath = cfg.hoodieSyncConfigParams.basePath;
+    newConfig.hoodieSyncConfigParams.assumeDatePartitioning = cfg.hoodieSyncConfigParams.assumeDatePartitioning;
+    newConfig.hoodieSyncConfigParams.databaseName = cfg.hoodieSyncConfigParams.databaseName;
+    newConfig.hiveSyncConfigParams.hivePass = cfg.hiveSyncConfigParams.hivePass;
+    newConfig.hiveSyncConfigParams.hiveUser = cfg.hiveSyncConfigParams.hiveUser;
+    newConfig.hoodieSyncConfigParams.partitionFields = cfg.hoodieSyncConfigParams.partitionFields;
+    newConfig.hoodieSyncConfigParams.partitionValueExtractorClass = cfg.hoodieSyncConfigParams.partitionValueExtractorClass;
+    newConfig.hiveSyncConfigParams.jdbcUrl = cfg.hiveSyncConfigParams.jdbcUrl;
+    newConfig.hoodieSyncConfigParams.tableName = cfg.hoodieSyncConfigParams.tableName;
+    newConfig.hiveSyncConfigParams.usePreApacheInputFormat = cfg.hiveSyncConfigParams.usePreApacheInputFormat;
+    newConfig.hoodieSyncConfigParams.useFileListingFromMetadata = cfg.hoodieSyncConfigParams.useFileListingFromMetadata;
+    newConfig.hiveSyncConfigParams.supportTimestamp = cfg.hiveSyncConfigParams.supportTimestamp;
+    newConfig.hoodieSyncConfigParams.decodePartition = cfg.hoodieSyncConfigParams.decodePartition;
+    newConfig.hiveSyncConfigParams.batchSyncNum = cfg.hiveSyncConfigParams.batchSyncNum;
     newConfig.globallyReplicatedTimeStamp = cfg.globallyReplicatedTimeStamp;
     return newConfig;
   }
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java
index a7d205962e..92c9631ab2 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java
@@ -80,7 +80,7 @@ public class GlobalHiveSyncTool extends HiveSyncTool {
   }
 
   public static GlobalHiveSyncTool buildGlobalHiveSyncTool(GlobalHiveSyncConfig cfg, HiveConf hiveConf) {
-    FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
+    FileSystem fs = FSUtils.getFs(cfg.hoodieSyncConfigParams.basePath, new Configuration());
     hiveConf.addResource(fs.getConf());
     return new GlobalHiveSyncTool(cfg, hiveConf, fs);
   }
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java
index c3dd2af346..917bc1e123 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java
@@ -74,12 +74,12 @@ public class HiveSyncGlobalCommitConfig extends GlobalHiveSyncConfig {
 
   GlobalHiveSyncConfig mkGlobalHiveSyncConfig(boolean forRemote) {
     GlobalHiveSyncConfig cfg = GlobalHiveSyncConfig.copy(this);
-    cfg.basePath = forRemote ? properties.getProperty(REMOTE_BASE_PATH)
-        : properties.getProperty(LOCAL_BASE_PATH, cfg.basePath);
-    cfg.jdbcUrl = forRemote ? properties.getProperty(REMOTE_HIVE_SERVER_JDBC_URLS)
-        : properties.getProperty(LOCAL_HIVE_SERVER_JDBC_URLS, cfg.jdbcUrl);
-    LOG.info("building hivesync config forRemote: " + forRemote + " " + cfg.jdbcUrl + " "
-        + cfg.basePath);
+    cfg.hoodieSyncConfigParams.basePath = forRemote ? properties.getProperty(REMOTE_BASE_PATH)
+            : properties.getProperty(LOCAL_BASE_PATH, cfg.hoodieSyncConfigParams.basePath);
+    cfg.hiveSyncConfigParams.jdbcUrl = forRemote ? properties.getProperty(REMOTE_HIVE_SERVER_JDBC_URLS)
+            : properties.getProperty(LOCAL_HIVE_SERVER_JDBC_URLS, cfg.hiveSyncConfigParams.jdbcUrl);
+    LOG.info("building hivesync config forRemote: " + forRemote + " " + cfg.hiveSyncConfigParams.jdbcUrl + " "
+        + cfg.hoodieSyncConfigParams.basePath);
     return cfg;
   }
 
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java
index a194eeb2e9..4d15d8db1e 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java
@@ -104,7 +104,7 @@ public class HiveSyncGlobalCommitTool implements HiveSyncGlobalCommit, AutoClose
       throws IOException {
     HiveSyncGlobalCommitConfig cfg = new HiveSyncGlobalCommitConfig();
     JCommander cmd = new JCommander(cfg, null, args);
-    if (cfg.help || args.length == 0) {
+    if (cfg.hiveSyncConfigParams.help || args.length == 0) {
       cmd.usage();
       System.exit(1);
     }
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java
index 0258cfc5ef..e17fe8af22 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java
@@ -40,18 +40,18 @@ public class HivePartitionUtil {
    */
   public static String getPartitionClauseForDrop(String partition, PartitionValueExtractor partitionValueExtractor, HiveSyncConfig config) {
     List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
-    ValidationUtils.checkArgument(config.partitionFields.size() == partitionValues.size(),
-        "Partition key parts " + config.partitionFields + " does not match with partition values " + partitionValues
+    ValidationUtils.checkArgument(config.hoodieSyncConfigParams.partitionFields.size() == partitionValues.size(),
+        "Partition key parts " + config.hoodieSyncConfigParams.partitionFields + " does not match with partition values " + partitionValues
             + ". Check partition strategy. ");
     List<String> partBuilder = new ArrayList<>();
-    for (int i = 0; i < config.partitionFields.size(); i++) {
+    for (int i = 0; i < config.hoodieSyncConfigParams.partitionFields.size(); i++) {
       String partitionValue = partitionValues.get(i);
       // decode the partition before sync to hive to prevent multiple escapes of HIVE
-      if (config.decodePartition) {
+      if (config.hoodieSyncConfigParams.decodePartition) {
         // This is a decode operator for encode in KeyGenUtils#getRecordPartitionPath
         partitionValue = PartitionPathEncodeUtils.unescapePathName(partitionValue);
       }
-      partBuilder.add(config.partitionFields.get(i) + "=" + partitionValue);
+      partBuilder.add(config.hoodieSyncConfigParams.partitionFields.get(i) + "=" + partitionValue);
     }
     return String.join("/", partBuilder);
   }
@@ -61,7 +61,7 @@ public class HivePartitionUtil {
     Partition newPartition;
     try {
       List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partitionPath);
-      newPartition = client.getPartition(config.databaseName, tableName, partitionValues);
+      newPartition = client.getPartition(config.hoodieSyncConfigParams.databaseName, tableName, partitionValues);
     } catch (NoSuchObjectException ignored) {
       newPartition = null;
     } catch (TException e) {
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
index 2d700596f0..85faa012d2 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
@@ -156,7 +156,7 @@ public class HiveSchemaUtil {
    * @return : Hive Table schema read from parquet file List[FieldSchema] without partitionField
    */
   public static List<FieldSchema> convertParquetSchemaToHiveFieldSchema(MessageType messageType, HiveSyncConfig syncConfig) throws IOException {
-    return convertMapSchemaToHiveFieldSchema(parquetSchemaToMapSchema(messageType, syncConfig.supportTimestamp, false), syncConfig);
+    return convertMapSchemaToHiveFieldSchema(parquetSchemaToMapSchema(messageType, syncConfig.hiveSyncConfigParams.supportTimestamp, false), syncConfig);
   }
 
   /**
@@ -202,7 +202,7 @@ public class HiveSchemaUtil {
   public static List<FieldSchema> convertMapSchemaToHiveFieldSchema(LinkedHashMap<String, String> schema, HiveSyncConfig syncConfig) throws IOException {
     return schema.keySet().stream()
         .map(key -> new FieldSchema(key, schema.get(key).toLowerCase(), ""))
-        .filter(field -> !syncConfig.partitionFields.contains(field.getName()))
+        .filter(field -> !syncConfig.hoodieSyncConfigParams.partitionFields.contains(field.getName()))
         .collect(Collectors.toList());
   }
 
@@ -448,11 +448,11 @@ public class HiveSchemaUtil {
   public static String generateCreateDDL(String tableName, MessageType storageSchema, HiveSyncConfig config, String inputFormatClass,
                                          String outputFormatClass, String serdeClass, Map<String, String> serdeProperties,
                                          Map<String, String> tableProperties) throws IOException {
-    Map<String, String> hiveSchema = convertParquetSchemaToHiveSchema(storageSchema, config.supportTimestamp);
-    String columns = generateSchemaString(storageSchema, config.partitionFields, config.supportTimestamp);
+    Map<String, String> hiveSchema = convertParquetSchemaToHiveSchema(storageSchema, config.hiveSyncConfigParams.supportTimestamp);
+    String columns = generateSchemaString(storageSchema, config.hoodieSyncConfigParams.partitionFields, config.hiveSyncConfigParams.supportTimestamp);
 
     List<String> partitionFields = new ArrayList<>();
-    for (String partitionKey : config.partitionFields) {
+    for (String partitionKey : config.hoodieSyncConfigParams.partitionFields) {
       String partitionKeyWithTicks = tickSurround(partitionKey);
       partitionFields.add(new StringBuilder().append(partitionKeyWithTicks).append(" ")
           .append(getPartitionKeyType(hiveSchema, partitionKeyWithTicks)).toString());
@@ -460,26 +460,26 @@ public class HiveSchemaUtil {
 
     String partitionsStr = String.join(",", partitionFields);
     StringBuilder sb = new StringBuilder();
-    if (config.createManagedTable) {
+    if (config.hiveSyncConfigParams.createManagedTable) {
       sb.append("CREATE TABLE IF NOT EXISTS ");
     } else {
       sb.append("CREATE EXTERNAL TABLE IF NOT EXISTS ");
     }
-    sb.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName).append(HIVE_ESCAPE_CHARACTER)
+    sb.append(HIVE_ESCAPE_CHARACTER).append(config.hoodieSyncConfigParams.databaseName).append(HIVE_ESCAPE_CHARACTER)
             .append(".").append(HIVE_ESCAPE_CHARACTER).append(tableName).append(HIVE_ESCAPE_CHARACTER);
     sb.append("( ").append(columns).append(")");
-    if (!config.partitionFields.isEmpty()) {
+    if (!config.hoodieSyncConfigParams.partitionFields.isEmpty()) {
       sb.append(" PARTITIONED BY (").append(partitionsStr).append(")");
     }
-    if (config.bucketSpec != null) {
-      sb.append(' ' + config.bucketSpec + ' ');
+    if (config.hiveSyncConfigParams.bucketSpec != null) {
+      sb.append(' ' + config.hiveSyncConfigParams.bucketSpec + ' ');
     }
     sb.append(" ROW FORMAT SERDE '").append(serdeClass).append("'");
     if (serdeProperties != null && !serdeProperties.isEmpty()) {
       sb.append(" WITH SERDEPROPERTIES (").append(propertyToString(serdeProperties)).append(")");
     }
     sb.append(" STORED AS INPUTFORMAT '").append(inputFormatClass).append("'");
-    sb.append(" OUTPUTFORMAT '").append(outputFormatClass).append("' LOCATION '").append(config.basePath).append("'");
+    sb.append(" OUTPUTFORMAT '").append(outputFormatClass).append("' LOCATION '").append(config.hoodieSyncConfigParams.basePath).append("'");
 
     if (tableProperties != null && !tableProperties.isEmpty()) {
       sb.append(" TBLPROPERTIES(").append(propertyToString(tableProperties)).append(")");
diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java
index 937243393f..aa1c606205 100644
--- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java
+++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java
@@ -57,22 +57,22 @@ public class TestHiveSyncGlobalCommitTool {
     config.properties.setProperty(LOCAL_BASE_PATH, localCluster.tablePath(dbName, tblName));
     config.properties.setProperty(REMOTE_BASE_PATH, remoteCluster.tablePath(dbName, tblName));
     config.globallyReplicatedTimeStamp = commitTime;
-    config.hiveUser = System.getProperty("user.name");
-    config.hivePass = "";
-    config.databaseName = dbName;
-    config.tableName = tblName;
-    config.basePath = localCluster.tablePath(dbName, tblName);
-    config.assumeDatePartitioning = true;
-    config.usePreApacheInputFormat = false;
-    config.partitionFields = Collections.singletonList("datestr");
+    config.hiveSyncConfigParams.hiveUser = System.getProperty("user.name");
+    config.hiveSyncConfigParams.hivePass = "";
+    config.hoodieSyncConfigParams.databaseName = dbName;
+    config.hoodieSyncConfigParams.tableName = tblName;
+    config.hoodieSyncConfigParams.basePath = localCluster.tablePath(dbName, tblName);
+    config.hoodieSyncConfigParams.assumeDatePartitioning = true;
+    config.hiveSyncConfigParams.usePreApacheInputFormat = false;
+    config.hoodieSyncConfigParams.partitionFields = Collections.singletonList("datestr");
     return config;
   }
 
   private void compareEqualLastReplicatedTimeStamp(HiveSyncGlobalCommitConfig config) throws Exception {
     Assertions.assertEquals(localCluster.getHMSClient()
-        .getTable(config.databaseName, config.tableName).getParameters()
+        .getTable(config.hoodieSyncConfigParams.databaseName, config.hoodieSyncConfigParams.tableName).getParameters()
         .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), remoteCluster.getHMSClient()
-        .getTable(config.databaseName, config.tableName).getParameters()
+        .getTable(config.hoodieSyncConfigParams.databaseName, config.hoodieSyncConfigParams.tableName).getParameters()
         .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), "compare replicated timestamps");
   }
 
@@ -116,11 +116,11 @@ public class TestHiveSyncGlobalCommitTool {
     remoteCluster.stopHiveServer2();
     Assertions.assertFalse(tool.commit());
     Assertions.assertEquals(commitTime, localCluster.getHMSClient()
-        .getTable(config.databaseName, config.tableName).getParameters()
+        .getTable(config.hoodieSyncConfigParams.databaseName, config.hoodieSyncConfigParams.tableName).getParameters()
         .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
     Assertions.assertTrue(tool.rollback()); // do a rollback
     Assertions.assertNotEquals(commitTime, localCluster.getHMSClient()
-        .getTable(config.databaseName, config.tableName).getParameters()
+        .getTable(config.hoodieSyncConfigParams.databaseName, config.hoodieSyncConfigParams.tableName).getParameters()
         .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
     Assertions.assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
     remoteCluster.startHiveServer2();
diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 167c35a124..3377917ffe 100644
--- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -29,8 +29,8 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.hive.testutils.HiveTestUtil;
 import org.apache.hudi.sync.common.util.ConfigUtils;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
+import org.apache.hudi.sync.common.HoodieSyncClient.PartitionEvent;
+import org.apache.hudi.sync.common.HoodieSyncClient.PartitionEvent.PartitionEventType;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveSyncFunctionalTestHarness.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveSyncFunctionalTestHarness.java
index b6adcb2982..0de27d3267 100644
--- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveSyncFunctionalTestHarness.java
+++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveSyncFunctionalTestHarness.java
@@ -80,32 +80,32 @@ public class HiveSyncFunctionalTestHarness {
 
   public HiveSyncConfig hiveSyncConf() throws IOException {
     HiveSyncConfig conf = new HiveSyncConfig();
-    conf.jdbcUrl = hiveTestService.getJdbcHive2Url();
-    conf.hiveUser = "";
-    conf.hivePass = "";
-    conf.databaseName = "hivesynctestdb";
-    conf.tableName = "hivesynctesttable";
-    conf.basePath = Files.createDirectories(tempDir.resolve("hivesynctestcase-" + Instant.now().toEpochMilli())).toUri().toString();
-    conf.assumeDatePartitioning = true;
-    conf.usePreApacheInputFormat = false;
-    conf.partitionFields = Collections.singletonList("datestr");
+    conf.hiveSyncConfigParams.jdbcUrl = hiveTestService.getJdbcHive2Url();
+    conf.hiveSyncConfigParams.hiveUser = "";
+    conf.hiveSyncConfigParams.hivePass = "";
+    conf.hoodieSyncConfigParams.databaseName = "hivesynctestdb";
+    conf.hoodieSyncConfigParams.tableName = "hivesynctesttable";
+    conf.hoodieSyncConfigParams.basePath = Files.createDirectories(tempDir.resolve("hivesynctestcase-" + Instant.now().toEpochMilli())).toUri().toString();
+    conf.hoodieSyncConfigParams.assumeDatePartitioning = true;
+    conf.hiveSyncConfigParams.usePreApacheInputFormat = false;
+    conf.hoodieSyncConfigParams.partitionFields = Collections.singletonList("datestr");
     return conf;
   }
 
   public HoodieHiveClient hiveClient(HiveSyncConfig hiveSyncConfig) throws IOException {
     HoodieTableMetaClient.withPropertyBuilder()
         .setTableType(HoodieTableType.COPY_ON_WRITE)
-        .setTableName(hiveSyncConfig.tableName)
+        .setTableName(hiveSyncConfig.hoodieSyncConfigParams.tableName)
         .setPayloadClass(HoodieAvroPayload.class)
-        .initTable(hadoopConf, hiveSyncConfig.basePath);
+        .initTable(hadoopConf, hiveSyncConfig.hoodieSyncConfigParams.basePath);
     return new HoodieHiveClient(hiveSyncConfig, hiveConf(), fs());
   }
 
   public void dropTables(String database, String... tables) throws IOException, HiveException, MetaException {
     HiveSyncConfig hiveSyncConfig = hiveSyncConf();
-    hiveSyncConfig.databaseName = database;
+    hiveSyncConfig.hoodieSyncConfigParams.databaseName = database;
     for (String table : tables) {
-      hiveSyncConfig.tableName = table;
+      hiveSyncConfig.hoodieSyncConfigParams.tableName = table;
       new HiveQueryDDLExecutor(hiveSyncConfig, fs(), hiveConf()).runSQL("drop table if exists " + table);
     }
   }
@@ -113,7 +113,7 @@ public class HiveSyncFunctionalTestHarness {
   public void dropDatabases(String... databases) throws IOException, HiveException, MetaException {
     HiveSyncConfig hiveSyncConfig = hiveSyncConf();
     for (String database : databases) {
-      hiveSyncConfig.databaseName = database;
+      hiveSyncConfig.hoodieSyncConfigParams.databaseName = database;
       new HiveQueryDDLExecutor(hiveSyncConfig, fs(), hiveConf()).runSQL("drop database if exists " + database);
     }
   }
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
similarity index 58%
rename from hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
rename to hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index 8eec327890..54e151dd69 100644
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -18,15 +18,12 @@
 
 package org.apache.hudi.sync.common;
 
-import java.io.Serializable;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -34,21 +31,18 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.parquet.schema.MessageType;
 
-public abstract class AbstractSyncHoodieClient implements AutoCloseable {
+public abstract class HoodieSyncClient implements AutoCloseable {
 
-  private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class);
+  private static final Logger LOG = LogManager.getLogger(HoodieSyncClient.class);
 
   public static final String HOODIE_LAST_COMMIT_TIME_SYNC = "last_commit_time_sync";
-  public static final TypeConverter TYPE_CONVERTOR = new TypeConverter() {};
 
   protected final HoodieTableMetaClient metaClient;
   protected final HoodieTableType tableType;
@@ -59,13 +53,13 @@ public abstract class AbstractSyncHoodieClient implements AutoCloseable {
   private final boolean withOperationField;
 
   @Deprecated
-  public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, boolean useFileListingFromMetadata,
-                                  boolean verifyMetadataFileListing, boolean withOperationField, FileSystem fs) {
+  public HoodieSyncClient(String basePath, boolean assumeDatePartitioning, boolean useFileListingFromMetadata,
+                          boolean verifyMetadataFileListing, boolean withOperationField, FileSystem fs) {
     this(basePath, assumeDatePartitioning, useFileListingFromMetadata, withOperationField, fs);
   }
 
-  public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, boolean useFileListingFromMetadata,
-                                  boolean withOperationField, FileSystem fs) {
+  public HoodieSyncClient(String basePath, boolean assumeDatePartitioning, boolean useFileListingFromMetadata,
+                          boolean withOperationField, FileSystem fs) {
     this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
     this.tableType = metaClient.getTableType();
     this.basePath = basePath;
@@ -75,47 +69,6 @@ public abstract class AbstractSyncHoodieClient implements AutoCloseable {
     this.fs = fs;
   }
 
-  /**
-   * Create the table.
-   * @param tableName The table name.
-   * @param storageSchema The table schema.
-   * @param inputFormatClass The input format class of this table.
-   * @param outputFormatClass The output format class of this table.
-   * @param serdeClass The serde class of this table.
-   * @param serdeProperties The serde properties of this table.
-   * @param tableProperties The table properties for this table.
-   */
-  public abstract void createTable(String tableName, MessageType storageSchema,
-                                   String inputFormatClass, String outputFormatClass,
-                                   String serdeClass, Map<String, String> serdeProperties,
-                                   Map<String, String> tableProperties);
-
-  /**
-   * @deprecated Use {@link #tableExists} instead.
-   */
-  @Deprecated
-  public abstract boolean doesTableExist(String tableName);
-
-  public abstract boolean tableExists(String tableName);
-
-  public abstract Option<String> getLastCommitTimeSynced(String tableName);
-
-  public abstract void updateLastCommitTimeSynced(String tableName);
-
-  public abstract Option<String> getLastReplicatedTime(String tableName);
-
-  public abstract void updateLastReplicatedTimeStamp(String tableName, String timeStamp);
-
-  public abstract void deleteLastReplicatedTimeStamp(String tableName);
-
-  public abstract void addPartitionsToTable(String tableName, List<String> partitionsToAdd);
-
-  public abstract void updatePartitionsToTable(String tableName, List<String> changedPartitions);
-
-  public abstract void dropPartitions(String tableName, List<String> partitionsToDrop);
-
-  public  void updateTableProperties(String tableName, Map<String, String> tableProperties) {}
-
   public abstract Map<String, String> getTableSchema(String tableName);
 
   public HoodieTableType getTableType() {
@@ -194,56 +147,6 @@ public abstract class AbstractSyncHoodieClient implements AutoCloseable {
     }
   }
 
-  public abstract static class TypeConverter implements Serializable {
-
-    static final String DEFAULT_TARGET_TYPE = "DECIMAL";
-
-    protected String targetType;
-
-    public TypeConverter() {
-      this.targetType = DEFAULT_TARGET_TYPE;
-    }
-
-    public TypeConverter(String targetType) {
-      ValidationUtils.checkArgument(Objects.nonNull(targetType));
-      this.targetType = targetType;
-    }
-
-    public void doConvert(ResultSet resultSet, Map<String, String> schema) throws SQLException {
-      schema.put(getColumnName(resultSet), targetType.equalsIgnoreCase(getColumnType(resultSet))
-                ? convert(resultSet) : getColumnType(resultSet));
-    }
-
-    public String convert(ResultSet resultSet) throws SQLException {
-      String columnType = getColumnType(resultSet);
-      int columnSize = resultSet.getInt("COLUMN_SIZE");
-      int decimalDigits = resultSet.getInt("DECIMAL_DIGITS");
-      return columnType + String.format("(%s,%s)", columnSize, decimalDigits);
-    }
-
-    public String getColumnName(ResultSet resultSet) throws SQLException {
-      return resultSet.getString(4);
-    }
-
-    public String getColumnType(ResultSet resultSet) throws SQLException {
-      return resultSet.getString(6);
-    }
-  }
-
-  /**
-   * Read the schema from the log file on path.
-   */
-  @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
-  private MessageType readSchemaFromLogFile(Option<HoodieInstant> lastCompactionCommitOpt, Path path) throws Exception {
-    MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(fs, path);
-    // Fall back to read the schema from last compaction
-    if (messageType == null) {
-      LOG.info("Falling back to read the schema from last compaction " + lastCompactionCommitOpt);
-      return new TableSchemaResolver(this.metaClient).readSchemaFromLastCompaction(lastCompactionCommitOpt);
-    }
-    return messageType;
-  }
-
   /**
    * Partition Event captures any partition that needs to be added or updated.
    */
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
index dc2b21ba45..6296958907 100644
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
@@ -28,6 +28,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 import com.beust.jcommander.Parameter;
 
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.function.Function;
@@ -37,40 +38,7 @@ import java.util.function.Function;
  */
 public class HoodieSyncConfig extends HoodieConfig {
 
-  @Parameter(names = {"--database"}, description = "name of the target database in meta store", required = true)
-  public String databaseName;
-
-  @Parameter(names = {"--table"}, description = "name of the target table in meta store", required = true)
-  public String tableName;
-
-  @Parameter(names = {"--base-path"}, description = "Base path of the hoodie table to sync", required = true)
-  public String basePath;
-
-  @Parameter(names = {"--base-file-format"}, description = "Format of the base files (PARQUET (or) HFILE)")
-  public String baseFileFormat;
-
-  @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by")
-  public List<String> partitionFields;
-
-  @Parameter(names = "--partition-value-extractor", description = "Class which implements PartitionValueExtractor "
-      + "to extract the partition values from HDFS path")
-  public String partitionValueExtractorClass;
-
-  @Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this"
-      + " exists to support backward compatibility. If you use hoodie 0.3.x, do not set this parameter")
-  public Boolean assumeDatePartitioning;
-
-  @Parameter(names = {"--decode-partition"}, description = "Decode the partition value if the partition has encoded during writing")
-  public Boolean decodePartition;
-
-  @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata")
-  public Boolean useFileListingFromMetadata;
-
-  @Parameter(names = {"--conditional-sync"}, description = "If true, only sync on conditions like schema change or partition change.")
-  public Boolean isConditionalSync;
-
-  @Parameter(names = {"--spark-version"}, description = "The spark version")
-  public String sparkVersion;
+  public final HoodieSyncConfigParams hoodieSyncConfigParams = new HoodieSyncConfigParams();
 
   public static final ConfigProperty<String> META_SYNC_BASE_PATH = ConfigProperty
       .key("hoodie.datasource.meta.sync.base.path")
@@ -169,20 +137,50 @@ public class HoodieSyncConfig extends HoodieConfig {
     super(props);
     setDefaults();
 
-    this.basePath = getStringOrDefault(META_SYNC_BASE_PATH);
-    this.databaseName = getStringOrDefault(META_SYNC_DATABASE_NAME);
-    this.tableName = getStringOrDefault(META_SYNC_TABLE_NAME);
-    this.baseFileFormat = getStringOrDefault(META_SYNC_BASE_FILE_FORMAT);
-    this.partitionFields = props.getStringList(META_SYNC_PARTITION_FIELDS.key(), ",", Collections.emptyList());
-    this.partitionValueExtractorClass = getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS);
-    this.assumeDatePartitioning = getBooleanOrDefault(META_SYNC_ASSUME_DATE_PARTITION);
-    this.decodePartition = getBooleanOrDefault(KeyGeneratorOptions.URL_ENCODE_PARTITIONING);
-    this.useFileListingFromMetadata = getBooleanOrDefault(META_SYNC_USE_FILE_LISTING_FROM_METADATA);
-    this.isConditionalSync = getBooleanOrDefault(META_SYNC_CONDITIONAL_SYNC);
-    this.sparkVersion = getStringOrDefault(META_SYNC_SPARK_VERSION);
+    this.hoodieSyncConfigParams.basePath = getStringOrDefault(META_SYNC_BASE_PATH);
+    this.hoodieSyncConfigParams.databaseName = getStringOrDefault(META_SYNC_DATABASE_NAME);
+    this.hoodieSyncConfigParams.tableName = getStringOrDefault(META_SYNC_TABLE_NAME);
+    this.hoodieSyncConfigParams.baseFileFormat = getStringOrDefault(META_SYNC_BASE_FILE_FORMAT);
+    this.hoodieSyncConfigParams.partitionFields = props.getStringList(META_SYNC_PARTITION_FIELDS.key(), ",", Collections.emptyList());
+    this.hoodieSyncConfigParams.partitionValueExtractorClass = getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS);
+    this.hoodieSyncConfigParams.assumeDatePartitioning = getBooleanOrDefault(META_SYNC_ASSUME_DATE_PARTITION);
+    this.hoodieSyncConfigParams.decodePartition = getBooleanOrDefault(KeyGeneratorOptions.URL_ENCODE_PARTITIONING);
+    this.hoodieSyncConfigParams.useFileListingFromMetadata = getBooleanOrDefault(META_SYNC_USE_FILE_LISTING_FROM_METADATA);
+    this.hoodieSyncConfigParams.isConditionalSync = getBooleanOrDefault(META_SYNC_CONDITIONAL_SYNC);
+    this.hoodieSyncConfigParams.sparkVersion = getStringOrDefault(META_SYNC_SPARK_VERSION);
   }
 
   protected void setDefaults() {
     this.setDefaultValue(META_SYNC_TABLE_NAME);
   }
+
+  public static class HoodieSyncConfigParams implements Serializable {
+    @Parameter(names = {"--database"}, description = "name of the target database in meta store", required = true)
+    public String databaseName;
+    @Parameter(names = {"--table"}, description = "name of the target table in meta store", required = true)
+    public String tableName;
+    @Parameter(names = {"--base-path"}, description = "Base path of the hoodie table to sync", required = true)
+    public String basePath;
+    @Parameter(names = {"--base-file-format"}, description = "Format of the base files (PARQUET (or) HFILE)")
+    public String baseFileFormat;
+    @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by")
+    public List<String> partitionFields;
+    @Parameter(names = "--partition-value-extractor", description = "Class which implements PartitionValueExtractor "
+            + "to extract the partition values from HDFS path")
+    public String partitionValueExtractorClass;
+    @Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this"
+            + " exists to support backward compatibility. If you use hoodie 0.3.x, do not set this parameter")
+    public Boolean assumeDatePartitioning;
+    @Parameter(names = {"--decode-partition"}, description = "Decode the partition value if the partition has encoded during writing")
+    public Boolean decodePartition;
+    @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata")
+    public Boolean useFileListingFromMetadata;
+    @Parameter(names = {"--conditional-sync"}, description = "If true, only sync on conditions like schema change or partition change.")
+    public Boolean isConditionalSync;
+    @Parameter(names = {"--spark-version"}, description = "The spark version")
+    public String sparkVersion;
+
+    public HoodieSyncConfigParams() {
+    }
+  }
 }
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java
new file mode 100644
index 0000000000..754d142695
--- /dev/null
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common;
+
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.util.Properties;
+
+/**
+ * Base class to sync Hudi meta data with Metastores to make
+ * Hudi table queryable through external systems.
+ */
+public abstract class HoodieSyncTool {
+  protected final Configuration conf;
+  protected final FileSystem fs;
+  protected TypedProperties props;
+
+  public HoodieSyncTool(TypedProperties props, Configuration conf, FileSystem fs) {
+    this.props = props;
+    this.conf = conf;
+    this.fs = fs;
+  }
+
+  @Deprecated
+  public HoodieSyncTool(Properties props, FileSystem fileSystem) {
+    this(new TypedProperties(props), fileSystem.getConf(), fileSystem);
+  }
+
+  public abstract void syncHoodieTable();
+
+}
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/SupportMetaSync.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/SupportMetaSync.java
new file mode 100644
index 0000000000..4e29a57a08
--- /dev/null
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/SupportMetaSync.java
@@ -0,0 +1,7 @@
+package org.apache.hudi.sync.common;
+
+public interface SupportMetaSync {
+  default void runMetaSync() {
+
+  }
+}
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/CatalogSync.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/CatalogSync.java
new file mode 100644
index 0000000000..fc5e093d7e
--- /dev/null
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/CatalogSync.java
@@ -0,0 +1,32 @@
+package org.apache.hudi.sync.common.operation;
+
+import org.apache.parquet.schema.MessageType;
+
+import java.util.Map;
+
+public interface CatalogSync {
+  /**
+   * Create the table.
+   *
+   * @param tableName         The table name.
+   * @param storageSchema     The table schema.
+   * @param inputFormatClass  The input format class of this table.
+   * @param outputFormatClass The output format class of this table.
+   * @param serdeClass        The serde class of this table.
+   * @param serdeProperties   The serde properties of this table.
+   * @param tableProperties   The table properties for this table.
+   */
+  void createTable(String tableName, MessageType storageSchema,
+                   String inputFormatClass, String outputFormatClass,
+                   String serdeClass, Map<String, String> serdeProperties,
+                   Map<String, String> tableProperties);
+
+  /**
+   * @deprecated Use {@link #tableExists} instead.
+   */
+  @Deprecated
+  boolean doesTableExist(String tableName);
+
+  boolean tableExists(String tableName);
+
+}
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/PartitionsSync.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/PartitionsSync.java
new file mode 100644
index 0000000000..c5b1edd4cd
--- /dev/null
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/PartitionsSync.java
@@ -0,0 +1,11 @@
+package org.apache.hudi.sync.common.operation;
+
+import java.util.List;
+
+public interface PartitionsSync {
+  void addPartitionsToTable(String tableName, List<String> partitionsToAdd);
+
+  void updatePartitionsToTable(String tableName, List<String> changedPartitions);
+
+  void dropPartitions(String tableName, List<String> partitionsToDrop);
+}
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/ReplicatedTimeSync.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/ReplicatedTimeSync.java
new file mode 100644
index 0000000000..a93a9481ce
--- /dev/null
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/ReplicatedTimeSync.java
@@ -0,0 +1,11 @@
+package org.apache.hudi.sync.common.operation;
+
+import org.apache.hudi.common.util.Option;
+
+public interface ReplicatedTimeSync {
+  Option<String> getLastReplicatedTime(String tableName);
+
+  void updateLastReplicatedTimeStamp(String tableName, String timeStamp);
+
+  void deleteLastReplicatedTimeStamp(String tableName);
+}
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/TblPropertiesSync.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/TblPropertiesSync.java
new file mode 100644
index 0000000000..c145e95532
--- /dev/null
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/TblPropertiesSync.java
@@ -0,0 +1,13 @@
+package org.apache.hudi.sync.common.operation;
+
+import org.apache.hudi.common.util.Option;
+
+import java.util.Map;
+
+public interface TblPropertiesSync {
+  Option<String> getLastCommitTimeSynced(String tableName);
+
+  void updateLastCommitTimeSynced(String tableName);
+
+  void updateTableProperties(String tableName, Map<String, String> tableProperties);
+}
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkDataSourceTableUtils.java
similarity index 63%
rename from hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java
rename to hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkDataSourceTableUtils.java
index 972ae1f96c..bcf572e088 100644
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkDataSourceTableUtils.java
@@ -1,29 +1,6 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+package org.apache.hudi.sync.common.util;
 
-package org.apache.hudi.sync.common;
-
-import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.sync.common.util.ConfigUtils;
-import org.apache.hudi.sync.common.util.Parquet2SparkSchemaUtils;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
@@ -33,40 +10,18 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 
 import static org.apache.parquet.schema.OriginalType.UTF8;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 
-/**
- * Base class to sync Hudi meta data with Metastores to make
- * Hudi table queryable through external systems.
- */
-public abstract class AbstractSyncTool {
-  protected final Configuration conf;
-  protected final FileSystem fs;
-  protected TypedProperties props;
-
-  public AbstractSyncTool(TypedProperties props, Configuration conf, FileSystem fs) {
-    this.props = props;
-    this.conf = conf;
-    this.fs = fs;
-  }
-
-  @Deprecated
-  public AbstractSyncTool(Properties props, FileSystem fileSystem) {
-    this(new TypedProperties(props), fileSystem.getConf(), fileSystem);
-  }
-
-  public abstract void syncHoodieTable();
-
+public class SparkDataSourceTableUtils {
   /**
    * Get Spark Sql related table properties. This is used for spark datasource table.
    * @param schema  The schema to write to the table.
    * @return A new parameters added the spark's table properties.
    */
-  protected Map<String, String> getSparkTableProperties(List<String> partitionNames, String sparkVersion,
-                                                      int schemaLengthThreshold, MessageType schema)  {
+  public static Map<String, String> getSparkTableProperties(List<String> partitionNames, String sparkVersion,
+                                                            int schemaLengthThreshold, MessageType schema)  {
     // Convert the schema and partition info used by spark sql to hive table properties.
     // The following code refers to the spark code in
     // https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -122,7 +77,7 @@ public abstract class AbstractSyncTool {
     return sparkProperties;
   }
 
-  protected Map<String, String> getSparkSerdeProperties(boolean readAsOptimized, String basePath) {
+  public static Map<String, String> getSparkSerdeProperties(boolean readAsOptimized, String basePath) {
     Map<String, String> sparkSerdeProperties = new HashMap<>();
     sparkSerdeProperties.put("path", basePath);
     sparkSerdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(readAsOptimized));
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
index def85c5b80..39cc56c04c 100644
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
@@ -22,7 +22,7 @@ package org.apache.hudi.sync.common.util;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.hudi.sync.common.HoodieSyncTool;
 import org.apache.hudi.sync.common.HoodieSyncConfig;
 
 import org.apache.hadoop.conf.Configuration;
@@ -39,7 +39,7 @@ public class SyncUtilHelpers {
   private static final Logger LOG = LogManager.getLogger(SyncUtilHelpers.class);
 
   /**
-   * Create an instance of an implementation of {@link AbstractSyncTool} that will sync all the relevant meta information
+   * Create an instance of an implementation of {@link HoodieSyncTool} that will sync all the relevant meta information
    * with an external metastore such as Hive etc. to ensure Hoodie tables can be queried or read via external systems.
    *
    * @param metaSyncFQCN  The class that implements the sync of the metadata.
@@ -62,12 +62,12 @@ public class SyncUtilHelpers {
     }
   }
 
-  static AbstractSyncTool instantiateMetaSyncTool(String metaSyncFQCN,
-                                                 TypedProperties props,
-                                                 Configuration hadoopConfig,
-                                                 FileSystem fs,
-                                                 String targetBasePath,
-                                                 String baseFileFormat) {
+  static HoodieSyncTool instantiateMetaSyncTool(String metaSyncFQCN,
+                                                TypedProperties props,
+                                                Configuration hadoopConfig,
+                                                FileSystem fs,
+                                                String targetBasePath,
+                                                String baseFileFormat) {
     TypedProperties properties = new TypedProperties();
     properties.putAll(props);
     properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), targetBasePath);
@@ -75,13 +75,13 @@ public class SyncUtilHelpers {
 
     if (ReflectionUtils.hasConstructor(metaSyncFQCN,
         new Class<?>[] {TypedProperties.class, Configuration.class, FileSystem.class})) {
-      return ((AbstractSyncTool) ReflectionUtils.loadClass(metaSyncFQCN,
+      return ((HoodieSyncTool) ReflectionUtils.loadClass(metaSyncFQCN,
           new Class<?>[] {TypedProperties.class, Configuration.class, FileSystem.class},
           properties, hadoopConfig, fs));
     } else {
       LOG.warn("Falling back to deprecated constructor for class: " + metaSyncFQCN);
       try {
-        return ((AbstractSyncTool) ReflectionUtils.loadClass(metaSyncFQCN,
+        return ((HoodieSyncTool) ReflectionUtils.loadClass(metaSyncFQCN,
             new Class<?>[] {Properties.class, FileSystem.class}, properties, fs));
       } catch (Throwable t) {
         throw new HoodieException("Could not load meta sync class " + metaSyncFQCN, t);
diff --git a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSyncUtilHelpers.java b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSyncUtilHelpers.java
index dc9dee8b42..4e8757d142 100644
--- a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSyncUtilHelpers.java
+++ b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSyncUtilHelpers.java
@@ -20,7 +20,7 @@ package org.apache.hudi.sync.common.util;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.hudi.sync.common.HoodieSyncTool;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -48,7 +48,7 @@ public class TestSyncUtilHelpers {
 
   @Test
   public void testCreateValidSyncClass() {
-    AbstractSyncTool metaSyncTool = SyncUtilHelpers.instantiateMetaSyncTool(
+    HoodieSyncTool metaSyncTool = SyncUtilHelpers.instantiateMetaSyncTool(
         ValidMetaSyncClass.class.getName(),
         new TypedProperties(),
         hadoopConf,
@@ -60,13 +60,13 @@ public class TestSyncUtilHelpers {
   }
 
   /**
-   * Ensure it still works for the deprecated constructor of {@link AbstractSyncTool}
+   * Ensure it still works for the deprecated constructor of {@link HoodieSyncTool}
    * as we implemented the fallback.
    */
   @Test
   public void testCreateDeprecatedSyncClass() {
     Properties properties = new Properties();
-    AbstractSyncTool deprecatedMetaSyncClass = SyncUtilHelpers.instantiateMetaSyncTool(
+    HoodieSyncTool deprecatedMetaSyncClass = SyncUtilHelpers.instantiateMetaSyncTool(
         DeprecatedMetaSyncClass.class.getName(),
         new TypedProperties(properties),
         hadoopConf,
@@ -95,7 +95,7 @@ public class TestSyncUtilHelpers {
 
   }
 
-  public static class ValidMetaSyncClass extends AbstractSyncTool {
+  public static class ValidMetaSyncClass extends HoodieSyncTool {
     public ValidMetaSyncClass(TypedProperties props, Configuration conf, FileSystem fs) {
       super(props, conf, fs);
     }
@@ -106,7 +106,7 @@ public class TestSyncUtilHelpers {
     }
   }
 
-  public static class DeprecatedMetaSyncClass extends AbstractSyncTool {
+  public static class DeprecatedMetaSyncClass extends HoodieSyncTool {
     public DeprecatedMetaSyncClass(Properties props, FileSystem fileSystem) {
       super(props, fileSystem);
     }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 736e416162..ef03079d05 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -60,6 +60,7 @@ import org.apache.hudi.keygen.SimpleKeyGenerator;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.metrics.HoodieMetrics;
+import org.apache.hudi.sync.common.SupportMetaSync;
 import org.apache.hudi.sync.common.util.SyncUtilHelpers;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback;
@@ -122,7 +123,7 @@ import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_REC
 /**
  * Sync's one batch of data to hoodie table.
  */
-public class DeltaSync implements Serializable {
+public class DeltaSync implements SupportMetaSync, Serializable {
 
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = LogManager.getLogger(DeltaSync.class);
@@ -629,7 +630,7 @@ public class DeltaSync implements Serializable {
         }
 
         if (!isEmpty) {
-          syncMeta(metrics);
+          runMetaSync();
         }
       } else {
         LOG.info("Commit " + instantTime + " failed!");
@@ -690,7 +691,7 @@ public class DeltaSync implements Serializable {
     return syncClassName.substring(syncClassName.lastIndexOf(".") + 1);
   }
 
-  private void syncMeta(HoodieDeltaStreamerMetrics metrics) {
+  public void runMetaSync() {
     Set<String> syncClientToolClasses = new HashSet<>(Arrays.asList(cfg.syncClientToolClassNames.split(",")));
     // for backward compatibility
     if (cfg.enableHiveSync) {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index ae38968187..bd979b5f62 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -1355,13 +1355,13 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
 
     // Test Hive integration
     HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath, "hive_trips");
-    hiveSyncConfig.partitionFields = CollectionUtils.createImmutableList("year", "month", "day");
+    hiveSyncConfig.hoodieSyncConfigParams.partitionFields = CollectionUtils.createImmutableList("year", "month", "day");
     HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), dfs);
-    assertTrue(hiveClient.tableExists(hiveSyncConfig.tableName), "Table " + hiveSyncConfig.tableName + " should exist");
-    assertEquals(3, hiveClient.getAllPartitions(hiveSyncConfig.tableName).size(),
+    assertTrue(hiveClient.tableExists(hiveSyncConfig.hoodieSyncConfigParams.tableName), "Table " + hiveSyncConfig.hoodieSyncConfigParams.tableName + " should exist");
+    assertEquals(3, hiveClient.getAllPartitions(hiveSyncConfig.hoodieSyncConfigParams.tableName).size(),
         "Table partitions should match the number of partitions we wrote");
     assertEquals(lastInstantForUpstreamTable,
-        hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
+        hiveClient.getLastCommitTimeSynced(hiveSyncConfig.hoodieSyncConfigParams.tableName).get(),
         "The last commit that was synced should be updated in the TBLPROPERTIES");
   }
 
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index 7df6e11014..fec1ed6511 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -184,15 +184,15 @@ public class UtilitiesTestBase {
    */
   protected static HiveSyncConfig getHiveSyncConfig(String basePath, String tableName) {
     HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
-    hiveSyncConfig.jdbcUrl = "jdbc:hive2://127.0.0.1:9999/";
-    hiveSyncConfig.hiveUser = "";
-    hiveSyncConfig.hivePass = "";
-    hiveSyncConfig.databaseName = "testdb1";
-    hiveSyncConfig.tableName = tableName;
-    hiveSyncConfig.basePath = basePath;
-    hiveSyncConfig.assumeDatePartitioning = false;
-    hiveSyncConfig.usePreApacheInputFormat = false;
-    hiveSyncConfig.partitionFields = CollectionUtils.createImmutableList("datestr");
+    hiveSyncConfig.hiveSyncConfigParams.jdbcUrl = "jdbc:hive2://127.0.0.1:9999/";
+    hiveSyncConfig.hiveSyncConfigParams.hiveUser = "";
+    hiveSyncConfig.hiveSyncConfigParams.hivePass = "";
+    hiveSyncConfig.hoodieSyncConfigParams.databaseName = "testdb1";
+    hiveSyncConfig.hoodieSyncConfigParams.tableName = tableName;
+    hiveSyncConfig.hoodieSyncConfigParams.basePath = basePath;
+    hiveSyncConfig.hoodieSyncConfigParams.assumeDatePartitioning = false;
+    hiveSyncConfig.hiveSyncConfigParams.usePreApacheInputFormat = false;
+    hiveSyncConfig.hoodieSyncConfigParams.partitionFields = CollectionUtils.createImmutableList("datestr");
     return hiveSyncConfig;
   }
 
@@ -208,12 +208,12 @@ public class UtilitiesTestBase {
     hiveConf.addResource(hiveServer.getHiveConf());
     HoodieTableMetaClient.withPropertyBuilder()
       .setTableType(HoodieTableType.COPY_ON_WRITE)
-      .setTableName(hiveSyncConfig.tableName)
-      .initTable(dfs.getConf(), hiveSyncConfig.basePath);
+      .setTableName(hiveSyncConfig.hoodieSyncConfigParams.tableName)
+      .initTable(dfs.getConf(), hiveSyncConfig.hoodieSyncConfigParams.basePath);
 
     QueryBasedDDLExecutor ddlExecutor = new JDBCExecutor(hiveSyncConfig, dfs);
-    ddlExecutor.runSQL("drop database if exists " + hiveSyncConfig.databaseName);
-    ddlExecutor.runSQL("create database " + hiveSyncConfig.databaseName);
+    ddlExecutor.runSQL("drop database if exists " + hiveSyncConfig.hoodieSyncConfigParams.databaseName);
+    ddlExecutor.runSQL("create database " + hiveSyncConfig.hoodieSyncConfigParams.databaseName);
     ddlExecutor.close();
   }