You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/06/08 01:35:11 UTC

[22/50] beam git commit: [BEAM-2405] Write to BQ using the streaming API

[BEAM-2405] Write to BQ using the streaming API


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1498684d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1498684d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1498684d

Branch: refs/heads/DSL_SQL
Commit: 1498684dfea31594a236edd7fde5d299e4b0aa1e
Parents: fa3922b
Author: Sourabh Bajaj <so...@google.com>
Authored: Fri Jun 2 20:32:48 2017 -0700
Committer: chamikara@google.com <ch...@google.com>
Committed: Tue Jun 6 14:04:33 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/bigquery.py      | 177 +++++++++++++++++++
 sdks/python/apache_beam/io/gcp/bigquery_test.py | 172 ++++++++++++++++++
 2 files changed, 349 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1498684d/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 201c798..9069f73 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -115,6 +115,9 @@ from apache_beam.internal.gcp import auth
 from apache_beam.internal.gcp.json_value import from_json_value
 from apache_beam.internal.gcp.json_value import to_json_value
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
+from apache_beam.transforms import DoFn
+from apache_beam.transforms import ParDo
+from apache_beam.transforms import PTransform
 from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.utils import retry
 from apache_beam.options.pipeline_options import GoogleCloudOptions
@@ -134,6 +137,7 @@ __all__ = [
     'BigQueryDisposition',
     'BigQuerySource',
     'BigQuerySink',
+    'WriteToBigQuery',
     ]
 
 JSON_COMPLIANCE_ERROR = 'NAN, INF and -INF values are not JSON compliant.'
@@ -813,6 +817,7 @@ class BigQueryWrapper(object):
     request = bigquery.BigqueryTablesInsertRequest(
         projectId=project_id, datasetId=dataset_id, table=table)
     response = self.client.tables.Insert(request)
+    logging.debug("Created the table with id %s", table_id)
     # The response is a bigquery.Table instance.
     return response
 
@@ -1134,3 +1139,175 @@ class BigQueryWrapper(object):
       else:
         result[field.name] = self._convert_cell_value_to_dict(value, field)
     return result
