You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/01/27 01:21:02 UTC

[GitHub] [hudi] vingov commented on a change in pull request #4503: [HUDI-2438] [RFC-34] Added the implementation details for the BigQuery integration

vingov commented on a change in pull request #4503:
URL: https://github.com/apache/hudi/pull/4503#discussion_r793180989



##########
File path: rfc/rfc-34/rfc-34.md
##########
@@ -0,0 +1,165 @@
+# Hudi BigQuery Integration
+
+## Abstract
+
+BigQuery is Google Cloud's fully managed, petabyte-scale, and cost-effective analytics data warehouse that lets you run
+analytics over vast amounts of data in near real time. BigQuery
+currently [doesn’t support](https://cloud.google.com/bigquery/external-data-cloud-storage) Apache Hudi file format, but
+it has support for the Parquet file format. The proposal is to implement a BigQuerySync similar to HiveSync to sync the
+Hudi table as the BigQuery External Parquet table, so that users can query the Hudi tables using BigQuery. Uber is
+already syncing some of its Hudi tables to BigQuery data mart this will help them to write, sync and query.
+
+## Background
+
+Hudi table types define how data is indexed & laid out on the DFS and how the above primitives and timeline activities
+are implemented on top of such organization (i.e how data is written). In turn, query types define how the underlying
+data is exposed to the queries (i.e how data is read).
+
+Hudi supports the following table types:
+
+* [Copy On Write](https://hudi.apache.org/docs/table_types#copy-on-write-table): Stores data using exclusively columnar
+  file formats (e.g parquet). Updates simply version & rewrite the files by performing a synchronous merge during write.
+* [Merge On Read](https://hudi.apache.org/docs/table_types#merge-on-read-table): Stores data using a combination of
+  columnar (e.g parquet) + row based (e.g avro) file formats. Updates are logged to delta files & later compacted to
+  produce new versions of columnar files synchronously or asynchronously.
+
+Hudi maintains multiple versions of the Parquet files and tracks the latest version using Hudi metadata (Cow), since
+BigQuery doesn’t support Hudi yet, when you sync the Hudi’s parquet files to BigQuery and query it without Hudi’s
+metadata layer, it will query all the versions of the parquet files which might cause duplicate rows.
+
+To avoid the above scenario, this proposal is to implement a BigQuery sync tool which will use the Hudi metadata to know
+which files are latest and filter only the latest version of parquet files to BigQuery external table so that users can
+query the Hudi tables without any duplicate records.
+
+## Implementation
+
+This new feature will implement
+the [AbstractSyncTool](https://github.com/apache/hudi/blob/master/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java)
+similar to
+the [HiveSyncTool](https://github.com/apache/hudi/blob/master/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java)
+named BigQuerySyncTool with sync methods for CoW tables. The sync implementation will identify the latest parquet files
+for each .commit file and keep these manifests synced with the BigQuery manifest table. Spark datasource & DeltaStreamer
+can already take a list of such classes to keep these manifests synced.
+
+###           
+
+![alt_text](big-query-arch.png "Big Query integration architecture.")
+
+To avoid duplicate records on the Hudi CoW table, we need to generate the list of latest snapshot files and create a BQ
+table for it, then use that table to filter the duplicate records from the history table.
+
+### Steps to create Hudi table on BigQuery
+
+1. Let's say you have a Hudi table data on google cloud storage (GCS).
+
+ ```
+CREATE TABLE dwh.bq_demo_partitioned_cow (
+  id bigint, 
+  name string,
+  price double,
+  ts bigint,
+  dt string
+) 
+using hudi 
+partitioned by (dt)
+options (
+  type = 'cow',
+  primaryKey = 'id',
+  preCombineField = 'ts',
+  hoodie.datasource.write.drop.partition.columns = 'true'
+)
+location 'gs://hudi_datasets/bq_demo_partitioned_cow/';
+```
+
+BigQuery doesn't accept the partition column in the parquet schema, hence we need to drop the partition columns from the
+schema by enabling this flag:
+
+```
+hoodie.datasource.write.drop.partition.columns = 'true'
+```
+
+2. As part of the BigQuerySync, the sync tool will generate/update the manifest files inside the .hoodie metadata files.
+   For tables which already exist, you can generate a manifest file for the Hudi table which has the list of the latest
+   snapshot parquet file names in a CSV format with only one column the file name. The location of the manifest file
+   will be on the .hoodie metadata folder (`gs://bucket_name/table_name/.hoodie/manifest/latest_snapshot_files.csv`)
+
+```
+// this command is coming soon.
+GENERATE symlink_format_manifest FOR TABLE dwh.bq_demo_partitioned_cow;
+```
+
+3. Create a BQ table named `hudi_table_name_manifest` with only one column filename with this location gs:
+   //bucket_name/table_name/.hoodie/manifest/latest_snapshot_files.csv.
+
+```
+CREATE EXTERNAL TABLE `my-first-project.dwh.bq_demo_partitioned_cow_manifest`
+(
+  filename STRING
+)
+OPTIONS(
+  format="CSV",
+  uris=["gs://hudi_datasets/bq_demo_partitioned_cow/.hoodie/manifest/latest_snapshot_files.csv"]
+);
+```
+
+4. Create another BQ table named `hudi_table_name_history` with this location `gs://bucket_name/table_name`, don't use
+   this table to query the data, this table will have duplicate records since it scans all the versions of parquet files
+   in the table/partition folders.
+
+```
+CREATE EXTERNAL TABLE `my-first-project.dwh.bq_demo_partitioned_cow_history`
+WITH 
+  PARTITION COLUMNS 
+  OPTIONS(
+    ignore_unknown_values=true, 
+    format="PARQUET", 
+    hive_partition_uri_prefix="gs://hudi_datasets/bq_demo_partitioned_cow/",
+    uris=["gs://hudi_snowflake/bq_demo_partitioned_cow/dt=*"]
+  );
+```
+
+5. Create a BQ view with the same hudi table name with this query, this view you created has the data from the Hudi
+   table without any duplicates, you can use that table to query the data.
+
+```
+CREATE VIEW `my-first-project.dwh.bq_demo_partitioned_cow` AS 
+  SELECT
+  *
+  FROM
+  `my-first-project.dwh.bq_demo_partitioned_cow_history`
+  WHERE
+  _hoodie_file_name IN (
+    SELECT 
+      filename 
+    FROM
+      `my-first-project.dwh.bq_demo_partitioned_cow_manifest`
+  );
+```
+
+BigQuerySync tool will
+use [HoodieTableMetaClient](https://github.com/apache/hudi/blob/master/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java)
+methods to get the list of latest set of parquet data files to generate the manifest csv file, then will invoke
+the [BigQuery Java Client](https://github.com/googleapis/java-bigquery/blob/main/samples/snippets/src/main/java/com/example/bigquery/CreateTableExternalHivePartitioned.java)
+to create the manifest table, history table and hudi table views.
+
+**All the steps described here will be automated, all you have to do is to supply a bunch of configs to enable the
+BigQuery sync.**
+
+## Rollout/Adoption Plan
+
+There are no impacts to existing users since this is entirely a new feature to support a new use case hence there are no
+migrations/behavior changes required.
+
+After the BigQuery sync tool has been implemented, I will reach out to Uber's Hudi/BigQuery team to rollout this feature
+for their BigQuery ingestion service.
+
+## Test Plan
+
+This RFC aims to implement a new SyncTool to sync the Hudi table to BigQuery, to test this feature, there will be some
+test tables created and updated on to the BigQuery along with unit tests for the code. Since this is an entirely new
+feature, I am confident that this will not cause any regressions during and after roll out.
+
+## Future Plans
+
+After this feature has been rolled out, the same model can be applied to sync the Hudi tables to other external data

Review comment:
       Yes, you are correct, manifest would be helpful for integrating both BigQuery and Snowflake, but if it is just for BigQuery, we can update their native table and use that table for filtering out the latest snapshot parquet files.

##########
File path: rfc/rfc-34/rfc-34.md
##########
@@ -0,0 +1,165 @@
+# Hudi BigQuery Integration
+
+## Abstract
+
+BigQuery is Google Cloud's fully managed, petabyte-scale, and cost-effective analytics data warehouse that lets you run
+analytics over vast amounts of data in near real time. BigQuery
+currently [doesn’t support](https://cloud.google.com/bigquery/external-data-cloud-storage) Apache Hudi file format, but
+it has support for the Parquet file format. The proposal is to implement a BigQuerySync similar to HiveSync to sync the
+Hudi table as the BigQuery External Parquet table, so that users can query the Hudi tables using BigQuery. Uber is
+already syncing some of its Hudi tables to BigQuery data mart this will help them to write, sync and query.
+
+## Background
+
+Hudi table types define how data is indexed & laid out on the DFS and how the above primitives and timeline activities
+are implemented on top of such organization (i.e how data is written). In turn, query types define how the underlying
+data is exposed to the queries (i.e how data is read).
+
+Hudi supports the following table types:
+
+* [Copy On Write](https://hudi.apache.org/docs/table_types#copy-on-write-table): Stores data using exclusively columnar
+  file formats (e.g parquet). Updates simply version & rewrite the files by performing a synchronous merge during write.
+* [Merge On Read](https://hudi.apache.org/docs/table_types#merge-on-read-table): Stores data using a combination of
+  columnar (e.g parquet) + row based (e.g avro) file formats. Updates are logged to delta files & later compacted to
+  produce new versions of columnar files synchronously or asynchronously.
+
+Hudi maintains multiple versions of the Parquet files and tracks the latest version using Hudi metadata (Cow), since
+BigQuery doesn’t support Hudi yet, when you sync the Hudi’s parquet files to BigQuery and query it without Hudi’s
+metadata layer, it will query all the versions of the parquet files which might cause duplicate rows.
+
+To avoid the above scenario, this proposal is to implement a BigQuery sync tool which will use the Hudi metadata to know
+which files are latest and filter only the latest version of parquet files to BigQuery external table so that users can
+query the Hudi tables without any duplicate records.
+
+## Implementation
+
+This new feature will implement
+the [AbstractSyncTool](https://github.com/apache/hudi/blob/master/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java)
+similar to
+the [HiveSyncTool](https://github.com/apache/hudi/blob/master/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java)
+named BigQuerySyncTool with sync methods for CoW tables. The sync implementation will identify the latest parquet files
+for each .commit file and keep these manifests synced with the BigQuery manifest table. Spark datasource & DeltaStreamer
+can already take a list of such classes to keep these manifests synced.
+
+###           
+
+![alt_text](big-query-arch.png "Big Query integration architecture.")
+
+To avoid duplicate records on the Hudi CoW table, we need to generate the list of latest snapshot files and create a BQ
+table for it, then use that table to filter the duplicate records from the history table.
+
+### Steps to create Hudi table on BigQuery
+
+1. Let's say you have a Hudi table data on google cloud storage (GCS).
+
+ ```
+CREATE TABLE dwh.bq_demo_partitioned_cow (
+  id bigint, 
+  name string,
+  price double,
+  ts bigint,
+  dt string
+) 
+using hudi 
+partitioned by (dt)
+options (
+  type = 'cow',
+  primaryKey = 'id',
+  preCombineField = 'ts',
+  hoodie.datasource.write.drop.partition.columns = 'true'
+)
+location 'gs://hudi_datasets/bq_demo_partitioned_cow/';
+```
+
+BigQuery doesn't accept the partition column in the parquet schema, hence we need to drop the partition columns from the

Review comment:
       Yes, we already brought this up with the BigQuery team, but they were not open to changing their design.

##########
File path: rfc/rfc-34/rfc-34.md
##########
@@ -0,0 +1,165 @@
+# Hudi BigQuery Integration
+
+## Abstract
+
+BigQuery is Google Cloud's fully managed, petabyte-scale, and cost-effective analytics data warehouse that lets you run
+analytics over vast amounts of data in near real time. BigQuery
+currently [doesn’t support](https://cloud.google.com/bigquery/external-data-cloud-storage) Apache Hudi file format, but
+it has support for the Parquet file format. The proposal is to implement a BigQuerySync similar to HiveSync to sync the
+Hudi table as the BigQuery External Parquet table, so that users can query the Hudi tables using BigQuery. Uber is
+already syncing some of its Hudi tables to BigQuery data mart this will help them to write, sync and query.
+
+## Background
+
+Hudi table types define how data is indexed & laid out on the DFS and how the above primitives and timeline activities
+are implemented on top of such organization (i.e how data is written). In turn, query types define how the underlying
+data is exposed to the queries (i.e how data is read).
+
+Hudi supports the following table types:
+
+* [Copy On Write](https://hudi.apache.org/docs/table_types#copy-on-write-table): Stores data using exclusively columnar
+  file formats (e.g parquet). Updates simply version & rewrite the files by performing a synchronous merge during write.
+* [Merge On Read](https://hudi.apache.org/docs/table_types#merge-on-read-table): Stores data using a combination of
+  columnar (e.g parquet) + row based (e.g avro) file formats. Updates are logged to delta files & later compacted to
+  produce new versions of columnar files synchronously or asynchronously.
+
+Hudi maintains multiple versions of the Parquet files and tracks the latest version using Hudi metadata (Cow), since
+BigQuery doesn’t support Hudi yet, when you sync the Hudi’s parquet files to BigQuery and query it without Hudi’s
+metadata layer, it will query all the versions of the parquet files which might cause duplicate rows.
+
+To avoid the above scenario, this proposal is to implement a BigQuery sync tool which will use the Hudi metadata to know
+which files are latest and filter only the latest version of parquet files to BigQuery external table so that users can
+query the Hudi tables without any duplicate records.
+
+## Implementation
+
+This new feature will implement
+the [AbstractSyncTool](https://github.com/apache/hudi/blob/master/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncTool.java)
+similar to
+the [HiveSyncTool](https://github.com/apache/hudi/blob/master/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java)
+named BigQuerySyncTool with sync methods for CoW tables. The sync implementation will identify the latest parquet files
+for each .commit file and keep these manifests synced with the BigQuery manifest table. Spark datasource & DeltaStreamer
+can already take a list of such classes to keep these manifests synced.
+
+###           
+
+![alt_text](big-query-arch.png "Big Query integration architecture.")
+
+To avoid duplicate records on the Hudi CoW table, we need to generate the list of latest snapshot files and create a BQ
+table for it, then use that table to filter the duplicate records from the history table.
+
+### Steps to create Hudi table on BigQuery
+
+1. Let's say you have a Hudi table data on google cloud storage (GCS).
+
+ ```
+CREATE TABLE dwh.bq_demo_partitioned_cow (
+  id bigint, 
+  name string,
+  price double,
+  ts bigint,
+  dt string
+) 
+using hudi 
+partitioned by (dt)
+options (
+  type = 'cow',
+  primaryKey = 'id',
+  preCombineField = 'ts',
+  hoodie.datasource.write.drop.partition.columns = 'true'
+)
+location 'gs://hudi_datasets/bq_demo_partitioned_cow/';
+```
+
+BigQuery doesn't accept the partition column in the parquet schema, hence we need to drop the partition columns from the
+schema by enabling this flag:
+
+```
+hoodie.datasource.write.drop.partition.columns = 'true'
+```
+
+2. As part of the BigQuerySync, the sync tool will generate/update the manifest files inside the .hoodie metadata files.
+   For tables which already exist, you can generate a manifest file for the Hudi table which has the list of the latest
+   snapshot parquet file names in a CSV format with only one column the file name. The location of the manifest file
+   will be on the .hoodie metadata folder (`gs://bucket_name/table_name/.hoodie/manifest/latest_snapshot_files.csv`)
+
+```
+// this command is coming soon.

Review comment:
       Yes, the alternative for this command could be a JAVA API to generate the manifest file, this is mainly useful for already existing hudi datasets.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org