You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/01 11:21:22 UTC

[GitHub] [hudi] codope commented on a diff in pull request #5854: [HUDI-3730] Improve meta sync class design and hierarchies

codope commented on code in PR #5854:
URL: https://github.com/apache/hudi/pull/5854#discussion_r910986567


##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java:
##########
@@ -133,6 +135,14 @@ public <T> String getString(ConfigProperty<T> configProperty) {
     return rawValue.map(Object::toString).orElse(null);
   }
 
+  public <T> List<String> getSplitStrings(ConfigProperty<T> configProperty) {
+    return getSplitStrings(configProperty, ",");
+  }
+
+  public <T> List<String> getSplitStrings(ConfigProperty<T> configProperty, String delimiter) {
+    return StringUtils.split(getString(configProperty), delimiter);

Review Comment:
   Should we filter out empty strings?



##########
hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java:
##########
@@ -106,14 +119,19 @@ private void syncCoWTable(HoodieBigQuerySyncClient bqSyncClient) {
     LOG.info("Sync table complete for " + snapshotViewName);
   }
 
+  @Override
+  public void close() {

Review Comment:
   Since it's a subclass of an auto-closeable class, why do we  even need to override here?



##########
hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncTool.java:
##########
@@ -18,53 +18,44 @@
 
 package org.apache.hudi.aws.sync;
 
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HiveSyncTool;
 
 import com.beust.jcommander.JCommander;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.util.Properties;
 
 /**
  * Currently Experimental. Utility class that implements syncing a Hudi Table with the
  * AWS Glue Data Catalog (https://docs.aws.amazon.com/glue/latest/dg/populate-data-catalog.html)
  * to enable querying via Glue ETLs, Athena etc.
- *
+ * <p>
  * Extends HiveSyncTool since most logic is similar to Hive syncing,
  * expect using a different client {@link AWSGlueCatalogSyncClient} that implements
  * the necessary functionality using Glue APIs.
  *
  * @Experimental
  */
-public class AwsGlueCatalogSyncTool extends HiveSyncTool {
-
-  public AwsGlueCatalogSyncTool(TypedProperties props, Configuration conf, FileSystem fs) {
-    super(props, new HiveConf(conf, HiveConf.class), fs);
-  }
+public class AWSGlueCatalogSyncTool extends HiveSyncTool {
 
-  public AwsGlueCatalogSyncTool(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf, FileSystem fs) {
-    super(hiveSyncConfig, hiveConf, fs);
+  public AWSGlueCatalogSyncTool(Properties props, Configuration hadoopConf) {
+    super(props, hadoopConf);

Review Comment:
   +1 on getting rid of HiveConf from subclasses.



##########
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common;
+
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.sync.common.model.Partition;
+import org.apache.hudi.sync.common.model.PartitionEvent;
+import org.apache.hudi.sync.common.model.PartitionValueExtractor;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA;
+
+public abstract class HoodieSyncClient implements HoodieMetaSyncOperations, AutoCloseable {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieSyncClient.class);
+
+  protected final HoodieSyncConfig config;
+  protected final PartitionValueExtractor partitionValueExtractor;
+  protected final HoodieTableMetaClient metaClient;
+
+  public HoodieSyncClient(HoodieSyncConfig config) {
+    this.config = config;
+    this.partitionValueExtractor = ReflectionUtils.loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
+    this.metaClient = HoodieTableMetaClient.builder()
+        .setConf(config.getHadoopConf())
+        .setBasePath(config.getString(META_SYNC_BASE_PATH))
+        .setLoadActiveTimelineOnLoad(true)
+        .build();
+  }
+
+  public HoodieTimeline getActiveTimeline() {
+    return metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+  }
+
+  public HoodieTableType getTableType() {
+    return metaClient.getTableType();
+  }
+
+  public String getBasePath() {
+    return metaClient.getBasePath();
+  }
+
+  public boolean isBootstrap() {
+    return metaClient.getTableConfig().getBootstrapBasePath().isPresent();
+  }
+
+  public boolean isDropPartition() {
+    try {
+      Option<HoodieCommitMetadata> hoodieCommitMetadata = HoodieTableMetadataUtil.getLatestCommitMetadata(metaClient);
+
+      if (hoodieCommitMetadata.isPresent()
+          && WriteOperationType.DELETE_PARTITION.equals(hoodieCommitMetadata.get().getOperationType())) {
+        return true;
+      }
+    } catch (Exception e) {
+      throw new HoodieSyncException("Failed to get commit metadata", e);
+    }
+    return false;
+  }
+
+  public List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSynced) {
+    if (!lastCommitTimeSynced.isPresent()) {
+      LOG.info("Last commit time synced is not known, listing all partitions in "
+          + config.getString(META_SYNC_BASE_PATH)
+          + ",FS :" + config.getHadoopFileSystem());
+      HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
+      return FSUtils.getAllPartitionPaths(engineContext,
+          config.getString(META_SYNC_BASE_PATH),
+          config.getBoolean(META_SYNC_USE_FILE_LISTING_FROM_METADATA),
+          config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION));
+    } else {
+      LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");
+      return TimelineUtils.getPartitionsWritten(metaClient.getActiveTimeline().getCommitsTimeline()
+          .findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE));
+    }
+  }
+
+  /**
+   * Iterate over the storage partitions and find if there are any new partitions that need to be added or updated.
+   * Generate a list of PartitionEvent based on the changes required.
+   */
+  public List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions, boolean isDropPartition) {
+    Map<String, String> paths = new HashMap<>();
+    for (Partition tablePartition : tablePartitions) {
+      List<String> hivePartitionValues = tablePartition.getValues();
+      String fullTablePartitionPath =
+          Path.getPathWithoutSchemeAndAuthority(new Path(tablePartition.getStorageLocation())).toUri().getPath();
+      paths.put(String.join(", ", hivePartitionValues), fullTablePartitionPath);
+    }
+
+    List<PartitionEvent> events = new ArrayList<>();
+    for (String storagePartition : partitionStoragePartitions) {
+      Path storagePartitionPath = FSUtils.getPartitionPath(config.getString(META_SYNC_BASE_PATH), storagePartition);
+      String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
+      // Check if the partition values or if hdfs path is the same
+      List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
+
+      if (isDropPartition) {
+        events.add(PartitionEvent.newPartitionDropEvent(storagePartition));
+      } else {
+        if (!storagePartitionValues.isEmpty()) {
+          String storageValue = String.join(", ", storagePartitionValues);
+          if (!paths.containsKey(storageValue)) {
+            events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
+          } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
+            events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
+          }
+        }

Review Comment:
   Duplicate code in `HoodieAdbJdbcClient`. Let's extract this block to a method and reuse that in subclass as well.



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java:
##########
@@ -90,11 +98,11 @@ public void createTable(String tableName, MessageType storageSchema, String inpu
   @Override
   public void updateTableDefinition(String tableName, MessageType newSchema) {
     try {
-      String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, config.partitionFields, config.supportTimestamp);
+      String newSchemaStr = HiveSchemaUtil.generateSchemaString(newSchema, config.getSplitStrings(META_SYNC_PARTITION_FIELDS), config.getBoolean(HIVE_SUPPORT_TIMESTAMP_TYPE));
       // Cascade clause should not be present for non-partitioned tables
-      String cascadeClause = config.partitionFields.size() > 0 ? " cascade" : "";
+      String cascadeClause = config.getSplitStrings(HIVE_SUPPORT_TIMESTAMP_TYPE).size() > 0 ? " cascade" : "";

Review Comment:
   let's make sure that `getSplitStrings` does not return empty string.



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java:
##########
@@ -48,19 +50,19 @@ public void syncHoodieTable() {
   @Override
   protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, boolean readAsOptimized) {
     super.syncHoodieTable(tableName, useRealtimeInputFormat, readAsOptimized);
-    if (((GlobalHiveSyncConfig) hiveSyncConfig).globallyReplicatedTimeStamp != null) {
-      hoodieHiveClient.updateLastReplicatedTimeStamp(tableName,
-          ((GlobalHiveSyncConfig) hiveSyncConfig).globallyReplicatedTimeStamp);
+    Option<String> timestamp = Option.ofNullable(config.getString(META_SYNC_GLOBAL_REPLICATE_TIMESTAMP));
+    if (timestamp.isPresent()) {

Review Comment:
   Should we log a warning if the replicate_timestamp is not set?



##########
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/FieldSchema.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sync.common.model;
+
+import org.apache.hudi.common.util.Option;
+
+import java.util.Objects;
+
+public class FieldSchema {

Review Comment:
   I see this is used in meta sync operations like `getStorageFieldSchemas` which is passed at many places. Should this implement `Serializable`?



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java:
##########
@@ -180,11 +181,11 @@ protected static void writeCommonPropsToFile(FileSystem dfs, String dfsBasePath)
     props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
 
     // Hive Configs
-    props.setProperty(HiveSyncConfig.HIVE_URL.key(), "jdbc:hive2://127.0.0.1:9999/");
-    props.setProperty(HiveSyncConfig.META_SYNC_DATABASE_NAME.key(), "testdb1");
-    props.setProperty(HiveSyncConfig.META_SYNC_TABLE_NAME.key(), "hive_trips");
-    props.setProperty(HiveSyncConfig.META_SYNC_PARTITION_FIELDS.key(), "datestr");
-    props.setProperty(HiveSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
+    props.setProperty(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://127.0.0.1:9999/");

Review Comment:
   nit: static imports



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java:
##########
@@ -133,6 +135,14 @@ public <T> String getString(ConfigProperty<T> configProperty) {
     return rawValue.map(Object::toString).orElse(null);
   }
 
+  public <T> List<String> getSplitStrings(ConfigProperty<T> configProperty) {
+    return getSplitStrings(configProperty, ",");
+  }
+
+  public <T> List<String> getSplitStrings(ConfigProperty<T> configProperty, String delimiter) {
+    return StringUtils.split(getString(configProperty), delimiter);

Review Comment:
   Also, why can't we reuse `StringUtils#split`



##########
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java:
##########
@@ -165,24 +140,85 @@ public class HoodieSyncConfig extends HoodieConfig {
       .defaultValue("")
       .withDocumentation("The spark version used when syncing with a metastore.");
 
-  public HoodieSyncConfig(TypedProperties props) {
+  private Configuration hadoopConf;
+
+  public HoodieSyncConfig(Properties props) {
+    this(props, ConfigUtils.createHadoopConf(props));
+  }
+
+  public HoodieSyncConfig(Properties props, Configuration hadoopConf) {
     super(props);
-    setDefaults();
-
-    this.basePath = getStringOrDefault(META_SYNC_BASE_PATH);
-    this.databaseName = getStringOrDefault(META_SYNC_DATABASE_NAME);
-    this.tableName = getStringOrDefault(META_SYNC_TABLE_NAME);
-    this.baseFileFormat = getStringOrDefault(META_SYNC_BASE_FILE_FORMAT);
-    this.partitionFields = props.getStringList(META_SYNC_PARTITION_FIELDS.key(), ",", Collections.emptyList());
-    this.partitionValueExtractorClass = getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS);
-    this.assumeDatePartitioning = getBooleanOrDefault(META_SYNC_ASSUME_DATE_PARTITION);
-    this.decodePartition = getBooleanOrDefault(KeyGeneratorOptions.URL_ENCODE_PARTITIONING);
-    this.useFileListingFromMetadata = getBooleanOrDefault(META_SYNC_USE_FILE_LISTING_FROM_METADATA);
-    this.isConditionalSync = getBooleanOrDefault(META_SYNC_CONDITIONAL_SYNC);
-    this.sparkVersion = getStringOrDefault(META_SYNC_SPARK_VERSION);
+    this.hadoopConf = hadoopConf;
+  }
+
+  public void setHadoopConf(Configuration hadoopConf) {
+    this.hadoopConf = hadoopConf;
+  }
+
+  public Configuration getHadoopConf() {
+    return hadoopConf;
+  }
+
+  public FileSystem getHadoopFileSystem() {
+    return FSUtils.getFs(getString(META_SYNC_BASE_PATH), getHadoopConf());
   }
 
-  protected void setDefaults() {
-    this.setDefaultValue(META_SYNC_TABLE_NAME);
+  public String getAbsoluteBasePath() {
+    return getString(META_SYNC_BASE_PATH);
+  }
+
+  @Override
+  public String toString() {
+    return props.toString();
+  }
+
+  public static class HoodieSyncConfigParams {
+    @Parameter(names = {"--database"}, description = "name of the target database in meta store", required = true)
+    public String databaseName;
+    @Parameter(names = {"--table"}, description = "name of the target table in meta store", required = true)
+    public String tableName;
+    @Parameter(names = {"--base-path"}, description = "Base path of the hoodie table to sync", required = true)
+    public String basePath;
+    @Parameter(names = {"--base-file-format"}, description = "Format of the base files (PARQUET (or) HFILE)")
+    public String baseFileFormat;
+    @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by")
+    public List<String> partitionFields;
+    @Parameter(names = "--partition-value-extractor", description = "Class which implements PartitionValueExtractor "
+        + "to extract the partition values from HDFS path")
+    public String partitionValueExtractorClass;
+    @Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this"
+        + " exists to support backward compatibility. If you use hoodie 0.3.x, do not set this parameter")
+    public Boolean assumeDatePartitioning;
+    @Parameter(names = {"--decode-partition"}, description = "Decode the partition value if the partition has encoded during writing")
+    public Boolean decodePartition;
+    @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata")
+    public Boolean useFileListingFromMetadata;
+    @Parameter(names = {"--conditional-sync"}, description = "If true, only sync on conditions like schema change or partition change.")
+    public Boolean isConditionalSync;
+    @Parameter(names = {"--spark-version"}, description = "The spark version")
+    public String sparkVersion;
+
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public boolean help = false;
+
+    public boolean isHelp() {
+      return help;
+    }
+
+    public TypedProperties toProps() {
+      final TypedProperties props = new TypedProperties();
+      props.setPropertyIfNonNull(META_SYNC_BASE_PATH.key(), basePath);
+      props.setPropertyIfNonNull(META_SYNC_DATABASE_NAME.key(), databaseName);
+      props.setPropertyIfNonNull(META_SYNC_TABLE_NAME.key(), tableName);
+      props.setPropertyIfNonNull(META_SYNC_BASE_FILE_FORMAT.key(), baseFileFormat);
+      props.setPropertyIfNonNull(META_SYNC_PARTITION_FIELDS.key(), StringUtils.join(",", partitionFields));

Review Comment:
   Previously, this was being set as empty list in case of config not being set by the user. Now, it will be null. This could have side-effect downstream. Why change the default here?



##########
hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java:
##########
@@ -40,26 +47,28 @@
  *
  * @Experimental
  */
-public class BigQuerySyncTool extends AbstractSyncTool {
+public class BigQuerySyncTool extends HoodieSyncTool {
 
   private static final Logger LOG = LogManager.getLogger(BigQuerySyncTool.class);
 
-  public final BigQuerySyncConfig cfg;
+  public final BigQuerySyncConfig config;
+  public final String tableName;
   public final String manifestTableName;
   public final String versionsTableName;
   public final String snapshotViewName;
 
-  public BigQuerySyncTool(TypedProperties properties, Configuration conf, FileSystem fs) {
-    super(properties, conf, fs);
-    cfg = BigQuerySyncConfig.fromProps(properties);
-    manifestTableName = cfg.tableName + "_manifest";
-    versionsTableName = cfg.tableName + "_versions";
-    snapshotViewName = cfg.tableName;
+  public BigQuerySyncTool(Properties props) {
+    super(props);
+    this.config = new BigQuerySyncConfig(props);
+    this.tableName = config.getString(BIGQUERY_SYNC_TABLE_NAME);

Review Comment:
   is there a validation somewhere that this config is mandatory?



##########
hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/AdbSyncConfig.java:
##########
@@ -152,89 +109,101 @@ public class AdbSyncConfig extends HoodieSyncConfig {
       .defaultValue(false)
       .withDocumentation("Whether drop table before creation");
 
-  public AdbSyncConfig() {
-    this(new TypedProperties());
+  public AdbSyncConfig(Properties props) {
+    super(props);
+  }
+
+  @Override
+  public String getAbsoluteBasePath() {
+    return generateAbsolutePathStr(new Path(getString(META_SYNC_BASE_PATH)));

Review Comment:
   Do we  have a prior validation on META_SYNC_BASE_PATH being non-empty?



##########
hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncTool.java:
##########
@@ -69,7 +78,7 @@ public void syncHoodieTable() {
           throw new UnsupportedOperationException(bqSyncClient.getTableType() + " table type is not supported yet.");
       }
     } catch (Exception e) {
-      throw new HoodieBigQuerySyncException("Got runtime exception when big query syncing " + cfg.tableName, e);
+      throw new HoodieBigQuerySyncException("Got runtime exception when big query syncing " + tableName, e);

Review Comment:
   nit: **bigquery** is a single word



##########
hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java:
##########
@@ -49,6 +49,24 @@ public TypedProperties(Properties defaults) {
     }
   }
 
+  public void setPropertyIfNonNull(String key, String value) {

Review Comment:
   We can define the method as `setPropertyIfNonNull(String key, Object value)` and convert to String if not null. This will eliminate the need for other two methods.



##########
hudi-common/src/main/java/org/apache/hudi/common/config/TypedProperties.java:
##########
@@ -49,6 +49,24 @@ public TypedProperties(Properties defaults) {
     }
   }
 
+  public void setPropertyIfNonNull(String key, String value) {

Review Comment:
   Perhaps we can define single method as `public void setPropertyIfNonNull(String key, Object value)` and avoid some duplicate code? At the end, all values are being written as String.



##########
hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java:
##########
@@ -166,76 +164,23 @@ public void createSnapshotView(String viewName, String versionsTableName, String
   }
 
   @Override
-  public Map<String, String> getTableSchema(String tableName) {
+  public Map<String, String> getMetastoreSchema(String tableName) {
     // TODO: Implement automatic schema evolution when you add a new column.
     return Collections.emptyMap();
   }
 
-  @Override
-  public void addPartitionsToTable(final String tableName, final List<String> partitionsToAdd) {
-    // bigQuery discovers the new partitions automatically, so do nothing.
-    throw new UnsupportedOperationException("No support for addPartitionsToTable yet.");
-  }
-
   public boolean datasetExists() {
-    Dataset dataset = bigquery.getDataset(DatasetId.of(syncConfig.projectId, syncConfig.datasetName));
+    Dataset dataset = bigquery.getDataset(DatasetId.of(projectId, datasetName));
     return dataset != null;
   }
 
-  @Override
-  public boolean doesTableExist(final String tableName) {
-    return tableExists(tableName);
-  }
-
   @Override
   public boolean tableExists(String tableName) {
-    TableId tableId = TableId.of(syncConfig.projectId, syncConfig.datasetName, tableName);
+    TableId tableId = TableId.of(projectId, datasetName, tableName);
     Table table = bigquery.getTable(tableId, BigQuery.TableOption.fields());
     return table != null && table.exists();
   }
 
-  @Override
-  public Option<String> getLastCommitTimeSynced(final String tableName) {
-    // bigQuery doesn't support tblproperties, so do nothing.
-    throw new UnsupportedOperationException("Not support getLastCommitTimeSynced yet.");
-  }
-
-  @Override
-  public void updateLastCommitTimeSynced(final String tableName) {
-    // bigQuery doesn't support tblproperties, so do nothing.
-    throw new UnsupportedOperationException("No support for updateLastCommitTimeSynced yet.");
-  }
-
-  @Override
-  public Option<String> getLastReplicatedTime(String tableName) {
-    // bigQuery doesn't support tblproperties, so do nothing.
-    throw new UnsupportedOperationException("Not support getLastReplicatedTime yet.");
-  }
-
-  @Override
-  public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) {
-    // bigQuery doesn't support tblproperties, so do nothing.
-    throw new UnsupportedOperationException("No support for updateLastReplicatedTimeStamp yet.");
-  }
-
-  @Override
-  public void deleteLastReplicatedTimeStamp(String tableName) {
-    // bigQuery doesn't support tblproperties, so do nothing.
-    throw new UnsupportedOperationException("No support for deleteLastReplicatedTimeStamp yet.");
-  }
-
-  @Override
-  public void updatePartitionsToTable(final String tableName, final List<String> changedPartitions) {
-    // bigQuery updates the partitions automatically, so do nothing.
-    throw new UnsupportedOperationException("No support for updatePartitionsToTable yet.");
-  }
-
-  @Override
-  public void dropPartitions(String tableName, List<String> partitionsToDrop) {
-    // bigQuery discovers the new partitions automatically, so do nothing.
-    throw new UnsupportedOperationException("No support for dropPartitions yet.");
-  }
-
   @Override
   public void close() {
     // bigQuery has no connection close method, so do nothing.

Review Comment:
   I see. But, probably we can null out the instance and free up the heap?



##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java:
##########
@@ -270,63 +264,6 @@ public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRD
     return dropDuplicates(jssc, incomingHoodieRecords, writeConfig);
   }
 
-  /**
-   * @deprecated Use {@link HiveSyncConfig} constructor directly and provide the props,
-   * and set {@link HoodieSyncConfig#META_SYNC_BASE_PATH} and {@link HoodieSyncConfig#META_SYNC_BASE_FILE_FORMAT} instead.
-   */
-  @Deprecated
-  public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String basePath, String baseFileFormat) {

Review Comment:
   +1 on cleaning it up. It was a maintenance overhead.



##########
hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java:
##########
@@ -49,45 +52,52 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
       .noDefaultValue()
       .withDocumentation("Pluggable class to supply a DataHub REST emitter to connect to the DataHub instance. This overwrites other emitter configs.");
 
-  @Parameter(names = {"--identifier-class"}, description = "Pluggable class to help provide info to identify a DataHub Dataset.")
-  public String identifierClass;
+  public final HoodieDataHubDatasetIdentifier datasetIdentifier;
+
+  public DataHubSyncConfig(Properties props) {
+    super(props);
+    String identifierClass = getStringOrDefault(META_SYNC_DATAHUB_DATASET_IDENTIFIER_CLASS);
+    datasetIdentifier = (HoodieDataHubDatasetIdentifier) ReflectionUtils.loadClass(identifierClass, new Class<?>[] {Properties.class}, props);
+  }
 
-  @Parameter(names = {"--emitter-server"}, description = "Server URL of the DataHub instance.")
-  public String emitterServer;
+  public RestEmitter getRestEmitter() {
+    if (contains(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS)) {
+      return ((DataHubEmitterSupplier) ReflectionUtils.loadClass(getString(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS))).get();
+    } else if (contains(META_SYNC_DATAHUB_EMITTER_SERVER)) {
+      return RestEmitter.create(b -> b.server(getString(META_SYNC_DATAHUB_EMITTER_SERVER)).token(getStringOrDefault(META_SYNC_DATAHUB_EMITTER_TOKEN, null)));
+    } else {
+      return RestEmitter.createWithDefaults();
+    }
+  }
 
-  @Parameter(names = {"--emitter-token"}, description = "Auth token to connect to the DataHub instance.")
-  public String emitterToken;
+  public static class DataHubSyncConfigParams {
 
-  @Parameter(names = {"--emitter-supplier-class"}, description = "Pluggable class to supply a DataHub REST emitter to connect to the DataHub instance. This overwrites other emitter configs.")
-  public String emitterSupplierClass;
+    @ParametersDelegate()
+    public final HoodieSyncConfigParams hoodieSyncConfigParams = new HoodieSyncConfigParams();
 
-  @Parameter(names = {"--help", "-h"}, help = true)
-  public Boolean help = false;
+    @Parameter(names = {"--identifier-class"}, description = "Pluggable class to help provide info to identify a DataHub Dataset.")
+    public String identifierClass;
 
-  public final HoodieDataHubDatasetIdentifier datasetIdentifier;
+    @Parameter(names = {"--emitter-server"}, description = "Server URL of the DataHub instance.")
+    public String emitterServer;
 
-  public DataHubSyncConfig() {
-    this(new TypedProperties());
-  }
+    @Parameter(names = {"--emitter-token"}, description = "Auth token to connect to the DataHub instance.")
+    public String emitterToken;
 
-  public DataHubSyncConfig(TypedProperties props) {
-    super(props);
-    identifierClass = getStringOrDefault(META_SYNC_DATAHUB_DATASET_IDENTIFIER_CLASS);
-    emitterServer = getStringOrDefault(META_SYNC_DATAHUB_EMITTER_SERVER, null);
-    emitterToken = getStringOrDefault(META_SYNC_DATAHUB_EMITTER_TOKEN, null);
-    emitterSupplierClass = getStringOrDefault(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS, null);
+    @Parameter(names = {"--emitter-supplier-class"}, description = "Pluggable class to supply a DataHub REST emitter to connect to the DataHub instance. This overwrites other emitter configs.")
+    public String emitterSupplierClass;
 
-    datasetIdentifier = (HoodieDataHubDatasetIdentifier) ReflectionUtils
-        .loadClass(identifierClass, new Class<?>[] {TypedProperties.class}, props);
-  }
+    public boolean isHelp() {

Review Comment:
   Probably, we can move this to config superclass? It's needed everywhere.



##########
hudi-sync/hudi-adb-sync/src/main/java/org/apache/hudi/sync/adb/HoodieAdbJdbcClient.java:
##########
@@ -405,26 +423,69 @@ private String constructShowCreateDatabaseSql(String databaseName) {
 
   private String constructUpdateTblPropertiesSql(String tableName, String lastCommitSynced) {
     return String.format("alter table `%s`.`%s` set tblproperties('%s' = '%s')",
-        adbSyncConfig.databaseName, tableName, HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced);
+        databaseName, tableName, HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced);
   }
 
   private String constructAddColumnSql(String tableName, String columnName, String columnType) {
     return String.format("alter table `%s`.`%s` add columns(`%s` %s)",
-        adbSyncConfig.databaseName, tableName, columnName, columnType);
+        databaseName, tableName, columnName, columnType);
   }
 
   private String constructChangeColumnSql(String tableName, String columnName, String columnType) {
     return String.format("alter table `%s`.`%s` change `%s` `%s` %s",
-        adbSyncConfig.databaseName, tableName, columnName, columnName, columnType);
+        databaseName, tableName, columnName, columnName, columnType);
+  }
+
+  /**
+   * TODO align with {@link HoodieSyncClient#getPartitionEvents}
+   */
+  public List<PartitionEvent> getPartitionEvents(Map<List<String>, String> tablePartitions, List<String> partitionStoragePartitions) {
+    Map<String, String> paths = new HashMap<>();
+
+    for (Map.Entry<List<String>, String> entry : tablePartitions.entrySet()) {
+      List<String> partitionValues = entry.getKey();
+      String fullTablePartitionPath = entry.getValue();
+      paths.put(String.join(", ", partitionValues), fullTablePartitionPath);
+    }
+    List<PartitionEvent> events = new ArrayList<>();
+    for (String storagePartition : partitionStoragePartitions) {
+      Path storagePartitionPath = FSUtils.getPartitionPath(config.getString(META_SYNC_BASE_PATH), storagePartition);
+      String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
+      // Check if the partition values or if hdfs path is the same
+      List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
+      if (config.getBoolean(ADB_SYNC_USE_HIVE_STYLE_PARTITIONING)) {
+        String partition = String.join("/", storagePartitionValues);
+        storagePartitionPath = FSUtils.getPartitionPath(config.getString(META_SYNC_BASE_PATH), partition);
+        fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
+      }
+      if (!storagePartitionValues.isEmpty()) {
+        String storageValue = String.join(", ", storagePartitionValues);
+        if (!paths.containsKey(storageValue)) {
+          events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
+        } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
+          events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
+        }
+      }
+    }
+    return events;
   }
 
-  private HiveSyncConfig getHiveSyncConfig() {
-    HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
-    hiveSyncConfig.partitionFields = adbSyncConfig.partitionFields;
-    hiveSyncConfig.databaseName = adbSyncConfig.databaseName;
-    Path basePath = new Path(adbSyncConfig.basePath);
-    hiveSyncConfig.basePath = generateAbsolutePathStr(basePath);
-    return hiveSyncConfig;
+  public void closeQuietly(ResultSet resultSet, Statement stmt) {

Review Comment:
   Probably we don't need this separate method to close if we do try-with-resources wherever needed. Both `ResultSet` and `Statement` implement `AutoCloseable`.



##########
hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java:
##########
@@ -19,113 +19,120 @@
 
 package org.apache.hudi.gcp.bigquery;
 
-import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.sync.common.HoodieSyncConfig;
 
 import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParametersDelegate;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
 
 /**
  * Configs needed to sync data into BigQuery.
  */
-public class BigQuerySyncConfig implements Serializable {
-
-  public static String BIGQUERY_SYNC_PROJECT_ID = "hoodie.gcp.bigquery.sync.project_id";
-  public static String BIGQUERY_SYNC_DATASET_NAME = "hoodie.gcp.bigquery.sync.dataset_name";
-  public static String BIGQUERY_SYNC_DATASET_LOCATION = "hoodie.gcp.bigquery.sync.dataset_location";
-  public static String BIGQUERY_SYNC_TABLE_NAME = "hoodie.gcp.bigquery.sync.table_name";
-  public static String BIGQUERY_SYNC_SOURCE_URI = "hoodie.gcp.bigquery.sync.source_uri";
-  public static String BIGQUERY_SYNC_SOURCE_URI_PREFIX = "hoodie.gcp.bigquery.sync.source_uri_prefix";
-  public static String BIGQUERY_SYNC_SYNC_BASE_PATH = "hoodie.gcp.bigquery.sync.base_path";
-  public static String BIGQUERY_SYNC_PARTITION_FIELDS = "hoodie.gcp.bigquery.sync.partition_fields";
-  public static String BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA = "hoodie.gcp.bigquery.sync.use_file_listing_from_metadata";
-  public static String BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING = "hoodie.gcp.bigquery.sync.assume_date_partitioning";
-
-  @Parameter(names = {"--project-id"}, description = "name of the target project in BigQuery", required = true)
-  public String projectId;
-  @Parameter(names = {"--dataset-name"}, description = "name of the target dataset in BigQuery", required = true)
-  public String datasetName;
-  @Parameter(names = {"--dataset-location"}, description = "location of the target dataset in BigQuery", required = true)
-  public String datasetLocation;
-  @Parameter(names = {"--table-name"}, description = "name of the target table in BigQuery", required = true)
-  public String tableName;
-  @Parameter(names = {"--source-uri"}, description = "name of the source uri gcs path of the table", required = true)
-  public String sourceUri;
-  @Parameter(names = {"--source-uri-prefix"}, description = "name of the source uri gcs path prefix of the table", required = true)
-  public String sourceUriPrefix;
-  @Parameter(names = {"--base-path"}, description = "Base path of the hoodie table to sync", required = true)
-  public String basePath;
-  @Parameter(names = {"--partitioned-by"}, description = "Comma-delimited partition fields. Default to non-partitioned.")
-  public List<String> partitionFields = new ArrayList<>();
-  @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata")
-  public Boolean useFileListingFromMetadata = false;
-  @Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this"
-      + " exists to support backward compatibility. If you use hoodie 0.3.x, do not set this parameter")
-  public Boolean assumeDatePartitioning = false;
-  @Parameter(names = {"--help", "-h"}, help = true)
-  public Boolean help = false;
-
-  public static BigQuerySyncConfig copy(BigQuerySyncConfig cfg) {
-    BigQuerySyncConfig newConfig = new BigQuerySyncConfig();
-    newConfig.projectId = cfg.projectId;
-    newConfig.datasetName = cfg.datasetName;
-    newConfig.datasetLocation = cfg.datasetLocation;
-    newConfig.tableName = cfg.tableName;
-    newConfig.sourceUri = cfg.sourceUri;
-    newConfig.sourceUriPrefix = cfg.sourceUriPrefix;
-    newConfig.basePath = cfg.basePath;
-    newConfig.partitionFields = cfg.partitionFields;
-    newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
-    newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
-    newConfig.help = cfg.help;
-    return newConfig;
-  }
+public class BigQuerySyncConfig extends HoodieSyncConfig implements Serializable {
 
-  public TypedProperties toProps() {
-    TypedProperties properties = new TypedProperties();
-    properties.put(BIGQUERY_SYNC_PROJECT_ID, projectId);
-    properties.put(BIGQUERY_SYNC_DATASET_NAME, datasetName);
-    properties.put(BIGQUERY_SYNC_DATASET_LOCATION, datasetLocation);
-    properties.put(BIGQUERY_SYNC_TABLE_NAME, tableName);
-    properties.put(BIGQUERY_SYNC_SOURCE_URI, sourceUri);
-    properties.put(BIGQUERY_SYNC_SOURCE_URI_PREFIX, sourceUriPrefix);
-    properties.put(BIGQUERY_SYNC_SYNC_BASE_PATH, basePath);
-    properties.put(BIGQUERY_SYNC_PARTITION_FIELDS, String.join(",", partitionFields));
-    properties.put(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA, useFileListingFromMetadata);
-    properties.put(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING, assumeDatePartitioning);
-    return properties;
-  }
+  public static final ConfigProperty<String> BIGQUERY_SYNC_PROJECT_ID = ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.project_id")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> BIGQUERY_SYNC_DATASET_NAME = ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.dataset_name")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> BIGQUERY_SYNC_DATASET_LOCATION = ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.dataset_location")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> BIGQUERY_SYNC_TABLE_NAME = ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.table_name")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> BIGQUERY_SYNC_SOURCE_URI = ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.source_uri")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> BIGQUERY_SYNC_SOURCE_URI_PREFIX = ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.source_uri_prefix")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<String> BIGQUERY_SYNC_SYNC_BASE_PATH = ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.base_path")
+      .noDefaultValue()
+      .withDocumentation("");
 
-  public static BigQuerySyncConfig fromProps(TypedProperties props) {
-    BigQuerySyncConfig config = new BigQuerySyncConfig();
-    config.projectId = props.getString(BIGQUERY_SYNC_PROJECT_ID);
-    config.datasetName = props.getString(BIGQUERY_SYNC_DATASET_NAME);
-    config.datasetLocation = props.getString(BIGQUERY_SYNC_DATASET_LOCATION);
-    config.tableName = props.getString(BIGQUERY_SYNC_TABLE_NAME);
-    config.sourceUri = props.getString(BIGQUERY_SYNC_SOURCE_URI);
-    config.sourceUriPrefix = props.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX);
-    config.basePath = props.getString(BIGQUERY_SYNC_SYNC_BASE_PATH);
-    config.partitionFields = props.getStringList(BIGQUERY_SYNC_PARTITION_FIELDS, ",", Collections.emptyList());
-    config.useFileListingFromMetadata = props.getBoolean(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA, false);
-    config.assumeDatePartitioning = props.getBoolean(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING, false);
-    return config;
+  public static final ConfigProperty<String> BIGQUERY_SYNC_PARTITION_FIELDS = ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.partition_fields")
+      .noDefaultValue()
+      .withDocumentation("");
+
+  public static final ConfigProperty<Boolean> BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA = ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.use_file_listing_from_metadata")
+      .defaultValue(true)

Review Comment:
   Wasn't the default value false earlier?



##########
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java:
##########
@@ -241,20 +248,20 @@ public void dropPartitionsToTable(String tableName, List<String> partitionsToDro
         if (HivePartitionUtil.partitionExists(client, tableName, dropPartition, partitionValueExtractor, syncConfig)) {
           String partitionClause =
               HivePartitionUtil.getPartitionClauseForDrop(dropPartition, partitionValueExtractor, syncConfig);
-          client.dropPartition(syncConfig.databaseName, tableName, partitionClause, false);
+          client.dropPartition(databaseName, tableName, partitionClause, false);
         }
         LOG.info("Drop partition " + dropPartition + " on " + tableName);
       }
     } catch (TException e) {
-      LOG.error(syncConfig.databaseName + "." + tableName + " drop partition failed", e);
-      throw new HoodieHiveSyncException(syncConfig.databaseName + "." + tableName + " drop partition failed", e);
+      LOG.error(databaseName + "." + tableName + " drop partition failed", e);
+      throw new HoodieHiveSyncException(databaseName + "." + tableName + " drop partition failed", e);

Review Comment:
   Should we also log partitions in case of failure in this method and dropPartitions?



##########
hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java:
##########
@@ -19,113 +19,120 @@
 
 package org.apache.hudi.gcp.bigquery;
 
-import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.sync.common.HoodieSyncConfig;
 
 import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParametersDelegate;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
 
 /**
  * Configs needed to sync data into BigQuery.
  */
-public class BigQuerySyncConfig implements Serializable {
-
-  public static String BIGQUERY_SYNC_PROJECT_ID = "hoodie.gcp.bigquery.sync.project_id";
-  public static String BIGQUERY_SYNC_DATASET_NAME = "hoodie.gcp.bigquery.sync.dataset_name";
-  public static String BIGQUERY_SYNC_DATASET_LOCATION = "hoodie.gcp.bigquery.sync.dataset_location";
-  public static String BIGQUERY_SYNC_TABLE_NAME = "hoodie.gcp.bigquery.sync.table_name";
-  public static String BIGQUERY_SYNC_SOURCE_URI = "hoodie.gcp.bigquery.sync.source_uri";
-  public static String BIGQUERY_SYNC_SOURCE_URI_PREFIX = "hoodie.gcp.bigquery.sync.source_uri_prefix";
-  public static String BIGQUERY_SYNC_SYNC_BASE_PATH = "hoodie.gcp.bigquery.sync.base_path";
-  public static String BIGQUERY_SYNC_PARTITION_FIELDS = "hoodie.gcp.bigquery.sync.partition_fields";
-  public static String BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA = "hoodie.gcp.bigquery.sync.use_file_listing_from_metadata";
-  public static String BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING = "hoodie.gcp.bigquery.sync.assume_date_partitioning";
-
-  @Parameter(names = {"--project-id"}, description = "name of the target project in BigQuery", required = true)
-  public String projectId;
-  @Parameter(names = {"--dataset-name"}, description = "name of the target dataset in BigQuery", required = true)
-  public String datasetName;
-  @Parameter(names = {"--dataset-location"}, description = "location of the target dataset in BigQuery", required = true)
-  public String datasetLocation;
-  @Parameter(names = {"--table-name"}, description = "name of the target table in BigQuery", required = true)
-  public String tableName;
-  @Parameter(names = {"--source-uri"}, description = "name of the source uri gcs path of the table", required = true)
-  public String sourceUri;
-  @Parameter(names = {"--source-uri-prefix"}, description = "name of the source uri gcs path prefix of the table", required = true)
-  public String sourceUriPrefix;
-  @Parameter(names = {"--base-path"}, description = "Base path of the hoodie table to sync", required = true)
-  public String basePath;
-  @Parameter(names = {"--partitioned-by"}, description = "Comma-delimited partition fields. Default to non-partitioned.")
-  public List<String> partitionFields = new ArrayList<>();
-  @Parameter(names = {"--use-file-listing-from-metadata"}, description = "Fetch file listing from Hudi's metadata")
-  public Boolean useFileListingFromMetadata = false;
-  @Parameter(names = {"--assume-date-partitioning"}, description = "Assume standard yyyy/mm/dd partitioning, this"
-      + " exists to support backward compatibility. If you use hoodie 0.3.x, do not set this parameter")
-  public Boolean assumeDatePartitioning = false;
-  @Parameter(names = {"--help", "-h"}, help = true)
-  public Boolean help = false;
-
-  public static BigQuerySyncConfig copy(BigQuerySyncConfig cfg) {
-    BigQuerySyncConfig newConfig = new BigQuerySyncConfig();
-    newConfig.projectId = cfg.projectId;
-    newConfig.datasetName = cfg.datasetName;
-    newConfig.datasetLocation = cfg.datasetLocation;
-    newConfig.tableName = cfg.tableName;
-    newConfig.sourceUri = cfg.sourceUri;
-    newConfig.sourceUriPrefix = cfg.sourceUriPrefix;
-    newConfig.basePath = cfg.basePath;
-    newConfig.partitionFields = cfg.partitionFields;
-    newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
-    newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
-    newConfig.help = cfg.help;
-    return newConfig;
-  }
+public class BigQuerySyncConfig extends HoodieSyncConfig implements Serializable {
 
-  public TypedProperties toProps() {
-    TypedProperties properties = new TypedProperties();
-    properties.put(BIGQUERY_SYNC_PROJECT_ID, projectId);
-    properties.put(BIGQUERY_SYNC_DATASET_NAME, datasetName);
-    properties.put(BIGQUERY_SYNC_DATASET_LOCATION, datasetLocation);
-    properties.put(BIGQUERY_SYNC_TABLE_NAME, tableName);
-    properties.put(BIGQUERY_SYNC_SOURCE_URI, sourceUri);
-    properties.put(BIGQUERY_SYNC_SOURCE_URI_PREFIX, sourceUriPrefix);
-    properties.put(BIGQUERY_SYNC_SYNC_BASE_PATH, basePath);
-    properties.put(BIGQUERY_SYNC_PARTITION_FIELDS, String.join(",", partitionFields));
-    properties.put(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA, useFileListingFromMetadata);
-    properties.put(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING, assumeDatePartitioning);
-    return properties;
-  }
+  public static final ConfigProperty<String> BIGQUERY_SYNC_PROJECT_ID = ConfigProperty

Review Comment:
   +1 for using ConfigProperty.
   Should we add some doc as well so that it gets automatically updated in our Hudi website configuration page?



##########
hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/replication/TestHiveSyncGlobalCommitTool.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hive.replication;
+
+import org.apache.hudi.hive.testutils.TestCluster;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
+import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS;
+import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER;
+import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT;
+import static org.apache.hudi.hive.replication.GlobalHiveSyncConfig.META_SYNC_GLOBAL_REPLICATE_TIMESTAMP;
+import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.LOCAL_BASE_PATH;
+import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.LOCAL_HIVE_SERVER_JDBC_URLS;
+import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.LOCAL_HIVE_SITE_URI;
+import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.REMOTE_BASE_PATH;
+import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.REMOTE_HIVE_SERVER_JDBC_URLS;
+import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.REMOTE_HIVE_SITE_URI;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHiveSyncGlobalCommitTool {
+
+  @RegisterExtension

Review Comment:
   Sorry i'm not very familiar with this annotation. Am I correct in assuming that by simply using this annotation, BeforeAll and AfterAll steps of `TestCluster` will be executed always? Asking as you are not stopping the local/remote cluster anywhere in this test.



##########
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common;
+
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.sync.common.model.Partition;
+import org.apache.hudi.sync.common.model.PartitionEvent;
+import org.apache.hudi.sync.common.model.PartitionValueExtractor;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA;
+
+public abstract class HoodieSyncClient implements HoodieMetaSyncOperations, AutoCloseable {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieSyncClient.class);
+
+  protected final HoodieSyncConfig config;
+  protected final PartitionValueExtractor partitionValueExtractor;
+  protected final HoodieTableMetaClient metaClient;
+
+  public HoodieSyncClient(HoodieSyncConfig config) {
+    this.config = config;
+    this.partitionValueExtractor = ReflectionUtils.loadClass(config.getStringOrDefault(META_SYNC_PARTITION_EXTRACTOR_CLASS));
+    this.metaClient = HoodieTableMetaClient.builder()
+        .setConf(config.getHadoopConf())
+        .setBasePath(config.getString(META_SYNC_BASE_PATH))
+        .setLoadActiveTimelineOnLoad(true)
+        .build();
+  }
+
+  public HoodieTimeline getActiveTimeline() {
+    return metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+  }
+
+  public HoodieTableType getTableType() {
+    return metaClient.getTableType();
+  }
+
+  public String getBasePath() {
+    return metaClient.getBasePath();

Review Comment:
   let's use `getBasePathV2` api



##########
hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java:
##########
@@ -244,10 +244,10 @@ public void testSyncCOWTableWithProperties(boolean useSchemaFromCommitMetadata,
         put("tp_1", "p1");
       }
     };
-    hiveSyncProps.setProperty(HiveSyncConfig.HIVE_SYNC_MODE.key(), syncMode);
-    hiveSyncProps.setProperty(HiveSyncConfig.HIVE_SYNC_AS_DATA_SOURCE_TABLE.key(), String.valueOf(syncAsDataSourceTable));
-    hiveSyncProps.setProperty(HiveSyncConfig.HIVE_TABLE_SERDE_PROPERTIES.key(), ConfigUtils.configToString(serdeProperties));
-    hiveSyncProps.setProperty(HiveSyncConfig.HIVE_TABLE_PROPERTIES.key(), ConfigUtils.configToString(tableProperties));
+    hiveSyncProps.setProperty(HiveSyncConfigHolder.HIVE_SYNC_MODE.key(), syncMode);

Review Comment:
   optional: static imports wherever possible?



##########
hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/replication/TestHiveSyncGlobalCommitTool.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hive.replication;
+
+import org.apache.hudi.hive.testutils.TestCluster;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
+import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS;
+import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER;
+import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT;
+import static org.apache.hudi.hive.replication.GlobalHiveSyncConfig.META_SYNC_GLOBAL_REPLICATE_TIMESTAMP;
+import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.LOCAL_BASE_PATH;
+import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.LOCAL_HIVE_SERVER_JDBC_URLS;
+import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.LOCAL_HIVE_SITE_URI;
+import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.REMOTE_BASE_PATH;
+import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.REMOTE_HIVE_SERVER_JDBC_URLS;
+import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitParams.REMOTE_HIVE_SITE_URI;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
+import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHiveSyncGlobalCommitTool {
+
+  @RegisterExtension
+  public static TestCluster localCluster = new TestCluster();
+  @RegisterExtension
+  public static TestCluster remoteCluster = new TestCluster();
+
+  private static final String DB_NAME = "foo";
+  private static final String TBL_NAME = "bar";
+
+  private HiveSyncGlobalCommitParams getGlobalCommitConfig(String commitTime) throws Exception {
+    HiveSyncGlobalCommitParams params = new HiveSyncGlobalCommitParams();
+    params.loadedProps.setProperty(LOCAL_HIVE_SITE_URI, localCluster.getHiveSiteXmlLocation());
+    params.loadedProps.setProperty(REMOTE_HIVE_SITE_URI, remoteCluster.getHiveSiteXmlLocation());
+    params.loadedProps.setProperty(LOCAL_HIVE_SERVER_JDBC_URLS, localCluster.getHiveJdBcUrl());
+    params.loadedProps.setProperty(REMOTE_HIVE_SERVER_JDBC_URLS, remoteCluster.getHiveJdBcUrl());
+    params.loadedProps.setProperty(LOCAL_BASE_PATH, localCluster.tablePath(DB_NAME, TBL_NAME));
+    params.loadedProps.setProperty(REMOTE_BASE_PATH, remoteCluster.tablePath(DB_NAME, TBL_NAME));
+    params.loadedProps.setProperty(META_SYNC_GLOBAL_REPLICATE_TIMESTAMP.key(), commitTime);
+    params.loadedProps.setProperty(HIVE_USER.key(), System.getProperty("user.name"));
+    params.loadedProps.setProperty(HIVE_PASS.key(), "");
+    params.loadedProps.setProperty(META_SYNC_DATABASE_NAME.key(), DB_NAME);
+    params.loadedProps.setProperty(META_SYNC_TABLE_NAME.key(), TBL_NAME);
+    params.loadedProps.setProperty(META_SYNC_BASE_PATH.key(), localCluster.tablePath(DB_NAME, TBL_NAME));
+    params.loadedProps.setProperty(META_SYNC_ASSUME_DATE_PARTITION.key(), "true");
+    params.loadedProps.setProperty(HIVE_USE_PRE_APACHE_INPUT_FORMAT.key(), "false");
+    params.loadedProps.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr");
+    return params;
+  }
+
+  private void compareEqualLastReplicatedTimeStamp(HiveSyncGlobalCommitParams config) throws Exception {
+    assertEquals(localCluster.getHMSClient()
+        .getTable(DB_NAME, TBL_NAME).getParameters()
+        .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), remoteCluster.getHMSClient()
+        .getTable(DB_NAME, TBL_NAME).getParameters()
+        .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), "compare replicated timestamps");
+  }
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    localCluster.forceCreateDb(DB_NAME);

Review Comment:
   Since we're creating the db after each test here, so why not drop db (with cascade) after each test in the `clear` method?



##########
hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieMetaSyncOperations.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sync.common;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.sync.common.model.FieldSchema;
+import org.apache.hudi.sync.common.model.Partition;
+
+import org.apache.parquet.schema.MessageType;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public interface HoodieMetaSyncOperations {
+
+  String HOODIE_LAST_COMMIT_TIME_SYNC = "last_commit_time_sync";
+
+  /**
+   * Create the table.
+   *
+   * @param tableName         The table name.
+   * @param storageSchema     The table schema.
+   * @param inputFormatClass  The input format class of this table.
+   * @param outputFormatClass The output format class of this table.
+   * @param serdeClass        The serde class of this table.
+   * @param serdeProperties   The serde properties of this table.
+   * @param tableProperties   The table properties for this table.
+   */
+  default void createTable(String tableName,
+                           MessageType storageSchema,
+                           String inputFormatClass,
+                           String outputFormatClass,
+                           String serdeClass,
+                           Map<String, String> serdeProperties,
+                           Map<String, String> tableProperties) {
+
+  }
+
+  default boolean tableExists(String tableName) {
+    return false;
+  }
+
+  default void dropTable(String tableName) {
+
+  }
+
+  default void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {

Review Comment:
   let's add javadoc for all these method? it's very hekpful especially when there are multiple arguments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org