You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/04/23 17:26:51 UTC

[beam] branch master updated: [BEAM-13953] added documentation for BQ Storage Write API (#17391)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f2e3c7c9ec [BEAM-13953] added documentation for BQ Storage Write API (#17391)
3f2e3c7c9ec is described below

commit 3f2e3c7c9eccb9d40370cbc70e9a451a4b5573f5
Author: David Huntsperger <56...@users.noreply.github.com>
AuthorDate: Sat Apr 23 10:26:41 2022 -0700

    [BEAM-13953] added documentation for BQ Storage Write API (#17391)
    
    * added documentatino for BQ Storage Write API
    
    * adding placeholder Python tabs to ensure code examples are visible
    
    * adding note about the `UseStorageWriteApi` option`
    
    * fixed phrasing of note about `UseStorageWriteApi`
---
 .../documentation/io/built-in/google-bigquery.md   | 150 ++++++++++++++++++---
 1 file changed, 131 insertions(+), 19 deletions(-)

diff --git a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
index 1759016f706..42efe571651 100644
--- a/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
+++ b/website/www/site/content/en/documentation/io/built-in/google-bigquery.md
@@ -68,7 +68,6 @@ Additional resources:
 * [Google BigQuery documentation](https://cloud.google.com/bigquery/docs)
 {{< /paragraph >}}
 
-
 ## BigQuery basics
 
 ### Table names
@@ -166,8 +165,9 @@ schema](#creating-a-table-schema) covers schemas in more detail.
 ### Data types
 
 BigQuery supports the following data types: STRING, BYTES, INTEGER, FLOAT,
-NUMERIC, BOOLEAN, TIMESTAMP, DATE, TIME, DATETIME and GEOGRAPHY.
-All possible values are described at [https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types](https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types).
+NUMERIC, BOOLEAN, TIMESTAMP, DATE, TIME, DATETIME and GEOGRAPHY. For an
+overview of Google Standard SQL data types, see
+[Data types](https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types).
 BigQueryIO allows you to use all of these data types. The following example
 shows the correct format for data types used when reading from and writing to
 BigQuery:
@@ -210,9 +210,9 @@ BigQueryIO allows you to read from a BigQuery table, or to execute a SQL query
 and read the results. By default, Beam invokes a [BigQuery export
 request](https://cloud.google.com/bigquery/docs/exporting-data) when you apply a
 BigQueryIO read transform. However, the Beam SDK for Java also supports using
-the [BigQuery Storage
+the [BigQuery Storage Read
 API](https://cloud.google.com/bigquery/docs/reference/storage) to read directly
-from BigQuery storage. See [Using the BigQuery Storage API](#storage-api) for
+from BigQuery storage. See [Using the Storage Read API](#storage-api) for
 more information.
 
 > Beam’s use of BigQuery APIs is subject to BigQuery's
@@ -288,7 +288,6 @@ then extracts the `max_temperature` column.
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_bigqueryio_read_table >}}
 {{< /highlight >}}
 
-
 ### Reading with a query string
 
 {{< paragraph class="language-java" >}}
@@ -324,7 +323,7 @@ in the following example:
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_bigqueryio_read_query_std_sql >}}
 {{< /highlight >}}
 
-### Using the BigQuery Storage API {#storage-api}
+### Using the Storage Read API {#storage-api}
 
 The [BigQuery Storage API](https://cloud.google.com/bigquery/docs/reference/storage/)
 allows you to directly access tables in BigQuery storage, and supports features
@@ -374,8 +373,12 @@ The following code snippet reads with a query string.
 
 ## Writing to BigQuery
 
-BigQueryIO allows you to write to BigQuery tables. If you are using the Beam SDK
-for Java, you can also write different rows to different tables.
+BigQueryIO lets you write to BigQuery tables. If you are using the Beam SDK
+for Java, you can write different rows to different tables. The Beam SDK for
+Java also supports using the
+[BigQuery Storage Write API](https://cloud.google.com/bigquery/docs/write-api)
+to write directly to BigQuery storage. For more information, see
+[Using the Storage Write API](#storage-write-api).
 
 > BigQueryIO write transforms use APIs that are subject to BigQuery's
 > [Quota](https://cloud.google.com/bigquery/quota-policy) and
@@ -395,7 +398,6 @@ for the destination table(s):
 In addition, if your write operation creates a new BigQuery table, you must also
 supply a table schema for the destination table.
 
-
 ### Create disposition
 
 The create disposition controls whether or not your BigQuery write operation
@@ -439,7 +441,6 @@ If you specify `CREATE_IF_NEEDED` as the create disposition and you don't supply
 a table schema, the transform might fail at runtime if the destination table does
 not exist.
 
-
 ### Write disposition
 
 The write disposition controls how your BigQuery write operation applies to an
@@ -492,7 +493,6 @@ concurrent pipelines that write to the same output table with a write
 disposition of `WRITE_EMPTY` might start successfully, but both pipelines can
 fail later when the write attempts happen.
 
-
 ### Creating a table schema
 
 If your BigQuery write operation creates a new table, you must provide schema
@@ -551,7 +551,6 @@ two fields (source and quote) of type string.
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_bigqueryio_schema_object >}}
 {{< /highlight >}}
 
-
 #### Using a string
 
 <!-- Java specific - string -->
@@ -596,7 +595,6 @@ as the previous example.
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_bigqueryio_schema >}}
 {{< /highlight >}}
 
-
 ### Setting the insertion method
 
 BigQueryIO supports two methods of inserting data into BigQuery: load jobs and
@@ -770,6 +768,124 @@ You can either keep retrying, or return the failed records in a separate
 `PCollection` using the `WriteResult.getFailedInserts()` method.
 {{< /paragraph >}}
 
+### Using the Storage Write API {#storage-write-api}
+
+Starting with version 2.36.0 of the Beam SDK for Java, you can use the
+[BigQuery Storage Write API](https://cloud.google.com/bigquery/docs/write-api)
+from the BigQueryIO connector.
+
+#### Exactly-once semantics
+
+To write to BigQuery using the Storage Write API, set `withMethod` to
+[`Method.STORAGE_WRITE_API`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.Method.html#STORAGE_WRITE_API).
+Here’s an example transform that writes to BigQuery using the Storage Write API and exactly-once semantics:
+
+{{< highlight java >}}
+WriteResult writeResult = rows.apply("Save Rows to BigQuery",
+BigQueryIO.writeTableRows()
+        .to(options.getFullyQualifiedTableName())
+        .withWriteDisposition(WriteDisposition.WRITE_APPEND)
+        .withCreateDisposition(CreateDisposition.CREATE_NEVER)
+        .withMethod(Method.STORAGE_WRITE_API)
+);
+{{< /highlight >}}
+{{< highlight py >}}
+# The SDK for Python does not support the BigQuery Storage API.
+{{< /highlight >}}
+
+If you want to change the behavior of BigQueryIO so that all the BigQuery sinks
+for your pipeline use the Storage Write API by default, set the
+[`UseStorageWriteApi` option](https://github.com/apache/beam/blob/2c18ce0ccd7705473aa9ecc443dcdbe223dd9449/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java#L82-L86).
+
+If your pipeline needs to create the table (in case it doesn’t exist and you
+specified the create disposition as `CREATE_IF_NEEDED`), you must provide a
+table schema. The API uses the schema to validate data and convert it to a
+binary protocol.
+
+{{< highlight java >}}
+TableSchema schema = new TableSchema().setFields(
+        List.of(
+            new TableFieldSchema()
+                .setName("request_ts")
+                .setType("TIMESTAMP")
+                .setMode("REQUIRED"),
+            new TableFieldSchema()
+                .setName("user_name")
+                .setType("STRING")
+                .setMode("REQUIRED")));
+{{< /highlight >}}
+{{< highlight py >}}
+# The SDK for Python does not support the BigQuery Storage API.
+{{< /highlight >}}
+
+For streaming pipelines, you need to set two additional parameters: the number
+of streams and the triggering frequency.
+
+{{< highlight java >}}
+BigQueryIO.writeTableRows()
+        // ...
+        .withTriggeringFrequency(Duration.standardSeconds(5))
+        .withNumStorageWriteApiStreams(3)
+);
+{{< /highlight >}}
+{{< highlight py >}}
+# The SDK for Python does not support the BigQuery Storage API.
+{{< /highlight >}}
+
+The number of streams defines the parallelism of the BigQueryIO Write transform
+and roughly corresponds to the number of Storage Write API streams that the
+pipeline uses. You can set it explicitly on the transform via
+[`withNumStorageWriteApiStreams`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withNumStorageWriteApiStreams-int-)
+or provide the `numStorageWriteApiStreams` option to the pipeline as defined in
+[`BigQueryOptions`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.html).
+
+Triggering frequency determines how soon the data is visible for querying in
+BigQuery. You can explicitly set it via
+[`withTriggeringFrequency`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withTriggeringFrequency-org.joda.time.Duration-)
+or specify the number of seconds by setting the
+`storageWriteApiTriggeringFrequencySec` option.
+
+The combination of these two parameters affects the size of the batches of rows
+that BigQueryIO creates before calling the Storage Write API. Setting the
+frequency too high can result in smaller batches, which can affect performance.
+As a general rule, a single stream should be able to handle throughput of at
+least 1Mb per second. Creating exclusive streams is an expensive operation for
+the BigQuery service, so you should use only as many streams as needed for your
+use case. Triggering frequency in single-digit seconds is a good choice for most
+pipelines.
+
+Currently, `STORAGE_WRITE_API` doesn’t support
+[`withAutoSharding`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withAutoSharding--).
+The method will be supported in a future release.
+
+When using `STORAGE_WRITE_API`, the PCollection returned by
+[`WriteResult.getFailedInserts`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.html#getFailedInserts--)
+will not contain the failed rows. If there are data validation errors, the
+transform will throw a `RuntimeException`.
+
+#### At-least-once semantics
+
+If your use case allows for potential duplicate records in the target table, you
+can use the
+[`STORAGE_API_AT_LEAST_ONCE`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.Method.html#STORAGE_API_AT_LEAST_ONCE)
+method. Because this method doesn’t persist the records to be written to
+BigQuery into its shuffle storage (needed to provide the exactly-once semantics
+of the `STORAGE_WRITE_API` method), it is cheaper and results in lower latency
+for most pipelines. If you use `STORAGE_API_AT_LEAST_ONCE`, you don’t need to
+specify the number of streams, and you can’t specify the triggering frequency.
+
+Auto sharding is not applicable for `STORAGE_API_AT_LEAST_ONCE`.
+
+When using `STORAGE_API_AT_LEAST_ONCE`, the PCollection returned by
+[`WriteResult.getFailedInserts`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.html#getFailedInserts--)
+will not contain the failed rows. If there are data validation errors, the
+transform will throw a `RuntimeException`.
+
+#### Quotas
+
+Before using the Storage Write API, be aware of the
+[BigQuery Storage Write API quotas](https://cloud.google.com/bigquery/quotas#write-api-limits).
+
 ### Using dynamic destinations
 
 You can use the dynamic destinations feature to write elements in a
@@ -847,7 +963,6 @@ This example generates one partition per day.
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_bigqueryio_time_partitioning>}}
 {{< /highlight >}}
 
-
 ## Limitations
 
 BigQueryIO currently has the following limitations.
@@ -861,7 +976,6 @@ BigQueryIO currently has the following limitations.
    multiple BigQuery tables. The Beam SDK for Java does not have this limitation
    as it partitions your dataset for you.
 
-
 ## Additional examples
 
 You can find additional examples that use BigQuery in Beam's examples
@@ -903,7 +1017,6 @@ directory.
   pipeline looks at the data coming in from a text file and writes the results
   to a BigQuery table.
 
-
 ### Java complete examples
 
 These examples are from the Java [complete examples](https://github.com/apache/beam/tree/master/examples/java/src/main/java/org/apache/beam/examples/complete)
@@ -926,7 +1039,6 @@ directory.
   reads traffic sensor data, calculates the average speed for each window and
   looks for slowdowns in routes, and writes the results to a BigQuery table.
 
-
 ### Python cookbook examples
 
 These examples are from the Python [cookbook examples](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/cookbook)
@@ -937,7 +1049,7 @@ directory.
   nested and repeated fields, and writes the data to a BigQuery table.
 
 * [BigQuery side inputs](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py)
-  uses BigQuery sources as a side inputs. It illustrates how to insert
+  uses BigQuery sources as side inputs. It illustrates how to insert
   side-inputs into transforms in three different forms: as a singleton, as a
   iterator, and as a list.