You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Yoni Bendayan (Jira)" <ji...@apache.org> on 2022/03/20 11:11:00 UTC

[jira] [Updated] (BEAM-14135) BigQuery Storage API insert with writeResult retry and write to error table

     [ https://issues.apache.org/jira/browse/BEAM-14135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yoni Bendayan updated BEAM-14135:
---------------------------------
    Description: 
I’m currently using the legacy big query insert on a streaming pipeline (not using the streaming engine) like this:
{code:java}
bqWriter = bqWriter.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
        .withExtendedErrorInfo();
bqErrorHandler = (writeResult, eventsProcessingOptions1) ->
        writeResult.getFailedInsertsWithErr().apply("BQ-insert-error-write",
                HandleInsertError.of();{code}
and in  HandleInsertError we process the BigQueryInsertError add some metadata and write to a desired big query error table:

 
{code:java}
@Override
public PCollection<Void> expand(PCollection<BigQueryInsertError> input) {
 return input
         .apply("transform-err-table-row", ParDo.of(new DoFn<BigQueryInsertError, KV<TableRow, TableDestination>>() {
             @ProcessElement
             public void processElement(ProcessContext c) {
                 BigQueryInsertError bigQueryInsertError = c.element();
                 TableRow convertedRow = new TableRow();
                 convertedRow.set("error", bigQueryInsertError.getError().toString());
                 convertedRow.set("t",  CommonConverter.convertDate(new Date()));
                 convertedRow.set(UUID, bigQueryInsertError.getRow().get(UUID));
                 TableDestination tableDestination = BqUtil.getTableDestination(bigQueryInsertError.getTable().getProjectId(),
                         bigQueryInsertError.getTable().getDatasetId(), errorTable);
                 c.output(KV.of(convertedRow,tableDestination));
             }
         }))
         .apply(new BqInsertError());
}



{code}
 

I’m trying to change the write method to use the new one
{code:java}
.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API);{code}
 

but I get this error:
{noformat}
When writing an unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES, triggering frequency must be specified
 



{noformat}
even though the documentation indicates that the triggering frequency is relevant to FILE_LOAD method:
[https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withTriggeringFrequency-org.joda.time.Duration-]

 

after I’ve added the triggering frequency and NumStorageWriteApiStreams im getting this error:
{noformat}
Cannot use getFailedInsertsWithErr as this WriteResult does not use extended errors. Use getFailedInserts instead{noformat}
 

but the difference between these functions is that getFailedInsertsWithErr expands PCollection<BigQueryInsertError>
and there we have 2 features that are not avaliable from the getFailedInserts function because it expands PCollection<TableRow>:

1. we can get the insert error  bigQueryInsertError.getError()
2. we can determine the projectid and dataset id by using 
       bigQueryInsertError.getTable().getProjectId(),
       bigQueryInsertError.getTable().getDatasetId()
       we need them because our pipeline is a multi tenant use case and to get                             those prarameters otherwise would require a lot of overhead.

and also when I’m trying to run it with the getFailedInserts like that:
{code:java}
bqWriter = bqWriter.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
        .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
        .withTriggeringFrequency(Duration.standardSeconds(5))
        .withNumStorageWriteApiStreams(12);
bqErrorHandler = (writeResult, eventsProcessingOptions1) ->
        writeResult.getFailedInserts().apply("BQ-insert-error-write",
                HandleStorageWriteApiInsertError.of();
 



{code}
I get the following error:
 
{noformat}
Record-insert retry policies are not supported when using BigQuery load jobs.{noformat}
 

but I’m using the STORAGE_API_WRITES which normally should support retryTransientErrors

So first i think  there is a something missing in the implementation of that write method  that makes the retry feature not supported,

and as a feature request is to support getFailedInsertsWithErr in the writeResult when using BigQueryIO.Write.Method.STORAGE_WRITE_API

if there is an existing workaround for that now it would be great because switching the write method significantly cuts our costs

Thanks!

  was:
I’m currently using the legacy big query insert on a streaming pipeline (not using the streaming engine) like this:
{code:java}
bqWriter = bqWriter.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
        .withExtendedErrorInfo();
bqErrorHandler = (writeResult, eventsProcessingOptions1) ->
        writeResult.getFailedInsertsWithErr().apply("BQ-insert-error-write",
                HandleInsertError.of();{code}

and in  HandleInsertError we process the BigQueryInsertError add some metadata and write to a desired big query error table:

 
{code:java}
@Override
public PCollection<Void> expand(PCollection<BigQueryInsertError> input) {
 return input
         .apply("transform-err-table-row", ParDo.of(new DoFn<BigQueryInsertError, KV<TableRow, TableDestination>>() {
             @ProcessElement
             public void processElement(ProcessContext c) {
                 BigQueryInsertError bigQueryInsertError = c.element();
                 TableRow convertedRow = new TableRow();
                 convertedRow.set("error", bigQueryInsertError.getError().toString());
                 convertedRow.set("t",  CommonConverter.convertDate(new Date()));
                 convertedRow.set(UUID, bigQueryInsertError.getRow().get(UUID));
                 TableDestination tableDestination = BqUtil.getTableDestination(bigQueryInsertError.getTable().getProjectId(),
                         bigQueryInsertError.getTable().getDatasetId(), errorTable);
                 c.output(KV.of(convertedRow,tableDestination));
             }
         }))
         .apply(new BqInsertError());
}



{code}
 

I’m trying to change the write method to use the new one
{code:java}
.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API);{code}
 

but I get this error:
{noformat}
When writing an unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES, triggering frequency must be specified
 



{noformat}
even though the documentation indicates that the triggering frequency is relevant to FILE_LOAD method:
[https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withTriggeringFrequency-org.joda.time.Duration-]

 

after I’ve added the triggering frequency and NumStorageWriteApiStreams im getting this error:
{noformat}
Cannot use getFailedInsertsWithErr as this WriteResult does not use extended errors. Use getFailedInserts instead{noformat}
 

but the difference between these functions is that getFailedInsertsWithErr expands PCollection<BigQueryInsertError>
and there we have 2 features that are not avaliable from the getFailedInserts function because it expands PCollection<TableRow>:

1. we can get the insert error  bigQueryInsertError.getError()
2. we can determine the projectid and dataset id by using 
       bigQueryInsertError.getTable().getProjectId(),
       bigQueryInsertError.getTable().getDatasetId()
       we need them because our pipeline is a multi tenant use case and to get                             those prarameters otherwise would require a lot of overhead.



and also when I’m trying to run it with the getFailedInserts like that:
{code:java}
bqWriter = bqWriter.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
        .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
        .withTriggeringFrequency(Duration.standardSeconds(5))
        .withNumStorageWriteApiStreams(12);
bqErrorHandler = (writeResult, eventsProcessingOptions1) ->
        writeResult.getFailedInserts().apply("BQ-insert-error-write",
                HandleStorageWriteApiInsertError.of();
 



{code}

I get the following error:
 
{noformat}
Record-insert retry policies are not supported when using BigQuery load jobs.{noformat}
 

but I’m using the STORAGE_API_WRITES which normally should support retryTransientErrors

So first i think  there is a something missing in the implementation of that write method  that makes the retry feature not supported,

and as a feature request is to support getFailedInsertsWithErr in the writeResult when using BigQueryIO.Write.Method.STORAGE_WRITE_API

if there is an existing workaround for that now it would be great because switching the write method significantly cuts our costs


> BigQuery Storage API insert with writeResult retry and write to error table
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-14135
>                 URL: https://issues.apache.org/jira/browse/BEAM-14135
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>            Reporter: Yoni Bendayan
>            Priority: P2
>
> I’m currently using the legacy big query insert on a streaming pipeline (not using the streaming engine) like this:
> {code:java}
> bqWriter = bqWriter.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
>         .withExtendedErrorInfo();
> bqErrorHandler = (writeResult, eventsProcessingOptions1) ->
>         writeResult.getFailedInsertsWithErr().apply("BQ-insert-error-write",
>                 HandleInsertError.of();{code}
> and in  HandleInsertError we process the BigQueryInsertError add some metadata and write to a desired big query error table:
>  
> {code:java}
> @Override
> public PCollection<Void> expand(PCollection<BigQueryInsertError> input) {
>  return input
>          .apply("transform-err-table-row", ParDo.of(new DoFn<BigQueryInsertError, KV<TableRow, TableDestination>>() {
>              @ProcessElement
>              public void processElement(ProcessContext c) {
>                  BigQueryInsertError bigQueryInsertError = c.element();
>                  TableRow convertedRow = new TableRow();
>                  convertedRow.set("error", bigQueryInsertError.getError().toString());
>                  convertedRow.set("t",  CommonConverter.convertDate(new Date()));
>                  convertedRow.set(UUID, bigQueryInsertError.getRow().get(UUID));
>                  TableDestination tableDestination = BqUtil.getTableDestination(bigQueryInsertError.getTable().getProjectId(),
>                          bigQueryInsertError.getTable().getDatasetId(), errorTable);
>                  c.output(KV.of(convertedRow,tableDestination));
>              }
>          }))
>          .apply(new BqInsertError());
> }
> {code}
>  
> I’m trying to change the write method to use the new one
> {code:java}
> .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API);{code}
>  
> but I get this error:
> {noformat}
> When writing an unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES, triggering frequency must be specified
>  
> {noformat}
> even though the documentation indicates that the triggering frequency is relevant to FILE_LOAD method:
> [https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#withTriggeringFrequency-org.joda.time.Duration-]
>  
> after I’ve added the triggering frequency and NumStorageWriteApiStreams im getting this error:
> {noformat}
> Cannot use getFailedInsertsWithErr as this WriteResult does not use extended errors. Use getFailedInserts instead{noformat}
>  
> but the difference between these functions is that getFailedInsertsWithErr expands PCollection<BigQueryInsertError>
> and there we have 2 features that are not avaliable from the getFailedInserts function because it expands PCollection<TableRow>:
> 1. we can get the insert error  bigQueryInsertError.getError()
> 2. we can determine the projectid and dataset id by using 
>        bigQueryInsertError.getTable().getProjectId(),
>        bigQueryInsertError.getTable().getDatasetId()
>        we need them because our pipeline is a multi tenant use case and to get                             those prarameters otherwise would require a lot of overhead.
> and also when I’m trying to run it with the getFailedInserts like that:
> {code:java}
> bqWriter = bqWriter.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
>         .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
>         .withTriggeringFrequency(Duration.standardSeconds(5))
>         .withNumStorageWriteApiStreams(12);
> bqErrorHandler = (writeResult, eventsProcessingOptions1) ->
>         writeResult.getFailedInserts().apply("BQ-insert-error-write",
>                 HandleStorageWriteApiInsertError.of();
>  
> {code}
> I get the following error:
>  
> {noformat}
> Record-insert retry policies are not supported when using BigQuery load jobs.{noformat}
>  
> but I’m using the STORAGE_API_WRITES which normally should support retryTransientErrors
> So first i think  there is a something missing in the implementation of that write method  that makes the retry feature not supported,
> and as a feature request is to support getFailedInsertsWithErr in the writeResult when using BigQueryIO.Write.Method.STORAGE_WRITE_API
> if there is an existing workaround for that now it would be great because switching the write method significantly cuts our costs
> Thanks!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)