You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/05 01:18:22 UTC

[GitHub] [beam] damccorm opened a new issue, #21694: BigQuery Storage API insert with writeResult retry and write to error table

damccorm opened a new issue, #21694:
URL: https://github.com/apache/beam/issues/21694

   I’m currently using the legacy big query insert on a streaming pipeline (not using the streaming engine) like this:
   ```
   
   bqWriter = bqWriter.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
   .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
   .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
    
         .withExtendedErrorInfo();
   bqErrorHandler = (writeResult, eventsProcessingOptions1) ->
        
     writeResult.getFailedInsertsWithErr().apply("BQ-insert-error-write",
                   HandleInsertError.of();
   ```
   
   and in  HandleInsertError we process the BigQueryInsertError add some metadata and write to a desired big query error table:
   
    
   ```
   
   @Override
   public PCollection<Void> expand(PCollection<BigQueryInsertError> input) {
    return input
    
          .apply("transform-err-table-row", ParDo.of(new DoFn<BigQueryInsertError, KV<TableRow, TableDestination>>()
   {
                @ProcessElement
                public void processElement(ProcessContext c) {
        
              BigQueryInsertError bigQueryInsertError = c.element();
                    TableRow convertedRow
   = new TableRow();
                    convertedRow.set("error", bigQueryInsertError.getError().toString());
    
                  convertedRow.set("t",  CommonConverter.convertDate(new Date()));
                    convertedRow.set(UUID,
   bigQueryInsertError.getRow().get(UUID));
                    TableDestination tableDestination = BqUtil.getTableDestination(bigQueryInsertError.getTable().getProjectId(),
    
                          bigQueryInsertError.getTable().getDatasetId(), errorTable);
                  
    c.output(KV.of(convertedRow,tableDestination));
                }
            }))
            .apply(new
   BqInsertError());
   }
   
   
   
   
   ```
   
    
   
   I’m trying to change the write method to use the new one
   ```
   
   .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API);
   ```
   
    
   
   but I get this error:
   ```
   
   When writing an unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES, triggering frequency must
   be specified
    
   
   
   
   
   ```
   
   even though the documentation indicates that the triggering frequency is relevant to FILE_LOAD method:
   [https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withTriggeringFrequency-org.joda.time.Duration-](https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withTriggeringFrequency-org.joda.time.Duration-)
   
    
   
   after I’ve added the triggering frequency and NumStorageWriteApiStreams im getting this error:
   ```
   
   Cannot use getFailedInsertsWithErr as this WriteResult does not use extended errors. Use getFailedInserts
   instead
   ```
   
    
   
   but the difference between these functions is that getFailedInsertsWithErr expands PCollection<BigQueryInsertError\>
   and there we have 2 features that are not avaliable from the getFailedInserts function because it expands PCollection<TableRow\>:
   
   1. we can get the insert error  bigQueryInsertError.getError()
   2. we can determine the projectid and dataset id by using 
          bigQueryInsertError.getTable().getProjectId(),
          bigQueryInsertError.getTable().getDatasetId()
          we need them because our pipeline is a multi tenant use case and to get                             those prarameters otherwise would require a lot of overhead.
   
   and also when I’m trying to run it with the getFailedInserts like that:
   ```
   
   bqWriter = bqWriter.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
          
   .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
           .withTriggeringFrequency(Duration.standardSeconds(5))
    
         .withNumStorageWriteApiStreams(12);
   bqErrorHandler = (writeResult, eventsProcessingOptions1)
   ->
           writeResult.getFailedInserts().apply("BQ-insert-error-write",
                   HandleStorageWriteApiInsertError.of();
    
   
   
   
   
   ```
   
   I get the following error:
    
   ```
   
   Record-insert retry policies are not supported when using BigQuery load jobs.
   ```
   
    
   
   but I’m using the STORAGE_API_WRITES which normally should support retryTransientErrors
   
   So first i think  there is a something missing in the implementation of that write method  that makes the retry feature not supported,
   
   and as a feature request is to support getFailedInsertsWithErr in the writeResult when using BigQueryIO.Write.Method.STORAGE_WRITE_API
   
   if there is an existing workaround for that now it would be great because switching the write method significantly cuts our costs
   
   Thanks!
   
   Imported from Jira [BEAM-14135](https://issues.apache.org/jira/browse/BEAM-14135). Original Jira may contain additional context.
   Reported by: yoni.be.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on issue #21694: BigQuery Storage API insert with writeResult retry and write to error table

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on issue #21694:
URL: https://github.com/apache/beam/issues/21694#issuecomment-1246916305

   I believe that means this issue can be closed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] johnjcasey commented on issue #21694: BigQuery Storage API insert with writeResult retry and write to error table

Posted by GitBox <gi...@apache.org>.
johnjcasey commented on issue #21694:
URL: https://github.com/apache/beam/issues/21694#issuecomment-1246915007

   I believe @bvolpato  updated the logging on this recently. #22405


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on issue #21694: BigQuery Storage API insert with writeResult retry and write to error table

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on issue #21694:
URL: https://github.com/apache/beam/issues/21694#issuecomment-1246273262

   yes - but it's a new method.
   
   PCollection<BigQueryStorageApiInsertError> errors = writeResult.getFailedStorageApiInserts();


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles closed issue #21694: BigQuery Storage API insert with writeResult retry and write to error table

Posted by GitBox <gi...@apache.org>.
kennknowles closed issue #21694: BigQuery Storage API insert with writeResult retry and write to error table
URL: https://github.com/apache/beam/issues/21694


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] algirdas-k commented on issue #21694: BigQuery Storage API insert with writeResult retry and write to error table

Posted by GitBox <gi...@apache.org>.
algirdas-k commented on issue #21694:
URL: https://github.com/apache/beam/issues/21694#issuecomment-1147140598

   Having a row-level error is crucial. We would like to migrate to Storage API as soon as possible (for better performance and higher limits), but without said functionality it would be no way of knowing if we lost any data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #21694: BigQuery Storage API insert with writeResult retry and write to error table

Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #21694:
URL: https://github.com/apache/beam/issues/21694#issuecomment-1246013159

   @johnjcasey @reuvenlax @chamikaramj  I think the biggest issue here is: can you get a `PCollection<BigQueryInsertError>` when using the storage API? If not, what are users supposed to do?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] kennknowles commented on issue #21694: BigQuery Storage API insert with writeResult retry and write to error table

Posted by GitBox <gi...@apache.org>.
kennknowles commented on issue #21694:
URL: https://github.com/apache/beam/issues/21694#issuecomment-1247254310

   Agree.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org