You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2022/09/14 18:27:08 UTC
[beam] branch master updated: (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads (#23012)
This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ac37784821e (BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads (#23012)
ac37784821e is described below
commit ac37784821eaedd97c2e6e39441c22ebc6d97cd3
Author: Ahmed Abualsaud <65...@users.noreply.github.com>
AuthorDate: Wed Sep 14 14:26:59 2022 -0400
(BQ Python) Perform job waits in finish_bundle to allow BQ streaming writes with large batch loads (#23012)
* remove WaitForBQJobs and perform waits at each step's finish_bundle. copy jobs will provide trigger for delete stage
* clarify why we emit a None value when no schema updates needed
* added test for copy jobs
* add test for dynamic destination streaming
* properly check results with matchers
* style fixes
* fixing test_one_job... test
* fixing test_wait_for_job_completion
* yield copy job references instead. having trouble returning and yielding to separate outputs in the same finish_bundle
* style fixes
---
.../apache_beam/io/gcp/bigquery_file_loads.py | 158 ++++++++++----------
.../apache_beam/io/gcp/bigquery_file_loads_test.py | 166 +++++++++++++++++----
2 files changed, 209 insertions(+), 115 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 2ddbacd4ed3..8b899a343d3 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -77,6 +77,9 @@ _FILE_TRIGGERING_RECORD_COUNT = 500000
# triggering file write to avoid generating too many small files.
_FILE_TRIGGERING_BATCHING_DURATION_SECS = 1
+# How many seconds we wait before polling a pending job
+_SLEEP_DURATION_BETWEEN_POLLS = 10
+
def _generate_job_name(job_name, job_type, step_name):
return bigquery_tools.generate_bq_job_name(
@@ -355,9 +358,10 @@ class UpdateDestinationSchema(beam.DoFn):
self._step_name = step_name
self._load_job_project_id = load_job_project_id
- def setup(self):
- self._bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client)
+ def start_bundle(self):
+ self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client)
self._bq_io_metadata = create_bigquery_io_metadata(self._step_name)
+ self.pending_jobs = []
def display_data(self):
return {
@@ -392,7 +396,7 @@ class UpdateDestinationSchema(beam.DoFn):
try:
# Check if destination table exists
- destination_table = self._bq_wrapper.get_table(
+ destination_table = self.bq_wrapper.get_table(
project_id=table_reference.projectId,
dataset_id=table_reference.datasetId,
table_id=table_reference.tableId)
@@ -404,7 +408,7 @@ class UpdateDestinationSchema(beam.DoFn):
else:
raise
- temp_table_load_job = self._bq_wrapper.get_job(
+ temp_table_load_job = self.bq_wrapper.get_job(
project=temp_table_load_job_reference.projectId,
job_id=temp_table_load_job_reference.jobId,
location=temp_table_load_job_reference.location)
@@ -432,20 +436,36 @@ class UpdateDestinationSchema(beam.DoFn):
table_reference)
# Trigger potential schema modification by loading zero rows into the
# destination table with the temporary table schema.
- schema_update_job_reference = self._bq_wrapper.perform_load_job(
- destination=table_reference,
- source_stream=io.BytesIO(), # file with zero rows
- job_id=job_name,
- schema=temp_table_schema,
- write_disposition='WRITE_APPEND',
- create_disposition='CREATE_NEVER',
- additional_load_parameters=additional_parameters,
- job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
- # JSON format is hardcoded because zero rows load(unlike AVRO) and
- # a nested schema(unlike CSV, which a default one) is permitted.
- source_format="NEWLINE_DELIMITED_JSON",
- load_job_project_id=self._load_job_project_id)
- yield (destination, schema_update_job_reference)
+ schema_update_job_reference = self.bq_wrapper.perform_load_job(
+ destination=table_reference,
+ source_stream=io.BytesIO(), # file with zero rows
+ job_id=job_name,
+ schema=temp_table_schema,
+ write_disposition='WRITE_APPEND',
+ create_disposition='CREATE_NEVER',
+ additional_load_parameters=additional_parameters,
+ job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
+ # JSON format is hardcoded because zero rows load(unlike AVRO) and
+ # a nested schema(unlike CSV, which a default one) is permitted.
+ source_format="NEWLINE_DELIMITED_JSON",
+ load_job_project_id=self._load_job_project_id)
+ self.pending_jobs.append(
+ GlobalWindows.windowed_value(
+ (destination, schema_update_job_reference)))
+
+ def finish_bundle(self):
+ # Unlike the other steps, schema update is not always necessary.
+ # In that case, return a None value to avoid blocking in streaming context.
+ # Otherwise, the streaming pipeline would get stuck waiting for the
+ # TriggerCopyJobs side-input.
+ if not self.pending_jobs:
+ return [GlobalWindows.windowed_value(None)]
+
+ for windowed_value in self.pending_jobs:
+ job_ref = windowed_value.value[1]
+ self.bq_wrapper.wait_for_bq_job(
+ job_ref, sleep_duration_sec=_SLEEP_DURATION_BETWEEN_POLLS)
+ return self.pending_jobs
class TriggerCopyJobs(beam.DoFn):
@@ -462,6 +482,9 @@ class TriggerCopyJobs(beam.DoFn):
copying from temp_tables to destination_table is not atomic.
See: https://issues.apache.org/jira/browse/BEAM-7822
"""
+
+ TRIGGER_DELETE_TEMP_TABLES = 'TriggerDeleteTempTables'
+
def __init__(
self,
project=None,
@@ -490,6 +513,7 @@ class TriggerCopyJobs(beam.DoFn):
self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client)
if not self.bq_io_metadata:
self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
+ self.pending_jobs = []
def process(self, element, job_name_prefix=None, unused_schema_mod_jobs=None):
destination = element[0]
@@ -551,8 +575,19 @@ class TriggerCopyJobs(beam.DoFn):
if wait_for_job:
self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10)
+ self.pending_jobs.append(
+ GlobalWindows.windowed_value((destination, job_reference)))
+
+ def finish_bundle(self):
+ for windowed_value in self.pending_jobs:
+ job_ref = windowed_value.value[1]
+ self.bq_wrapper.wait_for_bq_job(
+ job_ref, sleep_duration_sec=_SLEEP_DURATION_BETWEEN_POLLS)
+ yield windowed_value
- yield (destination, job_reference)
+ yield pvalue.TaggedOutput(
+ TriggerCopyJobs.TRIGGER_DELETE_TEMP_TABLES,
+ GlobalWindows.windowed_value(None))
class TriggerLoadJobs(beam.DoFn):
@@ -609,6 +644,7 @@ class TriggerLoadJobs(beam.DoFn):
self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client)
if not self.bq_io_metadata:
self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
+ self.pending_jobs = []
def process(self, element, load_job_name_prefix, *schema_side_inputs):
# Each load job is assumed to have files respecting these constraints:
@@ -682,7 +718,15 @@ class TriggerLoadJobs(beam.DoFn):
source_format=self.source_format,
job_labels=self.bq_io_metadata.add_additional_bq_job_labels(),
load_job_project_id=self.load_job_project_id)
- yield (destination, job_reference)
+ self.pending_jobs.append(
+ GlobalWindows.windowed_value((destination, job_reference)))
+
+ def finish_bundle(self):
+ for windowed_value in self.pending_jobs:
+ job_ref = windowed_value.value[1]
+ self.bq_wrapper.wait_for_bq_job(
+ job_ref, sleep_duration_sec=_SLEEP_DURATION_BETWEEN_POLLS)
+ return self.pending_jobs
class PartitionFiles(beam.DoFn):
@@ -739,29 +783,6 @@ class PartitionFiles(beam.DoFn):
yield pvalue.TaggedOutput(output_tag, (destination, partition))
-class WaitForBQJobs(beam.DoFn):
- """Takes in a series of BQ job names as side input, and waits for all of them.
-
- If any job fails, it will fail. If all jobs succeed, it will succeed.
-
- Experimental; no backwards compatibility guarantees.
- """
- def __init__(self, test_client=None):
- self.test_client = test_client
-
- def start_bundle(self):
- self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client)
-
- def process(self, element, dest_ids_list):
- job_references = [elm[1] for elm in dest_ids_list]
- for ref in job_references:
- # We must poll repeatedly until the job finishes or fails, thus setting
- # max_retries to 0.
- self.bq_wrapper.wait_for_bq_job(ref, sleep_duration_sec=10, max_retries=0)
-
- return dest_ids_list # Pass the list of destination-jobs downstream
-
-
class DeleteTablesFn(beam.DoFn):
def __init__(self, test_client=None):
self.test_client = test_client
@@ -1038,15 +1059,8 @@ class BigQueryBatchFileLoads(beam.PTransform):
temp_tables_load_job_ids_pc = trigger_loads_outputs['main']
temp_tables_pc = trigger_loads_outputs[TriggerLoadJobs.TEMP_TABLES]
- finished_temp_tables_load_jobs_pc = (
- p
- | "ImpulseMonitorLoadJobs" >> beam.Create([None])
- | "WaitForTempTableLoadJobs" >> beam.ParDo(
- WaitForBQJobs(self.test_client),
- pvalue.AsList(temp_tables_load_job_ids_pc)))
-
schema_mod_job_ids_pc = (
- finished_temp_tables_load_jobs_pc
+ temp_tables_load_job_ids_pc
| beam.ParDo(
UpdateDestinationSchema(
project=self.project,
@@ -1057,15 +1071,8 @@ class BigQueryBatchFileLoads(beam.PTransform):
load_job_project_id=self.load_job_project_id),
schema_mod_job_name_pcv))
- finished_schema_mod_jobs_pc = (
- p
- | "ImpulseMonitorSchemaModJobs" >> beam.Create([None])
- | "WaitForSchemaModJobs" >> beam.ParDo(
- WaitForBQJobs(self.test_client),
- pvalue.AsList(schema_mod_job_ids_pc)))
-
- destination_copy_job_ids_pc = (
- finished_temp_tables_load_jobs_pc
+ copy_job_outputs = (
+ temp_tables_load_job_ids_pc
| beam.ParDo(
TriggerCopyJobs(
project=self.project,
@@ -1075,25 +1082,17 @@ class BigQueryBatchFileLoads(beam.PTransform):
step_name=step_name,
load_job_project_id=self.load_job_project_id),
copy_job_name_pcv,
- pvalue.AsIter(finished_schema_mod_jobs_pc)))
+ pvalue.AsIter(schema_mod_job_ids_pc)).with_outputs(
+ TriggerCopyJobs.TRIGGER_DELETE_TEMP_TABLES, main='main'))
- finished_copy_jobs_pc = (
- p
- | "ImpulseMonitorCopyJobs" >> beam.Create([None])
- | "WaitForCopyJobs" >> beam.ParDo(
- WaitForBQJobs(self.test_client),
- pvalue.AsList(destination_copy_job_ids_pc)))
+ destination_copy_job_ids_pc = copy_job_outputs['main']
+ trigger_delete = copy_job_outputs[
+ TriggerCopyJobs.TRIGGER_DELETE_TEMP_TABLES]
_ = (
- p
- | "RemoveTempTables/Impulse" >> beam.Create([None])
- | "RemoveTempTables/PassTables" >> beam.FlatMap(
- lambda _,
- unused_copy_jobs,
- deleting_tables: deleting_tables,
- pvalue.AsIter(finished_copy_jobs_pc),
- pvalue.AsIter(temp_tables_pc))
- | "RemoveTempTables/AddUselessValue" >> beam.Map(lambda x: (x, None))
+ temp_tables_pc
+ | "RemoveTempTables/AddUselessValue" >> beam.Map(
+ lambda x, unused_trigger: (x, None), pvalue.AsList(trigger_delete))
| "RemoveTempTables/DeduplicateTables" >> beam.GroupByKey()
| "RemoveTempTables/GetTableNames" >> beam.Keys()
| "RemoveTempTables/Delete" >> beam.ParDo(
@@ -1116,13 +1115,6 @@ class BigQueryBatchFileLoads(beam.PTransform):
load_job_name_pcv,
*self.schema_side_inputs))
- _ = (
- p
- | "ImpulseMonitorDestinationLoadJobs" >> beam.Create([None])
- | "WaitForDestinationLoadJobs" >> beam.ParDo(
- WaitForBQJobs(self.test_client),
- pvalue.AsList(destination_load_job_ids_pc)))
-
destination_load_job_ids_pc = (
(temp_tables_load_job_ids_pc, destination_load_job_ids_pc)
| beam.Flatten())
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 21798fae0fe..77fb554bedc 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -44,8 +44,6 @@ from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryFullResultStreamin
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.runners.dataflow.test_dataflow_runner import TestDataflowRunner
-from apache_beam.runners.runner import PipelineState
-from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.util import assert_that
@@ -544,12 +542,15 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
label='CheckCopyJobProjectIds')
@mock.patch('time.sleep')
- def test_wait_for_job_completion(self, sleep_mock):
- job_references = [bigquery_api.JobReference(), bigquery_api.JobReference()]
- job_references[0].projectId = 'project1'
- job_references[0].jobId = 'jobId1'
- job_references[1].projectId = 'project1'
- job_references[1].jobId = 'jobId2'
+ def test_wait_for_load_job_completion(self, sleep_mock):
+ job_1 = bigquery_api.Job()
+ job_1.jobReference = bigquery_api.JobReference()
+ job_1.jobReference.projectId = 'project1'
+ job_1.jobReference.jobId = 'jobId1'
+ job_2 = bigquery_api.Job()
+ job_2.jobReference = bigquery_api.JobReference()
+ job_2.jobReference.projectId = 'project1'
+ job_2.jobReference.jobId = 'jobId2'
job_1_waiting = mock.Mock()
job_1_waiting.status.state = 'RUNNING'
@@ -565,26 +566,34 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
bq_client.jobs.Get.side_effect = [
job_1_waiting, job_2_done, job_1_done, job_2_done
]
+ partition_1 = ('project:dataset.table0', ['file0'])
+ partition_2 = ('project:dataset.table1', ['file1'])
+ bq_client.jobs.Insert.side_effect = [job_1, job_2]
+ test_job_prefix = "test_job"
- waiting_dofn = bqfl.WaitForBQJobs(bq_client)
-
- dest_list = [(i, job) for i, job in enumerate(job_references)]
-
+ expected_dest_jobref_list = [(partition_1[0], job_1.jobReference),
+ (partition_2[0], job_2.jobReference)]
with TestPipeline('DirectRunner') as p:
- references = beam.pvalue.AsList(p | 'job_ref' >> beam.Create(dest_list))
- outputs = (p | beam.Create(['']) | beam.ParDo(waiting_dofn, references))
+ partitions = p | beam.Create([partition_1, partition_2])
+ outputs = (
+ partitions
+ | beam.ParDo(
+ bqfl.TriggerLoadJobs(test_client=bq_client), test_job_prefix))
- assert_that(outputs, equal_to(dest_list))
+ assert_that(outputs, equal_to(expected_dest_jobref_list))
sleep_mock.assert_called_once()
@mock.patch('time.sleep')
- def test_one_job_failed_after_waiting(self, sleep_mock):
- job_references = [bigquery_api.JobReference(), bigquery_api.JobReference()]
- job_references[0].projectId = 'project1'
- job_references[0].jobId = 'jobId1'
- job_references[1].projectId = 'project1'
- job_references[1].jobId = 'jobId2'
+ def test_one_load_job_failed_after_waiting(self, sleep_mock):
+ job_1 = bigquery_api.Job()
+ job_1.jobReference = bigquery_api.JobReference()
+ job_1.jobReference.projectId = 'project1'
+ job_1.jobReference.jobId = 'jobId1'
+ job_2 = bigquery_api.Job()
+ job_2.jobReference = bigquery_api.JobReference()
+ job_2.jobReference.projectId = 'project1'
+ job_2.jobReference.jobId = 'jobId2'
job_1_waiting = mock.Mock()
job_1_waiting.status.state = 'RUNNING'
@@ -600,15 +609,18 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
bq_client.jobs.Get.side_effect = [
job_1_waiting, job_2_done, job_1_error, job_2_done
]
-
- waiting_dofn = bqfl.WaitForBQJobs(bq_client)
-
- dest_list = [(i, job) for i, job in enumerate(job_references)]
+ partition_1 = ('project:dataset.table0', ['file0'])
+ partition_2 = ('project:dataset.table1', ['file1'])
+ bq_client.jobs.Insert.side_effect = [job_1, job_2]
+ test_job_prefix = "test_job"
with self.assertRaises(Exception):
with TestPipeline('DirectRunner') as p:
- references = beam.pvalue.AsList(p | 'job_ref' >> beam.Create(dest_list))
- _ = (p | beam.Create(['']) | beam.ParDo(waiting_dofn, references))
+ partitions = p | beam.Create([partition_1, partition_2])
+ _ = (
+ partitions
+ | beam.ParDo(
+ bqfl.TriggerLoadJobs(test_client=bq_client), test_job_prefix))
sleep_mock.assert_called_once()
@@ -917,16 +929,14 @@ class BigQueryFileLoadsIT(unittest.TestCase):
schema = self.BIG_QUERY_STREAMING_SCHEMA
l = [{'Integr': i} for i in range(_SIZE)]
- state_matcher = PipelineStateMatcher(PipelineState.RUNNING)
bq_matcher = BigqueryFullResultStreamingMatcher(
project=self.project,
query="SELECT Integr FROM %s" % output_table,
- data=[(i, ) for i in range(100)])
+ data=[(i, ) for i in range(100)],
+ timeout=30)
args = self.test_pipeline.get_full_options_as_args(
- on_success_matcher=all_of(state_matcher, bq_matcher),
- streaming=True,
- allow_unsafe_triggers=True)
+ streaming=True, allow_unsafe_triggers=True)
with beam.Pipeline(argv=args) as p:
stream_source = (
TestStream().advance_watermark_to(0).advance_processing_time(
@@ -946,6 +956,98 @@ class BigQueryFileLoadsIT(unittest.TestCase):
.Method.FILE_LOADS,
triggering_frequency=100))
+ hamcrest_assert(p, bq_matcher)
+
+ @pytest.mark.it_postcommit
+ def test_bqfl_streaming_with_copy_jobs(self):
+ if isinstance(self.test_pipeline.runner, TestDataflowRunner):
+ self.skipTest("TestStream is not supported on TestDataflowRunner")
+ output_table = '%s_%s' % (self.output_table, 'with_copy_jobs')
+ _SIZE = 100
+ schema = self.BIG_QUERY_STREAMING_SCHEMA
+ l = [{'Integr': i} for i in range(_SIZE)]
+
+ bq_matcher = BigqueryFullResultStreamingMatcher(
+ project=self.project,
+ query="SELECT Integr FROM %s" % output_table,
+ data=[(i, ) for i in range(100)])
+
+ args = self.test_pipeline.get_full_options_as_args(
+ streaming=True, allow_unsafe_triggers=True)
+
+ # Override these parameters to induce copy jobs
+ bqfl._DEFAULT_MAX_FILE_SIZE = 100
+ bqfl._MAXIMUM_LOAD_SIZE = 400
+
+ with beam.Pipeline(argv=args) as p:
+ stream_source = (
+ TestStream().advance_watermark_to(0).advance_processing_time(
+ 100).add_elements(l[:_SIZE // 4]).
+ advance_processing_time(100).advance_watermark_to(100).add_elements(
+ l[_SIZE // 4:2 * _SIZE // 4]).advance_processing_time(
+ 100).advance_watermark_to(200).add_elements(
+ l[2 * _SIZE // 4:3 * _SIZE // 4]).advance_processing_time(
+ 100).advance_watermark_to(300).add_elements(
+ l[3 * _SIZE // 4:]).advance_processing_time(100).
+ advance_watermark_to_infinity().advance_processing_time(100))
+
+ _ = (p
+ | stream_source
+ | bigquery.WriteToBigQuery(output_table,
+ schema=schema,
+ method=bigquery.WriteToBigQuery \
+ .Method.FILE_LOADS,
+ triggering_frequency=100))
+
+ hamcrest_assert(p, bq_matcher)
+
+ @pytest.mark.it_postcommit
+ def test_bqfl_streaming_with_dynamic_destinations(self):
+ if isinstance(self.test_pipeline.runner, TestDataflowRunner):
+ self.skipTest("TestStream is not supported on TestDataflowRunner")
+ even_table = '%s_%s' % (self.output_table, "dynamic_dest_0")
+ odd_table = '%s_%s' % (self.output_table, "dynamic_dest_1")
+ output_table = lambda row: even_table if (
+ row['Integr'] % 2 == 0) else odd_table
+ _SIZE = 100
+ schema = self.BIG_QUERY_STREAMING_SCHEMA
+ l = [{'Integr': i} for i in range(_SIZE)]
+
+ pipeline_verifiers = [
+ BigqueryFullResultStreamingMatcher(
+ project=self.project,
+ query="SELECT Integr FROM %s" % even_table,
+ data=[(i, ) for i in range(0, 100, 2)]),
+ BigqueryFullResultStreamingMatcher(
+ project=self.project,
+ query="SELECT Integr FROM %s" % odd_table,
+ data=[(i, ) for i in range(1, 100, 2)])
+ ]
+
+ args = self.test_pipeline.get_full_options_as_args(
+ streaming=True, allow_unsafe_triggers=True)
+
+ with beam.Pipeline(argv=args) as p:
+ stream_source = (
+ TestStream().advance_watermark_to(0).advance_processing_time(
+ 100).add_elements(l[:_SIZE // 4]).
+ advance_processing_time(100).advance_watermark_to(100).add_elements(
+ l[_SIZE // 4:2 * _SIZE // 4]).advance_processing_time(
+ 100).advance_watermark_to(200).add_elements(
+ l[2 * _SIZE // 4:3 * _SIZE // 4]).advance_processing_time(
+ 100).advance_watermark_to(300).add_elements(
+ l[3 * _SIZE // 4:]).advance_processing_time(100).
+ advance_watermark_to_infinity().advance_processing_time(100))
+
+ _ = (p
+ | stream_source
+ | bigquery.WriteToBigQuery(output_table,
+ schema=schema,
+ method=bigquery.WriteToBigQuery \
+ .Method.FILE_LOADS,
+ triggering_frequency=100))
+ hamcrest_assert(p, all_of(*pipeline_verifiers))
+
@pytest.mark.it_postcommit
def test_one_job_fails_all_jobs_fail(self):