You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/23 13:14:19 UTC

[GitHub] [beam] pcoet commented on a diff in pull request #17391: [BEAM-13953] added documentation for BQ Storage Write API

pcoet commented on code in PR #17391:
URL: https://github.com/apache/beam/pull/17391#discussion_r856891520


##########
website/www/site/content/en/documentation/io/built-in/google-bigquery.md:
##########
@@ -770,6 +768,120 @@ You can either keep retrying, or return the failed records in a separate
 `PCollection` using the `WriteResult.getFailedInserts()` method.
 {{< /paragraph >}}
 
+### Using the Storage Write API {#storage-write-api}
+
+Starting with version 2.36.0 of the Beam SDK for Java, you can use the
+[BigQuery Storage Write API](https://cloud.google.com/bigquery/docs/write-api)
+from the BigQueryIO connector.
+
+#### Exactly-once semantics
+
+To write to BigQuery using the Storage Write API, set `withMethod` to
+[`Method.STORAGE_WRITE_API`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.Method.html#STORAGE_WRITE_API).
+Here’s an example transform that writes to BigQuery using the Storage Write API and exactly-once semantics:
+
+{{< highlight java >}}
+WriteResult writeResult = rows.apply("Save Rows to BigQuery",
+BigQueryIO.writeTableRows()
+        .to(options.getFullyQualifiedTableName())
+        .withWriteDisposition(WriteDisposition.WRITE_APPEND)
+        .withCreateDisposition(CreateDisposition.CREATE_NEVER)
+        .withMethod(Method.STORAGE_WRITE_API)
+);
+{{< /highlight >}}
+{{< highlight py >}}
+# The SDK for Python does not support the BigQuery Storage API.
+{{< /highlight >}}
+
+If your pipeline needs to create the table (in case it doesn’t exist and you
+specified the create disposition as `CREATE_IF_NEEDED`), you must provide a
+table schema. The API uses the schema to validate data and convert it to a
+binary protocol.
+
+{{< highlight java >}}
+TableSchema schema = new TableSchema().setFields(
+        List.of(
+            new TableFieldSchema()
+                .setName("request_ts")
+                .setType("TIMESTAMP")
+                .setMode("REQUIRED"),
+            new TableFieldSchema()
+                .setName("user_name")
+                .setType("STRING")
+                .setMode("REQUIRED")));
+{{< /highlight >}}
+{{< highlight py >}}
+# The SDK for Python does not support the BigQuery Storage API.
+{{< /highlight >}}
+
+For streaming pipelines, you need to set two additional parameters: the number
+of streams and the triggering frequency.
+
+{{< highlight java >}}
+BigQueryIO.writeTableRows()
+        // ...
+        .withTriggeringFrequency(Duration.standardSeconds(5))
+        .withNumStorageWriteApiStreams(3)
+);
+{{< /highlight >}}
+{{< highlight py >}}
+# The SDK for Python does not support the BigQuery Storage API.
+{{< /highlight >}}
+
+The number of streams defines the parallelism of the BigQueryIO Write transform
+and roughly corresponds to the number of Storage Write API streams that the

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org