You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/03/15 21:45:00 UTC
[jira] [Commented] (BEAM-10406) BigQueryBatchFileLoads does not
bundle rows correctly in streaming mode in python
[ https://issues.apache.org/jira/browse/BEAM-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302030#comment-17302030 ]
Kenneth Knowles commented on BEAM-10406:
----------------------------------------
It is expected that the file load occurs at the same granularity as a bundle. This seems like a useful improvement to the BigQuery connector to use something like GroupIntoBatches.
I don't have much context, just reading through issues and pinging people. In this case, [~pabloem].
> BigQueryBatchFileLoads does not bundle rows correctly in streaming mode in python
> ---------------------------------------------------------------------------------
>
> Key: BEAM-10406
> URL: https://issues.apache.org/jira/browse/BEAM-10406
> Project: Beam
> Issue Type: Improvement
> Components: io-py-gcp
> Affects Versions: 2.22.0
> Reporter: Nikunj Aggarwal
> Priority: P2
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> We are using FILE_LOADS to write to big query in streaming mode using python. Input is coming from a pubsub topic with ~5000 reqs/sec and each request is around 6KB. We perform some transforms on the input and then write to BigQuery.
>
> {code:java}
> beam.io.WriteToBigQuery(
> table=table_name,
> schema=schema,
> dataset=dataset_name,
> project=project',
> method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
> triggering_frequency=2 * 60,
> create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
> write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND),
> )
> {code}
>
> We noticed that each temporary GCS file created by the load process had a very small number of rows (~1-5). We are able to reproduce it both through direct runner and dataflow runnner. On debugging we believe the following to be an issue:
> In WriteRecordsToFile (apache_beam/io/gcp/bigquery_file_loads.py), we create destinations within start_bundle and clear them up in finish_bundle. When this is used within streaming mode, a typical bundle size within Pardo is coming out to be ~1-5. We do see Windowing applied before Pardo but since there's no GroupByKey, window doesn't affect Pardo. Below is a small code which can reproduce the issue highlighted:
>
>
> {code:java}
> class WriteRecordsToFile(beam.DoFn):
> def start_bundle(self):
> print('start bundle')
> self.data = []
> def process(self, element):
> self.data.append(element)
> def finish_bundle(self):
> print('finish bundle', len(self.data))
> self.data = []
> def run(argv=None):
> parser = argparse.ArgumentParser()
> parser.add_argument(
> '--input_subscription',
> required=True,
> help='Input PubSub subscription of the form "projects/<project>/subscriptions/<subscription>".')
> known_args, pipeline_args = parser.parse_known_args(argv)
> with beam.Pipeline(argv=pipeline_args) as p:
> lines = p | beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
> (lines
> | beam.WindowInto(beam.window.GlobalWindows(),
> trigger=trigger.Repeatedly(
> trigger.AfterAny(
> trigger.AfterProcessingTime(
> 60),
> trigger.AfterCount(
> 100))),
> accumulation_mode=trigger.AccumulationMode.DISCARDING)
> | beam.ParDo(WriteRecordsToFile())){code}
>
> In the above example, we see that start bundle is called very often and does not respect triggers.
> To fix, the behavior of BigQueryBatchFileLoads, we suggest doing a grouping after the window triggers before calling the ParDo(WriteRecordsToFile).
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)