You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by xu...@apache.org on 2022/06/12 17:55:17 UTC

[hudi] branch release-feature-rfc55 updated: [HUDI-3730] Improve meta sync class design and hierarchies (#5754)

This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-feature-rfc55
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/release-feature-rfc55 by this push:
     new d65555b106 [HUDI-3730] Improve meta sync class design and hierarchies (#5754)
d65555b106 is described below

commit d65555b1067b809a16fdfbb105f28348415b3945
Author: 冯健 <fe...@gmail.com>
AuthorDate: Mon Jun 13 01:55:09 2022 +0800

    [HUDI-3730] Improve meta sync class design and hierarchies (#5754)
    
    
    
    Co-authored-by: jian.feng <fe...@gmial.com>
    Co-authored-by: jian.feng <ji...@shopee.com>
---
 .../hudi/aws/sync/AWSGlueCatalogSyncClient.java    |  20 +-
 .../hudi/aws/sync/AwsGlueCatalogSyncTool.java      |   4 +-
 .../apache/hudi/sink/utils/HiveSyncContext.java    |  46 ++---
 .../hudi/sink/utils/TestHiveSyncContext.java       |   4 +-
 .../apache/hudi/gcp/bigquery/BigQuerySyncTool.java |   4 +-
 .../gcp/bigquery/HoodieBigQuerySyncClient.java     |  41 +---
 .../integ/testsuite/dag/nodes/HiveQueryNode.java   |   4 +-
 .../writers/KafkaConnectTransactionServices.java   |   3 +-
 .../main/java/org/apache/hudi/DataSourceUtils.java |  67 +++----
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      |  78 +++----
 .../hudi/command/MergeIntoHoodieTableCommand.scala |   6 +-
 .../java/org/apache/hudi/TestDataSourceUtils.java  |  12 +-
 .../hudi/sync/adb/AbstractAdbSyncHoodieClient.java |  27 +--
 .../org/apache/hudi/sync/adb/AdbSyncConfig.java    | 215 ++++++++++----------
 .../java/org/apache/hudi/sync/adb/AdbSyncTool.java |  63 +++---
 .../apache/hudi/sync/adb/HoodieAdbJdbcClient.java  |  65 +++---
 .../apache/hudi/sync/adb/TestAdbSyncConfig.java    |  54 ++---
 .../hudi/sync/datahub/DataHubSyncClient.java       |  59 +-----
 .../apache/hudi/sync/datahub/DataHubSyncTool.java  |  10 +-
 .../config/HoodieDataHubDatasetIdentifier.java     |   2 +-
 .../hudi/hive/AbstractHiveSyncHoodieClient.java    |  16 +-
 .../java/org/apache/hudi/hive/HiveSyncConfig.java  | 223 ++++++++++-----------
 .../java/org/apache/hudi/hive/HiveSyncTool.java    |  74 +++----
 .../org/apache/hudi/hive/HoodieHiveClient.java     |  40 ++--
 .../org/apache/hudi/hive/ddl/HMSDDLExecutor.java   |  54 ++---
 .../apache/hudi/hive/ddl/HiveQueryDDLExecutor.java |  10 +-
 .../org/apache/hudi/hive/ddl/JDBCExecutor.java     |  16 +-
 .../hudi/hive/ddl/QueryBasedDDLExecutor.java       |  34 ++--
 .../hive/replication/GlobalHiveSyncConfig.java     |  28 +--
 .../hudi/hive/replication/GlobalHiveSyncTool.java  |   2 +-
 .../replication/HiveSyncGlobalCommitConfig.java    |  12 +-
 .../hive/replication/HiveSyncGlobalCommitTool.java |   2 +-
 .../apache/hudi/hive/util/HivePartitionUtil.java   |  12 +-
 .../org/apache/hudi/hive/util/HiveSchemaUtil.java  |  22 +-
 .../hudi/hive/TestHiveSyncGlobalCommitTool.java    |  24 +--
 .../org/apache/hudi/hive/TestHiveSyncTool.java     |   4 +-
 .../testutils/HiveSyncFunctionalTestHarness.java   |  28 +--
 ...SyncHoodieClient.java => HoodieSyncClient.java} | 109 +---------
 .../apache/hudi/sync/common/HoodieSyncConfig.java  |  88 ++++----
 .../apache/hudi/sync/common/HoodieSyncTool.java    |  49 +++++
 .../apache/hudi/sync/common/SupportMetaSync.java   |   7 +
 .../hudi/sync/common/operation/CatalogSync.java    |  32 +++
 .../hudi/sync/common/operation/PartitionsSync.java |  11 +
 .../sync/common/operation/ReplicatedTimeSync.java  |  11 +
 .../sync/common/operation/TblPropertiesSync.java   |  13 ++
 .../SparkDataSourceTableUtils.java}                |  55 +----
 .../hudi/sync/common/util/SyncUtilHelpers.java     |  20 +-
 .../hudi/sync/common/util/TestSyncUtilHelpers.java |  12 +-
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   7 +-
 .../functional/TestHoodieDeltaStreamer.java        |   8 +-
 .../utilities/testutils/UtilitiesTestBase.java     |  26 +--
 51 files changed, 855 insertions(+), 978 deletions(-)

diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index e15a698f27..e269908a4f 100644
--- a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++ b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -90,7 +90,7 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
   public AWSGlueCatalogSyncClient(HiveSyncConfig syncConfig, Configuration hadoopConf, FileSystem fs) {
     super(syncConfig, hadoopConf, fs);
     this.awsGlue = AWSGlueClientBuilder.standard().build();
-    this.databaseName = syncConfig.databaseName;
+    this.databaseName = syncConfig.hoodieSyncConfigParams.databaseName;
   }
 
   @Override
@@ -126,7 +126,7 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
       StorageDescriptor sd = table.getStorageDescriptor();
       List<PartitionInput> partitionInputs = partitionsToAdd.stream().map(partition -> {
         StorageDescriptor partitionSd = sd.clone();
-        String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
+        String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.hoodieSyncConfigParams.basePath, partition).toString();
         List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
         partitionSd.setLocation(fullPartitionPath);
         return new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
@@ -160,7 +160,7 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
       StorageDescriptor sd = table.getStorageDescriptor();
       List<BatchUpdatePartitionRequestEntry> updatePartitionEntries = changedPartitions.stream().map(partition -> {
         StorageDescriptor partitionSd = sd.clone();
-        String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
+        String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.hoodieSyncConfigParams.basePath, partition).toString();
         List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
         sd.setLocation(fullPartitionPath);
         PartitionInput partitionInput = new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd);
@@ -206,10 +206,10 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
   @Override
   public void updateTableDefinition(String tableName, MessageType newSchema) {
     // ToDo Cascade is set in Hive meta sync, but need to investigate how to configure it for Glue meta
-    boolean cascade = syncConfig.partitionFields.size() > 0;
+    boolean cascade = syncConfig.hoodieSyncConfigParams.partitionFields.size() > 0;
     try {
       Table table = getTable(awsGlue, databaseName, tableName);
-      Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, syncConfig.supportTimestamp, false);
+      Map<String, String> newSchemaMap = parquetSchemaToMapSchema(newSchema, syncConfig.hiveSyncConfigParams.supportTimestamp, false);
       List<Column> newColumns = newSchemaMap.keySet().stream().map(key -> {
         String keyType = getPartitionKeyType(newSchemaMap, key);
         return new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
@@ -265,26 +265,26 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
     }
     CreateTableRequest request = new CreateTableRequest();
     Map<String, String> params = new HashMap<>();
-    if (!syncConfig.createManagedTable) {
+    if (!syncConfig.hiveSyncConfigParams.createManagedTable) {
       params.put("EXTERNAL", "TRUE");
     }
     params.putAll(tableProperties);
 
     try {
-      Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, syncConfig.supportTimestamp, false);
+      Map<String, String> mapSchema = parquetSchemaToMapSchema(storageSchema, syncConfig.hiveSyncConfigParams.supportTimestamp, false);
 
       List<Column> schemaWithoutPartitionKeys = new ArrayList<>();
       for (String key : mapSchema.keySet()) {
         String keyType = getPartitionKeyType(mapSchema, key);
         Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment("");
         // In Glue, the full schema should exclude the partition keys
-        if (!syncConfig.partitionFields.contains(key)) {
+        if (!syncConfig.hoodieSyncConfigParams.partitionFields.contains(key)) {
           schemaWithoutPartitionKeys.add(column);
         }
       }
 
       // now create the schema partition
-      List<Column> schemaPartitionKeys = syncConfig.partitionFields.stream().map(partitionKey -> {
+      List<Column> schemaPartitionKeys = syncConfig.hoodieSyncConfigParams.partitionFields.stream().map(partitionKey -> {
         String keyType = getPartitionKeyType(mapSchema, partitionKey);
         return new Column().withName(partitionKey).withType(keyType.toLowerCase()).withComment("");
       }).collect(Collectors.toList());
@@ -293,7 +293,7 @@ public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient {
       serdeProperties.put("serialization.format", "1");
       storageDescriptor
           .withSerdeInfo(new SerDeInfo().withSerializationLibrary(serdeClass).withParameters(serdeProperties))
-          .withLocation(s3aToS3(syncConfig.basePath))
+          .withLocation(s3aToS3(syncConfig.hoodieSyncConfigParams.basePath))
           .withInputFormat(inputFormatClass)
           .withOutputFormat(outputFormatClass)
           .withColumns(schemaWithoutPartitionKeys);
diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
index bb1be377c9..ed5a49dbdb 100644
--- a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
+++ b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
@@ -58,11 +58,11 @@ public class AwsGlueCatalogSyncTool extends HiveSyncTool {
     // parse the params
     final HiveSyncConfig cfg = new HiveSyncConfig();
     JCommander cmd = new JCommander(cfg, null, args);
-    if (cfg.help || args.length == 0) {
+    if (cfg.hiveSyncConfigParams.help || args.length == 0) {
       cmd.usage();
       System.exit(1);
     }
-    FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
+    FileSystem fs = FSUtils.getFs(cfg.hoodieSyncConfigParams.basePath, new Configuration());
     HiveConf hiveConf = new HiveConf();
     hiveConf.addResource(fs.getConf());
     new AwsGlueCatalogSyncTool(cfg, hiveConf, fs).syncHoodieTable();
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
index 9fc5323d46..dee87e9966 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/HiveSyncContext.java
@@ -52,7 +52,7 @@ public class HiveSyncContext {
   }
 
   public HiveSyncTool hiveSyncTool() {
-    HiveSyncMode syncMode = HiveSyncMode.of(syncConfig.syncMode);
+    HiveSyncMode syncMode = HiveSyncMode.of(syncConfig.hiveSyncConfigParams.syncMode);
     if (syncMode == HiveSyncMode.GLUE) {
       return new AwsGlueCatalogSyncTool(this.syncConfig, this.hiveConf, this.fs);
     }
@@ -76,28 +76,28 @@ public class HiveSyncContext {
   @VisibleForTesting
   public static HiveSyncConfig buildSyncConfig(Configuration conf) {
     HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
-    hiveSyncConfig.basePath = conf.getString(FlinkOptions.PATH);
-    hiveSyncConfig.baseFileFormat = conf.getString(FlinkOptions.HIVE_SYNC_FILE_FORMAT);
-    hiveSyncConfig.usePreApacheInputFormat = false;
-    hiveSyncConfig.databaseName = conf.getString(FlinkOptions.HIVE_SYNC_DB);
-    hiveSyncConfig.tableName = conf.getString(FlinkOptions.HIVE_SYNC_TABLE);
-    hiveSyncConfig.syncMode = conf.getString(FlinkOptions.HIVE_SYNC_MODE);
-    hiveSyncConfig.hiveUser = conf.getString(FlinkOptions.HIVE_SYNC_USERNAME);
-    hiveSyncConfig.hivePass = conf.getString(FlinkOptions.HIVE_SYNC_PASSWORD);
-    hiveSyncConfig.tableProperties = conf.getString(FlinkOptions.HIVE_SYNC_TABLE_PROPERTIES);
-    hiveSyncConfig.serdeProperties = conf.getString(FlinkOptions.HIVE_SYNC_TABLE_SERDE_PROPERTIES);
-    hiveSyncConfig.jdbcUrl = conf.getString(FlinkOptions.HIVE_SYNC_JDBC_URL);
-    hiveSyncConfig.partitionFields = Arrays.asList(FilePathUtils.extractHivePartitionFields(conf));
-    hiveSyncConfig.partitionValueExtractorClass = conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME);
-    hiveSyncConfig.useJdbc = conf.getBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC);
-    hiveSyncConfig.useFileListingFromMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED);
-    hiveSyncConfig.ignoreExceptions = conf.getBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS);
-    hiveSyncConfig.supportTimestamp = conf.getBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP);
-    hiveSyncConfig.autoCreateDatabase = conf.getBoolean(FlinkOptions.HIVE_SYNC_AUTO_CREATE_DB);
-    hiveSyncConfig.decodePartition = conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING);
-    hiveSyncConfig.skipROSuffix = conf.getBoolean(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX);
-    hiveSyncConfig.assumeDatePartitioning = conf.getBoolean(FlinkOptions.HIVE_SYNC_ASSUME_DATE_PARTITION);
-    hiveSyncConfig.withOperationField = conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED);
+    hiveSyncConfig.hoodieSyncConfigParams.basePath = conf.getString(FlinkOptions.PATH);
+    hiveSyncConfig.hoodieSyncConfigParams.baseFileFormat = conf.getString(FlinkOptions.HIVE_SYNC_FILE_FORMAT);
+    hiveSyncConfig.hiveSyncConfigParams.usePreApacheInputFormat = false;
+    hiveSyncConfig.hoodieSyncConfigParams.databaseName = conf.getString(FlinkOptions.HIVE_SYNC_DB);
+    hiveSyncConfig.hoodieSyncConfigParams.tableName = conf.getString(FlinkOptions.HIVE_SYNC_TABLE);
+    hiveSyncConfig.hiveSyncConfigParams.syncMode = conf.getString(FlinkOptions.HIVE_SYNC_MODE);
+    hiveSyncConfig.hiveSyncConfigParams.hiveUser = conf.getString(FlinkOptions.HIVE_SYNC_USERNAME);
+    hiveSyncConfig.hiveSyncConfigParams.hivePass = conf.getString(FlinkOptions.HIVE_SYNC_PASSWORD);
+    hiveSyncConfig.hiveSyncConfigParams.tableProperties = conf.getString(FlinkOptions.HIVE_SYNC_TABLE_PROPERTIES);
+    hiveSyncConfig.hiveSyncConfigParams.serdeProperties = conf.getString(FlinkOptions.HIVE_SYNC_TABLE_SERDE_PROPERTIES);
+    hiveSyncConfig.hiveSyncConfigParams.jdbcUrl = conf.getString(FlinkOptions.HIVE_SYNC_JDBC_URL);
+    hiveSyncConfig.hoodieSyncConfigParams.partitionFields = Arrays.asList(FilePathUtils.extractHivePartitionFields(conf));
+    hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass = conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME);
+    hiveSyncConfig.hiveSyncConfigParams.useJdbc = conf.getBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC);
+    hiveSyncConfig.hoodieSyncConfigParams.useFileListingFromMetadata = conf.getBoolean(FlinkOptions.METADATA_ENABLED);
+    hiveSyncConfig.hiveSyncConfigParams.ignoreExceptions = conf.getBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS);
+    hiveSyncConfig.hiveSyncConfigParams.supportTimestamp = conf.getBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP);
+    hiveSyncConfig.hiveSyncConfigParams.autoCreateDatabase = conf.getBoolean(FlinkOptions.HIVE_SYNC_AUTO_CREATE_DB);
+    hiveSyncConfig.hoodieSyncConfigParams.decodePartition = conf.getBoolean(FlinkOptions.URL_ENCODE_PARTITIONING);
+    hiveSyncConfig.hiveSyncConfigParams.skipROSuffix = conf.getBoolean(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX);
+    hiveSyncConfig.hoodieSyncConfigParams.assumeDatePartitioning = conf.getBoolean(FlinkOptions.HIVE_SYNC_ASSUME_DATE_PARTITION);
+    hiveSyncConfig.hiveSyncConfigParams.withOperationField = conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED);
     return hiveSyncConfig;
   }
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java
index 7bfaade59e..2e8cc00bb1 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestHiveSyncContext.java
@@ -55,8 +55,8 @@ public class TestHiveSyncContext {
     HiveSyncConfig hiveSyncConfig1 = HiveSyncContext.buildSyncConfig(configuration1);
     HiveSyncConfig hiveSyncConfig2 = HiveSyncContext.buildSyncConfig(configuration2);
 
-    assertTrue(hiveSyncConfig1.partitionFields.get(0).equals(hiveSyncPartitionField));
-    assertTrue(hiveSyncConfig2.partitionFields.get(0).equals(partitionPathField));
+    assertTrue(hiveSyncConfig1.hoodieSyncConfigParams.partitionFields.get(0).equals(hiveSyncPartitionField));
+    assertTrue(hiveSyncConfig2.hoodieSyncConfigParams.partitionFields.get(0).equals(partitionPathField));
 
   }
 }
diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
index 0cb75eea89..b6904a6887 100644
--- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
+++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java
@@ -23,7 +23,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.hudi.sync.common.HoodieSyncTool;
 import org.apache.hudi.sync.common.util.ManifestFileWriter;
 
 import com.beust.jcommander.JCommander;
@@ -40,7 +40,7 @@ import org.apache.log4j.Logger;
  *
  * @Experimental
  */
-public class BigQuerySyncTool extends AbstractSyncTool {
+public class BigQuerySyncTool extends HoodieSyncTool {
 
   private static final Logger LOG = LogManager.getLogger(BigQuerySyncTool.class);
 
diff --git a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
index cb41ca2272..c5222aca91 100644
--- a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
+++ b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
@@ -20,7 +20,7 @@
 package org.apache.hudi.gcp.bigquery;
 
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hudi.sync.common.HoodieSyncClient;
 
 import com.google.cloud.bigquery.BigQuery;
 import com.google.cloud.bigquery.BigQueryException;
@@ -39,6 +39,8 @@ import com.google.cloud.bigquery.TableId;
 import com.google.cloud.bigquery.TableInfo;
 import com.google.cloud.bigquery.ViewDefinition;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.sync.common.operation.CatalogSync;
+import org.apache.hudi.sync.common.operation.TblPropertiesSync;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.parquet.schema.MessageType;
@@ -47,7 +49,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-public class HoodieBigQuerySyncClient extends AbstractSyncHoodieClient {
+public class HoodieBigQuerySyncClient extends HoodieSyncClient implements CatalogSync, TblPropertiesSync {
   private static final Logger LOG = LogManager.getLogger(HoodieBigQuerySyncClient.class);
 
   private final BigQuerySyncConfig syncConfig;
@@ -171,12 +173,6 @@ public class HoodieBigQuerySyncClient extends AbstractSyncHoodieClient {
     return Collections.emptyMap();
   }
 
-  @Override
-  public void addPartitionsToTable(final String tableName, final List<String> partitionsToAdd) {
-    // bigQuery discovers the new partitions automatically, so do nothing.
-    throw new UnsupportedOperationException("No support for addPartitionsToTable yet.");
-  }
-
   public boolean datasetExists() {
     Dataset dataset = bigquery.getDataset(DatasetId.of(syncConfig.projectId, syncConfig.datasetName));
     return dataset != null;
@@ -207,33 +203,8 @@ public class HoodieBigQuerySyncClient extends AbstractSyncHoodieClient {
   }
 
   @Override
-  public Option<String> getLastReplicatedTime(String tableName) {
-    // bigQuery doesn't support tblproperties, so do nothing.
-    throw new UnsupportedOperationException("Not support getLastReplicatedTime yet.");
-  }
-
-  @Override
-  public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
-    // bigQuery doesn't support tblproperties, so do nothing.
-    throw new UnsupportedOperationException("No support for updateLastReplicatedTimeStamp yet.");
-  }
-
-  @Override
-  public void deleteLastReplicatedTimeStamp(String tableName) {
-    // bigQuery doesn't support tblproperties, so do nothing.
-    throw new UnsupportedOperationException("No support for deleteLastReplicatedTimeStamp yet.");
-  }
-
-  @Override
-  public void updatePartitionsToTable(final String tableName, final List<String> changedPartitions) {
-    // bigQuery updates the partitions automatically, so do nothing.
-    throw new UnsupportedOperationException("No support for updatePartitionsToTable yet.");
-  }
-
-  @Override
-  public void dropPartitions(String tableName, List<String> partitionsToDrop) {
-    // bigQuery discovers the new partitions automatically, so do nothing.
-    throw new UnsupportedOperationException("No support for dropPartitions yet.");
+  public void updateTableProperties(String tableName, Map<String, String> tableProperties) {
+    throw new UnsupportedOperationException("No support for updateTableProperties yet.");
   }
 
   @Override
diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
index 4736133f2c..8aef7bdf81 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java
@@ -57,8 +57,8 @@ public class HiveQueryNode extends DagNode<Boolean> {
         .getDeltaSyncService().getDeltaSync().getCfg().baseFileFormat);
     HiveSyncConfig hiveSyncConfig = new HiveSyncConfig(properties);
     this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter());
-    Connection con = DriverManager.getConnection(hiveSyncConfig.jdbcUrl, hiveSyncConfig.hiveUser,
-        hiveSyncConfig.hivePass);
+    Connection con = DriverManager.getConnection(hiveSyncConfig.hiveSyncConfigParams.jdbcUrl, hiveSyncConfig.hiveSyncConfigParams.hiveUser,
+        hiveSyncConfig.hiveSyncConfigParams.hivePass);
     Statement stmt = con.createStatement();
     stmt.execute("set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat");
     for (String hiveProperty : this.config.getHiveProperties()) {
diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
index 934dbadf1c..e351fe14c3 100644
--- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
+++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java
@@ -37,6 +37,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
 import org.apache.hudi.sync.common.HoodieSyncConfig;
+import org.apache.hudi.sync.common.SupportMetaSync;
 import org.apache.hudi.sync.common.util.SyncUtilHelpers;
 
 import org.apache.hadoop.conf.Configuration;
@@ -56,7 +57,7 @@ import java.util.Set;
  * {@link TransactionCoordinator}
  * using {@link HoodieJavaWriteClient}.
  */
-public class KafkaConnectTransactionServices implements ConnectTransactionServices {
+public class KafkaConnectTransactionServices implements ConnectTransactionServices, SupportMetaSync {
 
   private static final Logger LOG = LogManager.getLogger(KafkaConnectTransactionServices.class);
 
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index 4042f431d7..64b8079a64 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -279,50 +279,43 @@ public class DataSourceUtils {
   public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String basePath, String baseFileFormat) {
     checkRequiredProperties(props, Collections.singletonList(DataSourceWriteOptions.HIVE_TABLE().key()));
     HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
-    hiveSyncConfig.basePath = basePath;
-    hiveSyncConfig.usePreApacheInputFormat =
-        props.getBoolean(DataSourceWriteOptions.HIVE_USE_PRE_APACHE_INPUT_FORMAT().key(),
+    hiveSyncConfig.hoodieSyncConfigParams.basePath = basePath;
+    hiveSyncConfig.hiveSyncConfigParams.usePreApacheInputFormat = props.getBoolean(DataSourceWriteOptions.HIVE_USE_PRE_APACHE_INPUT_FORMAT().key(),
             Boolean.parseBoolean(DataSourceWriteOptions.HIVE_USE_PRE_APACHE_INPUT_FORMAT().defaultValue()));
-    hiveSyncConfig.databaseName = props.getString(DataSourceWriteOptions.HIVE_DATABASE().key(),
-        DataSourceWriteOptions.HIVE_DATABASE().defaultValue());
-    hiveSyncConfig.tableName = props.getString(DataSourceWriteOptions.HIVE_TABLE().key());
-    hiveSyncConfig.baseFileFormat = baseFileFormat;
-    hiveSyncConfig.hiveUser =
-        props.getString(DataSourceWriteOptions.HIVE_USER().key(), DataSourceWriteOptions.HIVE_USER().defaultValue());
-    hiveSyncConfig.hivePass =
-        props.getString(DataSourceWriteOptions.HIVE_PASS().key(), DataSourceWriteOptions.HIVE_PASS().defaultValue());
-    hiveSyncConfig.jdbcUrl =
-        props.getString(DataSourceWriteOptions.HIVE_URL().key(), DataSourceWriteOptions.HIVE_URL().defaultValue());
-    hiveSyncConfig.metastoreUris =
-            props.getString(DataSourceWriteOptions.METASTORE_URIS().key(), DataSourceWriteOptions.METASTORE_URIS().defaultValue());
-    hiveSyncConfig.partitionFields =
-        props.getStringList(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), ",", new ArrayList<>());
-    hiveSyncConfig.partitionValueExtractorClass =
-        props.getString(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(),
+    hiveSyncConfig.hoodieSyncConfigParams.databaseName = props.getString(DataSourceWriteOptions.HIVE_DATABASE().key(),
+            DataSourceWriteOptions.HIVE_DATABASE().defaultValue());
+    hiveSyncConfig.hoodieSyncConfigParams.tableName = props.getString(DataSourceWriteOptions.HIVE_TABLE().key());
+    hiveSyncConfig.hoodieSyncConfigParams.baseFileFormat = baseFileFormat;
+    hiveSyncConfig.hiveSyncConfigParams.hiveUser = props.getString(DataSourceWriteOptions.HIVE_USER().key(), DataSourceWriteOptions.HIVE_USER().defaultValue());
+    hiveSyncConfig.hiveSyncConfigParams.hivePass = props.getString(DataSourceWriteOptions.HIVE_PASS().key(), DataSourceWriteOptions.HIVE_PASS().defaultValue());
+    hiveSyncConfig.hiveSyncConfigParams.jdbcUrl = props.getString(DataSourceWriteOptions.HIVE_URL().key(), DataSourceWriteOptions.HIVE_URL().defaultValue());
+    hiveSyncConfig.hiveSyncConfigParams.metastoreUris = props.getString(DataSourceWriteOptions.METASTORE_URIS().key(), DataSourceWriteOptions.METASTORE_URIS().defaultValue());
+    hiveSyncConfig.hoodieSyncConfigParams.partitionFields = props.getStringList(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), ",", new ArrayList<>());
+    hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass = props.getString(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(),
             SlashEncodedDayPartitionValueExtractor.class.getName());
-    hiveSyncConfig.useJdbc = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_USE_JDBC().key(),
-        DataSourceWriteOptions.HIVE_USE_JDBC().defaultValue()));
+    hiveSyncConfig.hiveSyncConfigParams.useJdbc = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_USE_JDBC().key(),
+            DataSourceWriteOptions.HIVE_USE_JDBC().defaultValue()));
     if (props.containsKey(DataSourceWriteOptions.HIVE_SYNC_MODE().key())) {
-      hiveSyncConfig.syncMode = props.getString(DataSourceWriteOptions.HIVE_SYNC_MODE().key());
+      hiveSyncConfig.hiveSyncConfigParams.syncMode = props.getString(DataSourceWriteOptions.HIVE_SYNC_MODE().key());
     }
-    hiveSyncConfig.autoCreateDatabase = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE().key(),
-        DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE().defaultValue()));
-    hiveSyncConfig.ignoreExceptions = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS().key(),
-        DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS().defaultValue()));
-    hiveSyncConfig.skipROSuffix = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE().key(),
-        DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE().defaultValue()));
-    hiveSyncConfig.supportTimestamp = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP_TYPE().key(),
-        DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP_TYPE().defaultValue()));
-    hiveSyncConfig.isConditionalSync = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_CONDITIONAL_SYNC().key(),
-        DataSourceWriteOptions.HIVE_CONDITIONAL_SYNC().defaultValue()));
-    hiveSyncConfig.bucketSpec = props.getBoolean(DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().key(),
-        DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().defaultValue())
-        ? HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()),
+    hiveSyncConfig.hiveSyncConfigParams.autoCreateDatabase = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE().key(),
+            DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE().defaultValue()));
+    hiveSyncConfig.hiveSyncConfigParams.ignoreExceptions = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS().key(),
+            DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS().defaultValue()));
+    hiveSyncConfig.hiveSyncConfigParams.skipROSuffix = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE().key(),
+            DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE().defaultValue()));
+    hiveSyncConfig.hiveSyncConfigParams.supportTimestamp = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP_TYPE().key(),
+            DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP_TYPE().defaultValue()));
+    hiveSyncConfig.hoodieSyncConfigParams.isConditionalSync = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_CONDITIONAL_SYNC().key(),
+            DataSourceWriteOptions.HIVE_CONDITIONAL_SYNC().defaultValue()));
+    hiveSyncConfig.hiveSyncConfigParams.bucketSpec = props.getBoolean(DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().key(),
+            DataSourceWriteOptions.HIVE_SYNC_BUCKET_SYNC().defaultValue())
+            ? HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()),
             props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())) : null;
     if (props.containsKey(HiveExternalCatalog.CREATED_SPARK_VERSION())) {
-      hiveSyncConfig.sparkVersion = props.getString(HiveExternalCatalog.CREATED_SPARK_VERSION());
+      hiveSyncConfig.hoodieSyncConfigParams.sparkVersion = props.getString(HiveExternalCatalog.CREATED_SPARK_VERSION());
     }
