You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2022/03/16 17:26:00 UTC
[jira] [Commented] (BEAM-12633) How to get failed insert record for file load insertion in BigQuery.
[ https://issues.apache.org/jira/browse/BEAM-12633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17507760#comment-17507760 ]
Beam JIRA Bot commented on BEAM-12633:
--------------------------------------
This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3.
Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean.
> How to get failed insert record for file load insertion in BigQuery.
> --------------------------------------------------------------------
>
> Key: BEAM-12633
> URL: https://issues.apache.org/jira/browse/BEAM-12633
> Project: Beam
> Issue Type: Improvement
> Components: io-java-gcp
> Reporter: Ashutosh Dixit
> Priority: P2
> Labels: stale-P2
>
> I'm using Apache Beam (Java SDK) to insert record in BigQuery using Batch load method (File loads). I want to retrieve those records which failed during insertion.
> Is it possible to have a retry policy on failed records?
> Actually getting error after retying 1000 times.
> Below is my code:
>
> {{}}
> {code:java}
> public static void insertToBigQueryDataLake(
> final PCollectionTuple dataStoresCollectionTuple,
> final TupleTag<KV<DataLake, PayloadSpecs>> dataLakeValidTag,
> final Long loadJobTriggerFrequency,
> final Integer loadJobNumShard) {
> WriteResult writeResult = dataStoresCollectionTuple
> .get(dataLakeValidTag)
> .apply(TRANSFORMATION_NAME, DataLakeTableProcessor.dataLakeTableProcessorTransform())
> .apply(
> WRITING_EVENTS_NAME,
> BigQueryIO.<KV<DataLake, TableRowSpecs>>write()
> .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
> .withTriggeringFrequency(Duration.standardMinutes(loadJobTriggerFrequency))
> .withNumFileShards(loadJobNumShard)
> .to(new DynamicTableRowDestinations<>(IS_DATA_LAKE))
> .withFormatFunction(BigQueryServiceImpl::dataLakeTableRow));
> writeResult.getFailedInserts().apply(ParDo.of(new DoFn<TableRow, Void>() {
> @ProcessElement
> public void processElement(final ProcessContext processContext) throws IOException {
> System.out.println("Table Row : " + processContext.element().toPrettyString());
> }
> }));
> }{code}
> {{}}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)