You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/03/15 21:45:00 UTC

[jira] [Commented] (BEAM-10406) BigQueryBatchFileLoads does not bundle rows correctly in streaming mode in python

    [ https://issues.apache.org/jira/browse/BEAM-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302030#comment-17302030 ] 

Kenneth Knowles commented on BEAM-10406:
----------------------------------------

It is expected that the file load occurs at the same granularity as a bundle. This seems like a useful improvement to the BigQuery connector to use something like GroupIntoBatches.

I don't have much context, just reading through issues and pinging people. In this case, [~pabloem].

> BigQueryBatchFileLoads does not bundle rows correctly in streaming mode in python
> ---------------------------------------------------------------------------------
>
>                 Key: BEAM-10406
>                 URL: https://issues.apache.org/jira/browse/BEAM-10406
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-py-gcp
>    Affects Versions: 2.22.0
>            Reporter: Nikunj Aggarwal
>            Priority: P2
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> We are using FILE_LOADS to write to big query in streaming mode using python. Input is coming from a pubsub topic with ~5000 reqs/sec and each request is around 6KB. We perform some transforms on the input and then write to BigQuery. 
>  
> {code:java}
> beam.io.WriteToBigQuery(
>  table=table_name,
>  schema=schema,
>  dataset=dataset_name,
>  project=project',
>  method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
>  triggering_frequency=2 * 60,
>  create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
>  write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND),
> )
> {code}
>  
> We noticed that each temporary GCS file created by the load process had a very small number of rows (~1-5). We are able to reproduce it both through direct runner and dataflow runnner. On debugging we believe the following to be an issue:
> In WriteRecordsToFile (apache_beam/io/gcp/bigquery_file_loads.py), we create destinations within start_bundle and clear them up in finish_bundle. When this is used within streaming mode, a typical bundle size within Pardo is coming out to be ~1-5. We do see Windowing applied before Pardo but since there's no GroupByKey, window doesn't affect Pardo. Below is a small code which can reproduce the issue highlighted:
>  
>  
> {code:java}
> class WriteRecordsToFile(beam.DoFn):
>  def start_bundle(self):
>  print('start bundle')
>  self.data = []
>  def process(self, element):
>  self.data.append(element)
>  def finish_bundle(self):
>  print('finish bundle', len(self.data))
>  self.data = []
> def run(argv=None):
>  parser = argparse.ArgumentParser()
>  parser.add_argument(
>  '--input_subscription',
>  required=True,
>  help='Input PubSub subscription of the form "projects/<project>/subscriptions/<subscription>".')
>  known_args, pipeline_args = parser.parse_known_args(argv)
>  with beam.Pipeline(argv=pipeline_args) as p:
>  lines = p | beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
>  (lines
>  | beam.WindowInto(beam.window.GlobalWindows(),
>  trigger=trigger.Repeatedly(
>  trigger.AfterAny(
>  trigger.AfterProcessingTime(
>  60),
>  trigger.AfterCount(
>  100))),
>  accumulation_mode=trigger.AccumulationMode.DISCARDING)
>  | beam.ParDo(WriteRecordsToFile())){code}
>  
> In the above example, we see that start bundle is called very often and does not respect triggers. 
> To fix, the behavior of BigQueryBatchFileLoads, we suggest doing a grouping after the window triggers before calling the ParDo(WriteRecordsToFile).
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)