You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2021/10/01 17:25:01 UTC

[jira] [Updated] (BEAM-12633) How to get failed insert record for file load insertion in BigQuery.

     [ https://issues.apache.org/jira/browse/BEAM-12633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Beam JIRA Bot updated BEAM-12633:
---------------------------------
    Labels:   (was: stale-P2)

> How to get failed insert record for file load insertion in BigQuery.
> --------------------------------------------------------------------
>
>                 Key: BEAM-12633
>                 URL: https://issues.apache.org/jira/browse/BEAM-12633
>             Project: Beam
>          Issue Type: Improvement
>          Components: beam-community, io-java-gcp
>            Reporter: Ashutosh Dixit
>            Priority: P3
>
> I'm using Apache Beam (Java SDK) to insert record in BigQuery using Batch load method (File loads). I want to retrieve those records which failed during insertion.
> Is it possible to have a retry policy on failed records?
> Actually getting error after retying 1000 times.
> Below is my code:
>  
> {{}}
> {code:java}
> public static void insertToBigQueryDataLake(
>         final PCollectionTuple dataStoresCollectionTuple,
>         final TupleTag<KV<DataLake, PayloadSpecs>> dataLakeValidTag,
>         final Long loadJobTriggerFrequency,
>         final Integer loadJobNumShard) {
>     WriteResult writeResult = dataStoresCollectionTuple
>             .get(dataLakeValidTag)
>             .apply(TRANSFORMATION_NAME, DataLakeTableProcessor.dataLakeTableProcessorTransform())
>             .apply(
>                     WRITING_EVENTS_NAME,
>                     BigQueryIO.<KV<DataLake, TableRowSpecs>>write()
>                             .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>                             .withTriggeringFrequency(Duration.standardMinutes(loadJobTriggerFrequency))
>                             .withNumFileShards(loadJobNumShard)
>                             .to(new DynamicTableRowDestinations<>(IS_DATA_LAKE))
>                             .withFormatFunction(BigQueryServiceImpl::dataLakeTableRow));
>     writeResult.getFailedInserts().apply(ParDo.of(new DoFn<TableRow, Void>() {
>         @ProcessElement
>         public void processElement(final ProcessContext processContext) throws IOException {
>             System.out.println("Table Row : " + processContext.element().toPrettyString());
>         }
>     }));
> }{code}
> {{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)