You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2022/09/14 18:27:08 UTC

[beam] branch master updated: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads (#23012)

This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ac37784821e (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads (#23012)
ac37784821e is described below

commit ac37784821eaedd97c2e6e39441c22ebc6d97cd3
Author: Ahmed Abualsaud <65...@users.noreply.github.com>
AuthorDate: Wed Sep 14 14:26:59 2022 -0400

    (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads (#23012)
    
    * remove WaitForBQJobs and perform waits at each step's finish_bundle. copy jobs will provide trigger for delete stage
    
    * clarify why we emit a None value when no schema updates needed
    
    * added test for copy jobs
    
    * add test for dynamic destination streaming
    
    * properly check results with matchers
    
    * style fixes
    
    * fixing test_one_job... test
    
    * fixing test_wait_for_job_completion
    
    * yield copy job references instead. having trouble returning and yielding to separate outputs in the same finish_bundle
    
    * style fixes
---
 .../apache_beam/io/gcp/bigquery_file_loads.py      | 158 ++++++++++----------
 .../apache_beam/io/gcp/bigquery_file_loads_test.py | 166 +++++++++++++++++----
 2 files changed, 209 insertions(+), 115 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 2ddbacd4ed3..8b899a343d3 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -77,6 +77,9 @@ _FILE_TRIGGERING_RECORD_COUNT = 500000
 # triggering file write to avoid generating too many small files.
 _FILE_TRIGGERING_BATCHING_DURATION_SECS = 1
 
+# How many seconds we wait before polling a pending job
+_SLEEP_DURATION_BETWEEN_POLLS = 10
+
 
 def _generate_job_name(job_name, job_type, step_name):
   return bigquery_tools.generate_bq_job_name(
@@ -355,9 +358,10 @@ class UpdateDestinationSchema(beam.DoFn):
     self._step_name = step_name
     self._load_job_project_id = load_job_project_id
 
-  def setup(self):
-    self._bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client)
+  def start_bundle(self):
+    self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client)
     self._bq_io_metadata = create_bigquery_io_metadata(self._step_name)
+    self.pending_jobs = []
 
   def display_data(self):
     return {
@@ -392,7 +396,7 @@ class UpdateDestinationSchema(beam.DoFn):
 
     try:
       # Check if destination table exists
-      destination_table = self._bq_wrapper.get_table(
+      destination_table = self.bq_wrapper.get_table(
           project_id=table_reference.projectId,
           dataset_id=table_reference.datasetId,
           table_id=table_reference.tableId)
@@ -404,7 +408,7 @@ class UpdateDestinationSchema(beam.DoFn):
       else:
         raise
 
-    temp_table_load_job = self._bq_wrapper.get_job(
+    temp_table_load_job = self.bq_wrapper.get_job(
         project=temp_table_load_job_reference.projectId,
         job_id=temp_table_load_job_reference.jobId,
         location=temp_table_load_job_reference.location)
@@ -432,20 +436,36 @@ class UpdateDestinationSchema(beam.DoFn):
         table_reference)
     # Trigger potential schema modification by loading zero rows into the
     # destination table with the temporary table schema.
-    schema_update_job_reference = self._bq_wrapper.perform_load_job(
-        destination=table_reference,
-        source_stream=io.BytesIO(),  # file with zero rows
-        job_id=job_name,
-        schema=temp_table_schema,
-        write_disposition='WRITE_APPEND',
-        create_disposition='CREATE_NEVER',
-        additional_load_parameters=additional_parameters,
-        job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
-        # JSON format is hardcoded because zero rows load(unlike AVRO) and
-        # a nested schema(unlike CSV, which a default one) is permitted.
-        source_format="NEWLINE_DELIMITED_JSON",
-        load_job_project_id=self._load_job_project_id)
-    yield (destination, schema_update_job_reference)
+    schema_update_job_reference = self.bq_wrapper.perform_load_job(
+      destination=table_reference,
+      source_stream=io.BytesIO(),  # file with zero rows
+      job_id=job_name,
+      schema=temp_table_schema,
+      write_disposition='WRITE_APPEND',
+      create_disposition='CREATE_NEVER',
+      additional_load_parameters=additional_parameters,
+      job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
+      # JSON format is hardcoded because zero rows load(unlike AVRO) and
+      # a nested schema(unlike CSV, which a default one) is permitted.
+      source_format="NEWLINE_DELIMITED_JSON",
+      load_job_project_id=self._load_job_project_id)
+    self.pending_jobs.append(
+        GlobalWindows.windowed_value(
+            (destination, schema_update_job_reference)))
+
+  def finish_bundle(self):
+    # Unlike the other steps, schema update is not always necessary.
+    # In that case, return a None value to avoid blocking in streaming context.
+    # Otherwise, the streaming pipeline would get stuck waiting for the
+    # TriggerCopyJobs side-input.
+    if not self.pending_jobs:
+      return [GlobalWindows.windowed_value(None)]
+
+    for windowed_value in self.pending_jobs:
+      job_ref = windowed_value.value[1]
+      self.bq_wrapper.wait_for_bq_job(
+          job_ref, sleep_duration_sec=_SLEEP_DURATION_BETWEEN_POLLS)
+    return self.pending_jobs
 
 
 class TriggerCopyJobs(beam.DoFn):
@@ -462,6 +482,9 @@ class TriggerCopyJobs(beam.DoFn):
     copying from temp_tables to destination_table is not atomic.
     See: https://issues.apache.org/jira/browse/BEAM-7822
   """
+
+  TRIGGER_DELETE_TEMP_TABLES = 'TriggerDeleteTempTables'
+
   def __init__(
       self,
       project=None,
@@ -490,6 +513,7 @@ class TriggerCopyJobs(beam.DoFn):
     self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client)
     if not self.bq_io_metadata:
       self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
+    self.pending_jobs = []
 
   def process(self, element, job_name_prefix=None, unused_schema_mod_jobs=None):
     destination = element[0]
@@ -551,8 +575,19 @@ class TriggerCopyJobs(beam.DoFn):
 
     if wait_for_job:
       self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10)
+    self.pending_jobs.append(
+        GlobalWindows.windowed_value((destination, job_reference)))
+
+  def finish_bundle(self):
+    for windowed_value in self.pending_jobs:
+      job_ref = windowed_value.value[1]
+      self.bq_wrapper.wait_for_bq_job(
+          job_ref, sleep_duration_sec=_SLEEP_DURATION_BETWEEN_POLLS)
+      yield windowed_value
 
-    yield (destination, job_reference)
+    yield pvalue.TaggedOutput(
+        TriggerCopyJobs.TRIGGER_DELETE_TEMP_TABLES,
+        GlobalWindows.windowed_value(None))
 
 
 class TriggerLoadJobs(beam.DoFn):
@@ -609,6 +644,7 @@ class TriggerLoadJobs(beam.DoFn):
     self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client)
     if not self.bq_io_metadata:
       self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
+    self.pending_jobs = []
 
   def process(self, element, load_job_name_prefix, *schema_side_inputs):
     # Each load job is assumed to have files respecting these constraints:
@@ -682,7 +718,15 @@ class TriggerLoadJobs(beam.DoFn):
         source_format=self.source_format,
         job_labels=self.bq_io_metadata.add_additional_bq_job_labels(),
         load_job_project_id=self.load_job_project_id)
-    yield (destination, job_reference)
+    self.pending_jobs.append(
+        GlobalWindows.windowed_value((destination, job_reference)))
+
+  def finish_bundle(self):
+    for windowed_value in self.pending_jobs:
+      job_ref = windowed_value.value[1]
+      self.bq_wrapper.wait_for_bq_job(
+          job_ref, sleep_duration_sec=_SLEEP_DURATION_BETWEEN_POLLS)
+    return self.pending_jobs
 
 
 class PartitionFiles(beam.DoFn):
@@ -739,29 +783,6 @@ class PartitionFiles(beam.DoFn):
       yield pvalue.TaggedOutput(output_tag, (destination, partition))
 
 
-class WaitForBQJobs(beam.DoFn):
-  """Takes in a series of BQ job names as side input, and waits for all of them.
-
-  If any job fails, it will fail. If all jobs succeed, it will succeed.
-
-  Experimental; no backwards compatibility guarantees.
-  """
-  def __init__(self, test_client=None):
-    self.test_client = test_client
-
-  def start_bundle(self):
-    self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client)
-
-  def process(self, element, dest_ids_list):
-    job_references = [elm[1] for elm in dest_ids_list]
-    for ref in job_references:
-      # We must poll repeatedly until the job finishes or fails, thus setting
-      # max_retries to 0.
-      self.bq_wrapper.wait_for_bq_job(ref, sleep_duration_sec=10, max_retries=0)
-
-    return dest_ids_list  # Pass the list of destination-jobs downstream
-
-
 class DeleteTablesFn(beam.DoFn):
   def __init__(self, test_client=None):
     self.test_client = test_client
@@ -1038,15 +1059,8 @@ class BigQueryBatchFileLoads(beam.PTransform):
     temp_tables_load_job_ids_pc = trigger_loads_outputs['main']
     temp_tables_pc = trigger_loads_outputs[TriggerLoadJobs.TEMP_TABLES]
 
-    finished_temp_tables_load_jobs_pc = (
-        p
-        | "ImpulseMonitorLoadJobs" >> beam.Create([None])
-        | "WaitForTempTableLoadJobs" >> beam.ParDo(
-            WaitForBQJobs(self.test_client),
-            pvalue.AsList(temp_tables_load_job_ids_pc)))
-
     schema_mod_job_ids_pc = (
-        finished_temp_tables_load_jobs_pc
+        temp_tables_load_job_ids_pc
         | beam.ParDo(
             UpdateDestinationSchema(
                 project=self.project,
@@ -1057,15 +1071,8 @@ class BigQueryBatchFileLoads(beam.PTransform):
                 load_job_project_id=self.load_job_project_id),
             schema_mod_job_name_pcv))
 
-    finished_schema_mod_jobs_pc = (
-        p
-        | "ImpulseMonitorSchemaModJobs" >> beam.Create([None])
-        | "WaitForSchemaModJobs" >> beam.ParDo(
-            WaitForBQJobs(self.test_client),
-            pvalue.AsList(schema_mod_job_ids_pc)))
-
-    destination_copy_job_ids_pc = (
-        finished_temp_tables_load_jobs_pc
+    copy_job_outputs = (
+        temp_tables_load_job_ids_pc
         | beam.ParDo(
             TriggerCopyJobs(
                 project=self.project,
@@ -1075,25 +1082,17 @@ class BigQueryBatchFileLoads(beam.PTransform):
                 step_name=step_name,
                 load_job_project_id=self.load_job_project_id),
             copy_job_name_pcv,
-            pvalue.AsIter(finished_schema_mod_jobs_pc)))
+            pvalue.AsIter(schema_mod_job_ids_pc)).with_outputs(
+                TriggerCopyJobs.TRIGGER_DELETE_TEMP_TABLES, main='main'))
 
-    finished_copy_jobs_pc = (
-        p
-        | "ImpulseMonitorCopyJobs" >> beam.Create([None])
-        | "WaitForCopyJobs" >> beam.ParDo(
-            WaitForBQJobs(self.test_client),
-            pvalue.AsList(destination_copy_job_ids_pc)))
+    destination_copy_job_ids_pc = copy_job_outputs['main']
+    trigger_delete = copy_job_outputs[
+        TriggerCopyJobs.TRIGGER_DELETE_TEMP_TABLES]
 
     _ = (
-        p
-        | "RemoveTempTables/Impulse" >> beam.Create([None])
-        | "RemoveTempTables/PassTables" >> beam.FlatMap(
-            lambda _,
-            unused_copy_jobs,
-            deleting_tables: deleting_tables,
-            pvalue.AsIter(finished_copy_jobs_pc),
-            pvalue.AsIter(temp_tables_pc))
-        | "RemoveTempTables/AddUselessValue" >> beam.Map(lambda x: (x, None))
+        temp_tables_pc
+        | "RemoveTempTables/AddUselessValue" >> beam.Map(
+            lambda x, unused_trigger: (x, None), pvalue.AsList(trigger_delete))
         | "RemoveTempTables/DeduplicateTables" >> beam.GroupByKey()
         | "RemoveTempTables/GetTableNames" >> beam.Keys()
         | "RemoveTempTables/Delete" >> beam.ParDo(
@@ -1116,13 +1115,6 @@ class BigQueryBatchFileLoads(beam.PTransform):
             load_job_name_pcv,
             *self.schema_side_inputs))
 
-    _ = (
-        p
-        | "ImpulseMonitorDestinationLoadJobs" >> beam.Create([None])
-        | "WaitForDestinationLoadJobs" >> beam.ParDo(
-            WaitForBQJobs(self.test_client),
-            pvalue.AsList(destination_load_job_ids_pc)))
-
     destination_load_job_ids_pc = (
         (temp_tables_load_job_ids_pc, destination_load_job_ids_pc)
         | beam.Flatten())
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 21798fae0fe..77fb554bedc 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -44,8 +44,6 @@ from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultStreamin
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.runners.dataflow.test_dataflow_runner import TestDataflowRunner
-from apache_beam.runners.runner import PipelineState
-from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.test_stream import TestStream
 from apache_beam.testing.util import assert_that
@@ -544,12 +542,15 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
           label='CheckCopyJobProjectIds')
 
   @mock.patch('time.sleep')
-  def test_wait_for_job_completion(self, sleep_mock):
-    job_references = [bigquery_api.JobReference(), bigquery_api.JobReference()]
-    job_references[0].projectId = 'project1'
-    job_references[0].jobId = 'jobId1'
-    job_references[1].projectId = 'project1'
-    job_references[1].jobId = 'jobId2'
+  def test_wait_for_load_job_completion(self, sleep_mock):
+    job_1 = bigquery_api.Job()
+    job_1.jobReference = bigquery_api.JobReference()
+    job_1.jobReference.projectId = 'project1'
+    job_1.jobReference.jobId = 'jobId1'
+    job_2 = bigquery_api.Job()
+    job_2.jobReference = bigquery_api.JobReference()
+    job_2.jobReference.projectId = 'project1'
+    job_2.jobReference.jobId = 'jobId2'
 
     job_1_waiting = mock.Mock()
     job_1_waiting.status.state = 'RUNNING'
@@ -565,26 +566,34 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
     bq_client.jobs.Get.side_effect = [
         job_1_waiting, job_2_done, job_1_done, job_2_done
     ]
+    partition_1 = ('project:dataset.table0', ['file0'])
+    partition_2 = ('project:dataset.table1', ['file1'])
+    bq_client.jobs.Insert.side_effect = [job_1, job_2]
+    test_job_prefix = "test_job"
 
-    waiting_dofn = bqfl.WaitForBQJobs(bq_client)
-
-    dest_list = [(i, job) for i, job in enumerate(job_references)]
-
+    expected_dest_jobref_list = [(partition_1[0], job_1.jobReference),
+                                 (partition_2[0], job_2.jobReference)]
     with TestPipeline('DirectRunner') as p:
-      references = beam.pvalue.AsList(p | 'job_ref' >> beam.Create(dest_list))
-      outputs = (p | beam.Create(['']) | beam.ParDo(waiting_dofn, references))
+      partitions = p | beam.Create([partition_1, partition_2])
+      outputs = (
+          partitions
+          | beam.ParDo(
+              bqfl.TriggerLoadJobs(test_client=bq_client), test_job_prefix))
 
-      assert_that(outputs, equal_to(dest_list))
+      assert_that(outputs, equal_to(expected_dest_jobref_list))
 
     sleep_mock.assert_called_once()
 
   @mock.patch('time.sleep')
-  def test_one_job_failed_after_waiting(self, sleep_mock):
-    job_references = [bigquery_api.JobReference(), bigquery_api.JobReference()]
-    job_references[0].projectId = 'project1'
-    job_references[0].jobId = 'jobId1'
-    job_references[1].projectId = 'project1'
-    job_references[1].jobId = 'jobId2'
+  def test_one_load_job_failed_after_waiting(self, sleep_mock):
+    job_1 = bigquery_api.Job()
+    job_1.jobReference = bigquery_api.JobReference()
+    job_1.jobReference.projectId = 'project1'
+    job_1.jobReference.jobId = 'jobId1'
+    job_2 = bigquery_api.Job()
+    job_2.jobReference = bigquery_api.JobReference()
+    job_2.jobReference.projectId = 'project1'
+    job_2.jobReference.jobId = 'jobId2'
 
     job_1_waiting = mock.Mock()
     job_1_waiting.status.state = 'RUNNING'
@@ -600,15 +609,18 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
     bq_client.jobs.Get.side_effect = [
         job_1_waiting, job_2_done, job_1_error, job_2_done
     ]
-
-    waiting_dofn = bqfl.WaitForBQJobs(bq_client)
-
-    dest_list = [(i, job) for i, job in enumerate(job_references)]
+    partition_1 = ('project:dataset.table0', ['file0'])
+    partition_2 = ('project:dataset.table1', ['file1'])
+    bq_client.jobs.Insert.side_effect = [job_1, job_2]
+    test_job_prefix = "test_job"
 
     with self.assertRaises(Exception):
       with TestPipeline('DirectRunner') as p:
-        references = beam.pvalue.AsList(p | 'job_ref' >> beam.Create(dest_list))
-        _ = (p | beam.Create(['']) | beam.ParDo(waiting_dofn, references))
+        partitions = p | beam.Create([partition_1, partition_2])
+        _ = (
+            partitions
+            | beam.ParDo(
+                bqfl.TriggerLoadJobs(test_client=bq_client), test_job_prefix))
 
     sleep_mock.assert_called_once()
 
@@ -917,16 +929,14 @@ class BigQueryFileLoadsIT(unittest.TestCase):
     schema = self.BIG_QUERY_STREAMING_SCHEMA
     l = [{'Integr': i} for i in range(_SIZE)]
 
-    state_matcher = PipelineStateMatcher(PipelineState.RUNNING)
     bq_matcher = BigqueryFullResultStreamingMatcher(
         project=self.project,
         query="SELECT Integr FROM %s" % output_table,
-        data=[(i, ) for i in range(100)])
+        data=[(i, ) for i in range(100)],
+        timeout=30)
 
     args = self.test_pipeline.get_full_options_as_args(
-        on_success_matcher=all_of(state_matcher, bq_matcher),
-        streaming=True,
-        allow_unsafe_triggers=True)
+        streaming=True, allow_unsafe_triggers=True)
     with beam.Pipeline(argv=args) as p:
       stream_source = (
           TestStream().advance_watermark_to(0).advance_processing_time(
@@ -946,6 +956,98 @@ class BigQueryFileLoadsIT(unittest.TestCase):
                                         .Method.FILE_LOADS,
                                       triggering_frequency=100))
 
+    hamcrest_assert(p, bq_matcher)
+
+  @pytest.mark.it_postcommit
+  def test_bqfl_streaming_with_copy_jobs(self):
+    if isinstance(self.test_pipeline.runner, TestDataflowRunner):
+      self.skipTest("TestStream is not supported on TestDataflowRunner")
+    output_table = '%s_%s' % (self.output_table, 'with_copy_jobs')
+    _SIZE = 100
+    schema = self.BIG_QUERY_STREAMING_SCHEMA
+    l = [{'Integr': i} for i in range(_SIZE)]
+
+    bq_matcher = BigqueryFullResultStreamingMatcher(
+        project=self.project,
+        query="SELECT Integr FROM %s" % output_table,
+        data=[(i, ) for i in range(100)])
+
+    args = self.test_pipeline.get_full_options_as_args(
+        streaming=True, allow_unsafe_triggers=True)
+
+    # Override these parameters to induce copy jobs
+    bqfl._DEFAULT_MAX_FILE_SIZE = 100
+    bqfl._MAXIMUM_LOAD_SIZE = 400
+
+    with beam.Pipeline(argv=args) as p:
+      stream_source = (
+          TestStream().advance_watermark_to(0).advance_processing_time(
+              100).add_elements(l[:_SIZE // 4]).
+          advance_processing_time(100).advance_watermark_to(100).add_elements(
+              l[_SIZE // 4:2 * _SIZE // 4]).advance_processing_time(
+                  100).advance_watermark_to(200).add_elements(
+                      l[2 * _SIZE // 4:3 * _SIZE // 4]).advance_processing_time(
+                          100).advance_watermark_to(300).add_elements(
+                              l[3 * _SIZE // 4:]).advance_processing_time(100).
+          advance_watermark_to_infinity().advance_processing_time(100))
+
+      _ = (p
+           | stream_source
+           | bigquery.WriteToBigQuery(output_table,
+                                      schema=schema,
+                                      method=bigquery.WriteToBigQuery \
+                                      .Method.FILE_LOADS,
+                                      triggering_frequency=100))
+
+    hamcrest_assert(p, bq_matcher)
+
+  @pytest.mark.it_postcommit
+  def test_bqfl_streaming_with_dynamic_destinations(self):
+    if isinstance(self.test_pipeline.runner, TestDataflowRunner):
+      self.skipTest("TestStream is not supported on TestDataflowRunner")
+    even_table = '%s_%s' % (self.output_table, "dynamic_dest_0")
+    odd_table = '%s_%s' % (self.output_table, "dynamic_dest_1")
+    output_table = lambda row: even_table if (
+        row['Integr'] % 2 == 0) else odd_table
+    _SIZE = 100
+    schema = self.BIG_QUERY_STREAMING_SCHEMA
+    l = [{'Integr': i} for i in range(_SIZE)]
+
+    pipeline_verifiers = [
+        BigqueryFullResultStreamingMatcher(
+            project=self.project,
+            query="SELECT Integr FROM %s" % even_table,
+            data=[(i, ) for i in range(0, 100, 2)]),
+        BigqueryFullResultStreamingMatcher(
+            project=self.project,
+            query="SELECT Integr FROM %s" % odd_table,
+            data=[(i, ) for i in range(1, 100, 2)])
+    ]
+
+    args = self.test_pipeline.get_full_options_as_args(
+        streaming=True, allow_unsafe_triggers=True)
+
+    with beam.Pipeline(argv=args) as p:
+      stream_source = (
+          TestStream().advance_watermark_to(0).advance_processing_time(
+              100).add_elements(l[:_SIZE // 4]).
+          advance_processing_time(100).advance_watermark_to(100).add_elements(
+              l[_SIZE // 4:2 * _SIZE // 4]).advance_processing_time(
+                  100).advance_watermark_to(200).add_elements(
+                      l[2 * _SIZE // 4:3 * _SIZE // 4]).advance_processing_time(
+                          100).advance_watermark_to(300).add_elements(
+                              l[3 * _SIZE // 4:]).advance_processing_time(100).
+          advance_watermark_to_infinity().advance_processing_time(100))
+
+      _ = (p
+           | stream_source
+           | bigquery.WriteToBigQuery(output_table,
+                                      schema=schema,
+                                      method=bigquery.WriteToBigQuery \
+                                      .Method.FILE_LOADS,
+                                      triggering_frequency=100))
+    hamcrest_assert(p, all_of(*pipeline_verifiers))
+
   @pytest.mark.it_postcommit
   def test_one_job_fails_all_jobs_fail(self):