+
+
+class BigQueryWriteFn(DoFn):
+  """A ``DoFn`` that streams writes to BigQuery once the table is created.
+  """
+
+  def __init__(self, table_id, dataset_id, project_id, batch_size, schema,
+               create_disposition, write_disposition, client):
+    """Initialize a WriteToBigQuery transform.
+
+    Args:
+      table_id: The ID of the table. The ID must contain only letters
+        (a-z, A-Z), numbers (0-9), or underscores (_). If dataset argument is
+        None then the table argument must contain the entire table reference
+        specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'.
+      dataset_id: The ID of the dataset containing this table or null if the
+        table reference is specified entirely by the table argument.
+      project_id: The ID of the project containing this table or null if the
+        table reference is specified entirely by the table argument.
+      batch_size: Number of rows to be written to BQ per streaming API insert.
+      schema: The schema to be used if the BigQuery table to write has to be
+        created. This can be either specified as a 'bigquery.TableSchema' object
+        or a single string  of the form 'field1:type1,field2:type2,field3:type3'
+        that defines a comma separated list of fields. Here 'type' should
+        specify the BigQuery type of the field. Single string based schemas do
+        not support nested fields, repeated fields, or specifying a BigQuery
+        mode for fields (mode will always be set to 'NULLABLE').
+      create_disposition: A string describing what happens if the table does not
+        exist. Possible values are:
+        - BigQueryDisposition.CREATE_IF_NEEDED: create if does not exist.
+        - BigQueryDisposition.CREATE_NEVER: fail the write if does not exist.
+      write_disposition: A string describing what happens if the table has
+        already some data. Possible values are:
+        -  BigQueryDisposition.WRITE_TRUNCATE: delete existing rows.
+        -  BigQueryDisposition.WRITE_APPEND: add to existing rows.
+        -  BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty.
+        For streaming pipelines WriteTruncate can not be used.
+      test_client: Override the default bigquery client used for testing.
+    """
+    self.table_id = table_id
+    self.dataset_id = dataset_id
+    self.project_id = project_id
+    self.schema = schema
+    self.client = client
+    self.create_disposition = create_disposition
+    self.write_disposition = write_disposition
+    self._rows_buffer = []
+    # The default batch size is 500
+    self._max_batch_size = batch_size or 500
+
+  @staticmethod
+  def get_table_schema(schema):
+    # Transform the table schema into a bigquery.TableSchema instance.
+    if isinstance(schema, basestring):
+      table_schema = bigquery.TableSchema()
+      schema_list = [s.strip() for s in schema.split(',')]
+      for field_and_type in schema_list:
+        field_name, field_type = field_and_type.split(':')
+        field_schema = bigquery.TableFieldSchema()
+        field_schema.name = field_name
+        field_schema.type = field_type
+        field_schema.mode = 'NULLABLE'
+        table_schema.fields.append(field_schema)
+      return table_schema
+    elif schema is None:
+      return schema
+    elif isinstance(schema, bigquery.TableSchema):
+      return schema
+    else:
+      raise TypeError('Unexpected schema argument: %s.' % schema)
+
+  def start_bundle(self):
+    self._rows_buffer = []
+    self.table_schema = self.get_table_schema(self.schema)
+
+    self.bigquery_wrapper = BigQueryWrapper(client=self.client)
+    self.bigquery_wrapper.get_or_create_table(
+        self.project_id, self.dataset_id, self.table_id, self.table_schema,
+        self.create_disposition, self.write_disposition)
+
+  def process(self, element, unused_create_fn_output=None):
+    self._rows_buffer.append(element)
+    if len(self._rows_buffer) >= self._max_batch_size:
+      self._flush_batch()
+
+  def finish_bundle(self):
+    if self._rows_buffer:
+      self._flush_batch()
+    self._rows_buffer = []
+
+  def _flush_batch(self):
+    # Flush the current batch of rows to BigQuery.
+    passed, errors = self.bigquery_wrapper.insert_rows(
+        project_id=self.project_id, dataset_id=self.dataset_id,
+        table_id=self.table_id, rows=self._rows_buffer)
+    if not passed:
+      raise RuntimeError('Could not successfully insert rows to BigQuery'
+                         ' table [%s:%s.%s]. Errors: %s'%
+                         (self.project_id, self.dataset_id,
+                          self.table_id, errors))
+    logging.debug("Successfully wrote %d rows.", len(self._rows_buffer))
+    self._rows_buffer = []
+
+
+class WriteToBigQuery(PTransform):
+
+  def __init__(self, table, dataset=None, project=None, schema=None,
+               create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
+               write_disposition=BigQueryDisposition.WRITE_APPEND,
+               batch_size=None, test_client=None):
+    """Initialize a WriteToBigQuery transform.
+
+    Args:
+      table: The ID of the table. The ID must contain only letters
+        (a-z, A-Z), numbers (0-9), or underscores (_). If dataset argument is
+        None then the table argument must contain the entire table reference
+        specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'.
+      dataset: The ID of the dataset containing this table or null if the table
+        reference is specified entirely by the table argument.
+      project: The ID of the project containing this table or null if the table
+        reference is specified entirely by the table argument.
+      schema: The schema to be used if the BigQuery table to write has to be
+        created. This can be either specified as a 'bigquery.TableSchema' object
+        or a single string  of the form 'field1:type1,field2:type2,field3:type3'
+        that defines a comma separated list of fields. Here 'type' should
+        specify the BigQuery type of the field. Single string based schemas do
+        not support nested fields, repeated fields, or specifying a BigQuery
+        mode for fields (mode will always be set to 'NULLABLE').
+      create_disposition: A string describing what happens if the table does not
+        exist. Possible values are:
+        - BigQueryDisposition.CREATE_IF_NEEDED: create if does not exist.
+        - BigQueryDisposition.CREATE_NEVER: fail the write if does not exist.
+      write_disposition: A string describing what happens if the table has
+        already some data. Possible values are:
+        -  BigQueryDisposition.WRITE_TRUNCATE: delete existing rows.
+        -  BigQueryDisposition.WRITE_APPEND: add to existing rows.
+        -  BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty.
+        For streaming pipelines WriteTruncate can not be used.
+      batch_size: Number of rows to be written to BQ per streaming API insert.
+      test_client: Override the default bigquery client used for testing.
+    """
+    self.table_reference = _parse_table_reference(table, dataset, project)
+    self.create_disposition = BigQueryDisposition.validate_create(
+        create_disposition)
+    self.write_disposition = BigQueryDisposition.validate_write(
+        write_disposition)
+    self.schema = schema
+    self.batch_size = batch_size
+    self.test_client = test_client
+
+  def expand(self, pcoll):
+    bigquery_write_fn = BigQueryWriteFn(
+        table_id=self.table_reference.tableId,
+        dataset_id=self.table_reference.datasetId,
+        project_id=self.table_reference.projectId,
+        batch_size=self.batch_size,
+        schema=self.schema,
+        create_disposition=self.create_disposition,
+        write_disposition=self.write_disposition,
+        client=self.test_client)
+    return pcoll | 'Write to BigQuery' >> ParDo(bigquery_write_fn)
+
+  def display_data(self):
+    res = {}
+    if self.table_reference is not None:
+      tableSpec = '{}.{}'.format(self.table_reference.datasetId,
+                                 self.table_reference.tableId)
+      if self.table_reference.projectId is not None:
+        tableSpec = '{}:{}'.format(self.table_reference.projectId,
+                                   tableSpec)
+      res['table'] = DisplayDataItem(tableSpec, label='Table')
+    return res