-    hiveSyncConfig.syncComment = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SYNC_COMMENT().key(),
+    hiveSyncConfig.hiveSyncConfigParams.syncComment = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SYNC_COMMENT().key(),
             DataSourceWriteOptions.HIVE_SYNC_COMMENT().defaultValue()));
     return hiveSyncConfig;
   }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 3f67d5017f..cb2057a995 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -77,12 +77,12 @@ trait ProvidesHoodieConfig extends Logging {
         PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
         HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
         HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
-        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
-        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName,
-        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName,
+        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.hiveSyncConfigParams.syncMode,
+        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.hoodieSyncConfigParams.databaseName,
+        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.hoodieSyncConfigParams.tableName,
         HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
-        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass,
-        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
+        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass,
+        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.hiveSyncConfigParams.supportTimestamp.toString,
         HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
         SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
       )
@@ -195,11 +195,11 @@ trait ProvidesHoodieConfig extends Logging {
         HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
         HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
         HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
-        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
-        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName,
-        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName,
-        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
-        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass,
+        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.hiveSyncConfigParams.syncMode,
+        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.hoodieSyncConfigParams.databaseName,
+        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.hoodieSyncConfigParams.tableName,
+        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.hiveSyncConfigParams.supportTimestamp.toString,
+        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass,
         HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"),
         HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
         SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL
@@ -232,12 +232,12 @@ trait ProvidesHoodieConfig extends Logging {
         PARTITIONPATH_FIELD.key -> partitionFields,
         HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
         HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
-        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
-        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.databaseName,
-        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.tableName,
-        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
+        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.hiveSyncConfigParams.syncMode,
+        HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.hoodieSyncConfigParams.databaseName,
+        HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.hoodieSyncConfigParams.tableName,
+        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.hiveSyncConfigParams.supportTimestamp.toString,
         HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields,
-        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass
+        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass
       )
         .filter { case (_, v) => v != null }
     }
@@ -274,8 +274,8 @@ trait ProvidesHoodieConfig extends Logging {
         PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
         HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
         HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
-        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
-        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
+        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.hiveSyncConfigParams.syncMode,
+        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.hiveSyncConfigParams.supportTimestamp.toString,
         HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"),
         SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL
       )
@@ -290,32 +290,32 @@ trait ProvidesHoodieConfig extends Logging {
 
   def buildHiveSyncConfig(props: TypedProperties, hoodieCatalogTable: HoodieCatalogTable): HiveSyncConfig = {
     val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig
-    hiveSyncConfig.basePath = hoodieCatalogTable.tableLocation
-    hiveSyncConfig.baseFileFormat = hoodieCatalogTable.baseFileFormat
-    hiveSyncConfig.usePreApacheInputFormat = props.getBoolean(HiveSyncConfig.HIVE_USE_PRE_APACHE_INPUT_FORMAT.key, HiveSyncConfig.HIVE_USE_PRE_APACHE_INPUT_FORMAT.defaultValue.toBoolean)
-    hiveSyncConfig.databaseName = hoodieCatalogTable.table.identifier.database.getOrElse("default")
+    hiveSyncConfig.hoodieSyncConfigParams.basePath = hoodieCatalogTable.tableLocation
+    hiveSyncConfig.hoodieSyncConfigParams.baseFileFormat = hoodieCatalogTable.baseFileFormat
+    hiveSyncConfig.hiveSyncConfigParams.usePreApacheInputFormat = props.getBoolean(HiveSyncConfig.HIVE_USE_PRE_APACHE_INPUT_FORMAT.key, HiveSyncConfig.HIVE_USE_PRE_APACHE_INPUT_FORMAT.defaultValue.toBoolean)
+    hiveSyncConfig.hoodieSyncConfigParams.databaseName = hoodieCatalogTable.table.identifier.database.getOrElse("default")
     if (props.containsKey(HoodieSyncConfig.META_SYNC_TABLE_NAME.key)) {
-      hiveSyncConfig.tableName = props.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME.key)
+      hiveSyncConfig.hoodieSyncConfigParams.tableName = props.getString(HoodieSyncConfig.META_SYNC_TABLE_NAME.key)
     } else {
-      hiveSyncConfig.tableName = hoodieCatalogTable.table.identifier.table
+      hiveSyncConfig.hoodieSyncConfigParams.tableName = hoodieCatalogTable.table.identifier.table
     }
-    hiveSyncConfig.syncMode = props.getString(HiveSyncConfig.HIVE_SYNC_MODE.key, HiveSyncMode.HMS.name())
-    hiveSyncConfig.hiveUser = props.getString(HiveSyncConfig.HIVE_USER.key, HiveSyncConfig.HIVE_USER.defaultValue)
-    hiveSyncConfig.hivePass = props.getString(HiveSyncConfig.HIVE_PASS.key, HiveSyncConfig.HIVE_PASS.defaultValue)
-    hiveSyncConfig.jdbcUrl = props.getString(HiveSyncConfig.HIVE_URL.key, HiveSyncConfig.HIVE_URL.defaultValue)
-    hiveSyncConfig.metastoreUris = props.getString(HiveSyncConfig.METASTORE_URIS.key, HiveSyncConfig.METASTORE_URIS.defaultValue)
-    hiveSyncConfig.partitionFields = props.getStringList(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key, ",", new util.ArrayList[String])
-    hiveSyncConfig.partitionValueExtractorClass = props.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key, classOf[MultiPartKeysValueExtractor].getName)
-    if (props.containsKey(HiveSyncConfig.HIVE_SYNC_MODE.key)) hiveSyncConfig.syncMode = props.getString(HiveSyncConfig.HIVE_SYNC_MODE.key)
-    hiveSyncConfig.autoCreateDatabase = props.getString(HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key, HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.defaultValue).toBoolean
-    hiveSyncConfig.ignoreExceptions = props.getString(HiveSyncConfig.HIVE_IGNORE_EXCEPTIONS.key, HiveSyncConfig.HIVE_IGNORE_EXCEPTIONS.defaultValue).toBoolean
-    hiveSyncConfig.skipROSuffix = props.getString(HiveSyncConfig.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.key, HiveSyncConfig.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.defaultValue).toBoolean
-    hiveSyncConfig.supportTimestamp = props.getString(HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key, "true").toBoolean
-    hiveSyncConfig.isConditionalSync = props.getString(HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC.key, HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC.defaultValue).toBoolean
-    hiveSyncConfig.bucketSpec = if (props.getBoolean(HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.key, HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.defaultValue)) HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key), props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key))
+    hiveSyncConfig.hiveSyncConfigParams.syncMode = props.getString(HiveSyncConfig.HIVE_SYNC_MODE.key, HiveSyncMode.HMS.name())
+    hiveSyncConfig.hiveSyncConfigParams.hiveUser = props.getString(HiveSyncConfig.HIVE_USER.key, HiveSyncConfig.HIVE_USER.defaultValue)
+    hiveSyncConfig.hiveSyncConfigParams.hivePass = props.getString(HiveSyncConfig.HIVE_PASS.key, HiveSyncConfig.HIVE_PASS.defaultValue)
+    hiveSyncConfig.hiveSyncConfigParams.jdbcUrl = props.getString(HiveSyncConfig.HIVE_URL.key, HiveSyncConfig.HIVE_URL.defaultValue)
+    hiveSyncConfig.hiveSyncConfigParams.metastoreUris = props.getString(HiveSyncConfig.METASTORE_URIS.key, HiveSyncConfig.METASTORE_URIS.defaultValue)
+    hiveSyncConfig.hoodieSyncConfigParams.partitionFields = props.getStringList(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key, ",", new util.ArrayList[String])
+    hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass = props.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key, classOf[MultiPartKeysValueExtractor].getName)
+    if (props.containsKey(HiveSyncConfig.HIVE_SYNC_MODE.key)) hiveSyncConfig.hiveSyncConfigParams.syncMode = props.getString(HiveSyncConfig.HIVE_SYNC_MODE.key)
+    hiveSyncConfig.hiveSyncConfigParams.autoCreateDatabase = props.getString(HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key, HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.defaultValue).toBoolean
+    hiveSyncConfig.hiveSyncConfigParams.ignoreExceptions = props.getString(HiveSyncConfig.HIVE_IGNORE_EXCEPTIONS.key, HiveSyncConfig.HIVE_IGNORE_EXCEPTIONS.defaultValue).toBoolean
+    hiveSyncConfig.hiveSyncConfigParams.skipROSuffix = props.getString(HiveSyncConfig.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.key, HiveSyncConfig.HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE.defaultValue).toBoolean
+    hiveSyncConfig.hiveSyncConfigParams.supportTimestamp = props.getString(HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key, "true").toBoolean
+    hiveSyncConfig.hoodieSyncConfigParams.isConditionalSync = props.getString(HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC.key, HoodieSyncConfig.META_SYNC_CONDITIONAL_SYNC.defaultValue).toBoolean
+    hiveSyncConfig.hiveSyncConfigParams.bucketSpec = if (props.getBoolean(HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.key, HiveSyncConfig.HIVE_SYNC_BUCKET_SYNC.defaultValue)) HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key), props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key))
     else null
-    if (props.containsKey(HiveExternalCatalog.CREATED_SPARK_VERSION)) hiveSyncConfig.sparkVersion = props.getString(HiveExternalCatalog.CREATED_SPARK_VERSION)
-    hiveSyncConfig.syncComment = props.getString(DataSourceWriteOptions.HIVE_SYNC_COMMENT.key, DataSourceWriteOptions.HIVE_SYNC_COMMENT.defaultValue).toBoolean
+    if (props.containsKey(HiveExternalCatalog.CREATED_SPARK_VERSION)) hiveSyncConfig.hoodieSyncConfigParams.sparkVersion = props.getString(HiveExternalCatalog.CREATED_SPARK_VERSION)
+    hiveSyncConfig.hiveSyncConfigParams.syncComment = props.getString(DataSourceWriteOptions.HIVE_SYNC_COMMENT.key, DataSourceWriteOptions.HIVE_SYNC_COMMENT.defaultValue).toBoolean
     hiveSyncConfig
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 636599ce0c..296b359e90 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -467,13 +467,13 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
         KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
         SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName,
         HoodieSyncConfig.META_SYNC_ENABLED.key -> enableHive.toString,
+        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.hiveSyncConfigParams.syncMode,
         HiveSyncConfig.HIVE_SYNC_ENABLED.key -> enableHive.toString,
-        HiveSyncConfig.HIVE_SYNC_MODE.key -> hiveSyncConfig.syncMode,
         HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> targetTableDb,
         HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> targetTableName,
-        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.supportTimestamp.toString,
+        HiveSyncConfig.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.hiveSyncConfigParams.supportTimestamp.toString,
         HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp,
-        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.partitionValueExtractorClass,
+        HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass,
         HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"), // set the default parallelism to 200 for sql
         HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
         HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"),
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
index af5bbe7717..0a1f08d048 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
@@ -261,14 +261,14 @@ public class TestDataSourceUtils {
     HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, config.getBasePath(), PARQUET.name());
 
     if (useSyncMode) {
-      assertFalse(hiveSyncConfig.useJdbc);
-      assertEquals(HMS.name(), hiveSyncConfig.syncMode);
+      assertFalse(hiveSyncConfig.hiveSyncConfigParams.useJdbc);
+      assertEquals(HMS.name(), hiveSyncConfig.hiveSyncConfigParams.syncMode);
     } else {
-      assertTrue(hiveSyncConfig.useJdbc);
-      assertNull(hiveSyncConfig.syncMode);
+      assertTrue(hiveSyncConfig.hiveSyncConfigParams.useJdbc);
+      assertNull(hiveSyncConfig.hiveSyncConfigParams.syncMode);
     }
