You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/06/08 01:35:11 UTC
[22/50] beam git commit: [BEAM-2405] Write to BQ using the streaming
API
[BEAM-2405] Write to BQ using the streaming API
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1498684d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1498684d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1498684d
Branch: refs/heads/DSL_SQL
Commit: 1498684dfea31594a236edd7fde5d299e4b0aa1e
Parents: fa3922b
Author: Sourabh Bajaj <so...@google.com>
Authored: Fri Jun 2 20:32:48 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Tue Jun 6 14:04:33 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/gcp/bigquery.py | 177 +++++++++++++++++++
sdks/python/apache_beam/io/gcp/bigquery_test.py | 172 ++++++++++++++++++
2 files changed, 349 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1498684d/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 201c798..9069f73 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -115,6 +115,9 @@ from apache_beam.internal.gcp import auth
from apache_beam.internal.gcp.json_value import from_json_value
from apache_beam.internal.gcp.json_value import to_json_value
from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
+from apache_beam.transforms import DoFn
+from apache_beam.transforms import ParDo
+from apache_beam.transforms import PTransform
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.utils import retry
from apache_beam.options.pipeline_options import GoogleCloudOptions
@@ -134,6 +137,7 @@ __all__ = [
'BigQueryDisposition',
'BigQuerySource',
'BigQuerySink',
+ 'WriteToBigQuery',
]
JSON_COMPLIANCE_ERROR = 'NAN, INF and -INF values are not JSON compliant.'
@@ -813,6 +817,7 @@ class BigQueryWrapper(object):
request = bigquery.BigqueryTablesInsertRequest(
projectId=project_id, datasetId=dataset_id, table=table)
response = self.client.tables.Insert(request)
+ logging.debug("Created the table with id %s", table_id)
# The response is a bigquery.Table instance.
return response
@@ -1134,3 +1139,175 @@ class BigQueryWrapper(object):
else:
result[field.name] = self._convert_cell_value_to_dict(value, field)
return result
+
+
+class BigQueryWriteFn(DoFn):
+ """A ``DoFn`` that streams writes to BigQuery once the table is created.
+ """
+
+ def __init__(self, table_id, dataset_id, project_id, batch_size, schema,
+ create_disposition, write_disposition, client):
+ """Initialize a WriteToBigQuery transform.
+
+ Args:
+ table_id: The ID of the table. The ID must contain only letters
+ (a-z, A-Z), numbers (0-9), or underscores (_). If dataset argument is
+ None then the table argument must contain the entire table reference
+ specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'.
+ dataset_id: The ID of the dataset containing this table or null if the
+ table reference is specified entirely by the table argument.
+ project_id: The ID of the project containing this table or null if the
+ table reference is specified entirely by the table argument.
+ batch_size: Number of rows to be written to BQ per streaming API insert.
+ schema: The schema to be used if the BigQuery table to write has to be
+ created. This can be either specified as a 'bigquery.TableSchema' object
+ or a single string of the form 'field1:type1,field2:type2,field3:type3'
+ that defines a comma separated list of fields. Here 'type' should
+ specify the BigQuery type of the field. Single string based schemas do
+ not support nested fields, repeated fields, or specifying a BigQuery
+ mode for fields (mode will always be set to 'NULLABLE').
+ create_disposition: A string describing what happens if the table does not
+ exist. Possible values are:
+ - BigQueryDisposition.CREATE_IF_NEEDED: create if does not exist.
+ - BigQueryDisposition.CREATE_NEVER: fail the write if does not exist.
+ write_disposition: A string describing what happens if the table has
+ already some data. Possible values are:
+ - BigQueryDisposition.WRITE_TRUNCATE: delete existing rows.
+ - BigQueryDisposition.WRITE_APPEND: add to existing rows.
+ - BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty.
+ For streaming pipelines WriteTruncate can not be used.
+ test_client: Override the default bigquery client used for testing.
+ """
+ self.table_id = table_id
+ self.dataset_id = dataset_id
+ self.project_id = project_id
+ self.schema = schema
+ self.client = client
+ self.create_disposition = create_disposition
+ self.write_disposition = write_disposition
+ self._rows_buffer = []
+ # The default batch size is 500
+ self._max_batch_size = batch_size or 500
+
+ @staticmethod
+ def get_table_schema(schema):
+ # Transform the table schema into a bigquery.TableSchema instance.
+ if isinstance(schema, basestring):
+ table_schema = bigquery.TableSchema()
+ schema_list = [s.strip() for s in schema.split(',')]
+ for field_and_type in schema_list:
+ field_name, field_type = field_and_type.split(':')
+ field_schema = bigquery.TableFieldSchema()
+ field_schema.name = field_name
+ field_schema.type = field_type
+ field_schema.mode = 'NULLABLE'
+ table_schema.fields.append(field_schema)
+ return table_schema
+ elif schema is None:
+ return schema
+ elif isinstance(schema, bigquery.TableSchema):
+ return schema
+ else:
+ raise TypeError('Unexpected schema argument: %s.' % schema)
+
+ def start_bundle(self):
+ self._rows_buffer = []
+ self.table_schema = self.get_table_schema(self.schema)
+
+ self.bigquery_wrapper = BigQueryWrapper(client=self.client)
+ self.bigquery_wrapper.get_or_create_table(
+ self.project_id, self.dataset_id, self.table_id, self.table_schema,
+ self.create_disposition, self.write_disposition)
+
+ def process(self, element, unused_create_fn_output=None):
+ self._rows_buffer.append(element)
+ if len(self._rows_buffer) >= self._max_batch_size:
+ self._flush_batch()
+
+ def finish_bundle(self):
+ if self._rows_buffer:
+ self._flush_batch()
+ self._rows_buffer = []
+
+ def _flush_batch(self):
+ # Flush the current batch of rows to BigQuery.
+ passed, errors = self.bigquery_wrapper.insert_rows(
+ project_id=self.project_id, dataset_id=self.dataset_id,
+ table_id=self.table_id, rows=self._rows_buffer)
+ if not passed:
+ raise RuntimeError('Could not successfully insert rows to BigQuery'
+ ' table [%s:%s.%s]. Errors: %s'%
+ (self.project_id, self.dataset_id,
+ self.table_id, errors))
+ logging.debug("Successfully wrote %d rows.", len(self._rows_buffer))
+ self._rows_buffer = []
+
+
+class WriteToBigQuery(PTransform):
+
+ def __init__(self, table, dataset=None, project=None, schema=None,
+ create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
+ write_disposition=BigQueryDisposition.WRITE_APPEND,
+ batch_size=None, test_client=None):
+ """Initialize a WriteToBigQuery transform.
+
+ Args:
+ table: The ID of the table. The ID must contain only letters
+ (a-z, A-Z), numbers (0-9), or underscores (_). If dataset argument is
+ None then the table argument must contain the entire table reference
+ specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'.
+ dataset: The ID of the dataset containing this table or null if the table
+ reference is specified entirely by the table argument.
+ project: The ID of the project containing this table or null if the table
+ reference is specified entirely by the table argument.
+ schema: The schema to be used if the BigQuery table to write has to be
+ created. This can be either specified as a 'bigquery.TableSchema' object
+ or a single string of the form 'field1:type1,field2:type2,field3:type3'
+ that defines a comma separated list of fields. Here 'type' should
+ specify the BigQuery type of the field. Single string based schemas do
+ not support nested fields, repeated fields, or specifying a BigQuery
+ mode for fields (mode will always be set to 'NULLABLE').
+ create_disposition: A string describing what happens if the table does not
+ exist. Possible values are:
+ - BigQueryDisposition.CREATE_IF_NEEDED: create if does not exist.
+ - BigQueryDisposition.CREATE_NEVER: fail the write if does not exist.
+ write_disposition: A string describing what happens if the table has
+ already some data. Possible values are:
+ - BigQueryDisposition.WRITE_TRUNCATE: delete existing rows.
+ - BigQueryDisposition.WRITE_APPEND: add to existing rows.
+ - BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty.
+ For streaming pipelines WriteTruncate can not be used.
+ batch_size: Number of rows to be written to BQ per streaming API insert.
+ test_client: Override the default bigquery client used for testing.
+ """
+ self.table_reference = _parse_table_reference(table, dataset, project)
+ self.create_disposition = BigQueryDisposition.validate_create(
+ create_disposition)
+ self.write_disposition = BigQueryDisposition.validate_write(
+ write_disposition)
+ self.schema = schema
+ self.batch_size = batch_size
+ self.test_client = test_client
+
+ def expand(self, pcoll):
+ bigquery_write_fn = BigQueryWriteFn(
+ table_id=self.table_reference.tableId,
+ dataset_id=self.table_reference.datasetId,
+ project_id=self.table_reference.projectId,
+ batch_size=self.batch_size,
+ schema=self.schema,
+ create_disposition=self.create_disposition,
+ write_disposition=self.write_disposition,
+ client=self.test_client)
+ return pcoll | 'Write to BigQuery' >> ParDo(bigquery_write_fn)
+
+ def display_data(self):
+ res = {}
+ if self.table_reference is not None:
+ tableSpec = '{}.{}'.format(self.table_reference.datasetId,
+ self.table_reference.tableId)
+ if self.table_reference.projectId is not None:
+ tableSpec = '{}:{}'.format(self.table_reference.projectId,
+ tableSpec)
+ res['table'] = DisplayDataItem(tableSpec, label='Table')
+ return res
http://git-wip-us.apache.org/repos/asf/beam/blob/1498684d/sdks/python/apache_beam/io/gcp/bigquery_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index a26050c..b7f766b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -824,6 +824,178 @@ class TestBigQueryWrapper(unittest.TestCase):
self.assertEqual(new_dataset.datasetReference.datasetId, 'dataset_id')
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+class WriteToBigQuery(unittest.TestCase):
+
+ def test_dofn_client_start_bundle_called(self):
+ client = mock.Mock()
+ client.tables.Get.return_value = bigquery.Table(
+ tableReference=bigquery.TableReference(
+ projectId='project_id', datasetId='dataset_id', tableId='table_id'))
+ create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
+ write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+ fn = beam.io.gcp.bigquery.BigQueryWriteFn(
+ table_id='table_id',
+ dataset_id='dataset_id',
+ project_id='project_id',
+ batch_size=2,
+ schema='month:INTEGER',
+ create_disposition=create_disposition,
+ write_disposition=write_disposition,
+ client=client)
+
+ fn.start_bundle()
+ self.assertTrue(client.tables.Get.called)
+
+ def test_dofn_client_start_bundle_create_called(self):
+ client = mock.Mock()
+ client.tables.Get.return_value = None
+ client.tables.Insert.return_value = bigquery.Table(
+ tableReference=bigquery.TableReference(
+ projectId='project_id', datasetId='dataset_id', tableId='table_id'))
+ create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
+ write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+
+ fn = beam.io.gcp.bigquery.BigQueryWriteFn(
+ table_id='table_id',
+ dataset_id='dataset_id',
+ project_id='project_id',
+ batch_size=2,
+ schema='month:INTEGER',
+ create_disposition=create_disposition,
+ write_disposition=write_disposition,
+ client=client)
+
+ fn.start_bundle()
+ self.assertTrue(client.tables.Get.called)
+ self.assertTrue(client.tables.Insert.called)
+
+ def test_dofn_client_process_performs_batching(self):
+ client = mock.Mock()
+ client.tables.Get.return_value = bigquery.Table(
+ tableReference=bigquery.TableReference(
+ projectId='project_id', datasetId='dataset_id', tableId='table_id'))
+ client.tabledata.InsertAll.return_value = \
+ bigquery.TableDataInsertAllResponse(insertErrors=[])
+ create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
+ write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+
+ fn = beam.io.gcp.bigquery.BigQueryWriteFn(
+ table_id='table_id',
+ dataset_id='dataset_id',
+ project_id='project_id',
+ batch_size=2,
+ schema='month:INTEGER',
+ create_disposition=create_disposition,
+ write_disposition=write_disposition,
+ client=client)
+
+ fn.start_bundle()
+ fn.process({'month': 1})
+
+ self.assertTrue(client.tables.Get.called)
+ # InsertRows not called as batch size is not hit yet
+ self.assertFalse(client.tabledata.InsertAll.called)
+
+ def test_dofn_client_process_flush_called(self):
+ client = mock.Mock()
+ client.tables.Get.return_value = bigquery.Table(
+ tableReference=bigquery.TableReference(
+ projectId='project_id', datasetId='dataset_id', tableId='table_id'))
+ client.tabledata.InsertAll.return_value = (
+ bigquery.TableDataInsertAllResponse(insertErrors=[]))
+ create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
+ write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+
+ fn = beam.io.gcp.bigquery.BigQueryWriteFn(
+ table_id='table_id',
+ dataset_id='dataset_id',
+ project_id='project_id',
+ batch_size=2,
+ schema='month:INTEGER',
+ create_disposition=create_disposition,
+ write_disposition=write_disposition,
+ client=client)
+
+ fn.start_bundle()
+ fn.process({'month': 1})
+ fn.process({'month': 2})
+ self.assertTrue(client.tables.Get.called)
+ # InsertRows called as batch size is hit
+ self.assertTrue(client.tabledata.InsertAll.called)
+
+ def test_dofn_client_finish_bundle_flush_called(self):
+ client = mock.Mock()
+ client.tables.Get.return_value = bigquery.Table(
+ tableReference=bigquery.TableReference(
+ projectId='project_id', datasetId='dataset_id', tableId='table_id'))
+ client.tabledata.InsertAll.return_value = \
+ bigquery.TableDataInsertAllResponse(insertErrors=[])
+ create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
+ write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+
+ fn = beam.io.gcp.bigquery.BigQueryWriteFn(
+ table_id='table_id',
+ dataset_id='dataset_id',
+ project_id='project_id',
+ batch_size=2,
+ schema='month:INTEGER',
+ create_disposition=create_disposition,
+ write_disposition=write_disposition,
+ client=client)
+
+ fn.start_bundle()
+ fn.process({'month': 1})
+
+ self.assertTrue(client.tables.Get.called)
+ # InsertRows not called as batch size is not hit
+ self.assertFalse(client.tabledata.InsertAll.called)
+
+ fn.finish_bundle()
+ # InsertRows called in finish bundle
+ self.assertTrue(client.tabledata.InsertAll.called)
+
+ def test_dofn_client_no_records(self):
+ client = mock.Mock()
+ client.tables.Get.return_value = bigquery.Table(
+ tableReference=bigquery.TableReference(
+ projectId='project_id', datasetId='dataset_id', tableId='table_id'))
+ client.tabledata.InsertAll.return_value = \
+ bigquery.TableDataInsertAllResponse(insertErrors=[])
+ create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
+ write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+
+ fn = beam.io.gcp.bigquery.BigQueryWriteFn(
+ table_id='table_id',
+ dataset_id='dataset_id',
+ project_id='project_id',
+ batch_size=2,
+ schema='month:INTEGER',
+ create_disposition=create_disposition,
+ write_disposition=write_disposition,
+ client=client)
+
+ fn.start_bundle()
+ self.assertTrue(client.tables.Get.called)
+ # InsertRows not called as batch size is not hit
+ self.assertFalse(client.tabledata.InsertAll.called)
+
+ fn.finish_bundle()
+ # InsertRows not called in finish bundle as no records
+ self.assertFalse(client.tabledata.InsertAll.called)
+
+ def test_simple_schema_parsing(self):
+ table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema(
+ schema='s:STRING, n:INTEGER')
+ string_field = bigquery.TableFieldSchema(
+ name='s', type='STRING', mode='NULLABLE')
+ number_field = bigquery.TableFieldSchema(
+ name='n', type='INTEGER', mode='NULLABLE')
+ expected_table_schema = bigquery.TableSchema(
+ fields=[string_field, number_field])
+ self.assertEqual(expected_table_schema, table_schema)
+
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()