You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2021/10/01 17:25:01 UTC
[jira] [Updated] (BEAM-12633) How to get failed insert record for
file load insertion in BigQuery.
[ https://issues.apache.org/jira/browse/BEAM-12633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Beam JIRA Bot updated BEAM-12633:
---------------------------------
Labels: (was: stale-P2)
> How to get failed insert record for file load insertion in BigQuery.
> --------------------------------------------------------------------
>
> Key: BEAM-12633
> URL: https://issues.apache.org/jira/browse/BEAM-12633
> Project: Beam
> Issue Type: Improvement
> Components: beam-community, io-java-gcp
> Reporter: Ashutosh Dixit
> Priority: P3
>
> I'm using Apache Beam (Java SDK) to insert record in BigQuery using Batch load method (File loads). I want to retrieve those records which failed during insertion.
> Is it possible to have a retry policy on failed records?
> Actually getting error after retying 1000 times.
> Below is my code:
>
> {{}}
> {code:java}
> public static void insertToBigQueryDataLake(
> final PCollectionTuple dataStoresCollectionTuple,
> final TupleTag<KV<DataLake, PayloadSpecs>> dataLakeValidTag,
> final Long loadJobTriggerFrequency,
> final Integer loadJobNumShard) {
> WriteResult writeResult = dataStoresCollectionTuple
> .get(dataLakeValidTag)
> .apply(TRANSFORMATION_NAME, DataLakeTableProcessor.dataLakeTableProcessorTransform())
> .apply(
> WRITING_EVENTS_NAME,
> BigQueryIO.<KV<DataLake, TableRowSpecs>>write()
> .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
> .withTriggeringFrequency(Duration.standardMinutes(loadJobTriggerFrequency))
> .withNumFileShards(loadJobNumShard)
> .to(new DynamicTableRowDestinations<>(IS_DATA_LAKE))
> .withFormatFunction(BigQueryServiceImpl::dataLakeTableRow));
> writeResult.getFailedInserts().apply(ParDo.of(new DoFn<TableRow, Void>() {
> @ProcessElement
> public void processElement(final ProcessContext processContext) throws IOException {
> System.out.println("Table Row : " + processContext.element().toPrettyString());
> }
> }));
> }{code}
> {{}}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)