You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2020/05/19 20:06:38 UTC

[beam] branch master updated: [BEAM-9770] Add BigQueryIO deadletter pattern Decouple .java and .py snippits commit for purpose of PR. Add back changes for Snippets.java

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d22950  [BEAM-9770] Add BigQueryIO deadletter pattern Decouple .java and .py snippits commit for purpose of PR. Add back changes for Snippets.java
     new abd7684  Merge pull request #11437 from rezarokni/BEAM-9770
9d22950 is described below

commit 9d2295039fe72a87d2bc5e64d02f8c3363681c35
Author: rarokni <ra...@users.noreply.github.com>
AuthorDate: Fri May 15 09:06:25 2020 +0800

    [BEAM-9770] Add BigQueryIO deadletter pattern
    Decouple .java and .py snippits commit for purpose of PR.
    Add back changes for Snippets.java
---
 .../apache/beam/examples/snippets/Snippets.java    | 59 ++++++++++++++++++++++
 .../apache_beam/examples/snippets/snippets.py      | 28 ++++++++++
 2 files changed, 87 insertions(+)

diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
index 3fca68c..29682ea 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
@@ -43,9 +43,12 @@ import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
 import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
+import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
 import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
 import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -784,8 +787,64 @@ public class Snippets {
           Window.<TableRow>into(
               DynamicSessions.withDefaultGapDuration(Duration.standardSeconds(10))));
       // [END CustomSessionWindow6]
+    }
+  }
+
+  public static class DeadLetterBigQuery {
+
+    public static void main(String[] args) {
+
+      // [START BigQueryIODeadLetter]
 
+      PipelineOptions options =
+          PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryOptions.class);
+
+      Pipeline p = Pipeline.create(options);
+
+      // Create a bug by writing the 2nd value as null. The API will correctly
+      // throw an error when trying to insert a null value into a REQUIRED field.
+      WriteResult result =
+          p.apply(Create.of(1, 2))
+              .apply(
+                  BigQueryIO.<Integer>write()
+                      .withSchema(
+                          new TableSchema()
+                              .setFields(
+                                  ImmutableList.of(
+                                      new TableFieldSchema()
+                                          .setName("num")
+                                          .setType("INTEGER")
+                                          .setMode("REQUIRED"))))
+                      .to("Test.dummyTable")
+                      .withFormatFunction(x -> new TableRow().set("num", (x == 2) ? null : x))
+                      .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
+                      // Forcing the bounded pipeline to use streaming inserts
+                      .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
+                      // set the withExtendedErrorInfo property.
+                      .withExtendedErrorInfo()
+                      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+                      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
+
+      result
+          .getFailedInsertsWithErr()
+          .apply(
+              MapElements.into(TypeDescriptors.strings())
+                  .via(
+                      x -> {
+                        System.out.println(" The table was " + x.getTable());
+                        System.out.println(" The row was " + x.getRow());
+                        System.out.println(" The error was " + x.getError());
+                        return "";
+                      }));
+      p.run();
+
+      /*  Sample Output From the pipeline:
+       <p>The table was GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=Test,projectId=<>, tableId=dummyTable}}
+       <p>The row was GenericData{classInfo=[f], {num=null}}
+       <p>The error was GenericData{classInfo=[errors, index],{errors=[GenericData{classInfo=[debugInfo, location, message, reason], {debugInfo=,location=, message=Missing required field: Msg_0_CLOUD_QUERY_TABLE.num., reason=invalid}}],index=0}}
+      */
     }
+    // [END BigQueryIODeadLetter]
   }
 
   public static class PeriodicallyUpdatingSideInputs {
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 2fb6e59..c2ca401 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -33,6 +33,7 @@ string. The tags can contain only letters, digits and _.
 
 from __future__ import absolute_import
 from __future__ import division
+from __future__ import print_function
 
 import argparse
 import base64
@@ -1492,3 +1493,30 @@ def side_input_slow_update(
   # [END SideInputSlowUpdateSnip1]
 
   return p, result
+
+
+def bigqueryio_deadletter():
+  # [START BigQueryIODeadLetter]
+
+  # Create pipeline.
+  schema = ({'fields': [{'name': 'a', 'type': 'STRING', 'mode': 'REQUIRED'}]})
+
+  p = beam.Pipeline()
+
+  errors = (
+      p | 'Data' >> beam.Create([1, 2])
+      | 'CreateBrokenData' >>
+      beam.Map(lambda src: {'a': src} if src == 2 else {'a': None})
+      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
+          "<Your Project:Test.dummy_a_table",
+          schema=schema,
+          insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR',
+          create_disposition='CREATE_IF_NEEDED',
+          write_disposition='WRITE_APPEND'))
+  result = (
+      errors['FailedRows']
+      | 'PrintErrors' >>
+      beam.FlatMap(lambda err: print("Error Found {}".format(err))))
+  # [END BigQueryIODeadLetter]
+
+  return result