http://git-wip-us.apache.org/repos/asf/beam/blob/1498684d/sdks/python/apache_beam/io/gcp/bigquery_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index a26050c..b7f766b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -824,6 +824,178 @@ class TestBigQueryWrapper(unittest.TestCase):
     self.assertEqual(new_dataset.datasetReference.datasetId, 'dataset_id')
 
 
+@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+class WriteToBigQuery(unittest.TestCase):
+
+  def test_dofn_client_start_bundle_called(self):
+    client = mock.Mock()
+    client.tables.Get.return_value = bigquery.Table(
+        tableReference=bigquery.TableReference(
+            projectId='project_id', datasetId='dataset_id', tableId='table_id'))
+    create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
+    write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+    fn = beam.io.gcp.bigquery.BigQueryWriteFn(
+        table_id='table_id',
+        dataset_id='dataset_id',
+        project_id='project_id',
+        batch_size=2,
+        schema='month:INTEGER',
+        create_disposition=create_disposition,
+        write_disposition=write_disposition,
+        client=client)
+
+    fn.start_bundle()
+    self.assertTrue(client.tables.Get.called)
+
+  def test_dofn_client_start_bundle_create_called(self):
+    client = mock.Mock()
+    client.tables.Get.return_value = None
+    client.tables.Insert.return_value = bigquery.Table(
+        tableReference=bigquery.TableReference(
+            projectId='project_id', datasetId='dataset_id', tableId='table_id'))
+    create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
+    write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+
+    fn = beam.io.gcp.bigquery.BigQueryWriteFn(
+        table_id='table_id',
+        dataset_id='dataset_id',
+        project_id='project_id',
+        batch_size=2,
+        schema='month:INTEGER',
+        create_disposition=create_disposition,
+        write_disposition=write_disposition,
+        client=client)
+
+    fn.start_bundle()
+    self.assertTrue(client.tables.Get.called)
+    self.assertTrue(client.tables.Insert.called)
+
+  def test_dofn_client_process_performs_batching(self):
+    client = mock.Mock()
+    client.tables.Get.return_value = bigquery.Table(
+        tableReference=bigquery.TableReference(
+            projectId='project_id', datasetId='dataset_id', tableId='table_id'))
+    client.tabledata.InsertAll.return_value = \
+        bigquery.TableDataInsertAllResponse(insertErrors=[])
+    create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
+    write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+
+    fn = beam.io.gcp.bigquery.BigQueryWriteFn(
+        table_id='table_id',
+        dataset_id='dataset_id',
+        project_id='project_id',
+        batch_size=2,
+        schema='month:INTEGER',
+        create_disposition=create_disposition,
+        write_disposition=write_disposition,
+        client=client)
+
+    fn.start_bundle()
+    fn.process({'month': 1})
+
+    self.assertTrue(client.tables.Get.called)
+    # InsertRows not called as batch size is not hit yet
+    self.assertFalse(client.tabledata.InsertAll.called)
+
+  def test_dofn_client_process_flush_called(self):
+    client = mock.Mock()
+    client.tables.Get.return_value = bigquery.Table(
+        tableReference=bigquery.TableReference(
+            projectId='project_id', datasetId='dataset_id', tableId='table_id'))
+    client.tabledata.InsertAll.return_value = (
+        bigquery.TableDataInsertAllResponse(insertErrors=[]))
+    create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
+    write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+
+    fn = beam.io.gcp.bigquery.BigQueryWriteFn(
+        table_id='table_id',
+        dataset_id='dataset_id',
+        project_id='project_id',
+        batch_size=2,
+        schema='month:INTEGER',
+        create_disposition=create_disposition,
+        write_disposition=write_disposition,
+        client=client)
+
+    fn.start_bundle()
+    fn.process({'month': 1})
+    fn.process({'month': 2})
+    self.assertTrue(client.tables.Get.called)
+    # InsertRows called as batch size is hit
+    self.assertTrue(client.tabledata.InsertAll.called)
+
+  def test_dofn_client_finish_bundle_flush_called(self):
+    client = mock.Mock()
+    client.tables.Get.return_value = bigquery.Table(
+        tableReference=bigquery.TableReference(
+            projectId='project_id', datasetId='dataset_id', tableId='table_id'))
+    client.tabledata.InsertAll.return_value = \
+        bigquery.TableDataInsertAllResponse(insertErrors=[])
+    create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
+    write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+
+    fn = beam.io.gcp.bigquery.BigQueryWriteFn(
+        table_id='table_id',
+        dataset_id='dataset_id',
+        project_id='project_id',
+        batch_size=2,
+        schema='month:INTEGER',
+        create_disposition=create_disposition,
+        write_disposition=write_disposition,
+        client=client)
+
+    fn.start_bundle()
+    fn.process({'month': 1})
+
+    self.assertTrue(client.tables.Get.called)
+    # InsertRows not called as batch size is not hit
+    self.assertFalse(client.tabledata.InsertAll.called)
+
+    fn.finish_bundle()
+    # InsertRows called in finish bundle
+    self.assertTrue(client.tabledata.InsertAll.called)
+
+  def test_dofn_client_no_records(self):
+    client = mock.Mock()
+    client.tables.Get.return_value = bigquery.Table(
+        tableReference=bigquery.TableReference(
+            projectId='project_id', datasetId='dataset_id', tableId='table_id'))
+    client.tabledata.InsertAll.return_value = \
+        bigquery.TableDataInsertAllResponse(insertErrors=[])
+    create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
+    write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+
+    fn = beam.io.gcp.bigquery.BigQueryWriteFn(
+        table_id='table_id',
+        dataset_id='dataset_id',
+        project_id='project_id',
+        batch_size=2,
+        schema='month:INTEGER',
+        create_disposition=create_disposition,
+        write_disposition=write_disposition,
+        client=client)
+
+    fn.start_bundle()
+    self.assertTrue(client.tables.Get.called)
+    # InsertRows not called as batch size is not hit
+    self.assertFalse(client.tabledata.InsertAll.called)
+
+    fn.finish_bundle()
+    # InsertRows not called in finish bundle as no records
+    self.assertFalse(client.tabledata.InsertAll.called)
+
+  def test_simple_schema_parsing(self):
+    table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema(
+        schema='s:STRING, n:INTEGER')
+    string_field = bigquery.TableFieldSchema(
+        name='s', type='STRING', mode='NULLABLE')
+    number_field = bigquery.TableFieldSchema(
+        name='n', type='INTEGER', mode='NULLABLE')
+    expected_table_schema = bigquery.TableSchema(
+        fields=[string_field, number_field])
+    self.assertEqual(expected_table_schema, table_schema)
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()