-    assertEquals(HIVE_DATABASE, hiveSyncConfig.databaseName);
-    assertEquals(HIVE_TABLE, hiveSyncConfig.tableName);
+    assertEquals(HIVE_DATABASE, hiveSyncConfig.hoodieSyncConfigParams.databaseName);
+    assertEquals(HIVE_TABLE, hiveSyncConfig.hoodieSyncConfigParams.tableName);
   }
 
   private void setAndVerifyHoodieWriteClientWith(final String partitionerClassName) {
diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AbstractAdbSyncHoodieClient.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AbstractAdbSyncHoodieClient.java
index 84316ddb11..b8dec82b7b 100644
--- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AbstractAdbSyncHoodieClient.java
+++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AbstractAdbSyncHoodieClient.java
@@ -24,26 +24,29 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hive.PartitionValueExtractor;
 import org.apache.hudi.hive.SchemaDifference;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hudi.sync.common.HoodieSyncClient;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.sync.common.operation.CatalogSync;
+import org.apache.hudi.sync.common.operation.PartitionsSync;
+import org.apache.hudi.sync.common.operation.TblPropertiesSync;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public abstract class AbstractAdbSyncHoodieClient extends AbstractSyncHoodieClient {
+public abstract class AbstractAdbSyncHoodieClient extends HoodieSyncClient implements PartitionsSync, CatalogSync, TblPropertiesSync {
   protected AdbSyncConfig adbSyncConfig;
   protected PartitionValueExtractor partitionValueExtractor;
   protected HoodieTimeline activeTimeline;
 
   public AbstractAdbSyncHoodieClient(AdbSyncConfig syncConfig, FileSystem fs) {
-    super(syncConfig.basePath, syncConfig.assumeDatePartitioning,
-        syncConfig.useFileListingFromMetadata, false, fs);
+    super(syncConfig.hoodieSyncConfigParams.basePath, syncConfig.hoodieSyncConfigParams.assumeDatePartitioning,
+            syncConfig.hoodieSyncConfigParams.useFileListingFromMetadata, false, fs);
     this.adbSyncConfig = syncConfig;
-    final String clazz = adbSyncConfig.partitionValueExtractorClass;
+    final String clazz = adbSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass;
     try {
       this.partitionValueExtractor = (PartitionValueExtractor) Class.forName(clazz).newInstance();
     } catch (Exception e) {
@@ -64,13 +67,13 @@ public abstract class AbstractAdbSyncHoodieClient extends AbstractSyncHoodieClie
     }
     List<PartitionEvent> events = new ArrayList<>();
     for (String storagePartition : partitionStoragePartitions) {
-      Path storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, storagePartition);
+      Path storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.hoodieSyncConfigParams.basePath, storagePartition);
       String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
       // Check if the partition values or if hdfs path is the same
       List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
-      if (adbSyncConfig.useHiveStylePartitioning) {
+      if (adbSyncConfig.adbSyncConfigParams.useHiveStylePartitioning) {
         String partition = String.join("/", storagePartitionValues);
-        storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, partition);
+        storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.hoodieSyncConfigParams.basePath, partition);
         fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
       }
       if (!storagePartitionValues.isEmpty()) {
@@ -100,13 +103,13 @@ public abstract class AbstractAdbSyncHoodieClient extends AbstractSyncHoodieClie
   public abstract void dropTable(String tableName);
 
   protected String getDatabasePath() {
-    String dbLocation = adbSyncConfig.dbLocation;
+    String dbLocation = adbSyncConfig.adbSyncConfigParams.dbLocation;
     Path dbLocationPath;
     if (StringUtils.isNullOrEmpty(dbLocation)) {
-      if (new Path(adbSyncConfig.basePath).isRoot()) {
-        dbLocationPath = new Path(adbSyncConfig.basePath);
+      if (new Path(adbSyncConfig.hoodieSyncConfigParams.basePath).isRoot()) {
+        dbLocationPath = new Path(adbSyncConfig.hoodieSyncConfigParams.basePath);
       } else {
-        dbLocationPath = new Path(adbSyncConfig.basePath).getParent();
+        dbLocationPath = new Path(adbSyncConfig.hoodieSyncConfigParams.basePath).getParent();
       }
     } else {
       dbLocationPath = new Path(dbLocation);
diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java
index ae2e7024e5..a9a1c847d5 100644
--- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java
+++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java
@@ -18,64 +18,22 @@
 
 package org.apache.hudi.sync.adb;
 
+import com.beust.jcommander.ParametersDelegate;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.sync.common.HoodieSyncConfig;
 
 import com.beust.jcommander.Parameter;
 
+import java.io.Serializable;
+
 /**
  * Configs needed to sync data into Alibaba Cloud AnalyticDB(ADB).
  */
 public class AdbSyncConfig extends HoodieSyncConfig {
 
-  @Parameter(names = {"--user"}, description = "Adb username", required = true)
-  public String adbUser;
-
-  @Parameter(names = {"--pass"}, description = "Adb password", required = true)
-  public String adbPass;
-
-  @Parameter(names = {"--jdbc-url"}, description = "Adb jdbc connect url", required = true)
-  public String jdbcUrl;
-
-  @Parameter(names = {"--skip-ro-suffix"}, description = "Whether skip the `_ro` suffix for read optimized table when syncing")
-  public Boolean skipROSuffix;
-
-  @Parameter(names = {"--skip-rt-sync"}, description = "Whether skip the rt table when syncing")
-  public Boolean skipRTSync;
-
-  @Parameter(names = {"--hive-style-partitioning"}, description = "Whether use hive style partitioning, true if like the following style: field1=value1/field2=value2")
-  public Boolean useHiveStylePartitioning;
-
-  @Parameter(names = {"--support-timestamp"}, description = "If true, converts int64(timestamp_micros) to timestamp type")
-  public Boolean supportTimestamp;
-
-  @Parameter(names = {"--spark-datasource"}, description = "Whether sync this table as spark data source table")
-  public Boolean syncAsSparkDataSourceTable;
-
-  @Parameter(names = {"--table-properties"}, description = "Table properties, to support read hoodie table as datasource table", required = true)
-  public String tableProperties;
-
-  @Parameter(names = {"--serde-properties"}, description = "Serde properties, to support read hoodie table as datasource table", required = true)
-  public String serdeProperties;
-
-  @Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore")
-  public int sparkSchemaLengthThreshold;
-
-  @Parameter(names = {"--db-location"}, description = "Database location")
-  public String dbLocation;
-
-  @Parameter(names = {"--auto-create-database"}, description = "Whether auto create adb database")
-  public Boolean autoCreateDatabase = true;
-
-  @Parameter(names = {"--skip-last-commit-time-sync"}, description = "Whether skip last commit time syncing")
-  public Boolean skipLastCommitTimeSync = false;
-
-  @Parameter(names = {"--drop-table-before-creation"}, description = "Whether drop table before creation")
-  public Boolean dropTableBeforeCreation = false;
-
-  @Parameter(names = {"--help", "-h"}, help = true)
-  public Boolean help = false;
+  public final AdbSyncConfigParams adbSyncConfigParams = new AdbSyncConfigParams();
 
   public static final ConfigProperty<String> ADB_SYNC_USER = ConfigProperty
       .key("hoodie.datasource.adb.sync.username")
@@ -159,48 +117,48 @@ public class AdbSyncConfig extends HoodieSyncConfig {
   public AdbSyncConfig(TypedProperties props) {
     super(props);
 
-    adbUser = getString(ADB_SYNC_USER);
-    adbPass = getString(ADB_SYNC_PASS);
-    jdbcUrl = getString(ADB_SYNC_JDBC_URL);
-    skipROSuffix = getBooleanOrDefault(ADB_SYNC_SKIP_RO_SUFFIX);
-    skipRTSync = getBooleanOrDefault(ADB_SYNC_SKIP_RT_SYNC);
-    useHiveStylePartitioning = getBooleanOrDefault(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING);
-    supportTimestamp = getBooleanOrDefault(ADB_SYNC_SUPPORT_TIMESTAMP);
-    syncAsSparkDataSourceTable = getBooleanOrDefault(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE);
-    tableProperties = getString(ADB_SYNC_TABLE_PROPERTIES);
-    serdeProperties = getString(ADB_SYNC_SERDE_PROPERTIES);
-    sparkSchemaLengthThreshold = getIntOrDefault(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD);
-    dbLocation = getString(ADB_SYNC_DB_LOCATION);
-    autoCreateDatabase = getBooleanOrDefault(ADB_SYNC_AUTO_CREATE_DATABASE);
-    skipLastCommitTimeSync = getBooleanOrDefault(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC);
-    dropTableBeforeCreation = getBooleanOrDefault(ADB_SYNC_DROP_TABLE_BEFORE_CREATION);
+    adbSyncConfigParams.hiveSyncConfigParams.hiveUser = getString(ADB_SYNC_USER);
+    adbSyncConfigParams.hiveSyncConfigParams.hivePass = getString(ADB_SYNC_PASS);
+    adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl = getString(ADB_SYNC_JDBC_URL);
+    adbSyncConfigParams.hiveSyncConfigParams.skipROSuffix = getBooleanOrDefault(ADB_SYNC_SKIP_RO_SUFFIX);
+    adbSyncConfigParams.skipRTSync = getBooleanOrDefault(ADB_SYNC_SKIP_RT_SYNC);
+    adbSyncConfigParams.useHiveStylePartitioning = getBooleanOrDefault(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING);
+    adbSyncConfigParams.supportTimestamp = getBooleanOrDefault(ADB_SYNC_SUPPORT_TIMESTAMP);
+    adbSyncConfigParams.syncAsSparkDataSourceTable = getBooleanOrDefault(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE);
+    adbSyncConfigParams.tableProperties = getString(ADB_SYNC_TABLE_PROPERTIES);
+    adbSyncConfigParams.serdeProperties = getString(ADB_SYNC_SERDE_PROPERTIES);
+    adbSyncConfigParams.sparkSchemaLengthThreshold = getIntOrDefault(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD);
+    adbSyncConfigParams.dbLocation = getString(ADB_SYNC_DB_LOCATION);
+    adbSyncConfigParams.autoCreateDatabase = getBooleanOrDefault(ADB_SYNC_AUTO_CREATE_DATABASE);
+    adbSyncConfigParams.skipLastCommitTimeSync = getBooleanOrDefault(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC);
+    adbSyncConfigParams.dropTableBeforeCreation = getBooleanOrDefault(ADB_SYNC_DROP_TABLE_BEFORE_CREATION);
   }
 
   public static TypedProperties toProps(AdbSyncConfig cfg) {
     TypedProperties properties = new TypedProperties();
-    properties.put(META_SYNC_DATABASE_NAME.key(), cfg.databaseName);
-    properties.put(META_SYNC_TABLE_NAME.key(), cfg.tableName);
-    properties.put(ADB_SYNC_USER.key(), cfg.adbUser);
-    properties.put(ADB_SYNC_PASS.key(), cfg.adbPass);
-    properties.put(ADB_SYNC_JDBC_URL.key(), cfg.jdbcUrl);
-    properties.put(META_SYNC_BASE_PATH.key(), cfg.basePath);
-    properties.put(META_SYNC_PARTITION_FIELDS.key(), String.join(",", cfg.partitionFields));
-    properties.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), cfg.partitionValueExtractorClass);
-    properties.put(META_SYNC_ASSUME_DATE_PARTITION.key(), String.valueOf(cfg.assumeDatePartitioning));
-    properties.put(ADB_SYNC_SKIP_RO_SUFFIX.key(), String.valueOf(cfg.skipROSuffix));
-    properties.put(ADB_SYNC_SKIP_RT_SYNC.key(), String.valueOf(cfg.skipRTSync));
-    properties.put(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING.key(), String.valueOf(cfg.useHiveStylePartitioning));
-    properties.put(META_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), String.valueOf(cfg.useFileListingFromMetadata));
-    properties.put(ADB_SYNC_SUPPORT_TIMESTAMP.key(), String.valueOf(cfg.supportTimestamp));
-    properties.put(ADB_SYNC_TABLE_PROPERTIES.key(), cfg.tableProperties);
-    properties.put(ADB_SYNC_SERDE_PROPERTIES.key(), cfg.serdeProperties);
-    properties.put(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE.key(), String.valueOf(cfg.syncAsSparkDataSourceTable));
-    properties.put(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key(), String.valueOf(cfg.sparkSchemaLengthThreshold));
-    properties.put(META_SYNC_SPARK_VERSION.key(), cfg.sparkVersion);
-    properties.put(ADB_SYNC_DB_LOCATION.key(), cfg.dbLocation);
-    properties.put(ADB_SYNC_AUTO_CREATE_DATABASE.key(), String.valueOf(cfg.autoCreateDatabase));
-    properties.put(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC.key(), String.valueOf(cfg.skipLastCommitTimeSync));
-    properties.put(ADB_SYNC_DROP_TABLE_BEFORE_CREATION.key(), String.valueOf(cfg.dropTableBeforeCreation));
+    properties.put(META_SYNC_DATABASE_NAME.key(), cfg.hoodieSyncConfigParams.databaseName);
+    properties.put(META_SYNC_TABLE_NAME.key(), cfg.hoodieSyncConfigParams.tableName);
+    properties.put(ADB_SYNC_USER.key(), cfg.adbSyncConfigParams.hiveSyncConfigParams.hiveUser);
+    properties.put(ADB_SYNC_PASS.key(), cfg.adbSyncConfigParams.hiveSyncConfigParams.hivePass);
+    properties.put(ADB_SYNC_JDBC_URL.key(), cfg.adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl);
+    properties.put(META_SYNC_BASE_PATH.key(), cfg.hoodieSyncConfigParams.basePath);
+    properties.put(META_SYNC_PARTITION_FIELDS.key(), String.join(",", cfg.hoodieSyncConfigParams.partitionFields));
+    properties.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), cfg.hoodieSyncConfigParams.partitionValueExtractorClass);
+    properties.put(META_SYNC_ASSUME_DATE_PARTITION.key(), String.valueOf(cfg.hoodieSyncConfigParams.assumeDatePartitioning));
+    properties.put(ADB_SYNC_SKIP_RO_SUFFIX.key(), String.valueOf(cfg.adbSyncConfigParams.hiveSyncConfigParams.skipROSuffix));
+    properties.put(ADB_SYNC_SKIP_RT_SYNC.key(), String.valueOf(cfg.adbSyncConfigParams.skipRTSync));
+    properties.put(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING.key(), String.valueOf(cfg.adbSyncConfigParams.useHiveStylePartitioning));
+    properties.put(META_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), String.valueOf(cfg.hoodieSyncConfigParams.useFileListingFromMetadata));
+    properties.put(ADB_SYNC_SUPPORT_TIMESTAMP.key(), String.valueOf(cfg.adbSyncConfigParams.supportTimestamp));
+    properties.put(ADB_SYNC_TABLE_PROPERTIES.key(), cfg.adbSyncConfigParams.tableProperties);
+    properties.put(ADB_SYNC_SERDE_PROPERTIES.key(), cfg.adbSyncConfigParams.serdeProperties);
+    properties.put(ADB_SYNC_SYNC_AS_SPARK_DATA_SOURCE_TABLE.key(), String.valueOf(cfg.adbSyncConfigParams.syncAsSparkDataSourceTable));
+    properties.put(ADB_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key(), String.valueOf(cfg.adbSyncConfigParams.sparkSchemaLengthThreshold));
+    properties.put(META_SYNC_SPARK_VERSION.key(), cfg.hoodieSyncConfigParams.sparkVersion);
+    properties.put(ADB_SYNC_DB_LOCATION.key(), cfg.adbSyncConfigParams.dbLocation);
+    properties.put(ADB_SYNC_AUTO_CREATE_DATABASE.key(), String.valueOf(cfg.adbSyncConfigParams.autoCreateDatabase));
+    properties.put(ADB_SYNC_SKIP_LAST_COMMIT_TIME_SYNC.key(), String.valueOf(cfg.adbSyncConfigParams.skipLastCommitTimeSync));
+    properties.put(ADB_SYNC_DROP_TABLE_BEFORE_CREATION.key(), String.valueOf(cfg.adbSyncConfigParams.dropTableBeforeCreation));
 
     return properties;
   }
@@ -208,33 +166,66 @@ public class AdbSyncConfig extends HoodieSyncConfig {
   @Override
   public String toString() {
     return "AdbSyncConfig{"
-        + "adbUser='" + adbUser + '\''
-        + ", adbPass='" + adbPass + '\''
-        + ", jdbcUrl='" + jdbcUrl + '\''
-        + ", skipROSuffix=" + skipROSuffix
-        + ", skipRTSync=" + skipRTSync
-        + ", useHiveStylePartitioning=" + useHiveStylePartitioning
-        + ", supportTimestamp=" + supportTimestamp
-        + ", syncAsSparkDataSourceTable=" + syncAsSparkDataSourceTable
-        + ", tableProperties='" + tableProperties + '\''
-        + ", serdeProperties='" + serdeProperties + '\''
-        + ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold
-        + ", dbLocation='" + dbLocation + '\''
-        + ", autoCreateDatabase=" + autoCreateDatabase
-        + ", skipLastCommitTimeSync=" + skipLastCommitTimeSync
-        + ", dropTableBeforeCreation=" + dropTableBeforeCreation
-        + ", help=" + help
-        + ", databaseName='" + databaseName + '\''
-        + ", tableName='" + tableName + '\''
-        + ", basePath='" + basePath + '\''
-        + ", baseFileFormat='" + baseFileFormat + '\''
-        + ", partitionFields=" + partitionFields
-        + ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\''
-        + ", assumeDatePartitioning=" + assumeDatePartitioning
-        + ", decodePartition=" + decodePartition
-        + ", useFileListingFromMetadata=" + useFileListingFromMetadata
-        + ", isConditionalSync=" + isConditionalSync
-        + ", sparkVersion='" + sparkVersion + '\''
+        + "adbUser='" + adbSyncConfigParams.hiveSyncConfigParams.hiveUser + '\''
+        + ", adbPass='" + adbSyncConfigParams.hiveSyncConfigParams.hivePass + '\''
+        + ", jdbcUrl='" + adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl + '\''
+        + ", skipROSuffix=" + adbSyncConfigParams.hiveSyncConfigParams.skipROSuffix
+        + ", skipRTSync=" + adbSyncConfigParams.skipRTSync
+        + ", useHiveStylePartitioning=" + adbSyncConfigParams.useHiveStylePartitioning
+        + ", supportTimestamp=" + adbSyncConfigParams.supportTimestamp
+        + ", syncAsSparkDataSourceTable=" + adbSyncConfigParams.syncAsSparkDataSourceTable
+        + ", tableProperties='" + adbSyncConfigParams.tableProperties + '\''
+        + ", serdeProperties='" + adbSyncConfigParams.serdeProperties + '\''
+        + ", sparkSchemaLengthThreshold=" + adbSyncConfigParams.sparkSchemaLengthThreshold
+        + ", dbLocation='" + adbSyncConfigParams.dbLocation + '\''
+        + ", autoCreateDatabase=" + adbSyncConfigParams.autoCreateDatabase
+        + ", skipLastCommitTimeSync=" + adbSyncConfigParams.skipLastCommitTimeSync
+        + ", dropTableBeforeCreation=" + adbSyncConfigParams.dropTableBeforeCreation
+        + ", help=" + adbSyncConfigParams.help
+        + ", databaseName='" + hoodieSyncConfigParams.databaseName + '\''
+        + ", tableName='" + hoodieSyncConfigParams.tableName + '\''
+        + ", basePath='" + hoodieSyncConfigParams.basePath + '\''
+        + ", baseFileFormat='" + hoodieSyncConfigParams.baseFileFormat + '\''
+        + ", partitionFields=" + hoodieSyncConfigParams.partitionFields
+        + ", partitionValueExtractorClass='" + hoodieSyncConfigParams.partitionValueExtractorClass + '\''
+        + ", assumeDatePartitioning=" + hoodieSyncConfigParams.assumeDatePartitioning
+        + ", decodePartition=" + hoodieSyncConfigParams.decodePartition
+        + ", useFileListingFromMetadata=" + hoodieSyncConfigParams.useFileListingFromMetadata
+        + ", isConditionalSync=" + hoodieSyncConfigParams.isConditionalSync
+        + ", sparkVersion='" + hoodieSyncConfigParams.sparkVersion + '\''
         + '}';
   }
+
+  public static class AdbSyncConfigParams implements Serializable {
+    @ParametersDelegate()
+    public HiveSyncConfig.HiveSyncConfigParams hiveSyncConfigParams = new HiveSyncConfig.HiveSyncConfigParams();
+
+    @Parameter(names = {"--support-timestamp"}, description = "If true, converts int64(timestamp_micros) to timestamp type")
+    public Boolean supportTimestamp;
+    @Parameter(names = {"--spark-datasource"}, description = "Whether sync this table as spark data source table")
+    public Boolean syncAsSparkDataSourceTable;
+    @Parameter(names = {"--table-properties"}, description = "Table properties, to support read hoodie table as datasource table", required = true)
+    public String tableProperties;
+    @Parameter(names = {"--serde-properties"}, description = "Serde properties, to support read hoodie table as datasource table", required = true)
+    public String serdeProperties;
+    @Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore")
+    public int sparkSchemaLengthThreshold;
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public Boolean help = false;
+    @Parameter(names = {"--hive-style-partitioning"}, description = "Whether use hive style partitioning, true if like the following style: field1=value1/field2=value2")
+    public Boolean useHiveStylePartitioning;
+    @Parameter(names = {"--skip-rt-sync"}, description = "Whether skip the rt table when syncing")
+    public Boolean skipRTSync;
+    @Parameter(names = {"--db-location"}, description = "Database location")
+    public String dbLocation;
+    @Parameter(names = {"--auto-create-database"}, description = "Whether auto create adb database")
+    public Boolean autoCreateDatabase = true;
+    @Parameter(names = {"--skip-last-commit-time-sync"}, description = "Whether skip last commit time syncing")
+    public Boolean skipLastCommitTimeSync = false;
+    @Parameter(names = {"--drop-table-before-creation"}, description = "Whether drop table before creation")
+    public Boolean dropTableBeforeCreation = false;
+
+    public AdbSyncConfigParams() {
+    }
+  }
 }
diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
index 8c2f9e2045..fb9911ace5 100644
--- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
+++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncTool.java
@@ -26,9 +26,9 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.hive.SchemaDifference;
 import org.apache.hudi.hive.util.HiveSchemaUtil;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
-import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.hudi.sync.common.HoodieSyncClient.PartitionEvent;
+import org.apache.hudi.sync.common.HoodieSyncClient.PartitionEvent.PartitionEventType;
+import org.apache.hudi.sync.common.HoodieSyncTool;
 import org.apache.hudi.sync.common.util.ConfigUtils;
 
 import com.beust.jcommander.JCommander;
@@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils;
 import org.apache.parquet.schema.MessageType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,7 +56,7 @@ import java.util.stream.Collectors;
  * incremental partitions will be synced as well.
  */
 @SuppressWarnings("WeakerAccess")
-public class AdbSyncTool extends AbstractSyncTool {
+public class AdbSyncTool extends HoodieSyncTool {
   private static final Logger LOG = LoggerFactory.getLogger(AdbSyncTool.class);
 
   public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
@@ -72,13 +73,13 @@ public class AdbSyncTool extends AbstractSyncTool {
     this.hoodieAdbClient = getHoodieAdbClient(adbSyncConfig, fs);
     switch (hoodieAdbClient.getTableType()) {
       case COPY_ON_WRITE:
-        this.snapshotTableName = adbSyncConfig.tableName;
+        this.snapshotTableName = adbSyncConfig.hoodieSyncConfigParams.tableName;
         this.roTableTableName = Option.empty();
         break;
       case MERGE_ON_READ:
-        this.snapshotTableName = adbSyncConfig.tableName + SUFFIX_SNAPSHOT_TABLE;
-        this.roTableTableName = adbSyncConfig.skipROSuffix ? Option.of(adbSyncConfig.tableName)
-            : Option.of(adbSyncConfig.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
+        this.snapshotTableName = adbSyncConfig.hoodieSyncConfigParams.tableName + SUFFIX_SNAPSHOT_TABLE;
+        this.roTableTableName = adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.skipROSuffix ? Option.of(adbSyncConfig.hoodieSyncConfigParams.tableName)
+            : Option.of(adbSyncConfig.hoodieSyncConfigParams.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
         break;
       default:
         throw new HoodieAdbSyncException("Unknown table type:" + hoodieAdbClient.getTableType()
@@ -101,7 +102,7 @@ public class AdbSyncTool extends AbstractSyncTool {
           // Sync a ro table for MOR table
           syncHoodieTable(roTableTableName.get(), false, true);
           // Sync a rt table for MOR table
-          if (!adbSyncConfig.skipRTSync) {
+          if (!adbSyncConfig.adbSyncConfigParams.skipRTSync) {
             syncHoodieTable(snapshotTableName, true, false);
           }
           break;
@@ -110,7 +111,7 @@ public class AdbSyncTool extends AbstractSyncTool {
               + ", basePath:" + hoodieAdbClient.getBasePath());
       }
     } catch (Exception re) {
-      throw new HoodieAdbSyncException("Sync hoodie table to ADB failed, tableName:" + adbSyncConfig.tableName, re);
+      throw new HoodieAdbSyncException("Sync hoodie table to ADB failed, tableName:" + adbSyncConfig.hoodieSyncConfigParams.tableName, re);
     } finally {
       hoodieAdbClient.close();
     }
@@ -121,19 +122,19 @@ public class AdbSyncTool extends AbstractSyncTool {
     LOG.info("Try to sync hoodie table, tableName:{}, path:{}, tableType:{}",
         tableName, hoodieAdbClient.getBasePath(), hoodieAdbClient.getTableType());
 
-    if (adbSyncConfig.autoCreateDatabase) {
+    if (adbSyncConfig.adbSyncConfigParams.autoCreateDatabase) {
       try {
         synchronized (AdbSyncTool.class) {
-          if (!hoodieAdbClient.databaseExists(adbSyncConfig.databaseName)) {
-            hoodieAdbClient.createDatabase(adbSyncConfig.databaseName);
+          if (!hoodieAdbClient.databaseExists(adbSyncConfig.hoodieSyncConfigParams.databaseName)) {
+            hoodieAdbClient.createDatabase(adbSyncConfig.hoodieSyncConfigParams.databaseName);
           }
         }
       } catch (Exception e) {
-        throw new HoodieAdbSyncException("Failed to create database:" + adbSyncConfig.databaseName
+        throw new HoodieAdbSyncException("Failed to create database:" + adbSyncConfig.hoodieSyncConfigParams.databaseName
             + ", useRealtimeInputFormat = " + useRealtimeInputFormat, e);
       }
-    } else if (!hoodieAdbClient.databaseExists(adbSyncConfig.databaseName)) {
-      throw new HoodieAdbSyncException("ADB database does not exists:" + adbSyncConfig.databaseName);
+    } else if (!hoodieAdbClient.databaseExists(adbSyncConfig.hoodieSyncConfigParams.databaseName)) {
+      throw new HoodieAdbSyncException("ADB database does not exists:" + adbSyncConfig.hoodieSyncConfigParams.databaseName);
     }
 
     // Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table,
@@ -144,11 +145,11 @@ public class AdbSyncTool extends AbstractSyncTool {
     if (hoodieAdbClient.isBootstrap()
         && hoodieAdbClient.getTableType() == HoodieTableType.MERGE_ON_READ
         && !readAsOptimized) {
-      adbSyncConfig.syncAsSparkDataSourceTable = false;
+      adbSyncConfig.adbSyncConfigParams.syncAsSparkDataSourceTable = false;
       LOG.info("Disable sync as spark datasource table for mor rt table:{}", tableName);
     }
 
-    if (adbSyncConfig.dropTableBeforeCreation) {
+    if (adbSyncConfig.adbSyncConfigParams.dropTableBeforeCreation) {
       LOG.info("Drop table before creation, tableName:{}", tableName);
       hoodieAdbClient.dropTable(tableName);
     }
@@ -171,7 +172,7 @@ public class AdbSyncTool extends AbstractSyncTool {
 
     // Scan synced partitions
     List<String> writtenPartitionsSince;
-    if (adbSyncConfig.partitionFields.isEmpty()) {
+    if (adbSyncConfig.hoodieSyncConfigParams.partitionFields.isEmpty()) {
       writtenPartitionsSince = new ArrayList<>();
     } else {
       writtenPartitionsSince = hoodieAdbClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
@@ -183,7 +184,7 @@ public class AdbSyncTool extends AbstractSyncTool {
 
     // Update sync commit time
     // whether to skip syncing commit time stored in tbl properties, since it is time consuming.
-    if (!adbSyncConfig.skipLastCommitTimeSync) {
+    if (!adbSyncConfig.adbSyncConfigParams.skipLastCommitTimeSync) {
       hoodieAdbClient.updateLastCommitTimeSynced(tableName);
     }
     LOG.info("Sync complete for table:{}", tableName);
@@ -202,12 +203,12 @@ public class AdbSyncTool extends AbstractSyncTool {
   private void syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat,
                           boolean readAsOptimized, MessageType schema) throws Exception {
     // Append spark table properties & serde properties
-    Map<String, String> tableProperties = ConfigUtils.toMap(adbSyncConfig.tableProperties);
-    Map<String, String> serdeProperties = ConfigUtils.toMap(adbSyncConfig.serdeProperties);
-    if (adbSyncConfig.syncAsSparkDataSourceTable) {
-      Map<String, String> sparkTableProperties = getSparkTableProperties(adbSyncConfig.partitionFields,
-          adbSyncConfig.sparkVersion, adbSyncConfig.sparkSchemaLengthThreshold, schema);
-      Map<String, String> sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized, adbSyncConfig.basePath);
+    Map<String, String> tableProperties = ConfigUtils.toMap(adbSyncConfig.adbSyncConfigParams.tableProperties);
+    Map<String, String> serdeProperties = ConfigUtils.toMap(adbSyncConfig.adbSyncConfigParams.serdeProperties);
+    if (adbSyncConfig.adbSyncConfigParams.syncAsSparkDataSourceTable) {
+      Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(adbSyncConfig.hoodieSyncConfigParams.partitionFields,
+              adbSyncConfig.hoodieSyncConfigParams.sparkVersion, adbSyncConfig.adbSyncConfigParams.sparkSchemaLengthThreshold, schema);
+      Map<String, String> sparkSerdeProperties = SparkDataSourceTableUtils.getSparkSerdeProperties(readAsOptimized, adbSyncConfig.hoodieSyncConfigParams.basePath);
       tableProperties.putAll(sparkTableProperties);
       serdeProperties.putAll(sparkSerdeProperties);
       LOG.info("Sync as spark datasource table, tableName:{}, tableExists:{}, tableProperties:{}, sederProperties:{}",
@@ -227,8 +228,8 @@ public class AdbSyncTool extends AbstractSyncTool {
     } else {
       // Check if the table schema has evolved
       Map<String, String> tableSchema = hoodieAdbClient.getTableSchema(tableName);
-      SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, adbSyncConfig.partitionFields,
-          adbSyncConfig.supportTimestamp);
+      SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, adbSyncConfig.hoodieSyncConfigParams.partitionFields,
+              adbSyncConfig.adbSyncConfigParams.supportTimestamp);
       if (!schemaDiff.isEmpty()) {
         LOG.info("Schema difference found for table:{}", tableName);
         hoodieAdbClient.updateTableDefinition(tableName, schemaDiff);
@@ -244,7 +245,7 @@ public class AdbSyncTool extends AbstractSyncTool {
    */
   private void syncPartitions(String tableName, List<String> writtenPartitionsSince) {
     try {
-      if (adbSyncConfig.partitionFields.isEmpty()) {
+      if (adbSyncConfig.hoodieSyncConfigParams.partitionFields.isEmpty()) {
         LOG.info("Not a partitioned table.");
         return;
       }
@@ -271,13 +272,13 @@ public class AdbSyncTool extends AbstractSyncTool {
     // parse the params
     final AdbSyncConfig cfg = new AdbSyncConfig();
     JCommander cmd = new JCommander(cfg, null, args);
-    if (cfg.help || args.length == 0) {
+    if (cfg.adbSyncConfigParams.help || args.length == 0) {
       cmd.usage();
       System.exit(1);
     }
 
     Configuration hadoopConf = new Configuration();
-    FileSystem fs = FSUtils.getFs(cfg.basePath, hadoopConf);
+    FileSystem fs = FSUtils.getFs(cfg.hoodieSyncConfigParams.basePath, hadoopConf);
     new AdbSyncTool(AdbSyncConfig.toProps(cfg), hadoopConf, fs).syncHoodieTable();
   }
 }
diff --git a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java
index a347ba7011..81bd34ffa3 100644
--- a/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java
+++ b/hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java
@@ -69,7 +69,7 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
   public HoodieAdbJdbcClient(AdbSyncConfig syncConfig, FileSystem fs) {
     super(syncConfig, fs);
     createAdbConnection();
-    LOG.info("Init adb jdbc client success, jdbcUrl:{}", syncConfig.jdbcUrl);
+    LOG.info("Init adb jdbc client success, jdbcUrl:{}", syncConfig.adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl);
   }
 
   private void createAdbConnection() {
@@ -82,7 +82,8 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
       }
       try {
         this.connection = DriverManager.getConnection(
-            adbSyncConfig.jdbcUrl, adbSyncConfig.adbUser, adbSyncConfig.adbPass);
+                adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl,
+            adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.hiveUser, adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.hivePass);
       } catch (SQLException e) {
         throw new HoodieException("Cannot create adb connection ", e);
       }
@@ -106,7 +107,7 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
   @Override
   public void dropTable(String tableName) {
     LOG.info("Dropping table:{}", tableName);
-    String dropTable = "drop table if exists `" + adbSyncConfig.databaseName + "`.`" + tableName + "`";
+    String dropTable = "drop table if exists `" + adbSyncConfig.hoodieSyncConfigParams.databaseName + "`.`" + tableName + "`";
     executeAdbSql(dropTable);
   }
 
@@ -115,8 +116,8 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
     ResultSet result = null;
     try {
       DatabaseMetaData databaseMetaData = connection.getMetaData();
-      result = databaseMetaData.getColumns(adbSyncConfig.databaseName,
-          adbSyncConfig.databaseName, tableName, null);
+      result = databaseMetaData.getColumns(adbSyncConfig.hoodieSyncConfigParams.databaseName,
+              adbSyncConfig.hoodieSyncConfigParams.databaseName, tableName, null);
       while (result.next()) {
         String columnName = result.getString(4);
         String columnType = result.getString(6);
@@ -261,18 +262,8 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
   }
 
   @Override
-  public Option<String> getLastReplicatedTime(String tableName) {
-    throw new UnsupportedOperationException("Not support getLastReplicatedTime yet");
-  }
-
-  @Override
-  public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
-    throw new UnsupportedOperationException("Not support updateLastReplicatedTimeStamp yet");
-  }
-
-  @Override
-  public void deleteLastReplicatedTimeStamp(String tableName) {
-    throw new UnsupportedOperationException("Not support deleteLastReplicatedTimeStamp yet");
+  public void updateTableProperties(String tableName, Map<String, String> tableProperties) {
+    throw new UnsupportedOperationException("Not support updateTableProperties yet");
   }
 
   @Override
@@ -304,7 +295,7 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
             String str = resultSet.getString(1);
             if (!StringUtils.isNullOrEmpty(str)) {
               List<String> values = partitionValueExtractor.extractPartitionValuesInPath(str);
-              Path storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, String.join("/", values));
+              Path storagePartitionPath = FSUtils.getPartitionPath(adbSyncConfig.hoodieSyncConfigParams.basePath, String.join("/", values));
               String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
               partitions.put(values, fullStoragePartitionPath);
             }
@@ -332,11 +323,11 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
 
   private String constructAddPartitionsSql(String tableName, List<String> partitions) {
     StringBuilder sqlBuilder = new StringBuilder("alter table `");
-    sqlBuilder.append(adbSyncConfig.databaseName).append("`").append(".`")
+    sqlBuilder.append(adbSyncConfig.hoodieSyncConfigParams.databaseName).append("`").append(".`")
         .append(tableName).append("`").append(" add if not exists ");
     for (String partition : partitions) {
       String partitionClause = getPartitionClause(partition);
-      Path partitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, partition);
+      Path partitionPath = FSUtils.getPartitionPath(adbSyncConfig.hoodieSyncConfigParams.basePath, partition);
       String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
       sqlBuilder.append("  partition (").append(partitionClause).append(") location '")
           .append(fullPartitionPathStr).append("' ");
@@ -347,13 +338,13 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
 
   private List<String> constructChangePartitionsSql(String tableName, List<String> partitions) {
     List<String> changePartitions = new ArrayList<>();
-    String useDatabase = "use `" + adbSyncConfig.databaseName + "`";
+    String useDatabase = "use `" + adbSyncConfig.hoodieSyncConfigParams.databaseName + "`";
     changePartitions.add(useDatabase);
 
     String alterTable = "alter table `" + tableName + "`";
     for (String partition : partitions) {
       String partitionClause = getPartitionClause(partition);
-      Path partitionPath = FSUtils.getPartitionPath(adbSyncConfig.basePath, partition);
+      Path partitionPath = FSUtils.getPartitionPath(adbSyncConfig.hoodieSyncConfigParams.basePath, partition);
       String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
       String changePartition = alterTable + " add if not exists partition (" + partitionClause
           + ") location '" + fullPartitionPathStr + "'";
@@ -371,32 +362,32 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
    */
   private String getPartitionClause(String partition) {
     List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
-    ValidationUtils.checkArgument(adbSyncConfig.partitionFields.size() == partitionValues.size(),
-        "Partition key parts " + adbSyncConfig.partitionFields
+    ValidationUtils.checkArgument(adbSyncConfig.hoodieSyncConfigParams.partitionFields.size() == partitionValues.size(),
+        "Partition key parts " + adbSyncConfig.hoodieSyncConfigParams.partitionFields
             + " does not match with partition values " + partitionValues + ". Check partition strategy. ");
     List<String> partBuilder = new ArrayList<>();
-    for (int i = 0; i < adbSyncConfig.partitionFields.size(); i++) {
-      partBuilder.add(adbSyncConfig.partitionFields.get(i) + "='" + partitionValues.get(i) + "'");
+    for (int i = 0; i < adbSyncConfig.hoodieSyncConfigParams.partitionFields.size(); i++) {
+      partBuilder.add(adbSyncConfig.hoodieSyncConfigParams.partitionFields.get(i) + "='" + partitionValues.get(i) + "'");
     }
 
     return String.join(",", partBuilder);
   }
 
   private String constructShowPartitionSql(String tableName) {
-    return String.format("show partitions `%s`.`%s`", adbSyncConfig.databaseName, tableName);
+    return String.format("show partitions `%s`.`%s`", adbSyncConfig.hoodieSyncConfigParams.databaseName, tableName);
   }
 
   private String constructShowCreateTableSql(String tableName) {
-    return String.format("show create table `%s`.`%s`", adbSyncConfig.databaseName, tableName);
+    return String.format("show create table `%s`.`%s`", adbSyncConfig.hoodieSyncConfigParams.databaseName, tableName);
   }
 
   private String constructShowLikeTableSql(String tableName) {
-    return String.format("show tables from `%s` like '%s'", adbSyncConfig.databaseName, tableName);
+    return String.format("show tables from `%s` like '%s'", adbSyncConfig.hoodieSyncConfigParams.databaseName, tableName);
   }
 
   private String constructCreateDatabaseSql(String rootPath) {
     return String.format("create database if not exists `%s` with dbproperties(catalog = 'oss', location = '%s')",
-        adbSyncConfig.databaseName, rootPath);
+            adbSyncConfig.hoodieSyncConfigParams.databaseName, rootPath);
   }
 
   private String constructShowCreateDatabaseSql(String databaseName) {
@@ -405,25 +396,25 @@ public class HoodieAdbJdbcClient extends AbstractAdbSyncHoodieClient {
 
   private String constructUpdateTblPropertiesSql(String tableName, String lastCommitSynced) {
     return String.format("alter table `%s`.`%s` set tblproperties('%s' = '%s')",
-        adbSyncConfig.databaseName, tableName, HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced);
+            adbSyncConfig.hoodieSyncConfigParams.databaseName, tableName, HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced);
   }
 
   private String constructAddColumnSql(String tableName, String columnName, String columnType) {
     return String.format("alter table `%s`.`%s` add columns(`%s` %s)",
-        adbSyncConfig.databaseName, tableName, columnName, columnType);
+            adbSyncConfig.hoodieSyncConfigParams.databaseName, tableName, columnName, columnType);
   }
 
   private String constructChangeColumnSql(String tableName, String columnName, String columnType) {
     return String.format("alter table `%s`.`%s` change `%s` `%s` %s",
-        adbSyncConfig.databaseName, tableName, columnName, columnName, columnType);
+            adbSyncConfig.hoodieSyncConfigParams.databaseName, tableName, columnName, columnName, columnType);
   }
 
   private HiveSyncConfig getHiveSyncConfig() {
     HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
-    hiveSyncConfig.partitionFields = adbSyncConfig.partitionFields;
-    hiveSyncConfig.databaseName = adbSyncConfig.databaseName;
-    Path basePath = new Path(adbSyncConfig.basePath);
-    hiveSyncConfig.basePath = generateAbsolutePathStr(basePath);
+    hiveSyncConfig.hoodieSyncConfigParams.partitionFields = adbSyncConfig.hoodieSyncConfigParams.partitionFields;
+    hiveSyncConfig.hoodieSyncConfigParams.databaseName = adbSyncConfig.hoodieSyncConfigParams.databaseName;
+    Path basePath = new Path(adbSyncConfig.hoodieSyncConfigParams.basePath);
+    hiveSyncConfig.hoodieSyncConfigParams.basePath = generateAbsolutePathStr(basePath);
     return hiveSyncConfig;
   }
 
diff --git a/hudi-sync/hudi-adb-sync/src/test/java/org/apache/hudi/sync/adb/TestAdbSyncConfig.java b/hudi-sync/hudi-adb-sync/src/test/java/org/apache/hudi/sync/adb/TestAdbSyncConfig.java
index f4eb8fc7fc..0a3d937496 100644
--- a/hudi-sync/hudi-adb-sync/src/test/java/org/apache/hudi/sync/adb/TestAdbSyncConfig.java
+++ b/hudi-sync/hudi-adb-sync/src/test/java/org/apache/hudi/sync/adb/TestAdbSyncConfig.java
@@ -30,36 +30,36 @@ public class TestAdbSyncConfig {
   @Test
   public void testCopy() {
     AdbSyncConfig adbSyncConfig = new AdbSyncConfig();
-    adbSyncConfig.partitionFields = Arrays.asList("a", "b");
-    adbSyncConfig.basePath = "/tmp";
-    adbSyncConfig.assumeDatePartitioning = true;
-    adbSyncConfig.databaseName = "test";
-    adbSyncConfig.tableName = "test";
-    adbSyncConfig.adbUser = "adb";
-    adbSyncConfig.adbPass = "adb";
-    adbSyncConfig.jdbcUrl = "jdbc:mysql://localhost:3306";
-    adbSyncConfig.skipROSuffix = false;
-    adbSyncConfig.tableProperties = "spark.sql.sources.provider= 'hudi'\\n"
-        + "spark.sql.sources.schema.numParts = '1'\\n "
-        + "spark.sql.sources.schema.part.0 ='xx'\\n "
-        + "spark.sql.sources.schema.numPartCols = '1'\\n"
-        + "spark.sql.sources.schema.partCol.0 = 'dt'";
-    adbSyncConfig.serdeProperties = "'path'='/tmp/test_db/tbl'";
-    adbSyncConfig.dbLocation = "file://tmp/test_db";
+    adbSyncConfig.hoodieSyncConfigParams.partitionFields = Arrays.asList("a", "b");
+    adbSyncConfig.hoodieSyncConfigParams.basePath = "/tmp";
+    adbSyncConfig.hoodieSyncConfigParams.assumeDatePartitioning = true;
+    adbSyncConfig.hoodieSyncConfigParams.databaseName = "test";
+    adbSyncConfig.hoodieSyncConfigParams.tableName = "test";
+    adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.hiveUser = "adb";
+    adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.hivePass = "adb";
+    adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl = "jdbc:mysql://localhost:3306";
+    adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.skipROSuffix = false;
+    adbSyncConfig.adbSyncConfigParams.tableProperties = "spark.sql.sources.provider= 'hudi'\\n"
+            + "spark.sql.sources.schema.numParts = '1'\\n "
+            + "spark.sql.sources.schema.part.0 ='xx'\\n "
+            + "spark.sql.sources.schema.numPartCols = '1'\\n"
+            + "spark.sql.sources.schema.partCol.0 = 'dt'";
+    adbSyncConfig.adbSyncConfigParams.serdeProperties = "'path'='/tmp/test_db/tbl'";
+    adbSyncConfig.adbSyncConfigParams.dbLocation = "file://tmp/test_db";
 
     TypedProperties props = AdbSyncConfig.toProps(adbSyncConfig);
     AdbSyncConfig copied = new AdbSyncConfig(props);
 
-    assertEquals(copied.partitionFields, adbSyncConfig.partitionFields);
-    assertEquals(copied.basePath, adbSyncConfig.basePath);
-    assertEquals(copied.assumeDatePartitioning, adbSyncConfig.assumeDatePartitioning);
-    assertEquals(copied.databaseName, adbSyncConfig.databaseName);
-    assertEquals(copied.tableName, adbSyncConfig.tableName);
-    assertEquals(copied.adbUser, adbSyncConfig.adbUser);
-    assertEquals(copied.adbPass, adbSyncConfig.adbPass);
-    assertEquals(copied.basePath, adbSyncConfig.basePath);
-    assertEquals(copied.jdbcUrl, adbSyncConfig.jdbcUrl);
-    assertEquals(copied.skipROSuffix, adbSyncConfig.skipROSuffix);
-    assertEquals(copied.supportTimestamp, adbSyncConfig.supportTimestamp);
+    assertEquals(copied.hoodieSyncConfigParams.partitionFields, adbSyncConfig.hoodieSyncConfigParams.partitionFields);
+    assertEquals(copied.hoodieSyncConfigParams.basePath, adbSyncConfig.hoodieSyncConfigParams.basePath);
+    assertEquals(copied.hoodieSyncConfigParams.assumeDatePartitioning, adbSyncConfig.hoodieSyncConfigParams.assumeDatePartitioning);
+    assertEquals(copied.hoodieSyncConfigParams.databaseName, adbSyncConfig.hoodieSyncConfigParams.databaseName);
+    assertEquals(copied.hoodieSyncConfigParams.tableName, adbSyncConfig.hoodieSyncConfigParams.tableName);
+    assertEquals(copied.adbSyncConfigParams.hiveSyncConfigParams.hiveUser, adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.hiveUser);
+    assertEquals(copied.adbSyncConfigParams.hiveSyncConfigParams.hivePass, adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.hivePass);
+    assertEquals(copied.hoodieSyncConfigParams.basePath, adbSyncConfig.hoodieSyncConfigParams.basePath);
+    assertEquals(copied.adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl, adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.jdbcUrl);
+    assertEquals(copied.adbSyncConfigParams.hiveSyncConfigParams.skipROSuffix, adbSyncConfig.adbSyncConfigParams.hiveSyncConfigParams.skipROSuffix);
+    assertEquals(copied.adbSyncConfigParams.supportTimestamp, adbSyncConfig.adbSyncConfigParams.supportTimestamp);
   }
 }
diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
index 68569822cc..e784d591d6 100644
--- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
+++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
@@ -23,8 +23,9 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hudi.sync.common.HoodieSyncClient;
 import org.apache.hudi.sync.common.HoodieSyncException;
+import org.apache.hudi.sync.common.operation.TblPropertiesSync;
 import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
 
 import com.linkedin.common.urn.DatasetUrn;
@@ -53,14 +54,13 @@ import org.apache.avro.AvroTypeException;
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.parquet.schema.MessageType;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-public class DataHubSyncClient extends AbstractSyncHoodieClient {
+public class DataHubSyncClient extends HoodieSyncClient implements TblPropertiesSync {
 
   private final HoodieTimeline activeTimeline;
   private final DataHubSyncConfig syncConfig;
@@ -68,34 +68,13 @@ public class DataHubSyncClient extends AbstractSyncHoodieClient {
   private final DatasetUrn datasetUrn;
 
   public DataHubSyncClient(DataHubSyncConfig syncConfig, Configuration hadoopConf, FileSystem fs) {
-    super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata, false, fs);
+    super(syncConfig.hoodieSyncConfigParams.basePath, syncConfig.hoodieSyncConfigParams.assumeDatePartitioning, syncConfig.hoodieSyncConfigParams.useFileListingFromMetadata, false, fs);
     this.syncConfig = syncConfig;
     this.hadoopConf = hadoopConf;
     this.datasetUrn = syncConfig.datasetIdentifier.getDatasetUrn();
     this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
   }
 
-  @Override
-  public void createTable(String tableName,
-      MessageType storageSchema,
-      String inputFormatClass,
-      String outputFormatClass,
-      String serdeClass,
-      Map<String, String> serdeProperties,
-      Map<String, String> tableProperties) {
-    throw new UnsupportedOperationException("Not supported: `createTable`");
-  }
-
-  @Override
-  public boolean doesTableExist(String tableName) {
-    return tableExists(tableName);
-  }
-
-  @Override
-  public boolean tableExists(String tableName) {
-    throw new UnsupportedOperationException("Not supported: `tableExists`");
-  }
-
   @Override
   public Option<String> getLastCommitTimeSynced(String tableName) {
     throw new UnsupportedOperationException("Not supported: `getLastCommitTimeSynced`");
@@ -106,36 +85,6 @@ public class DataHubSyncClient extends AbstractSyncHoodieClient {
     updateTableProperties(tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, activeTimeline.lastInstant().get().getTimestamp()));
   }
 
-  @Override
-  public Option<String> getLastReplicatedTime(String tableName) {
-    throw new UnsupportedOperationException("Not supported: `getLastReplicatedTime`");
-  }
-
-  @Override
-  public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
-    throw new UnsupportedOperationException("Not supported: `updateLastReplicatedTimeStamp`");
-  }
-
-  @Override
-  public void deleteLastReplicatedTimeStamp(String tableName) {
-    throw new UnsupportedOperationException("Not supported: `deleteLastReplicatedTimeStamp`");
-  }
-
-  @Override
-  public void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {
-    throw new UnsupportedOperationException("Not supported: `addPartitionsToTable`");
-  }
-
-  @Override
-  public void updatePartitionsToTable(String tableName, List<String> changedPartitions) {
-    throw new UnsupportedOperationException("Not supported: `updatePartitionsToTable`");
-  }
-
-  @Override
-  public void dropPartitions(String tableName, List<String> partitionsToDrop) {
-    throw new UnsupportedOperationException("Not supported: `dropPartitions`");
-  }
-
   @Override
   public void updateTableProperties(String tableName, Map<String, String> tableProperties) {
     MetadataChangeProposalWrapper propertiesChangeProposal = MetadataChangeProposalWrapper.builder()
diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
index 9633d6b089..6502dcd1ce 100644
--- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
+++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncTool.java
@@ -21,7 +21,7 @@ package org.apache.hudi.sync.datahub;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.hudi.sync.common.HoodieSyncTool;
 import org.apache.hudi.sync.datahub.config.DataHubSyncConfig;
 
 import com.beust.jcommander.JCommander;
@@ -34,7 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
  * @Experimental
  * @see <a href="https://datahubproject.io/">https://datahubproject.io/</a>
  */
-public class DataHubSyncTool extends AbstractSyncTool {
+public class DataHubSyncTool extends HoodieSyncTool {
 
   private final DataHubSyncConfig config;
 
@@ -56,8 +56,8 @@ public class DataHubSyncTool extends AbstractSyncTool {
   @Override
   public void syncHoodieTable() {
     try (DataHubSyncClient syncClient = new DataHubSyncClient(config, conf, fs)) {
-      syncClient.updateTableDefinition(config.tableName);
-      syncClient.updateLastCommitTimeSynced(config.tableName);
+      syncClient.updateTableDefinition(config.hoodieSyncConfigParams.tableName);
+      syncClient.updateLastCommitTimeSynced(config.hoodieSyncConfigParams.tableName);
     }
   }
 
@@ -68,7 +68,7 @@ public class DataHubSyncTool extends AbstractSyncTool {
       cmd.usage();
       System.exit(1);
     }
-    FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
+    FileSystem fs = FSUtils.getFs(cfg.hoodieSyncConfigParams.basePath, new Configuration());
     new DataHubSyncTool(cfg, fs.getConf(), fs).syncHoodieTable();
   }
 }
diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
index e3c1ad486c..0f9f9ece9e 100644
--- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
+++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
@@ -43,6 +43,6 @@ public class HoodieDataHubDatasetIdentifier {
   public DatasetUrn getDatasetUrn() {
     DataPlatformUrn dataPlatformUrn = new DataPlatformUrn(DEFAULT_HOODIE_DATAHUB_PLATFORM_NAME);
     DataHubSyncConfig config = new DataHubSyncConfig(props);
-    return new DatasetUrn(dataPlatformUrn, String.format("%s.%s", config.databaseName, config.tableName), FabricType.DEV);
+    return new DatasetUrn(dataPlatformUrn, String.format("%s.%s", config.hoodieSyncConfigParams.databaseName, config.hoodieSyncConfigParams.tableName), FabricType.DEV);
   }
 }
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/AbstractHiveSyncHoodieClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/AbstractHiveSyncHoodieClient.java
index f0641b6fc0..b52237c698 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/AbstractHiveSyncHoodieClient.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/AbstractHiveSyncHoodieClient.java
@@ -22,7 +22,7 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hudi.sync.common.HoodieSyncClient;
 import org.apache.hudi.sync.common.HoodieSyncException;
 import org.apache.hudi.sync.common.model.Partition;
 
@@ -31,6 +31,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hudi.sync.common.operation.CatalogSync;
+import org.apache.hudi.sync.common.operation.PartitionsSync;
+import org.apache.hudi.sync.common.operation.ReplicatedTimeSync;
+import org.apache.hudi.sync.common.operation.TblPropertiesSync;
 import org.apache.parquet.schema.MessageType;
 
 import java.util.ArrayList;
@@ -41,7 +45,8 @@ import java.util.Map;
 /**
  * Base class to sync Hudi tables with Hive based metastores, such as Hive server, HMS or managed Hive services.
  */
-public abstract class AbstractHiveSyncHoodieClient extends AbstractSyncHoodieClient {
+public abstract class AbstractHiveSyncHoodieClient extends HoodieSyncClient implements ReplicatedTimeSync,
+    PartitionsSync, CatalogSync, TblPropertiesSync {
 
   protected final HoodieTimeline activeTimeline;
   protected final HiveSyncConfig syncConfig;
@@ -49,10 +54,11 @@ public abstract class AbstractHiveSyncHoodieClient extends AbstractSyncHoodieCli
   protected final PartitionValueExtractor partitionValueExtractor;
 
   public AbstractHiveSyncHoodieClient(HiveSyncConfig syncConfig, Configuration hadoopConf, FileSystem fs) {
-    super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata, syncConfig.withOperationField, fs);
+    super(syncConfig.hoodieSyncConfigParams.basePath, syncConfig.hoodieSyncConfigParams.assumeDatePartitioning,
+        syncConfig.hoodieSyncConfigParams.useFileListingFromMetadata, syncConfig.hiveSyncConfigParams.withOperationField, fs);
     this.syncConfig = syncConfig;
     this.hadoopConf = hadoopConf;
-    this.partitionValueExtractor = ReflectionUtils.loadClass(syncConfig.partitionValueExtractorClass);
+    this.partitionValueExtractor = ReflectionUtils.loadClass(syncConfig.hoodieSyncConfigParams.partitionValueExtractorClass);
     this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
   }
 
@@ -75,7 +81,7 @@ public abstract class AbstractHiveSyncHoodieClient extends AbstractSyncHoodieCli
 
     List<PartitionEvent> events = new ArrayList<>();
     for (String storagePartition : partitionStoragePartitions) {
-      Path storagePartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, storagePartition);
+      Path storagePartitionPath = FSUtils.getPartitionPath(syncConfig.hoodieSyncConfigParams.basePath, storagePartition);
       String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
       // Check if the partition values or if hdfs path is the same
       List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index 36dba81a33..66d17661a8 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -24,79 +24,14 @@ import org.apache.hudi.sync.common.HoodieSyncConfig;
 
 import com.beust.jcommander.Parameter;
 
+import java.io.Serializable;
+
 /**
  * Configs needed to sync data into the Hive Metastore.
  */
 public class HiveSyncConfig extends HoodieSyncConfig {
 
-  @Parameter(names = {"--user"}, description = "Hive username")
-  public String hiveUser;
-
-  @Parameter(names = {"--pass"}, description = "Hive password")
-  public String hivePass;
-
-  @Parameter(names = {"--jdbc-url"}, description = "Hive jdbc connect url")
-  public String jdbcUrl;
-
-  @Parameter(names = {"--metastore-uris"}, description = "Hive metastore uris")
-  public String metastoreUris;
-
-  @Parameter(names = {"--use-pre-apache-input-format"},
-      description = "Use InputFormat under com.uber.hoodie package "
-          + "instead of org.apache.hudi package. Use this when you are in the process of migrating from "
-          + "com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to "
-          + "org.apache.hudi input format.")
-  public Boolean usePreApacheInputFormat;
-
-  @Parameter(names = {"--bucket-spec"}, description = "bucket spec stored in metastore", required = false)
-  public String bucketSpec;
-
-  @Deprecated
-  @Parameter(names = {"--use-jdbc"}, description = "Hive jdbc connect url")
-  public Boolean useJdbc;
-
-  @Parameter(names = {"--sync-mode"}, description = "Mode to choose for Hive ops. Valid values are hms,glue,jdbc and hiveql")
-  public String syncMode;
-
-  @Parameter(names = {"--auto-create-database"}, description = "Auto create hive database")
-  public Boolean autoCreateDatabase;
-
-  @Parameter(names = {"--ignore-exceptions"}, description = "Ignore hive exceptions")
-  public Boolean ignoreExceptions;
-
-  @Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering")
-  public Boolean skipROSuffix;
-
-  @Parameter(names = {"--table-properties"}, description = "Table properties to hive table")
-  public String tableProperties;
-
-  @Parameter(names = {"--serde-properties"}, description = "Serde properties to hive table")
-  public String serdeProperties;
-
-  @Parameter(names = {"--help", "-h"}, help = true)
-  public Boolean help = false;
-
-  @Parameter(names = {"--support-timestamp"}, description = "'INT64' with original type TIMESTAMP_MICROS is converted to hive 'timestamp' type."
-      + "Disabled by default for backward compatibility.")
-  public Boolean supportTimestamp;
-
-  @Parameter(names = {"--managed-table"}, description = "Create a managed table")
-  public Boolean createManagedTable;
-
-  @Parameter(names = {"--batch-sync-num"}, description = "The number of partitions one batch when synchronous partitions to hive")
-  public Integer batchSyncNum;
-
-  @Parameter(names = {"--spark-datasource"}, description = "Whether sync this table as spark data source table.")
-  public Boolean syncAsSparkDataSourceTable;
-
-  @Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore.")
-  public int sparkSchemaLengthThreshold;
-
-  @Parameter(names = {"--with-operation-field"}, description = "Whether to include the '_hoodie_operation' field in the metadata fields")
-  public Boolean withOperationField = false;
-
-  @Parameter(names = {"--sync-comment"}, description = "synchronize table comments to hive")
-  public boolean syncComment = false;
+  public final HiveSyncConfigParams hiveSyncConfigParams = new HiveSyncConfigParams();
 
   // HIVE SYNC SPECIFIC CONFIGS
   // NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes
@@ -223,64 +158,118 @@ public class HiveSyncConfig extends HoodieSyncConfig {
 
   public HiveSyncConfig(TypedProperties props) {
     super(props);
-    this.hiveUser = getStringOrDefault(HIVE_USER);
-    this.hivePass = getStringOrDefault(HIVE_PASS);
-    this.jdbcUrl = getStringOrDefault(HIVE_URL);
-    this.usePreApacheInputFormat = getBooleanOrDefault(HIVE_USE_PRE_APACHE_INPUT_FORMAT);
-    this.useJdbc = getBooleanOrDefault(HIVE_USE_JDBC);
-    this.metastoreUris = getStringOrDefault(METASTORE_URIS);
-    this.syncMode = getString(HIVE_SYNC_MODE);
-    this.autoCreateDatabase = getBooleanOrDefault(HIVE_AUTO_CREATE_DATABASE);
-    this.ignoreExceptions = getBooleanOrDefault(HIVE_IGNORE_EXCEPTIONS);
-    this.skipROSuffix = getBooleanOrDefault(HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE);
-    this.tableProperties = getString(HIVE_TABLE_PROPERTIES);
-    this.serdeProperties = getString(HIVE_TABLE_SERDE_PROPERTIES);
-    this.supportTimestamp = getBooleanOrDefault(HIVE_SUPPORT_TIMESTAMP_TYPE);
-    this.batchSyncNum = getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
-    this.syncAsSparkDataSourceTable = getBooleanOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE);
-    this.sparkSchemaLengthThreshold = getIntOrDefault(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD);
-    this.createManagedTable = getBooleanOrDefault(HIVE_CREATE_MANAGED_TABLE);
-    this.bucketSpec = getStringOrDefault(HIVE_SYNC_BUCKET_SYNC_SPEC);
-    this.syncComment = getBooleanOrDefault(HIVE_SYNC_COMMENT);
+    this.hiveSyncConfigParams.hiveUser = getStringOrDefault(HIVE_USER);
+    this.hiveSyncConfigParams.hivePass = getStringOrDefault(HIVE_PASS);
+    this.hiveSyncConfigParams.jdbcUrl = getStringOrDefault(HIVE_URL);
+    this.hiveSyncConfigParams.usePreApacheInputFormat = getBooleanOrDefault(HIVE_USE_PRE_APACHE_INPUT_FORMAT);
+    this.hiveSyncConfigParams.useJdbc = getBooleanOrDefault(HIVE_USE_JDBC);
+    this.hiveSyncConfigParams.metastoreUris = getStringOrDefault(METASTORE_URIS);
+    this.hiveSyncConfigParams.syncMode = getString(HIVE_SYNC_MODE);
+    this.hiveSyncConfigParams.autoCreateDatabase = getBooleanOrDefault(HIVE_AUTO_CREATE_DATABASE);
+    this.hiveSyncConfigParams.ignoreExceptions = getBooleanOrDefault(HIVE_IGNORE_EXCEPTIONS);
+    this.hiveSyncConfigParams.skipROSuffix = getBooleanOrDefault(HIVE_SKIP_RO_SUFFIX_FOR_READ_OPTIMIZED_TABLE);
+    this.hiveSyncConfigParams.tableProperties = getString(HIVE_TABLE_PROPERTIES);
+    this.hiveSyncConfigParams.serdeProperties = getString(HIVE_TABLE_SERDE_PROPERTIES);
+    this.hiveSyncConfigParams.supportTimestamp = getBooleanOrDefault(HIVE_SUPPORT_TIMESTAMP_TYPE);
+    this.hiveSyncConfigParams.batchSyncNum = getIntOrDefault(HIVE_BATCH_SYNC_PARTITION_NUM);
+    this.hiveSyncConfigParams.syncAsSparkDataSourceTable = getBooleanOrDefault(HIVE_SYNC_AS_DATA_SOURCE_TABLE);
+    this.hiveSyncConfigParams.sparkSchemaLengthThreshold = getIntOrDefault(HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD);
+    this.hiveSyncConfigParams.createManagedTable = getBooleanOrDefault(HIVE_CREATE_MANAGED_TABLE);
+    this.hiveSyncConfigParams.bucketSpec = getStringOrDefault(HIVE_SYNC_BUCKET_SYNC_SPEC);
+    this.hiveSyncConfigParams.syncComment = getBooleanOrDefault(HIVE_SYNC_COMMENT);
   }
 
   @Override
   public String toString() {
     return "HiveSyncConfig{"
-      + "databaseName='" + databaseName + '\''
-      + ", tableName='" + tableName + '\''
-      + ", bucketSpec='" + bucketSpec + '\''
-      + ", baseFileFormat='" + baseFileFormat + '\''
-      + ", hiveUser='" + hiveUser + '\''
-      + ", hivePass='" + hivePass + '\''
-      + ", jdbcUrl='" + jdbcUrl + '\''
-      + ", metastoreUris='" + metastoreUris + '\''
-      + ", basePath='" + basePath + '\''
-      + ", partitionFields=" + partitionFields
-      + ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\''
-      + ", assumeDatePartitioning=" + assumeDatePartitioning
-      + ", usePreApacheInputFormat=" + usePreApacheInputFormat
-      + ", useJdbc=" + useJdbc
-      + ", autoCreateDatabase=" + autoCreateDatabase
-      + ", ignoreExceptions=" + ignoreExceptions
-      + ", skipROSuffix=" + skipROSuffix
-      + ", useFileListingFromMetadata=" + useFileListingFromMetadata
-      + ", tableProperties='" + tableProperties + '\''
-      + ", serdeProperties='" + serdeProperties + '\''
-      + ", help=" + help
-      + ", supportTimestamp=" + supportTimestamp
-      + ", decodePartition=" + decodePartition
-      + ", createManagedTable=" + createManagedTable
-      + ", syncAsSparkDataSourceTable=" + syncAsSparkDataSourceTable
-      + ", sparkSchemaLengthThreshold=" + sparkSchemaLengthThreshold
-      + ", withOperationField=" + withOperationField
-      + ", isConditionalSync=" + isConditionalSync
-      + ", sparkVersion=" + sparkVersion
-      + ", syncComment=" + syncComment
+      + "databaseName='" + hoodieSyncConfigParams.databaseName + '\''
+      + ", tableName='" + hoodieSyncConfigParams.tableName + '\''
+      + ", bucketSpec='" + hiveSyncConfigParams.bucketSpec + '\''
+      + ", baseFileFormat='" + hoodieSyncConfigParams.baseFileFormat + '\''
+      + ", hiveUser='" + hiveSyncConfigParams.hiveUser + '\''
+      + ", hivePass='" + hiveSyncConfigParams.hivePass + '\''
+      + ", jdbcUrl='" + hiveSyncConfigParams.jdbcUrl + '\''
+      + ", metastoreUris='" + hiveSyncConfigParams.metastoreUris + '\''
+      + ", basePath='" + hoodieSyncConfigParams.basePath + '\''
+      + ", partitionFields=" + hoodieSyncConfigParams.partitionFields
+      + ", partitionValueExtractorClass='" + hoodieSyncConfigParams.partitionValueExtractorClass + '\''
+      + ", assumeDatePartitioning=" + hoodieSyncConfigParams.assumeDatePartitioning
+      + ", usePreApacheInputFormat=" + hiveSyncConfigParams.usePreApacheInputFormat
+      + ", useJdbc=" + hiveSyncConfigParams.useJdbc
+      + ", autoCreateDatabase=" + hiveSyncConfigParams.autoCreateDatabase
+      + ", ignoreExceptions=" + hiveSyncConfigParams.ignoreExceptions
+      + ", skipROSuffix=" + hiveSyncConfigParams.skipROSuffix
+      + ", useFileListingFromMetadata=" + hoodieSyncConfigParams.useFileListingFromMetadata
+      + ", tableProperties='" + hiveSyncConfigParams.tableProperties + '\''
+      + ", serdeProperties='" + hiveSyncConfigParams.serdeProperties + '\''
+      + ", help=" + hiveSyncConfigParams.help
+      + ", supportTimestamp=" + hiveSyncConfigParams.supportTimestamp
+      + ", decodePartition=" + hoodieSyncConfigParams.decodePartition
+      + ", createManagedTable=" + hiveSyncConfigParams.createManagedTable
+      + ", syncAsSparkDataSourceTable=" + hiveSyncConfigParams.syncAsSparkDataSourceTable
+      + ", sparkSchemaLengthThreshold=" + hiveSyncConfigParams.sparkSchemaLengthThreshold
+      + ", withOperationField=" + hiveSyncConfigParams.withOperationField
+      + ", isConditionalSync=" + hoodieSyncConfigParams.isConditionalSync
+      + ", sparkVersion=" + hoodieSyncConfigParams.sparkVersion
+      + ", syncComment=" + hiveSyncConfigParams.syncComment
       + '}';
   }
 
   public static String getBucketSpec(String bucketCols, int bucketNum) {
     return "CLUSTERED BY (" + bucketCols + " INTO " + bucketNum + " BUCKETS";
   }
+
+  public static class HiveSyncConfigParams implements Serializable {
+    @Parameter(names = {"--user"}, description = "Hive username")
+    public String hiveUser;
+    @Parameter(names = {"--pass"}, description = "Hive password")
+    public String hivePass;
+    @Parameter(names = {"--jdbc-url"}, description = "Hive jdbc connect url")
+    public String jdbcUrl;
+    @Parameter(names = {"--metastore-uris"}, description = "Hive metastore uris")
+    public String metastoreUris;
+    @Parameter(names = {"--use-pre-apache-input-format"},
+            description = "Use InputFormat under com.uber.hoodie package "
+                    + "instead of org.apache.hudi package. Use this when you are in the process of migrating from "
+                    + "com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to "
+                    + "org.apache.hudi input format.")
+    public Boolean usePreApacheInputFormat;
+    @Parameter(names = {"--bucket-spec"}, description = "bucket spec stored in metastore", required = false)
+    public String bucketSpec;
+    @Deprecated
+    @Parameter(names = {"--use-jdbc"}, description = "Hive jdbc connect url")
+    public Boolean useJdbc;
+    @Parameter(names = {"--sync-mode"}, description = "Mode to choose for Hive ops. Valid values are hms,glue,jdbc and hiveql")
+    public String syncMode;
+    @Parameter(names = {"--auto-create-database"}, description = "Auto create hive database")
+    public Boolean autoCreateDatabase;
+    @Parameter(names = {"--ignore-exceptions"}, description = "Ignore hive exceptions")
+    public Boolean ignoreExceptions;
+    @Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering")
+    public Boolean skipROSuffix;
+    @Parameter(names = {"--table-properties"}, description = "Table properties to hive table")
+    public String tableProperties;
+    @Parameter(names = {"--serde-properties"}, description = "Serde properties to hive table")
+    public String serdeProperties;
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public Boolean help = false;
+    @Parameter(names = {"--support-timestamp"}, description = "'INT64' with original type TIMESTAMP_MICROS is converted to hive 'timestamp' type."
+            + "Disabled by default for backward compatibility.")
+    public Boolean supportTimestamp;
+    @Parameter(names = {"--managed-table"}, description = "Create a managed table")
+    public Boolean createManagedTable;
+    @Parameter(names = {"--batch-sync-num"}, description = "The number of partitions one batch when synchronous partitions to hive")
+    public Integer batchSyncNum;
+    @Parameter(names = {"--spark-datasource"}, description = "Whether sync this table as spark data source table.")
+    public Boolean syncAsSparkDataSourceTable;
+    @Parameter(names = {"--spark-schema-length-threshold"}, description = "The maximum length allowed in a single cell when storing additional schema information in Hive's metastore.")
+    public int sparkSchemaLengthThreshold;
+    @Parameter(names = {"--with-operation-field"}, description = "Whether to include the '_hoodie_operation' field in the metadata fields")
+    public Boolean withOperationField = false;
+    @Parameter(names = {"--sync-comment"}, description = "synchronize table comments to hive")
+    public boolean syncComment = false;
+
+    public HiveSyncConfigParams() {
+    }
+  }
 }
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 4d6fad033b..ded0756f8e 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -29,9 +29,9 @@ import org.apache.hudi.exception.InvalidTableException;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.sync.common.util.ConfigUtils;
 import org.apache.hudi.hive.util.HiveSchemaUtil;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
-import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.hudi.sync.common.HoodieSyncClient.PartitionEvent;
+import org.apache.hudi.sync.common.HoodieSyncClient.PartitionEvent.PartitionEventType;
+import org.apache.hudi.sync.common.HoodieSyncTool;
 import org.apache.hudi.sync.common.model.Partition;
 
 import com.beust.jcommander.JCommander;
@@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.parquet.schema.MessageType;
@@ -57,7 +58,7 @@ import java.util.stream.Collectors;
  * partitions incrementally (all the partitions modified since the last commit)
  */
 @SuppressWarnings("WeakerAccess")
-public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
+public class HiveSyncTool extends HoodieSyncTool implements AutoCloseable {
 
   private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class);
   public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
@@ -76,7 +77,7 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
     super(hiveSyncConfig.getProps(), hiveConf, fs);
     // TODO: reconcile the way to set METASTOREURIS
     if (StringUtils.isNullOrEmpty(hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))) {
-      hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, hiveSyncConfig.metastoreUris);
+      hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, hiveSyncConfig.hiveSyncConfigParams.metastoreUris);
     }
     // HiveConf needs to load fs conf to allow instantiation via AWSGlueClientFactory
     hiveConf.addResource(fs.getConf());
@@ -88,7 +89,7 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
     try {
       this.hoodieHiveClient = new HoodieHiveClient(hiveSyncConfig, hiveConf, fs);
     } catch (RuntimeException e) {
-      if (hiveSyncConfig.ignoreExceptions) {
+      if (hiveSyncConfig.hiveSyncConfigParams.ignoreExceptions) {
         LOG.error("Got runtime exception when hive syncing, but continuing as ignoreExceptions config is set ", e);
       } else {
         throw new HoodieHiveSyncException("Got runtime exception when hive syncing", e);
@@ -99,21 +100,21 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
   private void initConfig(HiveSyncConfig hiveSyncConfig) {
     // Set partitionFields to empty, when the NonPartitionedExtractor is used
     // TODO: HiveSyncConfig should be responsible for inferring config value
-    if (NonPartitionedExtractor.class.getName().equals(hiveSyncConfig.partitionValueExtractorClass)) {
+    if (NonPartitionedExtractor.class.getName().equals(hiveSyncConfig.hoodieSyncConfigParams.partitionValueExtractorClass)) {
       LOG.warn("Set partitionFields to empty, since the NonPartitionedExtractor is used");
-      hiveSyncConfig.partitionFields = new ArrayList<>();
+      hiveSyncConfig.hoodieSyncConfigParams.partitionFields = new ArrayList<>();
     }
     this.hiveSyncConfig = hiveSyncConfig;
     if (hoodieHiveClient != null) {
       switch (hoodieHiveClient.getTableType()) {
         case COPY_ON_WRITE:
-          this.snapshotTableName = hiveSyncConfig.tableName;
+          this.snapshotTableName = hiveSyncConfig.hoodieSyncConfigParams.tableName;
           this.roTableName = Option.empty();
           break;
         case MERGE_ON_READ:
-          this.snapshotTableName = hiveSyncConfig.tableName + SUFFIX_SNAPSHOT_TABLE;
-          this.roTableName = hiveSyncConfig.skipROSuffix ? Option.of(hiveSyncConfig.tableName) :
-              Option.of(hiveSyncConfig.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
+          this.snapshotTableName = hiveSyncConfig.hoodieSyncConfigParams.tableName + SUFFIX_SNAPSHOT_TABLE;
+          this.roTableName = hiveSyncConfig.hiveSyncConfigParams.skipROSuffix ? Option.of(hiveSyncConfig.hoodieSyncConfigParams.tableName) :
+              Option.of(hiveSyncConfig.hoodieSyncConfigParams.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
           break;
         default:
           LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
@@ -126,13 +127,13 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
   public void syncHoodieTable() {
     try {
       if (hoodieHiveClient != null) {
-        LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :"
-            + hiveSyncConfig.jdbcUrl + ", basePath :" + hiveSyncConfig.basePath);
+        LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.hoodieSyncConfigParams.tableName + "). Hive metastore URL :"
+            + hiveSyncConfig.hiveSyncConfigParams.jdbcUrl + ", basePath :" + hiveSyncConfig.hoodieSyncConfigParams.basePath);
 
         doSync();
       }
     } catch (RuntimeException re) {
-      throw new HoodieException("Got runtime exception when hive syncing " + hiveSyncConfig.tableName, re);
+      throw new HoodieException("Got runtime exception when hive syncing " + hiveSyncConfig.hoodieSyncConfigParams.tableName, re);
     } finally {
       close();
     }
@@ -172,19 +173,19 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
         + " of type " + hoodieHiveClient.getTableType());
 
     // check if the database exists else create it
-    if (hiveSyncConfig.autoCreateDatabase) {
+    if (hiveSyncConfig.hiveSyncConfigParams.autoCreateDatabase) {
       try {
-        if (!hoodieHiveClient.databaseExists(hiveSyncConfig.databaseName)) {
-          hoodieHiveClient.createDatabase(hiveSyncConfig.databaseName);
+        if (!hoodieHiveClient.databaseExists(hiveSyncConfig.hoodieSyncConfigParams.databaseName)) {
+          hoodieHiveClient.createDatabase(hiveSyncConfig.hoodieSyncConfigParams.databaseName);
         }
       } catch (Exception e) {
         // this is harmless since table creation will fail anyways, creation of DB is needed for in-memory testing
         LOG.warn("Unable to create database", e);
       }
     } else {
-      if (!hoodieHiveClient.databaseExists(hiveSyncConfig.databaseName)) {
-        LOG.error("Hive database does not exist " + hiveSyncConfig.databaseName);
-        throw new HoodieHiveSyncException("hive database does not exist " + hiveSyncConfig.databaseName);
+      if (!hoodieHiveClient.databaseExists(hiveSyncConfig.hoodieSyncConfigParams.databaseName)) {
+        LOG.error("Hive database does not exist " + hiveSyncConfig.hoodieSyncConfigParams.databaseName);
+        throw new HoodieHiveSyncException("hive database does not exist " + hiveSyncConfig.hoodieSyncConfigParams.databaseName);
       }
     }
 
@@ -204,7 +205,7 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
     if (hoodieHiveClient.isBootstrap()
             && hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ
             && !readAsOptimized) {
-      hiveSyncConfig.syncAsSparkDataSourceTable = false;
+      hiveSyncConfig.hiveSyncConfigParams.syncAsSparkDataSourceTable = false;
     }
 
     // Sync schema if needed
@@ -223,7 +224,7 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
     // Sync the partitions if needed
     boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, isDropPartition);
     boolean meetSyncConditions = schemaChanged || partitionsChanged;
-    if (!hiveSyncConfig.isConditionalSync || meetSyncConditions) {
+    if (!hiveSyncConfig.hoodieSyncConfigParams.isConditionalSync || meetSyncConditions) {
       hoodieHiveClient.updateLastCommitTimeSynced(tableName);
     }
     LOG.info("Sync complete for " + tableName);
@@ -239,12 +240,12 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
   private boolean syncSchema(String tableName, boolean tableExists, boolean useRealTimeInputFormat,
                           boolean readAsOptimized, MessageType schema) {
     // Append spark table properties & serde properties
-    Map<String, String> tableProperties = ConfigUtils.toMap(hiveSyncConfig.tableProperties);
-    Map<String, String> serdeProperties = ConfigUtils.toMap(hiveSyncConfig.serdeProperties);
-    if (hiveSyncConfig.syncAsSparkDataSourceTable) {
-      Map<String, String> sparkTableProperties = getSparkTableProperties(hiveSyncConfig.partitionFields,
-          hiveSyncConfig.sparkVersion, hiveSyncConfig.sparkSchemaLengthThreshold, schema);
-      Map<String, String> sparkSerdeProperties = getSparkSerdeProperties(readAsOptimized, hiveSyncConfig.basePath);
+    Map<String, String> tableProperties = ConfigUtils.toMap(hiveSyncConfig.hiveSyncConfigParams.tableProperties);
+    Map<String, String> serdeProperties = ConfigUtils.toMap(hiveSyncConfig.hiveSyncConfigParams.serdeProperties);
+    if (hiveSyncConfig.hiveSyncConfigParams.syncAsSparkDataSourceTable) {
+      Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties(hiveSyncConfig.hoodieSyncConfigParams.partitionFields,
+              hiveSyncConfig.hoodieSyncConfigParams.sparkVersion, hiveSyncConfig.hiveSyncConfigParams.sparkSchemaLengthThreshold, schema);
+      Map<String, String> sparkSerdeProperties = SparkDataSourceTableUtils.getSparkSerdeProperties(readAsOptimized, hiveSyncConfig.hoodieSyncConfigParams.basePath);
       tableProperties.putAll(sparkTableProperties);
       serdeProperties.putAll(sparkSerdeProperties);
     }
@@ -252,10 +253,10 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
     // Check and sync schema
     if (!tableExists) {
       LOG.info("Hive table " + tableName + " is not found. Creating it");
-      HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(hiveSyncConfig.baseFileFormat.toUpperCase());
+      HoodieFileFormat baseFileFormat = HoodieFileFormat.valueOf(hiveSyncConfig.hoodieSyncConfigParams.baseFileFormat.toUpperCase());
       String inputFormatClassName = HoodieInputFormatUtils.getInputFormatClassName(baseFileFormat, useRealTimeInputFormat);
 
-      if (baseFileFormat.equals(HoodieFileFormat.PARQUET) && hiveSyncConfig.usePreApacheInputFormat) {
+      if (baseFileFormat.equals(HoodieFileFormat.PARQUET) && hiveSyncConfig.hiveSyncConfigParams.usePreApacheInputFormat) {
         // Parquet input format had an InputFormat class visible under the old naming scheme.
         inputFormatClassName = useRealTimeInputFormat
             ? com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat.class.getName()
@@ -274,12 +275,13 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
     } else {
       // Check if the table schema has evolved
       Map<String, String> tableSchema = hoodieHiveClient.getTableSchema(tableName);
-      SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, hiveSyncConfig.partitionFields, hiveSyncConfig.supportTimestamp);
+      SchemaDifference schemaDiff = HiveSchemaUtil.getSchemaDifference(schema, tableSchema, hiveSyncConfig.hoodieSyncConfigParams.partitionFields,
+          hiveSyncConfig.hiveSyncConfigParams.supportTimestamp);
       if (!schemaDiff.isEmpty()) {
         LOG.info("Schema difference found for " + tableName);
         hoodieHiveClient.updateTableDefinition(tableName, schema);
         // Sync the table properties if the schema has changed
-        if (hiveSyncConfig.tableProperties != null || hiveSyncConfig.syncAsSparkDataSourceTable) {
+        if (hiveSyncConfig.hiveSyncConfigParams.tableProperties != null || hiveSyncConfig.hiveSyncConfigParams.syncAsSparkDataSourceTable) {
           hoodieHiveClient.updateTableProperties(tableName, tableProperties);
           LOG.info("Sync table properties for " + tableName + ", table properties is: " + tableProperties);
         }
@@ -289,7 +291,7 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
       }
     }
 
-    if (hiveSyncConfig.syncComment) {
+    if (hiveSyncConfig.hiveSyncConfigParams.syncComment) {
       Schema avroSchemaWithoutMetadataFields = hoodieHiveClient.getAvroSchemaWithoutMetadataFields();
       Map<String, String> newComments = avroSchemaWithoutMetadataFields.getFields()
               .stream().collect(Collectors.toMap(Schema.Field::name, field -> StringUtils.isNullOrEmpty(field.doc()) ? "" : field.doc()));
@@ -349,11 +351,11 @@ public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable {
     // parse the params
     final HiveSyncConfig cfg = new HiveSyncConfig();
     JCommander cmd = new JCommander(cfg, null, args);
-    if (cfg.help || args.length == 0) {
+    if (cfg.hiveSyncConfigParams.help || args.length == 0) {
       cmd.usage();
       System.exit(1);
     }
-    FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
+    FileSystem fs = FSUtils.getFs(cfg.hoodieSyncConfigParams.basePath, new Configuration());
     HiveConf hiveConf = new HiveConf();
     hiveConf.addResource(fs.getConf());
     new HiveSyncTool(cfg, hiveConf, fs).syncHoodieTable();
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 539d18a213..a14f4f8a89 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -66,8 +66,8 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
     // Support JDBC, HiveQL and metastore based implementations for backwards compatibility. Future users should
     // disable jdbc and depend on metastore client for all hive registrations
     try {
-      if (!StringUtils.isNullOrEmpty(cfg.syncMode)) {
-        HiveSyncMode syncMode = HiveSyncMode.of(cfg.syncMode);
+      if (!StringUtils.isNullOrEmpty(cfg.hiveSyncConfigParams.syncMode)) {
+        HiveSyncMode syncMode = HiveSyncMode.of(cfg.hiveSyncConfigParams.syncMode);
         switch (syncMode) {
           case HMS:
             ddlExecutor = new HMSDDLExecutor(configuration, cfg, fs);
@@ -79,10 +79,10 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
             ddlExecutor = new JDBCExecutor(cfg, fs);
             break;
           default:
-            throw new HoodieHiveSyncException("Invalid sync mode given " + cfg.syncMode);
+            throw new HoodieHiveSyncException("Invalid sync mode given " + cfg.hiveSyncConfigParams.syncMode);
         }
       } else {
-        ddlExecutor = cfg.useJdbc ? new JDBCExecutor(cfg, fs) : new HiveQueryDDLExecutor(cfg, fs, configuration);
+        ddlExecutor = cfg.hiveSyncConfigParams.useJdbc ? new JDBCExecutor(cfg, fs) : new HiveQueryDDLExecutor(cfg, fs, configuration);
       }
       this.client = Hive.get(configuration).getMSC();
     } catch (Exception e) {
@@ -123,11 +123,11 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
       return;
     }
     try {
-      Table table = client.getTable(syncConfig.databaseName, tableName);
+      Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
       for (Map.Entry<String, String> entry : tableProperties.entrySet()) {
         table.putToParameters(entry.getKey(), entry.getValue());
       }
-      client.alter_table(syncConfig.databaseName, tableName, table);
+      client.alter_table(syncConfig.hoodieSyncConfigParams.databaseName, tableName, table);
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed to update table properties for table: "
           + tableName, e);
@@ -141,7 +141,7 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
    */
   @Deprecated
   public List<org.apache.hadoop.hive.metastore.api.Partition> scanTablePartitions(String tableName) throws TException {
-    return client.listPartitions(syncConfig.databaseName, tableName, (short) -1);
+    return client.listPartitions(syncConfig.hoodieSyncConfigParams.databaseName, tableName, (short) -1);
   }
 
   @Override
@@ -152,12 +152,12 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
   @Override
   public List<Partition> getAllPartitions(String tableName) {
     try {
-      return client.listPartitions(syncConfig.databaseName, tableName, (short) -1)
+      return client.listPartitions(syncConfig.hoodieSyncConfigParams.databaseName, tableName, (short) -1)
           .stream()
           .map(p -> new Partition(p.getValues(), p.getSd().getLocation()))
           .collect(Collectors.toList());
     } catch (TException e) {
-      throw new HoodieHiveSyncException("Failed to get all partitions for table " + tableId(syncConfig.databaseName, tableName), e);
+      throw new HoodieHiveSyncException("Failed to get all partitions for table " + tableId(syncConfig.hoodieSyncConfigParams.databaseName, tableName), e);
     }
   }
 
@@ -189,7 +189,7 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
   @Override
   public boolean tableExists(String tableName) {
     try {
-      return client.tableExists(syncConfig.databaseName, tableName);
+      return client.tableExists(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
     } catch (TException e) {
       throw new HoodieHiveSyncException("Failed to check if table exists " + tableName, e);
     }
@@ -222,7 +222,7 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
   public Option<String> getLastCommitTimeSynced(String tableName) {
     // Get the last commit time from the TBLproperties
     try {
-      Table table = client.getTable(syncConfig.databaseName, tableName);
+      Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
       return Option.ofNullable(table.getParameters().getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, null));
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed to get the last commit time synced from the table " + tableName, e);
@@ -232,10 +232,10 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
   public Option<String> getLastReplicatedTime(String tableName) {
     // Get the last replicated time from the TBLproperties
     try {
-      Table table = client.getTable(syncConfig.databaseName, tableName);
+      Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
       return Option.ofNullable(table.getParameters().getOrDefault(GLOBALLY_CONSISTENT_READ_TIMESTAMP, null));
     } catch (NoSuchObjectException e) {
-      LOG.warn("the said table not found in hms " + syncConfig.databaseName + "." + tableName);
+      LOG.warn("the said table not found in hms " + syncConfig.hoodieSyncConfigParams.databaseName + "." + tableName);
       return Option.empty();
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed to get the last replicated time from the table " + tableName, e);
@@ -249,9 +249,9 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
           "Not a valid completed timestamp " + timeStamp + " for table " + tableName);
     }
     try {
-      Table table = client.getTable(syncConfig.databaseName, tableName);
+      Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
       table.putToParameters(GLOBALLY_CONSISTENT_READ_TIMESTAMP, timeStamp);
-      client.alter_table(syncConfig.databaseName, tableName, table);
+      client.alter_table(syncConfig.hoodieSyncConfigParams.databaseName, tableName, table);
     } catch (Exception e) {
       throw new HoodieHiveSyncException(
           "Failed to update last replicated time to " + timeStamp + " for " + tableName, e);
@@ -260,9 +260,9 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
 
   public void deleteLastReplicatedTimeStamp(String tableName) {
     try {
-      Table table = client.getTable(syncConfig.databaseName, tableName);
+      Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
       String timestamp = table.getParameters().remove(GLOBALLY_CONSISTENT_READ_TIMESTAMP);
-      client.alter_table(syncConfig.databaseName, tableName, table);
+      client.alter_table(syncConfig.hoodieSyncConfigParams.databaseName, tableName, table);
       if (timestamp != null) {
         LOG.info("deleted last replicated timestamp " + timestamp + " for table " + tableName);
       }
@@ -293,9 +293,9 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
     Option<String> lastCommitSynced = activeTimeline.lastInstant().map(HoodieInstant::getTimestamp);
     if (lastCommitSynced.isPresent()) {
       try {
-        Table table = client.getTable(syncConfig.databaseName, tableName);
+        Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
         table.putToParameters(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced.get());
-        client.alter_table(syncConfig.databaseName, tableName, table);
+        client.alter_table(syncConfig.hoodieSyncConfigParams.databaseName, tableName, table);
       } catch (Exception e) {
         throw new HoodieHiveSyncException("Failed to get update last commit time synced to " + lastCommitSynced, e);
       }
@@ -305,7 +305,7 @@ public class HoodieHiveClient extends AbstractHiveSyncHoodieClient {
   @Override
   public List<FieldSchema> getTableCommentUsingMetastoreClient(String tableName) {
     try {
-      return client.getSchema(syncConfig.databaseName, tableName);
+      return client.getSchema(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed to get table comments for : " + tableName, e);
     }
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
index 868f59b4fe..73fd31a51a 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java
@@ -71,10 +71,10 @@ public class HMSDDLExecutor implements DDLExecutor {
     this.fs = fs;
     try {
       this.partitionValueExtractor =
-          (PartitionValueExtractor) Class.forName(syncConfig.partitionValueExtractorClass).newInstance();
+          (PartitionValueExtractor) Class.forName(syncConfig.hoodieSyncConfigParams.partitionValueExtractorClass).newInstance();
     } catch (Exception e) {
       throw new HoodieHiveSyncException(
-          "Failed to initialize PartitionValueExtractor class " + syncConfig.partitionValueExtractorClass, e);
+          "Failed to initialize PartitionValueExtractor class " + syncConfig.hoodieSyncConfigParams.partitionValueExtractorClass, e);
     }
   }
 
@@ -93,16 +93,16 @@ public class HMSDDLExecutor implements DDLExecutor {
   public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass, Map<String, String> serdeProperties,
                           Map<String, String> tableProperties) {
     try {
-      LinkedHashMap<String, String> mapSchema = HiveSchemaUtil.parquetSchemaToMapSchema(storageSchema, syncConfig.supportTimestamp, false);
+      LinkedHashMap<String, String> mapSchema = HiveSchemaUtil.parquetSchemaToMapSchema(storageSchema, syncConfig.hiveSyncConfigParams.supportTimestamp, false);
 
       List<FieldSchema> fieldSchema = HiveSchemaUtil.convertMapSchemaToHiveFieldSchema(mapSchema, syncConfig);
 
-      List<FieldSchema> partitionSchema = syncConfig.partitionFields.stream().map(partitionKey -> {
+      List<FieldSchema> partitionSchema = syncConfig.hoodieSyncConfigParams.partitionFields.stream().map(partitionKey -> {
         String partitionKeyType = HiveSchemaUtil.getPartitionKeyType(mapSchema, partitionKey);
         return new FieldSchema(partitionKey, partitionKeyType.toLowerCase(), "");
       }).collect(Collectors.toList());
       Table newTb = new Table();
-      newTb.setDbName(syncConfig.databaseName);
+      newTb.setDbName(syncConfig.hoodieSyncConfigParams.databaseName);
       newTb.setTableName(tableName);
       newTb.setOwner(UserGroupInformation.getCurrentUser().getShortUserName());
       newTb.setCreateTime((int) System.currentTimeMillis());
@@ -110,13 +110,13 @@ public class HMSDDLExecutor implements DDLExecutor {
       storageDescriptor.setCols(fieldSchema);
       storageDescriptor.setInputFormat(inputFormatClass);
       storageDescriptor.setOutputFormat(outputFormatClass);
-      storageDescriptor.setLocation(syncConfig.basePath);
+      storageDescriptor.setLocation(syncConfig.hoodieSyncConfigParams.basePath);
       serdeProperties.put("serialization.format", "1");
       storageDescriptor.setSerdeInfo(new SerDeInfo(null, serdeClass, serdeProperties));
       newTb.setSd(storageDescriptor);
       newTb.setPartitionKeys(partitionSchema);
 
-      if (!syncConfig.createManagedTable) {
+      if (!syncConfig.hiveSyncConfigParams.createManagedTable) {
         newTb.putToParameters("EXTERNAL", "TRUE");
       }
 
@@ -134,9 +134,9 @@ public class HMSDDLExecutor implements DDLExecutor {
   @Override
   public void updateTableDefinition(String tableName, MessageType newSchema) {
     try {
-      boolean cascade = syncConfig.partitionFields.size() > 0;
+      boolean cascade = syncConfig.hoodieSyncConfigParams.partitionFields.size() > 0;
       List<FieldSchema> fieldSchema = HiveSchemaUtil.convertParquetSchemaToHiveFieldSchema(newSchema, syncConfig);
-      Table table = client.getTable(syncConfig.databaseName, tableName);
+      Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
       StorageDescriptor sd = table.getSd();
       sd.setCols(fieldSchema);
       table.setSd(sd);
@@ -145,7 +145,7 @@ public class HMSDDLExecutor implements DDLExecutor {
         LOG.info("partition table,need cascade");
         environmentContext.putToProperties(StatsSetupConst.CASCADE, StatsSetupConst.TRUE);
       }
-      client.alter_table_with_environmentContext(syncConfig.databaseName, tableName, table, environmentContext);
+      client.alter_table_with_environmentContext(syncConfig.hoodieSyncConfigParams.databaseName, tableName, table, environmentContext);
     } catch (Exception e) {
       LOG.error("Failed to update table for " + tableName, e);
       throw new HoodieHiveSyncException("Failed to update table for " + tableName, e);
@@ -158,7 +158,7 @@ public class HMSDDLExecutor implements DDLExecutor {
       // HiveMetastoreClient returns partition keys separate from Columns, hence get both and merge to
       // get the Schema of the table.
       final long start = System.currentTimeMillis();
-      Table table = this.client.getTable(syncConfig.databaseName, tableName);
+      Table table = this.client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
       Map<String, String> partitionKeysMap =
           table.getPartitionKeys().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase()));
 
@@ -184,22 +184,22 @@ public class HMSDDLExecutor implements DDLExecutor {
     }
     LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName);
     try {
-      StorageDescriptor sd = client.getTable(syncConfig.databaseName, tableName).getSd();
+      StorageDescriptor sd = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName).getSd();
       List<Partition> partitionList = partitionsToAdd.stream().map(partition -> {
         StorageDescriptor partitionSd = new StorageDescriptor();
         partitionSd.setCols(sd.getCols());
         partitionSd.setInputFormat(sd.getInputFormat());
         partitionSd.setOutputFormat(sd.getOutputFormat());
         partitionSd.setSerdeInfo(sd.getSerdeInfo());
-        String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString();
+        String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.hoodieSyncConfigParams.basePath, partition).toString();
         List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
         partitionSd.setLocation(fullPartitionPath);
-        return new Partition(partitionValues, syncConfig.databaseName, tableName, 0, 0, partitionSd, null);
+        return new Partition(partitionValues, syncConfig.hoodieSyncConfigParams.databaseName, tableName, 0, 0, partitionSd, null);
       }).collect(Collectors.toList());
       client.add_partitions(partitionList, true, false);
     } catch (TException e) {
-      LOG.error(syncConfig.databaseName + "." + tableName + " add partition failed", e);
-      throw new HoodieHiveSyncException(syncConfig.databaseName + "." + tableName + " add partition failed", e);
+      LOG.error(syncConfig.hoodieSyncConfigParams.databaseName + "." + tableName + " add partition failed", e);
+      throw new HoodieHiveSyncException(syncConfig.hoodieSyncConfigParams.databaseName + "." + tableName + " add partition failed", e);
     }
   }
 
@@ -211,20 +211,20 @@ public class HMSDDLExecutor implements DDLExecutor {
     }
     LOG.info("Changing partitions " + changedPartitions.size() + " on " + tableName);
     try {
-      StorageDescriptor sd = client.getTable(syncConfig.databaseName, tableName).getSd();
+      StorageDescriptor sd = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName).getSd();
       List<Partition> partitionList = changedPartitions.stream().map(partition -> {
-        Path partitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition);
+        Path partitionPath = FSUtils.getPartitionPath(syncConfig.hoodieSyncConfigParams.basePath, partition);
         String partitionScheme = partitionPath.toUri().getScheme();
         String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme)
             ? FSUtils.getDFSFullPartitionPath(fs, partitionPath) : partitionPath.toString();
         List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
         sd.setLocation(fullPartitionPath);
-        return new Partition(partitionValues, syncConfig.databaseName, tableName, 0, 0, sd, null);
+        return new Partition(partitionValues, syncConfig.hoodieSyncConfigParams.databaseName, tableName, 0, 0, sd, null);
       }).collect(Collectors.toList());
-      client.alter_partitions(syncConfig.databaseName, tableName, partitionList, null);
+      client.alter_partitions(syncConfig.hoodieSyncConfigParams.databaseName, tableName, partitionList, null);
     } catch (TException e) {
-      LOG.error(syncConfig.databaseName + "." + tableName + " update partition failed", e);
-      throw new HoodieHiveSyncException(syncConfig.databaseName + "." + tableName + " update partition failed", e);
+      LOG.error(syncConfig.hoodieSyncConfigParams.databaseName + "." + tableName + " update partition failed", e);
+      throw new HoodieHiveSyncException(syncConfig.hoodieSyncConfigParams.databaseName + "." + tableName + " update partition failed", e);
     }
   }
 
@@ -241,20 +241,20 @@ public class HMSDDLExecutor implements DDLExecutor {
         if (HivePartitionUtil.partitionExists(client, tableName, dropPartition, partitionValueExtractor, syncConfig)) {
           String partitionClause =
               HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, syncConfig);
-          client.dropPartition(syncConfig.databaseName, tableName, partitionClause, false);
+          client.dropPartition(syncConfig.hoodieSyncConfigParams.databaseName, tableName, partitionClause, false);
         }
         LOG.info("Drop partition " + dropPartition + " on " + tableName);
       }
     } catch (TException e) {
-      LOG.error(syncConfig.databaseName + "." + tableName + " drop partition failed", e);
-      throw new HoodieHiveSyncException(syncConfig.databaseName + "." + tableName + " drop partition failed", e);
+      LOG.error(syncConfig.hoodieSyncConfigParams.databaseName + "." + tableName + " drop partition failed", e);
+      throw new HoodieHiveSyncException(syncConfig.hoodieSyncConfigParams.databaseName + "." + tableName + " drop partition failed", e);
     }
   }
 
   @Override
   public void updateTableComments(String tableName, Map<String, ImmutablePair<String,String>> alterSchema) {
     try {
-      Table table = client.getTable(syncConfig.databaseName, tableName);
+      Table table = client.getTable(syncConfig.hoodieSyncConfigParams.databaseName, tableName);
       StorageDescriptor sd = new StorageDescriptor(table.getSd());
       for (FieldSchema fieldSchema : sd.getCols()) {
         if (alterSchema.containsKey(fieldSchema.getName())) {
@@ -264,7 +264,7 @@ public class HMSDDLExecutor implements DDLExecutor {
       }
       table.setSd(sd);
       EnvironmentContext environmentContext = new EnvironmentContext();
-      client.alter_table_with_environmentContext(syncConfig.databaseName, tableName, table, environmentContext);
+      client.alter_table_with_environmentContext(syncConfig.hoodieSyncConfigParams.databaseName, tableName, table, environmentContext);
       sd.clear();
     } catch (Exception e) {
       LOG.error("Failed to update table comments for " + tableName, e);
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
index 4b8ceec952..f1cafbffdf 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java
@@ -64,7 +64,7 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor {
       this.sessionState = new SessionState(configuration,
           UserGroupInformation.getCurrentUser().getShortUserName());
       SessionState.start(this.sessionState);
-      this.sessionState.setCurrentDatabase(config.databaseName);
+      this.sessionState.setCurrentDatabase(config.hoodieSyncConfigParams.databaseName);
       hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration);
     } catch (Exception e) {
       if (sessionState != null) {
@@ -109,7 +109,7 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor {
       // HiveMetastoreClient returns partition keys separate from Columns, hence get both and merge to
       // get the Schema of the table.
       final long start = System.currentTimeMillis();
-      Table table = metaStoreClient.getTable(config.databaseName, tableName);
+      Table table = metaStoreClient.getTable(config.hoodieSyncConfigParams.databaseName, tableName);
       Map<String, String> partitionKeysMap =
           table.getPartitionKeys().stream().collect(Collectors.toMap(FieldSchema::getName, f -> f.getType().toUpperCase()));
 
@@ -141,13 +141,13 @@ public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor {
             config)) {
           String partitionClause =
               HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, config);
-          metaStoreClient.dropPartition(config.databaseName, tableName, partitionClause, false);
+          metaStoreClient.dropPartition(config.hoodieSyncConfigParams.databaseName, tableName, partitionClause, false);
         }
         LOG.info("Drop partition " + dropPartition + " on " + tableName);
       }
     } catch (Exception e) {
-      LOG.error(config.databaseName + "." + tableName + " drop partition failed", e);
-      throw new HoodieHiveSyncException(config.databaseName + "." + tableName + " drop partition failed", e);
+      LOG.error(config.hoodieSyncConfigParams.databaseName + "." + tableName + " drop partition failed", e);
+      throw new HoodieHiveSyncException(config.hoodieSyncConfigParams.databaseName + "." + tableName + " drop partition failed", e);
     }
   }
 
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
index 997d6e087c..e1b33c93f5 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/JDBCExecutor.java
@@ -49,11 +49,11 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
 
   public JDBCExecutor(HiveSyncConfig config, FileSystem fs) {
     super(config, fs);
-    Objects.requireNonNull(config.jdbcUrl, "--jdbc-url option is required for jdbc sync mode");
-    Objects.requireNonNull(config.hiveUser, "--user option is required for jdbc sync mode");
-    Objects.requireNonNull(config.hivePass, "--pass option is required for jdbc sync mode");
+    Objects.requireNonNull(config.hiveSyncConfigParams.jdbcUrl, "--jdbc-url option is required for jdbc sync mode");
+    Objects.requireNonNull(config.hiveSyncConfigParams.hiveUser, "--user option is required for jdbc sync mode");
+    Objects.requireNonNull(config.hiveSyncConfigParams.hivePass, "--pass option is required for jdbc sync mode");
     this.config = config;
-    createHiveConnection(config.jdbcUrl, config.hiveUser, config.hivePass);
+    createHiveConnection(config.hiveSyncConfigParams.jdbcUrl, config.hiveSyncConfigParams.hiveUser, config.hiveSyncConfigParams.hivePass);
   }
 
   @Override
@@ -126,7 +126,7 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
     ResultSet result = null;
     try {
       DatabaseMetaData databaseMetaData = connection.getMetaData();
-      result = databaseMetaData.getColumns(null, config.databaseName, tableName, null);
+      result = databaseMetaData.getColumns(null, config.hoodieSyncConfigParams.databaseName, tableName, null);
       while (result.next()) {
         String columnName = result.getString(4);
         String columnType = result.getString(6);
@@ -157,11 +157,11 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
   }
 
   private List<String> constructDropPartitions(String tableName, List<String> partitions) {
-    if (config.batchSyncNum <= 0) {
+    if (config.hiveSyncConfigParams.batchSyncNum <= 0) {
       throw new HoodieHiveSyncException("batch-sync-num for sync hive table must be greater than 0, pls check your parameter");
     }
     List<String> result = new ArrayList<>();
-    int batchSyncPartitionNum = config.batchSyncNum;
+    int batchSyncPartitionNum = config.hiveSyncConfigParams.batchSyncNum;
     StringBuilder alterSQL = getAlterTableDropPrefix(tableName);
 
     for (int i = 0; i < partitions.size(); i++) {
@@ -186,7 +186,7 @@ public class JDBCExecutor extends QueryBasedDDLExecutor {
 
   public StringBuilder getAlterTableDropPrefix(String tableName) {
     StringBuilder alterSQL = new StringBuilder("ALTER TABLE ");
-    alterSQL.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName)
+    alterSQL.append(HIVE_ESCAPE_CHARACTER).append(config.hoodieSyncConfigParams.databaseName)
         .append(HIVE_ESCAPE_CHARACTER).append(".").append(HIVE_ESCAPE_CHARACTER)
         .append(tableName).append(HIVE_ESCAPE_CHARACTER).append(" DROP IF EXISTS ");
     return alterSQL;
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java
index d9b663ccb0..70b671595b 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java
@@ -55,10 +55,10 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
     this.config = config;
     try {
       this.partitionValueExtractor =
-          (PartitionValueExtractor) Class.forName(config.partitionValueExtractorClass).newInstance();
+          (PartitionValueExtractor) Class.forName(config.hoodieSyncConfigParams.partitionValueExtractorClass).newInstance();
     } catch (Exception e) {
       throw new HoodieHiveSyncException(
-          "Failed to initialize PartitionValueExtractor class " + config.partitionValueExtractorClass, e);
+          "Failed to initialize PartitionValueExtractor class " + config.hoodieSyncConfigParams.partitionValueExtractorClass, e);
     }
   }
 
@@ -90,11 +90,11 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
   @Override
   public void updateTableDefinition(String tableName, MessageType newSchema) {
     try {
-      String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, config.partitionFields, config.supportTimestamp);
+      String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, config.hoodieSyncConfigParams.partitionFields, config.hiveSyncConfigParams.supportTimestamp);
       // Cascade clause should not be present for non-partitioned tables
-      String cascadeClause = config.partitionFields.size() > 0 ? " cascade" : "";
+      String cascadeClause = config.hoodieSyncConfigParams.partitionFields.size() > 0 ? " cascade" : "";
       StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append(HIVE_ESCAPE_CHARACTER)
-          .append(config.databaseName).append(HIVE_ESCAPE_CHARACTER).append(".")
+          .append(config.hoodieSyncConfigParams.databaseName).append(HIVE_ESCAPE_CHARACTER).append(".")
           .append(HIVE_ESCAPE_CHARACTER).append(tableName)
           .append(HIVE_ESCAPE_CHARACTER).append(" REPLACE COLUMNS(")
           .append(newSchemaStr).append(" )").append(cascadeClause);
@@ -138,7 +138,7 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
       String comment = field.getValue().getRight();
       comment = comment.replace("'","");
       sql.append("ALTER TABLE ").append(HIVE_ESCAPE_CHARACTER)
-              .append(config.databaseName).append(HIVE_ESCAPE_CHARACTER).append(".")
+              .append(config.hoodieSyncConfigParams.databaseName).append(HIVE_ESCAPE_CHARACTER).append(".")
               .append(HIVE_ESCAPE_CHARACTER).append(tableName)
               .append(HIVE_ESCAPE_CHARACTER)
               .append(" CHANGE COLUMN `").append(name).append("` `").append(name)
@@ -148,15 +148,15 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
   }
 
   private List<String> constructAddPartitions(String tableName, List<String> partitions) {
-    if (config.batchSyncNum <= 0) {
+    if (config.hiveSyncConfigParams.batchSyncNum <= 0) {
       throw new HoodieHiveSyncException("batch-sync-num for sync hive table must be greater than 0, pls check your parameter");
     }
     List<String> result = new ArrayList<>();
-    int batchSyncPartitionNum = config.batchSyncNum;
+    int batchSyncPartitionNum = config.hiveSyncConfigParams.batchSyncNum;
     StringBuilder alterSQL = getAlterTablePrefix(tableName);
     for (int i = 0; i < partitions.size(); i++) {
       String partitionClause = getPartitionClause(partitions.get(i));
-      String fullPartitionPath = FSUtils.getPartitionPath(config.basePath, partitions.get(i)).toString();
+      String fullPartitionPath = FSUtils.getPartitionPath(config.hoodieSyncConfigParams.basePath, partitions.get(i)).toString();
       alterSQL.append("  PARTITION (").append(partitionClause).append(") LOCATION '").append(fullPartitionPath)
           .append("' ");
       if ((i + 1) % batchSyncPartitionNum == 0) {
@@ -173,7 +173,7 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
 
   private StringBuilder getAlterTablePrefix(String tableName) {
     StringBuilder alterSQL = new StringBuilder("ALTER TABLE ");
-    alterSQL.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName)
+    alterSQL.append(HIVE_ESCAPE_CHARACTER).append(config.hoodieSyncConfigParams.databaseName)
         .append(HIVE_ESCAPE_CHARACTER).append(".").append(HIVE_ESCAPE_CHARACTER)
         .append(tableName).append(HIVE_ESCAPE_CHARACTER).append(" ADD IF NOT EXISTS ");
     return alterSQL;
@@ -181,18 +181,18 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
 
   public String getPartitionClause(String partition) {
     List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
-    ValidationUtils.checkArgument(config.partitionFields.size() == partitionValues.size(),
-        "Partition key parts " + config.partitionFields + " does not match with partition values " + partitionValues
+    ValidationUtils.checkArgument(config.hoodieSyncConfigParams.partitionFields.size() == partitionValues.size(),
+        "Partition key parts " + config.hoodieSyncConfigParams.partitionFields + " does not match with partition values " + partitionValues
             + ". Check partition strategy. ");
     List<String> partBuilder = new ArrayList<>();
-    for (int i = 0; i < config.partitionFields.size(); i++) {
+    for (int i = 0; i < config.hoodieSyncConfigParams.partitionFields.size(); i++) {
       String partitionValue = partitionValues.get(i);
       // decode the partition before sync to hive to prevent multiple escapes of HIVE
-      if (config.decodePartition) {
+      if (config.hoodieSyncConfigParams.decodePartition) {
         // This is a decode operator for encode in KeyGenUtils#getRecordPartitionPath
         partitionValue = PartitionPathEncodeUtils.unescapePathName(partitionValue);
       }
-      partBuilder.add("`" + config.partitionFields.get(i) + "`='" + partitionValue + "'");
+      partBuilder.add("`" + config.hoodieSyncConfigParams.partitionFields.get(i) + "`='" + partitionValue + "'");
     }
     return String.join(",", partBuilder);
   }
@@ -200,12 +200,12 @@ public abstract class QueryBasedDDLExecutor implements DDLExecutor {
   private List<String> constructChangePartitions(String tableName, List<String> partitions) {
     List<String> changePartitions = new ArrayList<>();
     // Hive 2.x doesn't like db.table name for operations, hence we need to change to using the database first
-    String useDatabase = "USE " + HIVE_ESCAPE_CHARACTER + config.databaseName + HIVE_ESCAPE_CHARACTER;
+    String useDatabase = "USE " + HIVE_ESCAPE_CHARACTER + config.hoodieSyncConfigParams.databaseName + HIVE_ESCAPE_CHARACTER;
     changePartitions.add(useDatabase);
     String alterTable = "ALTER TABLE " + HIVE_ESCAPE_CHARACTER + tableName + HIVE_ESCAPE_CHARACTER;
     for (String partition : partitions) {
       String partitionClause = getPartitionClause(partition);
-      Path partitionPath = FSUtils.getPartitionPath(config.basePath, partition);
+      Path partitionPath = FSUtils.getPartitionPath(config.hoodieSyncConfigParams.basePath, partition);
       String partitionScheme = partitionPath.toUri().getScheme();
       String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme)
           ? FSUtils.getDFSFullPartitionPath(fs, partitionPath) : partitionPath.toString();
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java
index 16c30a16aa..6ec4a586b8 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java
@@ -36,20 +36,20 @@ public class GlobalHiveSyncConfig extends HiveSyncConfig {
 
   public static GlobalHiveSyncConfig copy(GlobalHiveSyncConfig cfg) {
     GlobalHiveSyncConfig newConfig = new GlobalHiveSyncConfig(cfg.getProps());
-    newConfig.basePath = cfg.basePath;
-    newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
-    newConfig.databaseName = cfg.databaseName;
-    newConfig.hivePass = cfg.hivePass;
-    newConfig.hiveUser = cfg.hiveUser;
-    newConfig.partitionFields = cfg.partitionFields;
-    newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass;
-    newConfig.jdbcUrl = cfg.jdbcUrl;
-    newConfig.tableName = cfg.tableName;
-    newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
-    newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
-    newConfig.supportTimestamp = cfg.supportTimestamp;
-    newConfig.decodePartition = cfg.decodePartition;
-    newConfig.batchSyncNum = cfg.batchSyncNum;
+    newConfig.hoodieSyncConfigParams.basePath = cfg.hoodieSyncConfigParams.basePath;
+    newConfig.hoodieSyncConfigParams.assumeDatePartitioning = cfg.hoodieSyncConfigParams.assumeDatePartitioning;
+    newConfig.hoodieSyncConfigParams.databaseName = cfg.hoodieSyncConfigParams.databaseName;
+    newConfig.hiveSyncConfigParams.hivePass = cfg.hiveSyncConfigParams.hivePass;
+    newConfig.hiveSyncConfigParams.hiveUser = cfg.hiveSyncConfigParams.hiveUser;
+    newConfig.hoodieSyncConfigParams.partitionFields = cfg.hoodieSyncConfigParams.partitionFields;
+    newConfig.hoodieSyncConfigParams.partitionValueExtractorClass = cfg.hoodieSyncConfigParams.partitionValueExtractorClass;
+    newConfig.hiveSyncConfigParams.jdbcUrl = cfg.hiveSyncConfigParams.jdbcUrl;
+    newConfig.hoodieSyncConfigParams.tableName = cfg.hoodieSyncConfigParams.tableName;
+    newConfig.hiveSyncConfigParams.usePreApacheInputFormat = cfg.hiveSyncConfigParams.usePreApacheInputFormat;
+    newConfig.hoodieSyncConfigParams.useFileListingFromMetadata = cfg.hoodieSyncConfigParams.useFileListingFromMetadata;
+    newConfig.hiveSyncConfigParams.supportTimestamp = cfg.hiveSyncConfigParams.supportTimestamp;
+    newConfig.hoodieSyncConfigParams.decodePartition = cfg.hoodieSyncConfigParams.decodePartition;
+    newConfig.hiveSyncConfigParams.batchSyncNum = cfg.hiveSyncConfigParams.batchSyncNum;
     newConfig.globallyReplicatedTimeStamp = cfg.globallyReplicatedTimeStamp;
     return newConfig;
   }
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java
index a7d205962e..92c9631ab2 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java
@@ -80,7 +80,7 @@ public class GlobalHiveSyncTool extends HiveSyncTool {
   }
 
   public static GlobalHiveSyncTool buildGlobalHiveSyncTool(GlobalHiveSyncConfig cfg, HiveConf hiveConf) {
-    FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
+    FileSystem fs = FSUtils.getFs(cfg.hoodieSyncConfigParams.basePath, new Configuration());
     hiveConf.addResource(fs.getConf());
     return new GlobalHiveSyncTool(cfg, hiveConf, fs);
   }
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java
index c3dd2af346..917bc1e123 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java
@@ -74,12 +74,12 @@ public class HiveSyncGlobalCommitConfig extends GlobalHiveSyncConfig {
 
   GlobalHiveSyncConfig mkGlobalHiveSyncConfig(boolean forRemote) {
     GlobalHiveSyncConfig cfg = GlobalHiveSyncConfig.copy(this);
-    cfg.basePath = forRemote ? properties.getProperty(REMOTE_BASE_PATH)
-        : properties.getProperty(LOCAL_BASE_PATH, cfg.basePath);
-    cfg.jdbcUrl = forRemote ? properties.getProperty(REMOTE_HIVE_SERVER_JDBC_URLS)
-        : properties.getProperty(LOCAL_HIVE_SERVER_JDBC_URLS, cfg.jdbcUrl);
-    LOG.info("building hivesync config forRemote: " + forRemote + " " + cfg.jdbcUrl + " "
-        + cfg.basePath);
+    cfg.hoodieSyncConfigParams.basePath = forRemote ? properties.getProperty(REMOTE_BASE_PATH)
+            : properties.getProperty(LOCAL_BASE_PATH, cfg.hoodieSyncConfigParams.basePath);
+    cfg.hiveSyncConfigParams.jdbcUrl = forRemote ? properties.getProperty(REMOTE_HIVE_SERVER_JDBC_URLS)
+            : properties.getProperty(LOCAL_HIVE_SERVER_JDBC_URLS, cfg.hiveSyncConfigParams.jdbcUrl);
+    LOG.info("building hivesync config forRemote: " + forRemote + " " + cfg.hiveSyncConfigParams.jdbcUrl + " "
+        + cfg.hoodieSyncConfigParams.basePath);
     return cfg;
   }
 
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java
index a194eeb2e9..4d15d8db1e 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java
@@ -104,7 +104,7 @@ public class HiveSyncGlobalCommitTool implements HiveSyncGlobalCommit, AutoClose
       throws IOException {
     HiveSyncGlobalCommitConfig cfg = new HiveSyncGlobalCommitConfig();
     JCommander cmd = new JCommander(cfg, null, args);
-    if (cfg.help || args.length == 0) {
+    if (cfg.hiveSyncConfigParams.help || args.length == 0) {
       cmd.usage();
       System.exit(1);
     }
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java
index 0258cfc5ef..e17fe8af22 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HivePartitionUtil.java
@@ -40,18 +40,18 @@ public class HivePartitionUtil {
    */
   public static String getPartitionClauseForDrop(String partition, PartitionValueExtractor partitionValueExtractor, HiveSyncConfig config) {
     List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
-    ValidationUtils.checkArgument(config.partitionFields.size() == partitionValues.size(),
-        "Partition key parts " + config.partitionFields + " does not match with partition values " + partitionValues
+    ValidationUtils.checkArgument(config.hoodieSyncConfigParams.partitionFields.size() == partitionValues.size(),
+        "Partition key parts " + config.hoodieSyncConfigParams.partitionFields + " does not match with partition values " + partitionValues
             + ". Check partition strategy. ");
     List<String> partBuilder = new ArrayList<>();
-    for (int i = 0; i < config.partitionFields.size(); i++) {
+    for (int i = 0; i < config.hoodieSyncConfigParams.partitionFields.size(); i++) {
       String partitionValue = partitionValues.get(i);
       // decode the partition before sync to hive to prevent multiple escapes of HIVE
-      if (config.decodePartition) {
+      if (config.hoodieSyncConfigParams.decodePartition) {
         // This is a decode operator for encode in KeyGenUtils#getRecordPartitionPath
         partitionValue = PartitionPathEncodeUtils.unescapePathName(partitionValue);
       }
-      partBuilder.add(config.partitionFields.get(i) + "=" + partitionValue);
+      partBuilder.add(config.hoodieSyncConfigParams.partitionFields.get(i) + "=" + partitionValue);
     }
     return String.join("/", partBuilder);
   }
@@ -61,7 +61,7 @@ public class HivePartitionUtil {
     Partition newPartition;
     try {
       List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partitionPath);
-      newPartition = client.getPartition(config.databaseName, tableName, partitionValues);
+      newPartition = client.getPartition(config.hoodieSyncConfigParams.databaseName, tableName, partitionValues);
     } catch (NoSuchObjectException ignored) {
       newPartition = null;
     } catch (TException e) {
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
index 2d700596f0..85faa012d2 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java
@@ -156,7 +156,7 @@ public class HiveSchemaUtil {
    * @return : Hive Table schema read from parquet file List[FieldSchema] without partitionField
    */
   public static List<FieldSchema> convertParquetSchemaToHiveFieldSchema(MessageType messageType, HiveSyncConfig syncConfig) throws IOException {
-    return convertMapSchemaToHiveFieldSchema(parquetSchemaToMapSchema(messageType, syncConfig.supportTimestamp, false), syncConfig);
+    return convertMapSchemaToHiveFieldSchema(parquetSchemaToMapSchema(messageType, syncConfig.hiveSyncConfigParams.supportTimestamp, false), syncConfig);
   }
 
   /**
@@ -202,7 +202,7 @@ public class HiveSchemaUtil {
   public static List<FieldSchema> convertMapSchemaToHiveFieldSchema(LinkedHashMap<String, String> schema, HiveSyncConfig syncConfig) throws IOException {
     return schema.keySet().stream()
         .map(key -> new FieldSchema(key, schema.get(key).toLowerCase(), ""))
-        .filter(field -> !syncConfig.partitionFields.contains(field.getName()))
+        .filter(field -> !syncConfig.hoodieSyncConfigParams.partitionFields.contains(field.getName()))
         .collect(Collectors.toList());
   }
 
@@ -448,11 +448,11 @@ public class HiveSchemaUtil {
   public static String generateCreateDDL(String tableName, MessageType storageSchema, HiveSyncConfig config, String inputFormatClass,
                                          String outputFormatClass, String serdeClass, Map<String, String> serdeProperties,
                                          Map<String, String> tableProperties) throws IOException {
-    Map<String, String> hiveSchema = convertParquetSchemaToHiveSchema(storageSchema, config.supportTimestamp);
-    String columns = generateSchemaString(storageSchema, config.partitionFields, config.supportTimestamp);
+    Map<String, String> hiveSchema = convertParquetSchemaToHiveSchema(storageSchema, config.hiveSyncConfigParams.supportTimestamp);
+    String columns = generateSchemaString(storageSchema, config.hoodieSyncConfigParams.partitionFields, config.hiveSyncConfigParams.supportTimestamp);
 
     List<String> partitionFields = new ArrayList<>();
-    for (String partitionKey : config.partitionFields) {
+    for (String partitionKey : config.hoodieSyncConfigParams.partitionFields) {
       String partitionKeyWithTicks = tickSurround(partitionKey);
       partitionFields.add(new StringBuilder().append(partitionKeyWithTicks).append(" ")
           .append(getPartitionKeyType(hiveSchema, partitionKeyWithTicks)).toString());
@@ -460,26 +460,26 @@ public class HiveSchemaUtil {
 
     String partitionsStr = String.join(",", partitionFields);
     StringBuilder sb = new StringBuilder();
-    if (config.createManagedTable) {
+    if (config.hiveSyncConfigParams.createManagedTable) {
       sb.append("CREATE TABLE IF NOT EXISTS ");
     } else {
       sb.append("CREATE EXTERNAL TABLE IF NOT EXISTS ");
     }
-    sb.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName).append(HIVE_ESCAPE_CHARACTER)
+    sb.append(HIVE_ESCAPE_CHARACTER).append(config.hoodieSyncConfigParams.databaseName).append(HIVE_ESCAPE_CHARACTER)
             .append(".").append(HIVE_ESCAPE_CHARACTER).append(tableName).append(HIVE_ESCAPE_CHARACTER);
     sb.append("( ").append(columns).append(")");
-    if (!config.partitionFields.isEmpty()) {
+    if (!config.hoodieSyncConfigParams.partitionFields.isEmpty()) {
       sb.append(" PARTITIONED BY (").append(partitionsStr).append(")");
     }
-    if (config.bucketSpec != null) {
-      sb.append(' ' + config.bucketSpec + ' ');
+    if (config.hiveSyncConfigParams.bucketSpec != null) {
+      sb.append(' ' + config.hiveSyncConfigParams.bucketSpec + ' ');
     }
     sb.append(" ROW FORMAT SERDE '").append(serdeClass).append("'");
     if (serdeProperties != null && !serdeProperties.isEmpty()) {
       sb.append(" WITH SERDEPROPERTIES (").append(propertyToString(serdeProperties)).append(")");
     }
     sb.append(" STORED AS INPUTFORMAT '").append(inputFormatClass).append("'");
-    sb.append(" OUTPUTFORMAT '").append(outputFormatClass).append("' LOCATION '").append(config.basePath).append("'");
+    sb.append(" OUTPUTFORMAT '").append(outputFormatClass).append("' LOCATION '").append(config.hoodieSyncConfigParams.basePath).append("'");
 
     if (tableProperties != null && !tableProperties.isEmpty()) {
       sb.append(" TBLPROPERTIES(").append(propertyToString(tableProperties)).append(")");
diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java
index 937243393f..aa1c606205 100644
--- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java
+++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java
@@ -57,22 +57,22 @@ public class TestHiveSyncGlobalCommitTool {
     config.properties.setProperty(LOCAL_BASE_PATH, localCluster.tablePath(dbName, tblName));
     config.properties.setProperty(REMOTE_BASE_PATH, remoteCluster.tablePath(dbName, tblName));
     config.globallyReplicatedTimeStamp = commitTime;
-    config.hiveUser = System.getProperty("user.name");
-    config.hivePass = "";
-    config.databaseName = dbName;
-    config.tableName = tblName;
-    config.basePath = localCluster.tablePath(dbName, tblName);
-    config.assumeDatePartitioning = true;
-    config.usePreApacheInputFormat = false;
-    config.partitionFields = Collections.singletonList("datestr");
+    config.hiveSyncConfigParams.hiveUser = System.getProperty("user.name");
+    config.hiveSyncConfigParams.hivePass = "";
+    config.hoodieSyncConfigParams.databaseName = dbName;
+    config.hoodieSyncConfigParams.tableName = tblName;
+    config.hoodieSyncConfigParams.basePath = localCluster.tablePath(dbName, tblName);
+    config.hoodieSyncConfigParams.assumeDatePartitioning = true;
+    config.hiveSyncConfigParams.usePreApacheInputFormat = false;
+    config.hoodieSyncConfigParams.partitionFields = Collections.singletonList("datestr");
     return config;
   }
 
   private void compareEqualLastReplicatedTimeStamp(HiveSyncGlobalCommitConfig config) throws Exception {
     Assertions.assertEquals(localCluster.getHMSClient()
-        .getTable(config.databaseName, config.tableName).getParameters()
+        .getTable(config.hoodieSyncConfigParams.databaseName, config.hoodieSyncConfigParams.tableName).getParameters()
         .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), remoteCluster.getHMSClient()
-        .getTable(config.databaseName, config.tableName).getParameters()
+        .getTable(config.hoodieSyncConfigParams.databaseName, config.hoodieSyncConfigParams.tableName).getParameters()
         .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), "compare replicated timestamps");
   }
 
@@ -116,11 +116,11 @@ public class TestHiveSyncGlobalCommitTool {
     remoteCluster.stopHiveServer2();
     Assertions.assertFalse(tool.commit());
     Assertions.assertEquals(commitTime, localCluster.getHMSClient()
-        .getTable(config.databaseName, config.tableName).getParameters()
+        .getTable(config.hoodieSyncConfigParams.databaseName, config.hoodieSyncConfigParams.tableName).getParameters()
         .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
     Assertions.assertTrue(tool.rollback()); // do a rollback
     Assertions.assertNotEquals(commitTime, localCluster.getHMSClient()
-        .getTable(config.databaseName, config.tableName).getParameters()
+        .getTable(config.hoodieSyncConfigParams.databaseName, config.hoodieSyncConfigParams.tableName).getParameters()
         .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
     Assertions.assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
     remoteCluster.startHiveServer2();
diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 167c35a124..3377917ffe 100644
--- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -29,8 +29,8 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.hive.testutils.HiveTestUtil;
 import org.apache.hudi.sync.common.util.ConfigUtils;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
+import org.apache.hudi.sync.common.HoodieSyncClient.PartitionEvent;
+import org.apache.hudi.sync.common.HoodieSyncClient.PartitionEvent.PartitionEventType;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveSyncFunctionalTestHarness.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveSyncFunctionalTestHarness.java
index b6adcb2982..0de27d3267 100644
--- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveSyncFunctionalTestHarness.java
+++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveSyncFunctionalTestHarness.java
@@ -80,32 +80,32 @@ public class HiveSyncFunctionalTestHarness {
 
   public HiveSyncConfig hiveSyncConf() throws IOException {
     HiveSyncConfig conf = new HiveSyncConfig();
-    conf.jdbcUrl = hiveTestService.getJdbcHive2Url();
-    conf.hiveUser = "";
-    conf.hivePass = "";
-    conf.databaseName = "hivesynctestdb";
-    conf.tableName = "hivesynctesttable";
-    conf.basePath = Files.createDirectories(tempDir.resolve("hivesynctestcase-" + Instant.now().toEpochMilli())).toUri().toString();
-    conf.assumeDatePartitioning = true;
-    conf.usePreApacheInputFormat = false;
-    conf.partitionFields = Collections.singletonList("datestr");
+    conf.hiveSyncConfigParams.jdbcUrl = hiveTestService.getJdbcHive2Url();
+    conf.hiveSyncConfigParams.hiveUser = "";
+    conf.hiveSyncConfigParams.hivePass = "";
+    conf.hoodieSyncConfigParams.databaseName = "hivesynctestdb";
+    conf.hoodieSyncConfigParams.tableName = "hivesynctesttable";
+    conf.hoodieSyncConfigParams.basePath = Files.createDirectories(tempDir.resolve("hivesynctestcase-" + Instant.now().toEpochMilli())).toUri().toString();
+    conf.hoodieSyncConfigParams.assumeDatePartitioning = true;
+    conf.hiveSyncConfigParams.usePreApacheInputFormat = false;
+    conf.hoodieSyncConfigParams.partitionFields = Collections.singletonList("datestr");
     return conf;
   }
 
   public HoodieHiveClient hiveClient(HiveSyncConfig hiveSyncConfig) throws IOException {
     HoodieTableMetaClient.withPropertyBuilder()
         .setTableType(HoodieTableType.COPY_ON_WRITE)
-        .setTableName(hiveSyncConfig.tableName)
+        .setTableName(hiveSyncConfig.hoodieSyncConfigParams.tableName)
         .setPayloadClass(HoodieAvroPayload.class)
-        .initTable(hadoopConf, hiveSyncConfig.basePath);
+        .initTable(hadoopConf, hiveSyncConfig.hoodieSyncConfigParams.basePath);
     return new HoodieHiveClient(hiveSyncConfig, hiveConf(), fs());
   }
 
   public void dropTables(String database, String... tables) throws IOException, HiveException, MetaException {
     HiveSyncConfig hiveSyncConfig = hiveSyncConf();
-    hiveSyncConfig.databaseName = database;
+    hiveSyncConfig.hoodieSyncConfigParams.databaseName = database;
     for (String table : tables) {
-      hiveSyncConfig.tableName = table;
+      hiveSyncConfig.hoodieSyncConfigParams.tableName = table;
       new HiveQueryDDLExecutor(hiveSyncConfig, fs(), hiveConf()).runSQL("drop table if exists " + table);
     }
   }
@@ -113,7 +113,7 @@ public class HiveSyncFunctionalTestHarness {
   public void dropDatabases(String... databases) throws IOException, HiveException, MetaException {
     HiveSyncConfig hiveSyncConfig = hiveSyncConf();
     for (String database : databases) {
-      hiveSyncConfig.databaseName = database;
+      hiveSyncConfig.hoodieSyncConfigParams.databaseName = database;
       new HiveQueryDDLExecutor(hiveSyncConfig, fs(), hiveConf()).runSQL("drop database if exists " + database);
     }
   }
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
similarity index 58%
rename from hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
rename to hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index 8eec327890..54e151dd69 100644
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -18,15 +18,12 @@
 
 package org.apache.hudi.sync.common;
 
-import java.io.Serializable;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -34,21 +31,18 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.parquet.schema.MessageType;
 
-public abstract class AbstractSyncHoodieClient implements AutoCloseable {
+public abstract class HoodieSyncClient implements AutoCloseable {
 
-  private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class);
+  private static final Logger LOG = LogManager.getLogger(HoodieSyncClient.class);
 
   public static final String HOODIE_LAST_COMMIT_TIME_SYNC = "last_commit_time_sync";
-  public static final TypeConverter TYPE_CONVERTOR = new TypeConverter() {};
 
   protected final HoodieTableMetaClient metaClient;
   protected final HoodieTableType tableType;
@@ -59,13 +53,13 @@ public abstract class AbstractSyncHoodieClient implements AutoCloseable {
   private final boolean withOperationField;
 
   @Deprecated
-  public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, boolean useFileListingFromMetadata,
-                                  boolean verifyMetadataFileListing, boolean withOperationField, FileSystem fs) {
+  public HoodieSyncClient(String basePath, boolean assumeDatePartitioning, boolean useFileListingFromMetadata,
+                          boolean verifyMetadataFileListing, boolean withOperationField, FileSystem fs) {
     this(basePath, assumeDatePartitioning, useFileListingFromMetadata, withOperationField, fs);
   }
 
-  public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, boolean useFileListingFromMetadata,
-                                  boolean withOperationField, FileSystem fs) {
+  public HoodieSyncClient(String basePath, boolean assumeDatePartitioning, boolean useFileListingFromMetadata,
+                          boolean withOperationField, FileSystem fs) {
     this.metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
     this.tableType = metaClient.getTableType();
     this.basePath = basePath;
@@ -75,47 +69,6 @@ public abstract class AbstractSyncHoodieClient implements AutoCloseable {
     this.fs = fs;
   }
 
-  /**
-   * Create the table.
-   * @param tableName The table name.
-   * @param storageSchema The table schema.
-   * @param inputFormatClass The input format class of this table.
-   * @param outputFormatClass The output format class of this table.
-   * @param serdeClass The serde class of this table.
-   * @param serdeProperties The serde properties of this table.
-   * @param tableProperties The table properties for this table.
-   */
-  public abstract void createTable(String tableName, MessageType storageSchema,
-                                   String inputFormatClass, String outputFormatClass,
-                                   String serdeClass, Map<String, String> serdeProperties,
-                                   Map<String, String> tableProperties);
-
-  /**
-   * @deprecated Use {@link #tableExists} instead.
-   */
-  @Deprecated
-  public abstract boolean doesTableExist(String tableName);
-
-  public abstract boolean tableExists(String tableName);
-
-  public abstract Option<String> getLastCommitTimeSynced(String tableName);
-
-  public abstract void updateLastCommitTimeSynced(String tableName);
-
-  public abstract Option<String> getLastReplicatedTime(String tableName);
-
-  public abstract void updateLastReplicatedTimeStamp(String tableName, String timeStamp);
-
-  public abstract void deleteLastReplicatedTimeStamp(String tableName);
-
-  public abstract void addPartitionsToTable(String tableName, List<String> partitionsToAdd);
-
-  public abstract void updatePartitionsToTable(String tableName, List<String> changedPartitions);
-
-  public abstract void dropPartitions(String tableName, List<String> partitionsToDrop);
-
-  public  void updateTableProperties(String tableName, Map<String, String> tableProperties) {}
-
   public abstract Map<String, String> getTableSchema(String tableName);
 
   public HoodieTableType getTableType() {
@@ -194,56 +147,6 @@ public abstract class AbstractSyncHoodieClient implements AutoCloseable {
     }
   }
 
-  public abstract static class TypeConverter implements Serializable {
-
-    static final String DEFAULT_TARGET_TYPE = "DECIMAL";
-
-    protected String targetType;
-
-    public TypeConverter() {
-      this.targetType = DEFAULT_TARGET_TYPE;
-    }
-
-    public TypeConverter(String targetType) {
-      ValidationUtils.checkArgument(Objects.nonNull(targetType));
-      this.targetType = targetType;
-    }
-
-    public void doConvert(ResultSet resultSet, Map<String, String> schema) throws SQLException {
-      schema.put(getColumnName(resultSet), targetType.equalsIgnoreCase(getColumnType(resultSet))
-                ? convert(resultSet) : getColumnType(resultSet));
-    }
-
-    public String convert(ResultSet resultSet) throws SQLException {
-      String columnType = getColumnType(resultSet);
-      int columnSize = resultSet.getInt("COLUMN_SIZE");
-      int decimalDigits = resultSet.getInt("DECIMAL_DIGITS");
-      return columnType + String.format("(%s,%s)", columnSize, decimalDigits);
-    }
-
-    public String getColumnName(ResultSet resultSet) throws SQLException {
-      return resultSet.getString(4);
-    }
-
-    public String getColumnType(ResultSet resultSet) throws SQLException {
-      return resultSet.getString(6);
-    }
-  }
-
-  /**
-   * Read the schema from the log file on path.
-   */
-  @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
-  private MessageType readSchemaFromLogFile(Option<HoodieInstant> lastCompactionCommitOpt, Path path) throws Exception {
-    MessageType messageType = TableSchemaResolver.readSchemaFromLogFile(fs, path);
-    // Fall back to read the schema from last compaction
-    if (messageType == null) {
-      LOG.info("Falling back to read the schema from last compaction " + lastCompactionCommitOpt);
-      return new TableSchemaResolver(this.metaClient).readSchemaFromLastCompaction(lastCompactionCommitOpt);
-    }
-    return messageType;
-  }
-
   /**
    * Partition Event captures any partition that needs to be added or updated.
    */
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
index dc2b21ba45..6296958907 100644
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java
@@ -28,6 +28,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 import com.beust.jcommander.Parameter;
 
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.function.Function;
@@ -37,40 +38,7 @@ import java.util.function.Function;
  */
 public class HoodieSyncConfig extends HoodieConfig {
 
-  @Parameter(names = {"--database"}, description = "name of the target database in meta store", required = true)
-  public String databaseName;
-
-  @Parameter(names = {"--table"}, description = "name of the target table in meta store", required = true)
-  public String tableName;
-
-  @Parameter(names = {"--base-path"}, description = "Base path of the hoodie table to sync", required = true)
-  public String basePath;
-
-  @Parameter(names = {"--base-file-format"}, description = "Format of the base files (PARQUET (or) HFILE)")
-  public String baseFileFormat;
-
-  @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by")
-  public List<String> partitionFields;
-
-  @Parameter(names = "--partition-value-extractor", description = "Class which implements PartitionValueExtractor "
-      + "to extract the partition values from HDFS path")
-  public String partitionValueExtractorClass;
-
-  @Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this"
-      + " exists to support backward compatibility. If you use hoodie 0.3.x, do not set this parameter")
-  public Boolean assumeDatePartitioning;
-
-  @Parameter(names = {"--decode-partition"}, description = "Decode the partition value if the partition has encoded during writing")
-  public Boolean decodePartition;
-
-  @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata")
-  public Boolean useFileListingFromMetadata;
-
-  @Parameter(names = {"--conditional-sync"}, description = "If true, only sync on conditions like schema change or partition change.")
-  public Boolean isConditionalSync;
-
-  @Parameter(names = {"--spark-version"}, description = "The spark version")
-  public String sparkVersion;
+  public final HoodieSyncConfigParams hoodieSyncConfigParams = new HoodieSyncConfigParams();
 
   public static final ConfigProperty<String> META_SYNC_BASE_PATH = ConfigProperty
       .key("hoodie.datasource.meta.sync.base.path")
@@ -169,20 +137,50 @@ public class HoodieSyncConfig extends HoodieConfig {
     super(props);
     setDefaults();
 
-    this.basePath = getStringOrDefault(META_SYNC_BASE_PATH);
-    this.databaseName = getStringOrDefault(META_SYNC_DATABASE_NAME);
-    this.tableName = getStringOrDefault(META_SYNC_TABLE_NAME);
-    this.baseFileFormat = getStringOrDefault(META_SYNC_BASE_FILE_FORMAT);
-    this.partitionFields = props.getStringList(META_SYNC_PARTITION_FIELDS.key(), ",", Collections.emptyList());
-    this.partitionValueExtractorClass = getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS);
-    this.assumeDatePartitioning = getBooleanOrDefault(META_SYNC_ASSUME_DATE_PARTITION);
-    this.decodePartition = getBooleanOrDefault(KeyGeneratorOptions.URL_ENCODE_PARTITIONING);
-    this.useFileListingFromMetadata = getBooleanOrDefault(META_SYNC_USE_FILE_LISTING_FROM_METADATA);
-    this.isConditionalSync = getBooleanOrDefault(META_SYNC_CONDITIONAL_SYNC);
-    this.sparkVersion = getStringOrDefault(META_SYNC_SPARK_VERSION);
+    this.hoodieSyncConfigParams.basePath = getStringOrDefault(META_SYNC_BASE_PATH);
+    this.hoodieSyncConfigParams.databaseName = getStringOrDefault(META_SYNC_DATABASE_NAME);
+    this.hoodieSyncConfigParams.tableName = getStringOrDefault(META_SYNC_TABLE_NAME);
+    this.hoodieSyncConfigParams.baseFileFormat = getStringOrDefault(META_SYNC_BASE_FILE_FORMAT);
+    this.hoodieSyncConfigParams.partitionFields = props.getStringList(META_SYNC_PARTITION_FIELDS.key(), ",", Collections.emptyList());
+    this.hoodieSyncConfigParams.partitionValueExtractorClass = getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS);
+    this.hoodieSyncConfigParams.assumeDatePartitioning = getBooleanOrDefault(META_SYNC_ASSUME_DATE_PARTITION);
+    this.hoodieSyncConfigParams.decodePartition = getBooleanOrDefault(KeyGeneratorOptions.URL_ENCODE_PARTITIONING);
+    this.hoodieSyncConfigParams.useFileListingFromMetadata = getBooleanOrDefault(META_SYNC_USE_FILE_LISTING_FROM_METADATA);
+    this.hoodieSyncConfigParams.isConditionalSync = getBooleanOrDefault(META_SYNC_CONDITIONAL_SYNC);
+    this.hoodieSyncConfigParams.sparkVersion = getStringOrDefault(META_SYNC_SPARK_VERSION);
   }
 
   protected void setDefaults() {
     this.setDefaultValue(META_SYNC_TABLE_NAME);
   }
+
+  public static class HoodieSyncConfigParams implements Serializable {
+    @Parameter(names = {"--database"}, description = "name of the target database in meta store", required = true)
+    public String databaseName;
+    @Parameter(names = {"--table"}, description = "name of the target table in meta store", required = true)
+    public String tableName;
+    @Parameter(names = {"--base-path"}, description = "Base path of the hoodie table to sync", required = true)
+    public String basePath;
+    @Parameter(names = {"--base-file-format"}, description = "Format of the base files (PARQUET (or) HFILE)")
+    public String baseFileFormat;
+    @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by")
+    public List<String> partitionFields;
+    @Parameter(names = "--partition-value-extractor", description = "Class which implements PartitionValueExtractor "
+            + "to extract the partition values from HDFS path")
+    public String partitionValueExtractorClass;
+    @Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this"
+            + " exists to support backward compatibility. If you use hoodie 0.3.x, do not set this parameter")
+    public Boolean assumeDatePartitioning;
+    @Parameter(names = {"--decode-partition"}, description = "Decode the partition value if the partition has encoded during writing")
+    public Boolean decodePartition;
+    @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata")
+    public Boolean useFileListingFromMetadata;
+    @Parameter(names = {"--conditional-sync"}, description = "If true, only sync on conditions like schema change or partition change.")
+    public Boolean isConditionalSync;
+    @Parameter(names = {"--spark-version"}, description = "The spark version")
+    public String sparkVersion;
+
+    public HoodieSyncConfigParams() {
+    }
+  }
 }
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java
new file mode 100644
index 0000000000..754d142695
--- /dev/null
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncTool.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common;
+
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.util.Properties;
+
+/**
+ * Base class to sync Hudi meta data with Metastores to make
+ * Hudi table queryable through external systems.
+ */
+public abstract class HoodieSyncTool {
+  protected final Configuration conf;
+  protected final FileSystem fs;
+  protected TypedProperties props;
+
+  public HoodieSyncTool(TypedProperties props, Configuration conf, FileSystem fs) {
+    this.props = props;
+    this.conf = conf;
+    this.fs = fs;
+  }
+
+  @Deprecated
+  public HoodieSyncTool(Properties props, FileSystem fileSystem) {
+    this(new TypedProperties(props), fileSystem.getConf(), fileSystem);
+  }
+
+  public abstract void syncHoodieTable();
+
+}
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/SupportMetaSync.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/SupportMetaSync.java
new file mode 100644
index 0000000000..4e29a57a08
--- /dev/null
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/SupportMetaSync.java
@@ -0,0 +1,7 @@
+package org.apache.hudi.sync.common;
+
+public interface SupportMetaSync {
+  default void runMetaSync() {
+
+  }
+}
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/CatalogSync.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/CatalogSync.java
new file mode 100644
index 0000000000..fc5e093d7e
--- /dev/null
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/CatalogSync.java
@@ -0,0 +1,32 @@
+package org.apache.hudi.sync.common.operation;
+
+import org.apache.parquet.schema.MessageType;
+
+import java.util.Map;
+
+public interface CatalogSync {
+  /**
+   * Create the table.
+   *
+   * @param tableName         The table name.
+   * @param storageSchema     The table schema.
+   * @param inputFormatClass  The input format class of this table.
+   * @param outputFormatClass The output format class of this table.
+   * @param serdeClass        The serde class of this table.
+   * @param serdeProperties   The serde properties of this table.
+   * @param tableProperties   The table properties for this table.
+   */
+  void createTable(String tableName, MessageType storageSchema,
+                   String inputFormatClass, String outputFormatClass,
+                   String serdeClass, Map<String, String> serdeProperties,
+                   Map<String, String> tableProperties);
+
+  /**
+   * @deprecated Use {@link #tableExists} instead.
+   */
+  @Deprecated
+  boolean doesTableExist(String tableName);
+
+  boolean tableExists(String tableName);
+
+}
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/PartitionsSync.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/PartitionsSync.java
new file mode 100644
index 0000000000..c5b1edd4cd
--- /dev/null
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/PartitionsSync.java
@@ -0,0 +1,11 @@
+package org.apache.hudi.sync.common.operation;
+
+import java.util.List;
+
+public interface PartitionsSync {
+  void addPartitionsToTable(String tableName, List<String> partitionsToAdd);
+
+  void updatePartitionsToTable(String tableName, List<String> changedPartitions);
+
+  void dropPartitions(String tableName, List<String> partitionsToDrop);
+}
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/ReplicatedTimeSync.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/ReplicatedTimeSync.java
new file mode 100644
index 0000000000..a93a9481ce
--- /dev/null
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/ReplicatedTimeSync.java
@@ -0,0 +1,11 @@
+package org.apache.hudi.sync.common.operation;
+
+import org.apache.hudi.common.util.Option;
+
+public interface ReplicatedTimeSync {
+  Option<String> getLastReplicatedTime(String tableName);
+
+  void updateLastReplicatedTimeStamp(String tableName, String timeStamp);
+
+  void deleteLastReplicatedTimeStamp(String tableName);
+}
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/TblPropertiesSync.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/TblPropertiesSync.java
new file mode 100644
index 0000000000..c145e95532
--- /dev/null
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/operation/TblPropertiesSync.java
@@ -0,0 +1,13 @@
+package org.apache.hudi.sync.common.operation;
+
+import org.apache.hudi.common.util.Option;
+
+import java.util.Map;
+
+public interface TblPropertiesSync {
+  Option<String> getLastCommitTimeSynced(String tableName);
+
+  void updateLastCommitTimeSynced(String tableName);
+
+  void updateTableProperties(String tableName, Map<String, String> tableProperties);
+}
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkDataSourceTableUtils.java
similarity index 63%
rename from hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java
rename to hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkDataSourceTableUtils.java
index 972ae1f96c..bcf572e088 100644
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SparkDataSourceTableUtils.java
@@ -1,29 +1,6 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+package org.apache.hudi.sync.common.util;
 
-package org.apache.hudi.sync.common;
-
-import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.sync.common.util.ConfigUtils;
-import org.apache.hudi.sync.common.util.Parquet2SparkSchemaUtils;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
@@ -33,40 +10,18 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 
 import static org.apache.parquet.schema.OriginalType.UTF8;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 
-/**
- * Base class to sync Hudi meta data with Metastores to make
- * Hudi table queryable through external systems.
- */
-public abstract class AbstractSyncTool {
-  protected final Configuration conf;
-  protected final FileSystem fs;
-  protected TypedProperties props;
-
-  public AbstractSyncTool(TypedProperties props, Configuration conf, FileSystem fs) {
-    this.props = props;
-    this.conf = conf;
-    this.fs = fs;
-  }
-
-  @Deprecated
-  public AbstractSyncTool(Properties props, FileSystem fileSystem) {
-    this(new TypedProperties(props), fileSystem.getConf(), fileSystem);
-  }
-
-  public abstract void syncHoodieTable();
-
+public class SparkDataSourceTableUtils {
   /**
    * Get Spark Sql related table properties. This is used for spark datasource table.
    * @param schema  The schema to write to the table.
    * @return A new parameters added the spark's table properties.
    */
-  protected Map<String, String> getSparkTableProperties(List<String> partitionNames, String sparkVersion,
-                                                      int schemaLengthThreshold, MessageType schema)  {
+  public static Map<String, String> getSparkTableProperties(List<String> partitionNames, String sparkVersion,
+                                                            int schemaLengthThreshold, MessageType schema)  {
     // Convert the schema and partition info used by spark sql to hive table properties.
     // The following code refers to the spark code in
     // https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -122,7 +77,7 @@ public abstract class AbstractSyncTool {
     return sparkProperties;
   }
 
-  protected Map<String, String> getSparkSerdeProperties(boolean readAsOptimized, String basePath) {
+  public static Map<String, String> getSparkSerdeProperties(boolean readAsOptimized, String basePath) {
     Map<String, String> sparkSerdeProperties = new HashMap<>();
     sparkSerdeProperties.put("path", basePath);
     sparkSerdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(readAsOptimized));
diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
index def85c5b80..39cc56c04c 100644
--- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
+++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
@@ -22,7 +22,7 @@ package org.apache.hudi.sync.common.util;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.hudi.sync.common.HoodieSyncTool;
 import org.apache.hudi.sync.common.HoodieSyncConfig;
 
 import org.apache.hadoop.conf.Configuration;
@@ -39,7 +39,7 @@ public class SyncUtilHelpers {
   private static final Logger LOG = LogManager.getLogger(SyncUtilHelpers.class);
 
   /**
-   * Create an instance of an implementation of {@link AbstractSyncTool} that will sync all the relevant meta information
+   * Create an instance of an implementation of {@link HoodieSyncTool} that will sync all the relevant meta information
    * with an external metastore such as Hive etc. to ensure Hoodie tables can be queried or read via external systems.
    *
    * @param metaSyncFQCN  The class that implements the sync of the metadata.
@@ -62,12 +62,12 @@ public class SyncUtilHelpers {
     }
   }
 
-  static AbstractSyncTool instantiateMetaSyncTool(String metaSyncFQCN,
-                                                 TypedProperties props,
-                                                 Configuration hadoopConfig,
-                                                 FileSystem fs,
-                                                 String targetBasePath,
-                                                 String baseFileFormat) {
+  static HoodieSyncTool instantiateMetaSyncTool(String metaSyncFQCN,
+                                                TypedProperties props,
+                                                Configuration hadoopConfig,
+                                                FileSystem fs,
+                                                String targetBasePath,
+                                                String baseFileFormat) {
     TypedProperties properties = new TypedProperties();
     properties.putAll(props);
     properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), targetBasePath);
@@ -75,13 +75,13 @@ public class SyncUtilHelpers {
 
     if (ReflectionUtils.hasConstructor(metaSyncFQCN,
         new Class<?>[] {TypedProperties.class, Configuration.class, FileSystem.class})) {
-      return ((AbstractSyncTool) ReflectionUtils.loadClass(metaSyncFQCN,
+      return ((HoodieSyncTool) ReflectionUtils.loadClass(metaSyncFQCN,
           new Class<?>[] {TypedProperties.class, Configuration.class, FileSystem.class},
           properties, hadoopConfig, fs));
     } else {
       LOG.warn("Falling back to deprecated constructor for class: " + metaSyncFQCN);
       try {
-        return ((AbstractSyncTool) ReflectionUtils.loadClass(metaSyncFQCN,
+        return ((HoodieSyncTool) ReflectionUtils.loadClass(metaSyncFQCN,
             new Class<?>[] {Properties.class, FileSystem.class}, properties, fs));
       } catch (Throwable t) {
         throw new HoodieException("Could not load meta sync class " + metaSyncFQCN, t);
diff --git a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSyncUtilHelpers.java b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSyncUtilHelpers.java
index dc9dee8b42..4e8757d142 100644
--- a/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSyncUtilHelpers.java
+++ b/hudi-sync/hudi-sync-common/src/test/java/org/apache/hudi/sync/common/util/TestSyncUtilHelpers.java
@@ -20,7 +20,7 @@ package org.apache.hudi.sync.common.util;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.hudi.sync.common.HoodieSyncTool;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -48,7 +48,7 @@ public class TestSyncUtilHelpers {
 
   @Test
   public void testCreateValidSyncClass() {
-    AbstractSyncTool metaSyncTool = SyncUtilHelpers.instantiateMetaSyncTool(
+    HoodieSyncTool metaSyncTool = SyncUtilHelpers.instantiateMetaSyncTool(
         ValidMetaSyncClass.class.getName(),
         new TypedProperties(),
         hadoopConf,
@@ -60,13 +60,13 @@ public class TestSyncUtilHelpers {
   }
 
   /**
-   * Ensure it still works for the deprecated constructor of {@link AbstractSyncTool}
+   * Ensure it still works for the deprecated constructor of {@link HoodieSyncTool}
    * as we implemented the fallback.
    */
   @Test
   public void testCreateDeprecatedSyncClass() {
     Properties properties = new Properties();
-    AbstractSyncTool deprecatedMetaSyncClass = SyncUtilHelpers.instantiateMetaSyncTool(
+    HoodieSyncTool deprecatedMetaSyncClass = SyncUtilHelpers.instantiateMetaSyncTool(
         DeprecatedMetaSyncClass.class.getName(),
         new TypedProperties(properties),
         hadoopConf,
@@ -95,7 +95,7 @@ public class TestSyncUtilHelpers {
 
   }
 
-  public static class ValidMetaSyncClass extends AbstractSyncTool {
+  public static class ValidMetaSyncClass extends HoodieSyncTool {
     public ValidMetaSyncClass(TypedProperties props, Configuration conf, FileSystem fs) {
       super(props, conf, fs);
     }
@@ -106,7 +106,7 @@ public class TestSyncUtilHelpers {
     }
   }
 
-  public static class DeprecatedMetaSyncClass extends AbstractSyncTool {
+  public static class DeprecatedMetaSyncClass extends HoodieSyncTool {
     public DeprecatedMetaSyncClass(Properties props, FileSystem fileSystem) {
       super(props, fileSystem);
     }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 736e416162..ef03079d05 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -60,6 +60,7 @@ import org.apache.hudi.keygen.SimpleKeyGenerator;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.metrics.HoodieMetrics;
+import org.apache.hudi.sync.common.SupportMetaSync;
 import org.apache.hudi.sync.common.util.SyncUtilHelpers;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback;
@@ -122,7 +123,7 @@ import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_REC
 /**
  * Sync's one batch of data to hoodie table.
  */
-public class DeltaSync implements Serializable {
+public class DeltaSync implements SupportMetaSync, Serializable {
 
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = LogManager.getLogger(DeltaSync.class);
@@ -629,7 +630,7 @@ public class DeltaSync implements Serializable {
         }
 
         if (!isEmpty) {
-          syncMeta(metrics);
+          runMetaSync();
         }
       } else {
         LOG.info("Commit " + instantTime + " failed!");
@@ -690,7 +691,7 @@ public class DeltaSync implements Serializable {
     return syncClassName.substring(syncClassName.lastIndexOf(".") + 1);
   }
 
-  private void syncMeta(HoodieDeltaStreamerMetrics metrics) {
+  public void runMetaSync() {
     Set<String> syncClientToolClasses = new HashSet<>(Arrays.asList(cfg.syncClientToolClassNames.split(",")));
     // for backward compatibility
     if (cfg.enableHiveSync) {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index ad94ada59b..dcbc98f3a8 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -1355,13 +1355,13 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
 
     // Test Hive integration
     HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath, "hive_trips");
-    hiveSyncConfig.partitionFields = CollectionUtils.createImmutableList("year", "month", "day");
+    hiveSyncConfig.hoodieSyncConfigParams.partitionFields = CollectionUtils.createImmutableList("year", "month", "day");
     HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), dfs);
-    assertTrue(hiveClient.tableExists(hiveSyncConfig.tableName), "Table " + hiveSyncConfig.tableName + " should exist");
-    assertEquals(3, hiveClient.getAllPartitions(hiveSyncConfig.tableName).size(),
+    assertTrue(hiveClient.tableExists(hiveSyncConfig.hoodieSyncConfigParams.tableName), "Table " + hiveSyncConfig.hoodieSyncConfigParams.tableName + " should exist");
+    assertEquals(3, hiveClient.getAllPartitions(hiveSyncConfig.hoodieSyncConfigParams.tableName).size(),
         "Table partitions should match the number of partitions we wrote");
     assertEquals(lastInstantForUpstreamTable,
-        hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
+        hiveClient.getLastCommitTimeSynced(hiveSyncConfig.hoodieSyncConfigParams.tableName).get(),
         "The last commit that was synced should be updated in the TBLPROPERTIES");
   }
 
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index cc93fe4975..6f18ec6a93 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -184,15 +184,15 @@ public class UtilitiesTestBase {
    */
   protected static HiveSyncConfig getHiveSyncConfig(String basePath, String tableName) {
     HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
-    hiveSyncConfig.jdbcUrl = "jdbc:hive2://127.0.0.1:9999/";
-    hiveSyncConfig.hiveUser = "";
-    hiveSyncConfig.hivePass = "";
-    hiveSyncConfig.databaseName = "testdb1";
-    hiveSyncConfig.tableName = tableName;
-    hiveSyncConfig.basePath = basePath;
-    hiveSyncConfig.assumeDatePartitioning = false;
-    hiveSyncConfig.usePreApacheInputFormat = false;
-    hiveSyncConfig.partitionFields = CollectionUtils.createImmutableList("datestr");
+    hiveSyncConfig.hiveSyncConfigParams.jdbcUrl = "jdbc:hive2://127.0.0.1:9999/";
+    hiveSyncConfig.hiveSyncConfigParams.hiveUser = "";
+    hiveSyncConfig.hiveSyncConfigParams.hivePass = "";
+    hiveSyncConfig.hoodieSyncConfigParams.databaseName = "testdb1";
+    hiveSyncConfig.hoodieSyncConfigParams.tableName = tableName;
+    hiveSyncConfig.hoodieSyncConfigParams.basePath = basePath;
+    hiveSyncConfig.hoodieSyncConfigParams.assumeDatePartitioning = false;
+    hiveSyncConfig.hiveSyncConfigParams.usePreApacheInputFormat = false;
+    hiveSyncConfig.hoodieSyncConfigParams.partitionFields = CollectionUtils.createImmutableList("datestr");
     return hiveSyncConfig;
   }
 
@@ -208,12 +208,12 @@ public class UtilitiesTestBase {
     hiveConf.addResource(hiveServer.getHiveConf());
     HoodieTableMetaClient.withPropertyBuilder()
       .setTableType(HoodieTableType.COPY_ON_WRITE)
-      .setTableName(hiveSyncConfig.tableName)
-      .initTable(dfs.getConf(), hiveSyncConfig.basePath);
+      .setTableName(hiveSyncConfig.hoodieSyncConfigParams.tableName)
+      .initTable(dfs.getConf(), hiveSyncConfig.hoodieSyncConfigParams.basePath);
 
     QueryBasedDDLExecutor ddlExecutor = new JDBCExecutor(hiveSyncConfig, dfs);
-    ddlExecutor.runSQL("drop database if exists " + hiveSyncConfig.databaseName);
-    ddlExecutor.runSQL("create database " + hiveSyncConfig.databaseName);
+    ddlExecutor.runSQL("drop database if exists " + hiveSyncConfig.hoodieSyncConfigParams.databaseName);
+    ddlExecutor.runSQL("create database " + hiveSyncConfig.hoodieSyncConfigParams.databaseName);
     ddlExecutor.close();
   }