You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2021/05/15 17:20:02 UTC
[jira] [Commented] (BEAM-10406) BigQueryBatchFileLoads does not
bundle rows correctly in streaming mode in python
[ https://issues.apache.org/jira/browse/BEAM-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17345092#comment-17345092 ]
Beam JIRA Bot commented on BEAM-10406:
--------------------------------------
This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3.
Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean.
> BigQueryBatchFileLoads does not bundle rows correctly in streaming mode in python
> ---------------------------------------------------------------------------------
>
> Key: BEAM-10406
> URL: https://issues.apache.org/jira/browse/BEAM-10406
> Project: Beam
> Issue Type: Improvement
> Components: io-py-gcp
> Affects Versions: 2.22.0
> Reporter: Nikunj Aggarwal
> Priority: P2
> Labels: stale-P2
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> We are using FILE_LOADS to write to big query in streaming mode using python. Input is coming from a pubsub topic with ~5000 reqs/sec and each request is around 6KB. We perform some transforms on the input and then write to BigQuery.
>
> {code:java}
> beam.io.WriteToBigQuery(
> table=table_name,
> schema=schema,
> dataset=dataset_name,
> project=project',
> method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
> triggering_frequency=2 * 60,
> create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
> write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND),
> )
> {code}
>
> We noticed that each temporary GCS file created by the load process had a very small number of rows (~1-5). We are able to reproduce it both through direct runner and dataflow runnner. On debugging we believe the following to be an issue:
> In WriteRecordsToFile (apache_beam/io/gcp/bigquery_file_loads.py), we create destinations within start_bundle and clear them up in finish_bundle. When this is used within streaming mode, a typical bundle size within Pardo is coming out to be ~1-5. We do see Windowing applied before Pardo but since there's no GroupByKey, window doesn't affect Pardo. Below is a small code which can reproduce the issue highlighted:
>
>
> {code:java}
> class WriteRecordsToFile(beam.DoFn):
> def start_bundle(self):
> print('start bundle')
> self.data = []
> def process(self, element):
> self.data.append(element)
> def finish_bundle(self):
> print('finish bundle', len(self.data))
> self.data = []
> def run(argv=None):
> parser = argparse.ArgumentParser()
> parser.add_argument(
> '--input_subscription',
> required=True,
> help='Input PubSub subscription of the form "projects/<project>/subscriptions/<subscription>".')
> known_args, pipeline_args = parser.parse_known_args(argv)
> with beam.Pipeline(argv=pipeline_args) as p:
> lines = p | beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
> (lines
> | beam.WindowInto(beam.window.GlobalWindows(),
> trigger=trigger.Repeatedly(
> trigger.AfterAny(
> trigger.AfterProcessingTime(
> 60),
> trigger.AfterCount(
> 100))),
> accumulation_mode=trigger.AccumulationMode.DISCARDING)
> | beam.ParDo(WriteRecordsToFile())){code}
>
> In the above example, we see that start bundle is called very often and does not respect triggers.
> To fix, the behavior of BigQueryBatchFileLoads, we suggest doing a grouping after the window triggers before calling the ParDo(WriteRecordsToFile).
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)