You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2020/05/19 20:06:38 UTC
[beam] branch master updated: [BEAM-9770] Add BigQueryIO deadletter
pattern Decouple .java and .py snippits commit for purpose of PR. Add back
changes for Snippets.java
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9d22950 [BEAM-9770] Add BigQueryIO deadletter pattern Decouple .java and .py snippits commit for purpose of PR. Add back changes for Snippets.java
new abd7684 Merge pull request #11437 from rezarokni/BEAM-9770
9d22950 is described below
commit 9d2295039fe72a87d2bc5e64d02f8c3363681c35
Author: rarokni <ra...@users.noreply.github.com>
AuthorDate: Fri May 15 09:06:25 2020 +0800
[BEAM-9770] Add BigQueryIO deadletter pattern
Decouple .java and .py snippits commit for purpose of PR.
Add back changes for Snippets.java
---
.../apache/beam/examples/snippets/Snippets.java | 59 ++++++++++++++++++++++
.../apache_beam/examples/snippets/snippets.py | 28 ++++++++++
2 files changed, 87 insertions(+)
diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
index 3fca68c..29682ea 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
@@ -43,9 +43,12 @@ import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
+import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -784,8 +787,64 @@ public class Snippets {
Window.<TableRow>into(
DynamicSessions.withDefaultGapDuration(Duration.standardSeconds(10))));
// [END CustomSessionWindow6]
+ }
+ }
+
+ public static class DeadLetterBigQuery {
+
+ public static void main(String[] args) {
+
+ // [START BigQueryIODeadLetter]
+ PipelineOptions options =
+ PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+
+ // Create a bug by writing the 2nd value as null. The API will correctly
+ // throw an error when trying to insert a null value into a REQUIRED field.
+ WriteResult result =
+ p.apply(Create.of(1, 2))
+ .apply(
+ BigQueryIO.<Integer>write()
+ .withSchema(
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema()
+ .setName("num")
+ .setType("INTEGER")
+ .setMode("REQUIRED"))))
+ .to("Test.dummyTable")
+ .withFormatFunction(x -> new TableRow().set("num", (x == 2) ? null : x))
+ .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
+ // Forcing the bounded pipeline to use streaming inserts
+ .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
+ // set the withExtendedErrorInfo property.
+ .withExtendedErrorInfo()
+ .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
+
+ result
+ .getFailedInsertsWithErr()
+ .apply(
+ MapElements.into(TypeDescriptors.strings())
+ .via(
+ x -> {
+ System.out.println(" The table was " + x.getTable());
+ System.out.println(" The row was " + x.getRow());
+ System.out.println(" The error was " + x.getError());
+ return "";
+ }));
+ p.run();
+
+ /* Sample Output From the pipeline:
+ <p>The table was GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=Test,projectId=<>, tableId=dummyTable}}
+ <p>The row was GenericData{classInfo=[f], {num=null}}
+ <p>The error was GenericData{classInfo=[errors, index],{errors=[GenericData{classInfo=[debugInfo, location, message, reason], {debugInfo=,location=, message=Missing required field: Msg_0_CLOUD_QUERY_TABLE.num., reason=invalid}}],index=0}}
+ */
}
+ // [END BigQueryIODeadLetter]
}
public static class PeriodicallyUpdatingSideInputs {
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 2fb6e59..c2ca401 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -33,6 +33,7 @@ string. The tags can contain only letters, digits and _.
from __future__ import absolute_import
from __future__ import division
+from __future__ import print_function
import argparse
import base64
@@ -1492,3 +1493,30 @@ def side_input_slow_update(
# [END SideInputSlowUpdateSnip1]
return p, result
+
+
+def bigqueryio_deadletter():
+ # [START BigQueryIODeadLetter]
+
+ # Create pipeline.
+ schema = ({'fields': [{'name': 'a', 'type': 'STRING', 'mode': 'REQUIRED'}]})
+
+ p = beam.Pipeline()
+
+ errors = (
+ p | 'Data' >> beam.Create([1, 2])
+ | 'CreateBrokenData' >>
+ beam.Map(lambda src: {'a': src} if src == 2 else {'a': None})
+ | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
+ "<Your Project:Test.dummy_a_table",
+ schema=schema,
+ insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR',
+ create_disposition='CREATE_IF_NEEDED',
+ write_disposition='WRITE_APPEND'))
+ result = (
+ errors['FailedRows']
+ | 'PrintErrors' >>
+ beam.FlatMap(lambda err: print("Error Found {}".format(err))))
+ # [END BigQueryIODeadLetter]
+
+ return result