You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2019/03/27 16:02:37 UTC
[beam] branch master updated: Merge pull request #8093 from
pabloem/sch-dest-bq
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7d08e95 Merge pull request #8093 from pabloem/sch-dest-bq
7d08e95 is described below
commit 7d08e950b7a5aa10ba3eb50de7a3d95225d84501
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Wed Mar 27 09:02:27 2019 -0700
Merge pull request #8093 from pabloem/sch-dest-bq
[BEAM-6892] Schemas and destinations are provided to WriteToBigQuery separately (#8093)
---
.../examples/cookbook/bigquery_tornadoes.py | 8 +--
.../cookbook/bigquery_tornadoes_it_test.py | 4 +-
.../io/gcp/big_query_query_to_table_it_test.py | 7 +-
.../io/gcp/big_query_query_to_table_pipeline.py | 12 +---
sdks/python/apache_beam/io/gcp/bigquery.py | 79 ++++++++++++++--------
.../apache_beam/io/gcp/bigquery_file_loads.py | 78 ++++++++++++++++-----
.../apache_beam/io/gcp/bigquery_file_loads_test.py | 10 +--
sdks/python/apache_beam/io/gcp/bigquery_test.py | 23 +++++--
sdks/python/apache_beam/io/gcp/bigquery_tools.py | 10 +--
9 files changed, 138 insertions(+), 93 deletions(-)
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index 9be3f89..c7c837b 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -87,19 +87,13 @@ def run(argv=None):
rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input))
counts = count_tornadoes(rows)
- if 'temp_location' in p.options.get_all_options():
- location = p.options.get_all_options()['temp_location']
- else:
- location = known_args.gcs_location
-
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
counts | 'Write' >> beam.io.WriteToBigQuery(
known_args.output,
schema='month:INTEGER, tornado_count:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
- write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
- gs_location=location)
+ write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
# Run the pipeline (all operations are deferred until run() is called).
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
index bf3581c..f7eb93b 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
@@ -60,10 +60,8 @@ class BigqueryTornadoesIT(unittest.TestCase):
project=project,
query=query,
checksum=self.DEFAULT_CHECKSUM)]
- gs_location = 'gs://temp-storage-for-upload-tests/%s' % table
extra_opts = {'output': output_table,
- 'on_success_matcher': all_of(*pipeline_verifiers),
- 'gcs_location': gs_location}
+ 'on_success_matcher': all_of(*pipeline_verifiers)}
# Register cleanup before pipeline execution.
# Note that actual execution happens in reverse order.
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
index 2570fc7..5bdf8a8 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
@@ -131,10 +131,8 @@ class BigQueryQueryToTableIT(unittest.TestCase):
query=verify_query,
checksum=expected_checksum)]
- gs_location = 'gs://temp-storage-for-upload-tests/%s' % self.output_table
extra_opts = {'query': LEGACY_QUERY,
'output': self.output_table,
- 'bq_temp_location': gs_location,
'output_schema': DIALECT_OUTPUT_SCHEMA,
'use_standard_sql': False,
'on_success_matcher': all_of(*pipeline_verifiers)}
@@ -149,10 +147,9 @@ class BigQueryQueryToTableIT(unittest.TestCase):
project=self.project,
query=verify_query,
checksum=expected_checksum)]
- gs_location = 'gs://temp-storage-for-upload-tests/%s' % self.output_table
+
extra_opts = {'query': STANDARD_QUERY,
'output': self.output_table,
- 'bq_temp_location': gs_location,
'output_schema': DIALECT_OUTPUT_SCHEMA,
'use_standard_sql': True,
'on_success_matcher': all_of(*pipeline_verifiers)}
@@ -197,11 +194,9 @@ class BigQueryQueryToTableIT(unittest.TestCase):
query=verify_query,
checksum=expected_checksum)]
self._setup_new_types_env()
- gs_location = 'gs://temp-storage-for-upload-tests/%s' % self.output_table
extra_opts = {
'query': NEW_TYPES_QUERY % (self.dataset_id, NEW_TYPES_INPUT_TABLE),
'output': self.output_table,
- 'bq_temp_location': gs_location,
'output_schema': NEW_TYPES_OUTPUT_SCHEMA,
'use_standard_sql': False,
'on_success_matcher': all_of(*pipeline_verifiers)}
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
index 07b194e..45aa10e 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_pipeline.py
@@ -50,10 +50,7 @@ def run_bq_pipeline(argv=None):
help='Output BQ table to write results to.')
parser.add_argument('--kms_key', default=None,
help='Use this Cloud KMS key with BigQuery.')
- parser.add_argument('--bq_temp_location',
- default=None,
- help=('GCS bucket to use to store files for '
- 'loading data into BigQuery.'))
+
known_args, pipeline_args = parser.parse_known_args(argv)
table_schema = parse_table_schema_from_json(known_args.output_schema)
@@ -61,10 +58,6 @@ def run_bq_pipeline(argv=None):
p = TestPipeline(options=PipelineOptions(pipeline_args))
- if 'temp_location' in p.options.get_all_options():
- location = p.options.get_all_options()['temp_location']
- else:
- location = known_args.bq_temp_location
# pylint: disable=expression-not-assigned
# pylint: disable=bad-continuation
(p | 'read' >> beam.io.Read(beam.io.BigQuerySource(
@@ -74,8 +67,7 @@ def run_bq_pipeline(argv=None):
known_args.output,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
- write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
- gs_location=location))
+ write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
result = p.run()
result.wait_until_finish()
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 27ee67a..f94f31b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -552,6 +552,7 @@ class BigQueryWriteFn(DoFn):
def __init__(
self,
batch_size,
+ schema=None,
create_disposition=None,
write_disposition=None,
kms_key=None,
@@ -590,11 +591,13 @@ class BigQueryWriteFn(DoFn):
retry_strategy: The strategy to use when retrying streaming inserts
into BigQuery. Options are shown in bigquery_tools.RetryStrategy attrs.
"""
+ self.schema = schema
self.test_client = test_client
self.create_disposition = create_disposition
self.write_disposition = write_disposition
self._rows_buffer = []
self._reset_rows_buffer()
+ self._observed_tables = set()
self._total_buffered_rows = 0
self.kms_key = kms_key
@@ -654,6 +657,11 @@ class BigQueryWriteFn(DoFn):
if str_table_reference in self._observed_tables:
return
+ if self.create_disposition == BigQueryDisposition.CREATE_NEVER:
+ # If we never want to create the table, we assume it already exists,
+ # and avoid the get-or-create step.
+ return
+
logging.debug('Creating or getting table %s with schema %s.',
table_reference, schema)
@@ -671,12 +679,17 @@ class BigQueryWriteFn(DoFn):
def process(self, element, unused_create_fn_output=None):
destination = element[0]
- if isinstance(destination, tuple):
- schema = destination[1]
- destination = destination[0]
- self._create_table_if_needed(
- bigquery_tools.parse_table_reference(destination),
- schema)
+
+ if callable(self.schema):
+ schema = self.schema(destination)
+ elif isinstance(self.schema, vp.ValueProvider):
+ schema = self.schema.get()
+ else:
+ schema = self.schema
+
+ self._create_table_if_needed(
+ bigquery_tools.parse_table_reference(destination),
+ schema)
row = element[1]
self._rows_buffer[destination].append(row)
@@ -762,9 +775,10 @@ class WriteToBigQuery(PTransform):
max_file_size=None,
max_files_per_bundle=None,
test_client=None,
- gs_location=None,
+ custom_gcs_temp_location=None,
method=None,
- insert_retry_strategy=None):
+ insert_retry_strategy=None,
+ validate=True):
"""Initialize a WriteToBigQuery transform.
Args:
@@ -784,17 +798,19 @@ class WriteToBigQuery(PTransform):
project (str): The ID of the project containing this table or
:data:`None` if the table reference is specified entirely by the table
argument.
- schema (str,dict,ValueProvider): The schema to be used if the
+ schema (str,dict,ValueProvider,callable): The schema to be used if the
BigQuery table to write has to be created. This can be either specified
as a :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
- or a python dictionary, or the string or dictionary itself.
+ or a python dictionary, or the string or dictionary itself,
object or a single string of the form
``'field1:type1,field2:type2,field3:type3'`` that defines a comma
separated list of fields. Here ``'type'`` should specify the BigQuery
type of the field. Single string based schemas do not support nested
fields, repeated fields, or specifying a BigQuery mode for fields
(mode will always be set to ``'NULLABLE'``).
+ If a callable, then it should receive a destination (in the form of
+ a TableReference or a string, and return a str, dict or TableSchema.
create_disposition (BigQueryDisposition): A string describing what
happens if the table does not exist. Possible values are:
@@ -825,8 +841,8 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
written by a worker. The default here is 20. Larger values will allow
writing to multiple destinations without having to reshard - but they
increase the memory burden on the workers.
- gs_location (str): A GCS location to store files to be used for file
- loads into BigQuery. By default, this will use the pipeline's
+ custom_gcs_temp_location (str): A GCS location to store files to be used
+ for file loads into BigQuery. By default, this will use the pipeline's
temp_location, but for pipelines whose temp_location is not appropriate
for BQ File Loads, users should pass a specific one.
method: The method to use to write to BigQuery. It may be
@@ -836,6 +852,8 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
FILE_LOADS on Batch pipelines.
insert_retry_strategy: The strategy to use when retrying streaming inserts
into BigQuery. Options are shown in bigquery_tools.RetryStrategy attrs.
+ validate: Indicates whether to perform validation checks on
+ inputs. This parameter is primarily used for testing.
"""
self.table_reference = bigquery_tools.parse_table_reference(
table, dataset, project)
@@ -847,11 +865,14 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
self.batch_size = batch_size
self.kms_key = kms_key
self.test_client = test_client
- self.gs_location = gs_location
+
+ # TODO(pabloem): Consider handling ValueProvider for this location.
+ self.custom_gcs_temp_location = custom_gcs_temp_location
self.max_file_size = max_file_size
self.max_files_per_bundle = max_files_per_bundle
self.method = method or WriteToBigQuery.Method.DEFAULT
self.insert_retry_strategy = insert_retry_strategy
+ self._validate = validate
@staticmethod
def get_table_schema_from_string(schema):
@@ -919,17 +940,15 @@ bigquery_v2_messages.TableSchema):
Dict[str, Any]: The schema to be used if the BigQuery table to write has
to be created but in the dictionary format.
"""
- if isinstance(schema, dict):
- return schema
- elif schema is None:
+ if (isinstance(schema, (dict, vp.ValueProvider)) or
+ callable(schema) or
+ schema is None):
return schema
elif isinstance(schema, (str, unicode)):
table_schema = WriteToBigQuery.get_table_schema_from_string(schema)
return WriteToBigQuery.table_schema_to_dict(table_schema)
elif isinstance(schema, bigquery.TableSchema):
return WriteToBigQuery.table_schema_to_dict(schema)
- elif isinstance(schema, vp.ValueProvider):
- return schema
else:
raise TypeError('Unexpected schema argument: %s.' % schema)
@@ -948,6 +967,7 @@ bigquery_v2_messages.TableSchema):
self.method == WriteToBigQuery.Method.STREAMING_INSERTS):
# TODO: Support load jobs for streaming pipelines.
bigquery_write_fn = BigQueryWriteFn(
+ schema=self.schema,
batch_size=self.batch_size,
create_disposition=self.create_disposition,
write_disposition=self.write_disposition,
@@ -957,8 +977,7 @@ bigquery_v2_messages.TableSchema):
outputs = (pcoll
| 'AppendDestination' >> beam.ParDo(
- bigquery_tools.AppendDestinationsFn(
- destination=self.table_reference, schema=self.schema))
+ bigquery_tools.AppendDestinationsFn(self.table_reference))
| 'StreamInsertRows' >> ParDo(bigquery_write_fn).with_outputs(
BigQueryWriteFn.FAILED_ROWS, main='main'))
@@ -969,15 +988,17 @@ bigquery_v2_messages.TableSchema):
'File Loads to BigQuery are only supported on Batch pipelines.')
from apache_beam.io.gcp import bigquery_file_loads
- return pcoll | bigquery_file_loads.BigQueryBatchFileLoads(
- destination=self.table_reference,
- schema=self.schema,
- create_disposition=self.create_disposition,
- write_disposition=self.write_disposition,
- max_file_size=self.max_file_size,
- max_files_per_bundle=self.max_files_per_bundle,
- gs_location=self.gs_location,
- test_client=self.test_client)
+ return (pcoll
+ | bigquery_file_loads.BigQueryBatchFileLoads(
+ destination=self.table_reference,
+ schema=self.schema,
+ create_disposition=self.create_disposition,
+ write_disposition=self.write_disposition,
+ max_file_size=self.max_file_size,
+ max_files_per_bundle=self.max_files_per_bundle,
+ custom_gcs_temp_location=self.custom_gcs_temp_location,
+ test_client=self.test_client,
+ validate=self._validate))
def display_data(self):
res = {}
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 7395370..f6d5c83 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -44,7 +44,7 @@ from apache_beam.io import filesystems as fs
from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.internal.clients import bigquery as bigquery_api
from apache_beam.options import value_provider as vp
-from apache_beam.transforms.combiners import Count
+from apache_beam.options.pipeline_options import GoogleCloudOptions
ONE_TERABYTE = (1 << 40)
@@ -66,13 +66,27 @@ def _generate_load_job_name():
return 'beam_load_%s_%s' % (datetime_component, random.randint(0, 100))
-def _generate_file_prefix(pipeline_gcs_location):
- # If a gcs location is provided to the pipeline, then we shall use that.
- # Otherwise, we shall use the temp_location from pipeline options.
- gcs_base = str(pipeline_gcs_location or
- vp.RuntimeValueProvider.get_value('temp_location', str, ''))
- prefix_uuid = _bq_uuid()
- return fs.FileSystems.join(gcs_base, 'bq_load', prefix_uuid)
+def file_prefix_generator(with_validation=True):
+ def _generate_file_prefix(pipeline_gcs_location):
+ # If a gcs location is provided to the pipeline, then we shall use that.
+ # Otherwise, we shall use the temp_location from pipeline options.
+ gcs_base = str(pipeline_gcs_location or
+ vp.RuntimeValueProvider.get_value('temp_location', str, ''))
+
+ # This will fail at pipeline execution time, but will fail early, as this
+ # step doesn't have any dependencies (and thus will be one of the first
+ # stages to be run).
+ if with_validation and (not gcs_base or not gcs_base.startswith('gs://')):
+ raise ValueError('Invalid GCS location.\n'
+ 'Writing to BigQuery with FILE_LOADS method requires a '
+ 'GCS location to be provided to write files to be loaded'
+ ' loaded into BigQuery. Please provide a GCS bucket, or '
+ 'pass method="STREAMING_INSERTS" to WriteToBigQuery.')
+
+ prefix_uuid = _bq_uuid()
+ return fs.FileSystems.join(gcs_base, 'bq_load', prefix_uuid)
+
+ return _generate_file_prefix
def _make_new_file_writer(file_prefix, destination):
@@ -276,8 +290,8 @@ class TriggerCopyJobs(beam.DoFn):
copy_to_reference = bigquery_tools.parse_table_reference(destination)
if copy_to_reference.projectId is None:
- copy_to_reference.projectId = vp.RuntimeValueProvider.get_value(
- 'project', str, '')
+ copy_to_reference.projectId = vp.RuntimeValueProvider.get_value('project',
+ str, '')
copy_from_reference = bigquery_tools.parse_table_reference(destination)
copy_from_reference.tableId = job_reference.jobId
@@ -350,6 +364,13 @@ class TriggerLoadJobs(beam.DoFn):
destination = element[0]
files = iter(element[1])
+ if callable(self.schema):
+ schema = self.schema(destination)
+ elif isinstance(self.schema, vp.ValueProvider):
+ schema = self.schema.get()
+ else:
+ schema = self.schema
+
job_count = 0
batch_of_files = list(itertools.islice(files, _MAXIMUM_SOURCE_URIS))
while batch_of_files:
@@ -380,7 +401,7 @@ class TriggerLoadJobs(beam.DoFn):
job_name, table_reference)
job_reference = self.bq_wrapper.perform_load_job(
table_reference, batch_of_files, job_name,
- schema=self.schema,
+ schema=schema,
write_disposition=self.write_disposition,
create_disposition=self.create_disposition)
yield (destination, job_reference)
@@ -466,20 +487,21 @@ class BigQueryBatchFileLoads(beam.PTransform):
self,
destination,
schema=None,
- gs_location=None,
+ custom_gcs_temp_location=None,
create_disposition=None,
write_disposition=None,
coder=None,
max_file_size=None,
max_files_per_bundle=None,
- test_client=None):
+ test_client=None,
+ validate=True):
self.destination = destination
self.create_disposition = create_disposition
self.write_disposition = write_disposition
self.max_file_size = max_file_size or _DEFAULT_MAX_FILE_SIZE
self.max_files_per_bundle = (max_files_per_bundle or
_DEFAULT_MAX_WRITERS_PER_BUNDLE)
- self._input_gs_location = gs_location
+ self._custom_gcs_temp_location = custom_gcs_temp_location
self.test_client = test_client
self.schema = schema
self.coder = coder or bigquery_tools.RowAsDictJsonCoder()
@@ -490,9 +512,28 @@ class BigQueryBatchFileLoads(beam.PTransform):
# job to run - and thus we avoid using temporary tables
self.temp_tables = True if callable(destination) else False
+ self._validate = validate
+ if self._validate:
+ self.verify()
+
+ def verify(self):
+ if (isinstance(self._custom_gcs_temp_location, str) and
+ not self._custom_gcs_temp_location.startswith('gs://')):
+ # Only fail if the custom location is provided, and it is not a GCS
+ # location.
+ raise ValueError('Invalid GCS location.\n'
+ 'Writing to BigQuery with FILE_LOADS method requires a '
+ 'GCS location to be provided to write files to be '
+ 'loaded into BigQuery. Please provide a GCS bucket, or '
+ 'pass method="STREAMING_INSERTS" to WriteToBigQuery.')
+
def expand(self, pcoll):
p = pcoll.pipeline
+ self._custom_gcs_temp_location = (
+ self._custom_gcs_temp_location
+ or p.options.view_as(GoogleCloudOptions).temp_location)
+
load_job_name_pcv = pvalue.AsSingleton(
p
| "ImpulseJobName" >> beam.Create([None])
@@ -500,8 +541,10 @@ class BigQueryBatchFileLoads(beam.PTransform):
file_prefix_pcv = pvalue.AsSingleton(
p
- | "CreateFilePrefixView" >> beam.Create([self._input_gs_location])
- | "GenerateFilePrefix" >> beam.Map(_generate_file_prefix))
+ | "CreateFilePrefixView" >> beam.Create(
+ [self._custom_gcs_temp_location])
+ | "GenerateFilePrefix" >> beam.Map(
+ file_prefix_generator(self._validate)))
outputs = (
pcoll
@@ -583,7 +626,8 @@ class BigQueryBatchFileLoads(beam.PTransform):
| "RemoveTempTables/PassTables" >> beam.FlatMap(
lambda x, deleting_tables: deleting_tables,
pvalue.AsIter(temp_tables_pc))
- | "RemoveTempTables/DeduplicateTables" >> Count.PerElement()
+ | "RemoveTempTables/AddUselessValue" >> beam.Map(lambda x: (x, None))
+ | "RemoveTempTables/DeduplicateTables" >> beam.GroupByKey()
| "RemoveTempTables/GetTableNames" >> beam.Map(lambda elm: elm[0])
| "RemoveTempTables/Delete" >> beam.ParDo(DeleteTablesFn()))
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 499f0c3..9391b66 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -23,7 +23,6 @@ import json
import logging
import os
import random
-import sys
import time
import unittest
@@ -295,8 +294,9 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
transform = bigquery.WriteToBigQuery(
destination,
- gs_location=self._new_tempdir(),
- test_client=bq_client)
+ custom_gcs_temp_location=self._new_tempdir(),
+ test_client=bq_client,
+ validate=False)
# Need to test this with the DirectRunner to avoid serializing mocks
with TestPipeline('DirectRunner') as p:
@@ -361,10 +361,6 @@ class BigQueryFileLoadsIT(unittest.TestCase):
logging.info("Created dataset %s in project %s",
self.dataset_id, self.project)
- @unittest.skipIf(sys.version_info[0] == 3 and
- os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
- 'This test still needs to be fixed on Python 3'
- 'TODO: BEAM-6711')
@attr('IT')
def test_multiple_destinations_transform(self):
output_table_1 = '%s%s' % (self.output_table, 1)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 67b2bef..5beb983 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -425,7 +425,7 @@ class BigQueryStreamingInsertTransformTests(unittest.TestCase):
projectId='project_id', datasetId='dataset_id', tableId='table_id'))
client.tabledata.InsertAll.return_value = \
bigquery.TableDataInsertAllResponse(insertErrors=[])
- create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
+ create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
fn = beam.io.gcp.bigquery.BigQueryWriteFn(
@@ -439,7 +439,7 @@ class BigQueryStreamingInsertTransformTests(unittest.TestCase):
# Destination is a tuple of (destination, schema) to ensure the table is
# created.
- fn.process((('project_id:dataset_id.table_id', None), {'month': 1}))
+ fn.process(('project_id:dataset_id.table_id', {'month': 1}))
self.assertTrue(client.tables.Get.called)
# InsertRows not called as batch size is not hit
@@ -522,18 +522,24 @@ class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
_ = (input
| "WriteWithMultipleDests" >> beam.io.gcp.bigquery.WriteToBigQuery(
- table=value_provider.StaticValueProvider(str, output_table_1),
+ table=value_provider.StaticValueProvider(
+ str, '%s:%s' % (self.project, output_table_1)),
schema=value_provider.StaticValueProvider(dict, schema),
method='STREAMING_INSERTS'))
_ = (input
| "WriteWithMultipleDests2" >> beam.io.gcp.bigquery.WriteToBigQuery(
- table=value_provider.StaticValueProvider(str, output_table_2),
+ table=value_provider.StaticValueProvider(
+ str, '%s:%s' % (self.project, output_table_2)),
method='FILE_LOADS'))
@attr('IT')
def test_multiple_destinations_transform(self):
output_table_1 = '%s%s' % (self.output_table, 1)
output_table_2 = '%s%s' % (self.output_table, 2)
+
+ full_output_table_1 = '%s:%s' % (self.project, output_table_1)
+ full_output_table_2 = '%s:%s' % (self.project, output_table_2)
+
schema1 = {'fields': [
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'language', 'type': 'STRING', 'mode': 'NULLABLE'}]}
@@ -569,13 +575,16 @@ class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
r = (input
| "WriteWithMultipleDests" >> beam.io.gcp.bigquery.WriteToBigQuery(
- table=lambda x: ((output_table_1, schema1)
+ table=lambda x: (full_output_table_1
if 'language' in x
- else (output_table_2, schema2)),
+ else full_output_table_2),
+ schema=lambda dest: (schema1
+ if dest == full_output_table_1
+ else schema2),
method='STREAMING_INSERTS'))
assert_that(r[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS],
- equal_to([(output_table_1, bad_record)]))
+ equal_to([(full_output_table_1, bad_record)]))
def tearDown(self):
request = bigquery.BigqueryDatasetsDeleteRequest(
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index 82ea8ea..ca7216a 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -997,8 +997,8 @@ class AppendDestinationsFn(DoFn):
Experimental; no backwards compatibility guarantees.
"""
- def __init__(self, destination, schema=None):
- self.destination = AppendDestinationsFn._get_table_fn(destination, schema)
+ def __init__(self, destination):
+ self.destination = AppendDestinationsFn._get_table_fn(destination)
@staticmethod
def _value_provider_or_static_val(elm):
@@ -1010,13 +1010,9 @@ class AppendDestinationsFn(DoFn):
return value_provider.StaticValueProvider(lambda x: x, value=elm)
@staticmethod
- def _get_table_fn(destination, schema=None):
+ def _get_table_fn(destination):
if callable(destination):
return destination
- elif not callable(destination) and schema is not None:
- return lambda x: (
- AppendDestinationsFn._value_provider_or_static_val(destination).get(),
- AppendDestinationsFn._value_provider_or_static_val(schema).get())
else:
return lambda x: AppendDestinationsFn._value_provider_or_static_val(
destination).get()