You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 17:20:58 UTC

[GitHub] [beam] damccorm opened a new issue, #20433: BigQueryBatchFileLoads does not bundle rows correctly in streaming mode in python

damccorm opened a new issue, #20433:
URL: https://github.com/apache/beam/issues/20433

   We are using FILE_LOADS to write to big query in streaming mode using python. Input is coming from a pubsub topic with ~5000 reqs/sec and each request is around 6KB. We perform some transforms on the input and then write to BigQuery. 
   
    
   ```
   
   beam.io.WriteToBigQuery(
    table=table_name,
    schema=schema,
    dataset=dataset_name,
    project=project',
   
   method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
    triggering_frequency=2 * 60,
    create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
   
   write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND),
   )
   
   ```
   
    
   
   We noticed that each temporary GCS file created by the load process had a very small number of rows (~1-5). We are able to reproduce it both through direct runner and dataflow runnner. On debugging we believe the following to be an issue:
   
   In WriteRecordsToFile (apache_beam/io/gcp/bigquery_file_loads.py), we create destinations within start_bundle and clear them up in finish_bundle. When this is used within streaming mode, a typical bundle size within Pardo is coming out to be ~1-5. We do see Windowing applied before Pardo but since there's no GroupByKey, window doesn't affect Pardo. Below is a small code which can reproduce the issue highlighted:
   
    
   
    
   ```
   
   class WriteRecordsToFile(beam.DoFn):
    def start_bundle(self):
    print('start bundle')
    self.data
   = []
   
    def process(self, element):
    self.data.append(element)
   
    def finish_bundle(self):
    print('finish
   bundle', len(self.data))
    self.data = []
   
   
   def run(argv=None):
    parser = argparse.ArgumentParser()
   
   parser.add_argument(
    '--input_subscription',
    required=True,
    help='Input PubSub subscription of
   the form "projects/<project>/subscriptions/<subscription>".')
   
    known_args, pipeline_args = parser.parse_known_args(argv)
   
   with beam.Pipeline(argv=pipeline_args) as p:
    lines = p | beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
   
   (lines
    | beam.WindowInto(beam.window.GlobalWindows(),
    trigger=trigger.Repeatedly(
    trigger.AfterAny(
   
   trigger.AfterProcessingTime(
    60),
    trigger.AfterCount(
    100))),
    accumulation_mode=trigger.AccumulationMode.DISCARDING)
   
   | beam.ParDo(WriteRecordsToFile()))
   ```
   
    
   
   In the above example, we see that start bundle is called very often and does not respect triggers. 
   
   To fix, the behavior of BigQueryBatchFileLoads, we suggest doing a grouping after the window triggers before calling the ParDo(WriteRecordsToFile).
   
    
   
   Imported from Jira [BEAM-10406](https://issues.apache.org/jira/browse/BEAM-10406). Original Jira may contain additional context.
   Reported by: nikunj-jira.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org