You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/03/30 21:21:20 UTC

[GitHub] [hudi] xushiyan commented on a change in pull request #5125: [HUDI-3357] MVP implementation of BigQuerySyncTool

xushiyan commented on a change in pull request #5125:
URL: https://github.com/apache/hudi/pull/5125#discussion_r838955074



##########
File path: hudi-sync/hudi-bigquery-sync/src/main/java/org/apache/hudi/bigquery/BigQuerySyncConfig.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.bigquery;
+
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+
+import com.beust.jcommander.Parameter;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Configs needed to sync data into BigQuery.
+ */
+public class BigQuerySyncConfig implements Serializable {
+
+  @Parameter(names = {"--help", "-h"}, help = true)
+  public final Boolean help = false;

Review comment:
       why final?  would this fail if user put -h ?

##########
File path: hudi-sync/hudi-bigquery-sync/src/main/java/org/apache/hudi/bigquery/BigQuerySyncTool.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.bigquery;
+
+import org.apache.hudi.bigquery.util.Utils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.InvalidTableException;
+import org.apache.hudi.sync.common.AbstractSyncTool;
+
+import com.beust.jcommander.JCommander;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Tool to sync a hoodie table with a big query table. Either use it as an api
+ * BigQuerySyncTool.syncHoodieTable(BigQuerySyncConfig) or as a command line java -cp hoodie-hive.jar BigQuerySyncTool [args]
+ * <p>
+ * This utility will get the schema from the latest commit and will sync big query table schema
+ */
+public class BigQuerySyncTool extends AbstractSyncTool {
+  private static final Logger LOG = LogManager.getLogger(BigQuerySyncTool.class);
+  public final BigQuerySyncConfig cfg;
+  public final HoodieBigQueryClient hoodieBigQueryClient;
+  public String projectId;
+  public String datasetName;
+  public String manifestTableName;
+  public String versionsTableName;
+  public String snapshotViewName;
+  public String sourceUri;
+  public String sourceUriPrefix;
+  public List<String> partitionFields;
+
+  private BigQuerySyncTool(Properties properties, Configuration conf, FileSystem fs) {
+    super(new TypedProperties(properties), conf, fs);
+    hoodieBigQueryClient = new HoodieBigQueryClient(Utils.propertiesToConfig(properties), fs);
+    cfg = Utils.propertiesToConfig(properties);
+    switch (hoodieBigQueryClient.getTableType()) {
+      case COPY_ON_WRITE:
+        projectId = cfg.projectId;
+        datasetName = cfg.datasetName;
+        manifestTableName = cfg.tableName + "_manifest";
+        versionsTableName = cfg.tableName + "_versions";
+        snapshotViewName = cfg.tableName;
+        sourceUri = cfg.sourceUri;
+        sourceUriPrefix = cfg.sourceUriPrefix;
+        partitionFields = cfg.partitionFields;
+        break;
+      case MERGE_ON_READ:
+        LOG.error("Not supported table type " + hoodieBigQueryClient.getTableType());
+        throw new InvalidTableException(hoodieBigQueryClient.getBasePath());
+      default:
+        LOG.error("Unknown table type " + hoodieBigQueryClient.getTableType());
+        throw new InvalidTableException(hoodieBigQueryClient.getBasePath());
+    }
+  }
+
+  public static void main(String[] args) {
+    // parse the params
+    BigQuerySyncConfig cfg = new BigQuerySyncConfig();
+    JCommander cmd = new JCommander(cfg, null, args);
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+    FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
+    new BigQuerySyncTool(Utils.configToProperties(cfg), new Configuration(), fs).syncHoodieTable();

Review comment:
       ```suggestion
       new BigQuerySyncTool(Utils.configToProperties(cfg), fs.getConf(), fs).syncHoodieTable();
   ```

##########
File path: hudi-sync/hudi-bigquery-sync/src/main/java/org/apache/hudi/bigquery/BigQuerySyncTool.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.bigquery;
+
+import org.apache.hudi.bigquery.util.Utils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.InvalidTableException;
+import org.apache.hudi.sync.common.AbstractSyncTool;
+
+import com.beust.jcommander.JCommander;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Tool to sync a hoodie table with a big query table. Either use it as an api
+ * BigQuerySyncTool.syncHoodieTable(BigQuerySyncConfig) or as a command line java -cp hoodie-hive.jar BigQuerySyncTool [args]
+ * <p>
+ * This utility will get the schema from the latest commit and will sync big query table schema
+ */
+public class BigQuerySyncTool extends AbstractSyncTool {
+  private static final Logger LOG = LogManager.getLogger(BigQuerySyncTool.class);
+  public final BigQuerySyncConfig cfg;
+  public final HoodieBigQueryClient hoodieBigQueryClient;
+  public String projectId;
+  public String datasetName;
+  public String manifestTableName;
+  public String versionsTableName;
+  public String snapshotViewName;
+  public String sourceUri;
+  public String sourceUriPrefix;
+  public List<String> partitionFields;
+
+  private BigQuerySyncTool(Properties properties, Configuration conf, FileSystem fs) {
+    super(new TypedProperties(properties), conf, fs);
+    hoodieBigQueryClient = new HoodieBigQueryClient(Utils.propertiesToConfig(properties), fs);
+    cfg = Utils.propertiesToConfig(properties);
+    switch (hoodieBigQueryClient.getTableType()) {
+      case COPY_ON_WRITE:
+        projectId = cfg.projectId;
+        datasetName = cfg.datasetName;
+        manifestTableName = cfg.tableName + "_manifest";
+        versionsTableName = cfg.tableName + "_versions";
+        snapshotViewName = cfg.tableName;
+        sourceUri = cfg.sourceUri;
+        sourceUriPrefix = cfg.sourceUriPrefix;
+        partitionFields = cfg.partitionFields;
+        break;
+      case MERGE_ON_READ:
+        LOG.error("Not supported table type " + hoodieBigQueryClient.getTableType());
+        throw new InvalidTableException(hoodieBigQueryClient.getBasePath());
+      default:
+        LOG.error("Unknown table type " + hoodieBigQueryClient.getTableType());
+        throw new InvalidTableException(hoodieBigQueryClient.getBasePath());
+    }
+  }
+
+  public static void main(String[] args) {
+    // parse the params
+    BigQuerySyncConfig cfg = new BigQuerySyncConfig();
+    JCommander cmd = new JCommander(cfg, null, args);
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+    FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
+    new BigQuerySyncTool(Utils.configToProperties(cfg), new Configuration(), fs).syncHoodieTable();
+  }
+
+  @Override
+  public void syncHoodieTable() {
+    try {
+      switch (hoodieBigQueryClient.getTableType()) {
+        case COPY_ON_WRITE:
+          syncCoWTable();
+          break;
+        case MERGE_ON_READ:
+          LOG.error("Not supported table type " + hoodieBigQueryClient.getTableType());
+          throw new InvalidTableException(hoodieBigQueryClient.getBasePath());
+        default:
+          LOG.error("Unknown table type " + hoodieBigQueryClient.getTableType());
+          throw new InvalidTableException(hoodieBigQueryClient.getBasePath());
+      }
+    } catch (RuntimeException re) {

Review comment:
       since you're re-throwing, can just catch `Exception` ?

##########
File path: hudi-sync/hudi-bigquery-sync/src/main/java/org/apache/hudi/bigquery/BigQuerySyncTool.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.bigquery;
+
+import org.apache.hudi.bigquery.util.Utils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.InvalidTableException;
+import org.apache.hudi.sync.common.AbstractSyncTool;
+
+import com.beust.jcommander.JCommander;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Tool to sync a hoodie table with a big query table. Either use it as an api
+ * BigQuerySyncTool.syncHoodieTable(BigQuerySyncConfig) or as a command line java -cp hoodie-hive.jar BigQuerySyncTool [args]
+ * <p>
+ * This utility will get the schema from the latest commit and will sync big query table schema

Review comment:
       suggest to mention `@Experimental` in javadoc 

##########
File path: hudi-sync/hudi-bigquery-sync/src/main/java/org/apache/hudi/bigquery/HoodieBigQueryClient.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.bigquery;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryException;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.CsvOptions;
+import com.google.cloud.bigquery.ExternalTableDefinition;
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.FormatOptions;
+import com.google.cloud.bigquery.HivePartitioningOptions;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.StandardSQLTypeName;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.TableInfo;
+import com.google.cloud.bigquery.ViewDefinition;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.util.List;
+import java.util.Map;
+
+public class HoodieBigQueryClient extends AbstractSyncHoodieClient {
+  private static final Logger LOG = LogManager.getLogger(HoodieBigQueryClient.class);
+  private transient BigQuery bigquery;
+
+  public HoodieBigQueryClient(final BigQuerySyncConfig syncConfig, final FileSystem fs) {
+    super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata,
+        false, fs);
+    this.createBigQueryConnection();
+  }
+
+  private void createBigQueryConnection() {
+    if (bigquery == null) {
+      try {
+        // Initialize client that will be used to send requests. This client only needs to be created
+        // once, and can be reused for multiple requests.
+        bigquery = BigQueryOptions.getDefaultInstance().getService();
+        LOG.info("Successfully established BigQuery connection.");
+      } catch (BigQueryException e) {
+        throw new HoodieException("Cannot create bigQuery connection ", e);
+      }
+    }
+  }
+
+  @Override
+  public void createTable(final String tableName, final MessageType storageSchema, final String inputFormatClass,
+                          final String outputFormatClass, final String serdeClass,
+                          final Map<String, String> serdeProperties, final Map<String, String> tableProperties) {
+    // bigQuery create table arguments are different, so do nothing.
+  }
+
+  public void createVersionsTable(
+      String projectId, String datasetName, String tableName, String sourceUri, String sourceUriPrefix, List<String> partitionFields) {
+    try {
+      ExternalTableDefinition customTable;
+      TableId tableId = TableId.of(projectId, datasetName, tableName);
+
+      if (partitionFields != null) {
+        // Configuring partitioning options for partitioned table.
+        HivePartitioningOptions hivePartitioningOptions =
+            HivePartitioningOptions.newBuilder()
+                .setMode("AUTO")
+                .setRequirePartitionFilter(false)
+                .setSourceUriPrefix(sourceUriPrefix)
+                .build();
+        customTable =
+            ExternalTableDefinition.newBuilder(sourceUri, FormatOptions.parquet())
+                .setAutodetect(true)
+                .setHivePartitioningOptions(hivePartitioningOptions)
+                .setIgnoreUnknownValues(true)
+                .setMaxBadRecords(0)
+                .build();
+      } else {
+        customTable =
+            ExternalTableDefinition.newBuilder(sourceUri, FormatOptions.parquet())
+                .setAutodetect(true)
+                .setIgnoreUnknownValues(true)
+                .setMaxBadRecords(0)
+                .build();
+      }
+
+      bigquery.create(TableInfo.of(tableId, customTable));
+      LOG.info("External table created using hivepartitioningoptions");
+    } catch (BigQueryException e) {
+      throw new HoodieException("External table was not created ", e);
+    }
+  }
+
+  public void createManifestTable(
+      String projectId, String datasetName, String tableName, String sourceUri) {
+    try {
+      TableId tableId = TableId.of(projectId, datasetName, tableName);
+      CsvOptions csvOptions = CsvOptions.newBuilder()
+          .setFieldDelimiter(",")
+          .setAllowJaggedRows(false)
+          .setAllowQuotedNewLines(false)
+          .setSkipLeadingRows(0)
+          .build();
+      Schema schema = Schema.of(
+          Field.of("filename", StandardSQLTypeName.STRING));
+
+      ExternalTableDefinition customTable =
+          ExternalTableDefinition.newBuilder(sourceUri, schema, csvOptions)
+              .setAutodetect(false)
+              .setIgnoreUnknownValues(false)
+              .setMaxBadRecords(0)
+              .build();
+      bigquery.create(TableInfo.of(tableId, customTable));
+      LOG.info("Manifest External table created.");
+    } catch (BigQueryException e) {
+      throw new HoodieException("Manifest External table was not created ", e);

Review comment:
       ditto

##########
File path: hudi-sync/hudi-bigquery-sync/src/main/java/org/apache/hudi/bigquery/HoodieBigQueryClient.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.bigquery;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryException;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.CsvOptions;
+import com.google.cloud.bigquery.ExternalTableDefinition;
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.FormatOptions;
+import com.google.cloud.bigquery.HivePartitioningOptions;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.StandardSQLTypeName;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.TableInfo;
+import com.google.cloud.bigquery.ViewDefinition;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.util.List;
+import java.util.Map;
+
+public class HoodieBigQueryClient extends AbstractSyncHoodieClient {
+  private static final Logger LOG = LogManager.getLogger(HoodieBigQueryClient.class);
+  private transient BigQuery bigquery;
+
+  public HoodieBigQueryClient(final BigQuerySyncConfig syncConfig, final FileSystem fs) {
+    super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata,
+        false, fs);
+    this.createBigQueryConnection();
+  }
+
+  private void createBigQueryConnection() {
+    if (bigquery == null) {
+      try {
+        // Initialize client that will be used to send requests. This client only needs to be created
+        // once, and can be reused for multiple requests.
+        bigquery = BigQueryOptions.getDefaultInstance().getService();
+        LOG.info("Successfully established BigQuery connection.");
+      } catch (BigQueryException e) {
+        throw new HoodieException("Cannot create bigQuery connection ", e);
+      }
+    }
+  }
+
+  @Override
+  public void createTable(final String tableName, final MessageType storageSchema, final String inputFormatClass,
+                          final String outputFormatClass, final String serdeClass,
+                          final Map<String, String> serdeProperties, final Map<String, String> tableProperties) {
+    // bigQuery create table arguments are different, so do nothing.
+  }
+
+  public void createVersionsTable(
+      String projectId, String datasetName, String tableName, String sourceUri, String sourceUriPrefix, List<String> partitionFields) {
+    try {
+      ExternalTableDefinition customTable;
+      TableId tableId = TableId.of(projectId, datasetName, tableName);
+
+      if (partitionFields != null) {
+        // Configuring partitioning options for partitioned table.
+        HivePartitioningOptions hivePartitioningOptions =
+            HivePartitioningOptions.newBuilder()
+                .setMode("AUTO")
+                .setRequirePartitionFilter(false)
+                .setSourceUriPrefix(sourceUriPrefix)
+                .build();
+        customTable =
+            ExternalTableDefinition.newBuilder(sourceUri, FormatOptions.parquet())
+                .setAutodetect(true)
+                .setHivePartitioningOptions(hivePartitioningOptions)
+                .setIgnoreUnknownValues(true)
+                .setMaxBadRecords(0)
+                .build();
+      } else {
+        customTable =
+            ExternalTableDefinition.newBuilder(sourceUri, FormatOptions.parquet())
+                .setAutodetect(true)
+                .setIgnoreUnknownValues(true)
+                .setMaxBadRecords(0)
+                .build();
+      }
+
+      bigquery.create(TableInfo.of(tableId, customTable));
+      LOG.info("External table created using hivepartitioningoptions");
+    } catch (BigQueryException e) {
+      throw new HoodieException("External table was not created ", e);
+    }
+  }
+
+  public void createManifestTable(
+      String projectId, String datasetName, String tableName, String sourceUri) {
+    try {
+      TableId tableId = TableId.of(projectId, datasetName, tableName);
+      CsvOptions csvOptions = CsvOptions.newBuilder()
+          .setFieldDelimiter(",")
+          .setAllowJaggedRows(false)
+          .setAllowQuotedNewLines(false)
+          .setSkipLeadingRows(0)
+          .build();
+      Schema schema = Schema.of(
+          Field.of("filename", StandardSQLTypeName.STRING));
+
+      ExternalTableDefinition customTable =
+          ExternalTableDefinition.newBuilder(sourceUri, schema, csvOptions)
+              .setAutodetect(false)
+              .setIgnoreUnknownValues(false)
+              .setMaxBadRecords(0)
+              .build();
+      bigquery.create(TableInfo.of(tableId, customTable));
+      LOG.info("Manifest External table created.");
+    } catch (BigQueryException e) {
+      throw new HoodieException("Manifest External table was not created ", e);
+    }
+  }
+
+  public void createSnapshotView(
+      String projectId, String datasetName, String viewName, String versionsTableName, String manifestTableName) {
+    try {
+      TableId tableId = TableId.of(projectId, datasetName, viewName);
+      String query =
+          String.format(
+              "SELECT * FROM `%s.%s.%s` WHERE _hoodie_file_name IN "
+                  + "(SELECT filename FROM `%s.%s.%s`)",
+              projectId,
+              datasetName,
+              versionsTableName,
+              projectId,
+              datasetName,
+              manifestTableName);
+
+      ViewDefinition viewDefinition =
+          ViewDefinition.newBuilder(query).setUseLegacySql(false).build();
+
+      bigquery.create(TableInfo.of(tableId, viewDefinition));
+      LOG.info("View created successfully");
+    } catch (BigQueryException e) {
+      throw new HoodieException("View was not created ", e);
+    }
+  }
+
+  @Override
+  public Map<String, String> getTableSchema(String tableName) {
+    // TODO: Implement automatic schema evolution when you add a new column.
+    return null;
+  }
+
+  @Override
+  public void addPartitionsToTable(final String tableName, final List<String> partitionsToAdd) {
+    // bigQuery discovers the new partitions automatically, so do nothing.
+  }
+
+  @Override
+  public void dropPartitionsToTable(final String tableName, final List<String> partitionsToDrop) {
+    // bigQuery discovers the new partitions automatically, so do nothing.
+  }
+
+  @Override
+  public boolean doesTableExist(final String tableName) {
+    // bigQuery table exists needs different set of arguments, so do nothing.
+    throw new UnsupportedOperationException("Not support doesTableExist yet.");
+  }
+
+  public boolean doesTableExist(final String projectId, final String datasetName, final String tableName) {
+    TableId tableId = TableId.of(projectId, datasetName, tableName);
+    return bigquery.getTable(tableId, BigQuery.TableOption.fields()) != null;
+  }
+
+  public boolean doesViewExist(final String projectId, final String datasetName, final String viewName) {
+    TableId tableId = TableId.of(projectId, datasetName, viewName);
+    return bigquery.getTable(tableId) != null;
+  }
+
+  @Override
+  public Option<String> getLastCommitTimeSynced(final String tableName) {
+    // bigQuery doesn't support tblproperties, so do nothing.
+    throw new UnsupportedOperationException("Not support getLastCommitTimeSynced yet.");
+  }
+
+  @Override
+  public void updateLastCommitTimeSynced(final String tableName) {
+    // bigQuery doesn't support tblproperties, so do nothing.

Review comment:
       can we standardize on all unsupported APIs to throw `UnsupportedOperationException` ?

##########
File path: hudi-sync/hudi-bigquery-sync/src/main/java/org/apache/hudi/bigquery/HoodieBigQueryClient.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.bigquery;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryException;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.CsvOptions;
+import com.google.cloud.bigquery.ExternalTableDefinition;
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.FormatOptions;
+import com.google.cloud.bigquery.HivePartitioningOptions;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.StandardSQLTypeName;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.TableInfo;
+import com.google.cloud.bigquery.ViewDefinition;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.util.List;
+import java.util.Map;
+
+public class HoodieBigQueryClient extends AbstractSyncHoodieClient {
+  private static final Logger LOG = LogManager.getLogger(HoodieBigQueryClient.class);
+  private transient BigQuery bigquery;
+
+  public HoodieBigQueryClient(final BigQuerySyncConfig syncConfig, final FileSystem fs) {
+    super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata,
+        false, fs);
+    this.createBigQueryConnection();
+  }
+
+  private void createBigQueryConnection() {
+    if (bigquery == null) {
+      try {
+        // Initialize client that will be used to send requests. This client only needs to be created
+        // once, and can be reused for multiple requests.
+        bigquery = BigQueryOptions.getDefaultInstance().getService();
+        LOG.info("Successfully established BigQuery connection.");
+      } catch (BigQueryException e) {
+        throw new HoodieException("Cannot create bigQuery connection ", e);
+      }
+    }
+  }
+
+  @Override
+  public void createTable(final String tableName, final MessageType storageSchema, final String inputFormatClass,
+                          final String outputFormatClass, final String serdeClass,
+                          final Map<String, String> serdeProperties, final Map<String, String> tableProperties) {
+    // bigQuery create table arguments are different, so do nothing.
+  }
+
+  public void createVersionsTable(
+      String projectId, String datasetName, String tableName, String sourceUri, String sourceUriPrefix, List<String> partitionFields) {
+    try {
+      ExternalTableDefinition customTable;
+      TableId tableId = TableId.of(projectId, datasetName, tableName);
+
+      if (partitionFields != null) {
+        // Configuring partitioning options for partitioned table.
+        HivePartitioningOptions hivePartitioningOptions =
+            HivePartitioningOptions.newBuilder()
+                .setMode("AUTO")
+                .setRequirePartitionFilter(false)
+                .setSourceUriPrefix(sourceUriPrefix)
+                .build();
+        customTable =
+            ExternalTableDefinition.newBuilder(sourceUri, FormatOptions.parquet())
+                .setAutodetect(true)
+                .setHivePartitioningOptions(hivePartitioningOptions)
+                .setIgnoreUnknownValues(true)
+                .setMaxBadRecords(0)
+                .build();
+      } else {
+        customTable =
+            ExternalTableDefinition.newBuilder(sourceUri, FormatOptions.parquet())
+                .setAutodetect(true)
+                .setIgnoreUnknownValues(true)
+                .setMaxBadRecords(0)
+                .build();
+      }
+
+      bigquery.create(TableInfo.of(tableId, customTable));
+      LOG.info("External table created using hivepartitioningoptions");
+    } catch (BigQueryException e) {
+      throw new HoodieException("External table was not created ", e);
+    }
+  }
+
+  public void createManifestTable(
+      String projectId, String datasetName, String tableName, String sourceUri) {
+    try {
+      TableId tableId = TableId.of(projectId, datasetName, tableName);
+      CsvOptions csvOptions = CsvOptions.newBuilder()
+          .setFieldDelimiter(",")
+          .setAllowJaggedRows(false)
+          .setAllowQuotedNewLines(false)
+          .setSkipLeadingRows(0)
+          .build();
+      Schema schema = Schema.of(
+          Field.of("filename", StandardSQLTypeName.STRING));
+
+      ExternalTableDefinition customTable =
+          ExternalTableDefinition.newBuilder(sourceUri, schema, csvOptions)
+              .setAutodetect(false)
+              .setIgnoreUnknownValues(false)
+              .setMaxBadRecords(0)
+              .build();
+      bigquery.create(TableInfo.of(tableId, customTable));
+      LOG.info("Manifest External table created.");
+    } catch (BigQueryException e) {
+      throw new HoodieException("Manifest External table was not created ", e);
+    }
+  }
+
+  public void createSnapshotView(
+      String projectId, String datasetName, String viewName, String versionsTableName, String manifestTableName) {
+    try {
+      TableId tableId = TableId.of(projectId, datasetName, viewName);
+      String query =
+          String.format(
+              "SELECT * FROM `%s.%s.%s` WHERE _hoodie_file_name IN "
+                  + "(SELECT filename FROM `%s.%s.%s`)",
+              projectId,
+              datasetName,
+              versionsTableName,
+              projectId,
+              datasetName,
+              manifestTableName);
+
+      ViewDefinition viewDefinition =
+          ViewDefinition.newBuilder(query).setUseLegacySql(false).build();
+
+      bigquery.create(TableInfo.of(tableId, viewDefinition));
+      LOG.info("View created successfully");
+    } catch (BigQueryException e) {
+      throw new HoodieException("View was not created ", e);
+    }
+  }
+
+  @Override
+  public Map<String, String> getTableSchema(String tableName) {
+    // TODO: Implement automatic schema evolution when you add a new column.
+    return null;

Review comment:
       return emptyMap instead of null ?

##########
File path: hudi-sync/hudi-bigquery-sync/src/main/java/org/apache/hudi/bigquery/BigQuerySyncTool.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.bigquery;
+
+import org.apache.hudi.bigquery.util.Utils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.InvalidTableException;
+import org.apache.hudi.sync.common.AbstractSyncTool;
+
+import com.beust.jcommander.JCommander;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Tool to sync a hoodie table with a big query table. Either use it as an api
+ * BigQuerySyncTool.syncHoodieTable(BigQuerySyncConfig) or as a command line java -cp hoodie-hive.jar BigQuerySyncTool [args]
+ * <p>
+ * This utility will get the schema from the latest commit and will sync big query table schema
+ */
+public class BigQuerySyncTool extends AbstractSyncTool {
+  private static final Logger LOG = LogManager.getLogger(BigQuerySyncTool.class);
+  public final BigQuerySyncConfig cfg;
+  public final HoodieBigQueryClient hoodieBigQueryClient;
+  public String projectId;
+  public String datasetName;
+  public String manifestTableName;
+  public String versionsTableName;
+  public String snapshotViewName;
+  public String sourceUri;
+  public String sourceUriPrefix;
+  public List<String> partitionFields;
+
+  private BigQuerySyncTool(Properties properties, Configuration conf, FileSystem fs) {
+    super(new TypedProperties(properties), conf, fs);
+    hoodieBigQueryClient = new HoodieBigQueryClient(Utils.propertiesToConfig(properties), fs);
+    cfg = Utils.propertiesToConfig(properties);
+    switch (hoodieBigQueryClient.getTableType()) {
+      case COPY_ON_WRITE:
+        projectId = cfg.projectId;
+        datasetName = cfg.datasetName;
+        manifestTableName = cfg.tableName + "_manifest";
+        versionsTableName = cfg.tableName + "_versions";
+        snapshotViewName = cfg.tableName;
+        sourceUri = cfg.sourceUri;
+        sourceUriPrefix = cfg.sourceUriPrefix;
+        partitionFields = cfg.partitionFields;
+        break;
+      case MERGE_ON_READ:
+        LOG.error("Not supported table type " + hoodieBigQueryClient.getTableType());
+        throw new InvalidTableException(hoodieBigQueryClient.getBasePath());
+      default:
+        LOG.error("Unknown table type " + hoodieBigQueryClient.getTableType());
+        throw new InvalidTableException(hoodieBigQueryClient.getBasePath());
+    }
+  }
+
+  public static void main(String[] args) {
+    // parse the params
+    BigQuerySyncConfig cfg = new BigQuerySyncConfig();
+    JCommander cmd = new JCommander(cfg, null, args);
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+    FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
+    new BigQuerySyncTool(Utils.configToProperties(cfg), new Configuration(), fs).syncHoodieTable();
+  }
+
+  @Override
+  public void syncHoodieTable() {
+    try {
+      switch (hoodieBigQueryClient.getTableType()) {
+        case COPY_ON_WRITE:
+          syncCoWTable();
+          break;
+        case MERGE_ON_READ:
+          LOG.error("Not supported table type " + hoodieBigQueryClient.getTableType());
+          throw new InvalidTableException(hoodieBigQueryClient.getBasePath());
+        default:
+          LOG.error("Unknown table type " + hoodieBigQueryClient.getTableType());
+          throw new InvalidTableException(hoodieBigQueryClient.getBasePath());
+      }
+    } catch (RuntimeException re) {
+      throw new HoodieException("Got runtime exception when big query syncing " + cfg.tableName, re);
+    } finally {
+      hoodieBigQueryClient.close();
+    }
+  }
+
+  private void syncCoWTable() {
+    LOG.info("Trying to sync hoodie table " + snapshotViewName + " with base path " + hoodieBigQueryClient.getBasePath()
+        + " of type " + hoodieBigQueryClient.getTableType());

Review comment:
       can have a ValidationUtils.checkState here to make sure it's COW

##########
File path: hudi-sync/hudi-bigquery-sync/src/main/java/org/apache/hudi/bigquery/HoodieBigQueryClient.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.bigquery;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryException;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.CsvOptions;
+import com.google.cloud.bigquery.ExternalTableDefinition;
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.FormatOptions;
+import com.google.cloud.bigquery.HivePartitioningOptions;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.StandardSQLTypeName;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.TableInfo;
+import com.google.cloud.bigquery.ViewDefinition;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.util.List;
+import java.util.Map;
+
+public class HoodieBigQueryClient extends AbstractSyncHoodieClient {
+  private static final Logger LOG = LogManager.getLogger(HoodieBigQueryClient.class);
+  private transient BigQuery bigquery;
+
+  public HoodieBigQueryClient(final BigQuerySyncConfig syncConfig, final FileSystem fs) {
+    super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata,
+        false, fs);
+    this.createBigQueryConnection();
+  }
+
+  private void createBigQueryConnection() {
+    if (bigquery == null) {
+      try {
+        // Initialize client that will be used to send requests. This client only needs to be created
+        // once, and can be reused for multiple requests.
+        bigquery = BigQueryOptions.getDefaultInstance().getService();
+        LOG.info("Successfully established BigQuery connection.");
+      } catch (BigQueryException e) {
+        throw new HoodieException("Cannot create bigQuery connection ", e);
+      }
+    }
+  }
+
+  @Override
+  public void createTable(final String tableName, final MessageType storageSchema, final String inputFormatClass,
+                          final String outputFormatClass, final String serdeClass,
+                          final Map<String, String> serdeProperties, final Map<String, String> tableProperties) {
+    // bigQuery create table arguments are different, so do nothing.
+  }
+
+  public void createVersionsTable(
+      String projectId, String datasetName, String tableName, String sourceUri, String sourceUriPrefix, List<String> partitionFields) {
+    try {
+      ExternalTableDefinition customTable;
+      TableId tableId = TableId.of(projectId, datasetName, tableName);
+
+      if (partitionFields != null) {

Review comment:
       better use the newly added `org.apache.hudi.common.util.CollectionUtils#nonEmpty`

##########
File path: hudi-sync/hudi-bigquery-sync/src/main/java/org/apache/hudi/bigquery/HoodieBigQueryClient.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.bigquery;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryException;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.CsvOptions;
+import com.google.cloud.bigquery.ExternalTableDefinition;
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.FormatOptions;
+import com.google.cloud.bigquery.HivePartitioningOptions;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.StandardSQLTypeName;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.TableInfo;
+import com.google.cloud.bigquery.ViewDefinition;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.util.List;
+import java.util.Map;
+
+public class HoodieBigQueryClient extends AbstractSyncHoodieClient {
+  private static final Logger LOG = LogManager.getLogger(HoodieBigQueryClient.class);
+  private transient BigQuery bigquery;
+
+  public HoodieBigQueryClient(final BigQuerySyncConfig syncConfig, final FileSystem fs) {
+    super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata,
+        false, fs);
+    this.createBigQueryConnection();
+  }
+
+  private void createBigQueryConnection() {
+    if (bigquery == null) {
+      try {
+        // Initialize client that will be used to send requests. This client only needs to be created
+        // once, and can be reused for multiple requests.
+        bigquery = BigQueryOptions.getDefaultInstance().getService();
+        LOG.info("Successfully established BigQuery connection.");
+      } catch (BigQueryException e) {
+        throw new HoodieException("Cannot create bigQuery connection ", e);
+      }
+    }
+  }
+
+  @Override
+  public void createTable(final String tableName, final MessageType storageSchema, final String inputFormatClass,
+                          final String outputFormatClass, final String serdeClass,
+                          final Map<String, String> serdeProperties, final Map<String, String> tableProperties) {
+    // bigQuery create table arguments are different, so do nothing.
+  }
+
+  public void createVersionsTable(
+      String projectId, String datasetName, String tableName, String sourceUri, String sourceUriPrefix, List<String> partitionFields) {
+    try {
+      ExternalTableDefinition customTable;
+      TableId tableId = TableId.of(projectId, datasetName, tableName);
+
+      if (partitionFields != null) {
+        // Configuring partitioning options for partitioned table.
+        HivePartitioningOptions hivePartitioningOptions =
+            HivePartitioningOptions.newBuilder()
+                .setMode("AUTO")
+                .setRequirePartitionFilter(false)
+                .setSourceUriPrefix(sourceUriPrefix)
+                .build();
+        customTable =
+            ExternalTableDefinition.newBuilder(sourceUri, FormatOptions.parquet())
+                .setAutodetect(true)
+                .setHivePartitioningOptions(hivePartitioningOptions)
+                .setIgnoreUnknownValues(true)
+                .setMaxBadRecords(0)
+                .build();
+      } else {
+        customTable =
+            ExternalTableDefinition.newBuilder(sourceUri, FormatOptions.parquet())
+                .setAutodetect(true)
+                .setIgnoreUnknownValues(true)
+                .setMaxBadRecords(0)
+                .build();
+      }
+
+      bigquery.create(TableInfo.of(tableId, customTable));
+      LOG.info("External table created using hivepartitioningoptions");
+    } catch (BigQueryException e) {
+      throw new HoodieException("External table was not created ", e);

Review comment:
       suggest a dedicated exception for this, say `HoodieBigQuerySyncException`

##########
File path: hudi-sync/hudi-bigquery-sync/src/main/java/org/apache/hudi/bigquery/BigQuerySyncTool.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.bigquery;
+
+import org.apache.hudi.bigquery.util.Utils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.InvalidTableException;
+import org.apache.hudi.sync.common.AbstractSyncTool;
+
+import com.beust.jcommander.JCommander;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Tool to sync a hoodie table with a big query table. Either use it as an api
+ * BigQuerySyncTool.syncHoodieTable(BigQuerySyncConfig) or as a command line java -cp hoodie-hive.jar BigQuerySyncTool [args]
+ * <p>
+ * This utility will get the schema from the latest commit and will sync big query table schema
+ */
+public class BigQuerySyncTool extends AbstractSyncTool {
+  private static final Logger LOG = LogManager.getLogger(BigQuerySyncTool.class);
+  public final BigQuerySyncConfig cfg;
+  public final HoodieBigQueryClient hoodieBigQueryClient;
+  public String projectId;
+  public String datasetName;
+  public String manifestTableName;
+  public String versionsTableName;
+  public String snapshotViewName;
+  public String sourceUri;
+  public String sourceUriPrefix;
+  public List<String> partitionFields;
+
+  private BigQuerySyncTool(Properties properties, Configuration conf, FileSystem fs) {
+    super(new TypedProperties(properties), conf, fs);
+    hoodieBigQueryClient = new HoodieBigQueryClient(Utils.propertiesToConfig(properties), fs);
+    cfg = Utils.propertiesToConfig(properties);
+    switch (hoodieBigQueryClient.getTableType()) {
+      case COPY_ON_WRITE:
+        projectId = cfg.projectId;
+        datasetName = cfg.datasetName;
+        manifestTableName = cfg.tableName + "_manifest";
+        versionsTableName = cfg.tableName + "_versions";
+        snapshotViewName = cfg.tableName;
+        sourceUri = cfg.sourceUri;
+        sourceUriPrefix = cfg.sourceUriPrefix;
+        partitionFields = cfg.partitionFields;
+        break;
+      case MERGE_ON_READ:
+        LOG.error("Not supported table type " + hoodieBigQueryClient.getTableType());
+        throw new InvalidTableException(hoodieBigQueryClient.getBasePath());
+      default:
+        LOG.error("Unknown table type " + hoodieBigQueryClient.getTableType());
+        throw new InvalidTableException(hoodieBigQueryClient.getBasePath());
+    }
+  }
+
+  public static void main(String[] args) {
+    // parse the params
+    BigQuerySyncConfig cfg = new BigQuerySyncConfig();
+    JCommander cmd = new JCommander(cfg, null, args);
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+    FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
+    new BigQuerySyncTool(Utils.configToProperties(cfg), new Configuration(), fs).syncHoodieTable();
+  }
+
+  @Override
+  public void syncHoodieTable() {
+    try {
+      switch (hoodieBigQueryClient.getTableType()) {
+        case COPY_ON_WRITE:
+          syncCoWTable();
+          break;
+        case MERGE_ON_READ:
+          LOG.error("Not supported table type " + hoodieBigQueryClient.getTableType());
+          throw new InvalidTableException(hoodieBigQueryClient.getBasePath());
+        default:
+          LOG.error("Unknown table type " + hoodieBigQueryClient.getTableType());
+          throw new InvalidTableException(hoodieBigQueryClient.getBasePath());
+      }
+    } catch (RuntimeException re) {
+      throw new HoodieException("Got runtime exception when big query syncing " + cfg.tableName, re);
+    } finally {
+      hoodieBigQueryClient.close();

Review comment:
       use try-with-resources?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org