You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/09 01:53:23 UTC

[GitHub] [beam] dhercher opened a new pull request #13055: [BEAM-11006] BigQuery failsafe function

dhercher opened a new pull request #13055:
URL: https://github.com/apache/beam/pull/13055


   Implementing a new function to allow BigQueryIO write to have customized Failsafe handling for Streaming Writes.
   
   Adding a new withFailsafeFormatFunction() which formats the original PCollection value into a raw TableRow (which does not have to match the TableRow written to BigQuery.
   
   PCollection<FailsafeElement<String, TableRow>> can write the TableRow to BigQuery, but restore the original payload on an exception.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #13055: [BEAM-11006] BigQuery failsafe function

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #13055:
URL: https://github.com/apache/beam/pull/13055#discussion_r506654118



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##########
@@ -2002,6 +2007,11 @@ static String getExtractDestinationUri(String extractDestinationDir) {
       return toBuilder().setFormatFunction(formatFunction).build();
     }
 
+    /** Formats the user's type into a {@link TableRow} to be written to an error collector. */
+    public Write<T> withFailsafeFormatFunction(SerializableFunction<T, TableRow> formatFunction) {

Review comment:
       Just one change request: 
   - Perhaps can we rename this to something that doesn't assume this fail-safe keyword is known by users? Perhaps `withFailedInsertFormatFunction`?
   - And can you add some detail to the Javadoc so users will understand what this does?
   
   Thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dhercher commented on pull request #13055: [BEAM-11006] BigQuery failsafe function

Posted by GitBox <gi...@apache.org>.
dhercher commented on pull request #13055:
URL: https://github.com/apache/beam/pull/13055#issuecomment-705920722


   :R @pabloem 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dhercher commented on a change in pull request #13055: [BEAM-11006] BigQuery failsafe function

Posted by GitBox <gi...@apache.org>.
dhercher commented on a change in pull request #13055:
URL: https://github.com/apache/beam/pull/13055#discussion_r506745382



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##########
@@ -2002,6 +2007,11 @@ static String getExtractDestinationUri(String extractDestinationDir) {
       return toBuilder().setFormatFunction(formatFunction).build();
     }
 
+    /** Formats the user's type into a {@link TableRow} to be written to an error collector. */
+    public Write<T> withFailsafeFormatFunction(SerializableFunction<T, TableRow> formatFunction) {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dhercher commented on a change in pull request #13055: [BEAM-11006] BigQuery failsafe function

Posted by GitBox <gi...@apache.org>.
dhercher commented on a change in pull request #13055:
URL: https://github.com/apache/beam/pull/13055#discussion_r502720045



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java
##########
@@ -40,23 +40,35 @@ private RowWriterFactory() {}
       String tempFilePrefix, DestinationT destination) throws Exception;
 
   static <ElementT, DestinationT> RowWriterFactory<ElementT, DestinationT> tableRows(
-      SerializableFunction<ElementT, TableRow> toRow) {
-    return new TableRowWriterFactory<ElementT, DestinationT>(toRow);
+      SerializableFunction<ElementT, TableRow> toRow,
+      SerializableFunction<ElementT, TableRow> toFailsafeRow) {
+    return new TableRowWriterFactory<ElementT, DestinationT>(toRow, toFailsafeRow);
   }
 
   static final class TableRowWriterFactory<ElementT, DestinationT>
       extends RowWriterFactory<ElementT, DestinationT> {
 
     private final SerializableFunction<ElementT, TableRow> toRow;
+    private final SerializableFunction<ElementT, TableRow> toFailsafeRow;
 
-    private TableRowWriterFactory(SerializableFunction<ElementT, TableRow> toRow) {
+    private TableRowWriterFactory(
+        SerializableFunction<ElementT, TableRow> toRow,
+        SerializableFunction<ElementT, TableRow> toFailsafeRow) {
       this.toRow = toRow;
+      this.toFailsafeRow = toFailsafeRow; // TODO yummy

Review comment:
       haha, removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dhercher commented on pull request #13055: [BEAM-11006] BigQuery failsafe function

Posted by GitBox <gi...@apache.org>.
dhercher commented on pull request #13055:
URL: https://github.com/apache/beam/pull/13055#issuecomment-705920722


   :R @pabloem 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dhercher commented on a change in pull request #13055: [BEAM-11006] BigQuery failsafe function

Posted by GitBox <gi...@apache.org>.
dhercher commented on a change in pull request #13055:
URL: https://github.com/apache/beam/pull/13055#discussion_r502701806



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##########
@@ -2002,6 +2006,11 @@ static String getExtractDestinationUri(String extractDestinationDir) {
       return toBuilder().setFormatFunction(formatFunction).build();
     }
 
+    /** Formats the user's type into a {@link TableRow} to be written to an error collector. */
+     public Write<T> withFailsafeFormatFunction(SerializableFunction<T, TableRow> formatFunction) {

Review comment:
       Currently the design is for the ErrorContainer to return TableRow's (or BigQueryError<TableRow>).  My worry is that it wouldn't be backwards compatible to make this generic, though I agree it would be more valuable.
   
   I think this question is important and is my only big open question around this design




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #13055: [BEAM-11006] BigQuery failsafe function

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #13055:
URL: https://github.com/apache/beam/pull/13055#discussion_r502683616



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/RowWriterFactory.java
##########
@@ -40,23 +40,35 @@ private RowWriterFactory() {}
       String tempFilePrefix, DestinationT destination) throws Exception;
 
   static <ElementT, DestinationT> RowWriterFactory<ElementT, DestinationT> tableRows(
-      SerializableFunction<ElementT, TableRow> toRow) {
-    return new TableRowWriterFactory<ElementT, DestinationT>(toRow);
+      SerializableFunction<ElementT, TableRow> toRow,
+      SerializableFunction<ElementT, TableRow> toFailsafeRow) {
+    return new TableRowWriterFactory<ElementT, DestinationT>(toRow, toFailsafeRow);
   }
 
   static final class TableRowWriterFactory<ElementT, DestinationT>
       extends RowWriterFactory<ElementT, DestinationT> {
 
     private final SerializableFunction<ElementT, TableRow> toRow;
+    private final SerializableFunction<ElementT, TableRow> toFailsafeRow;
 
-    private TableRowWriterFactory(SerializableFunction<ElementT, TableRow> toRow) {
+    private TableRowWriterFactory(
+        SerializableFunction<ElementT, TableRow> toRow,
+        SerializableFunction<ElementT, TableRow> toFailsafeRow) {
       this.toRow = toRow;
+      this.toFailsafeRow = toFailsafeRow; // TODO yummy

Review comment:
       yum yum? : ) remove?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem merged pull request #13055: [BEAM-11006] BigQuery failsafe function

Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #13055:
URL: https://github.com/apache/beam/pull/13055


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #13055: [BEAM-11006] BigQuery failsafe function

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #13055:
URL: https://github.com/apache/beam/pull/13055#issuecomment-709444327


   this looks fine to me.
   
   fyi @chamikaramj 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dhercher commented on a change in pull request #13055: [BEAM-11006] BigQuery failsafe function

Posted by GitBox <gi...@apache.org>.
dhercher commented on a change in pull request #13055:
URL: https://github.com/apache/beam/pull/13055#discussion_r506715613



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##########
@@ -2002,6 +2007,11 @@ static String getExtractDestinationUri(String extractDestinationDir) {
       return toBuilder().setFormatFunction(formatFunction).build();
     }
 
+    /** Formats the user's type into a {@link TableRow} to be written to an error collector. */
+    public Write<T> withFailsafeFormatFunction(SerializableFunction<T, TableRow> formatFunction) {

Review comment:
       Sounds good, renaming it to
   `withFormatRecordOnFailureFunction`
   
   And adding more in the Javadoc




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #13055: [BEAM-11006] BigQuery failsafe function

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #13055:
URL: https://github.com/apache/beam/pull/13055#issuecomment-710726029


   Run Java PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] dhercher commented on a change in pull request #13055: [BEAM-11006] BigQuery failsafe function

Posted by GitBox <gi...@apache.org>.
dhercher commented on a change in pull request #13055:
URL: https://github.com/apache/beam/pull/13055#discussion_r503560850



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##########
@@ -2002,6 +2006,11 @@ static String getExtractDestinationUri(String extractDestinationDir) {
       return toBuilder().setFormatFunction(formatFunction).build();
     }
 
+    /** Formats the user's type into a {@link TableRow} to be written to an error collector. */
+     public Write<T> withFailsafeFormatFunction(SerializableFunction<T, TableRow> formatFunction) {

Review comment:
       I've been looking into this feature and it will require a number of additional changes.  IMO it makes more sense to merge this first feature, then add a second issue / PR for the dynamic typing as it appears to be a relatively complex addon




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #13055: [BEAM-11006] BigQuery failsafe function

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #13055:
URL: https://github.com/apache/beam/pull/13055#discussion_r502683454



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
##########
@@ -2002,6 +2006,11 @@ static String getExtractDestinationUri(String extractDestinationDir) {
       return toBuilder().setFormatFunction(formatFunction).build();
     }
 
+    /** Formats the user's type into a {@link TableRow} to be written to an error collector. */
+     public Write<T> withFailsafeFormatFunction(SerializableFunction<T, TableRow> formatFunction) {

Review comment:
       Does this have to write TableRows? Why not also have the option to return the user type itself? (perhaps it's a lot of trouble to implement given the existing types?)
   What are some examples where users may want to recover a different tablerow than the one that was sent to BQ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org