You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2017/07/13 20:52:16 UTC
[1/2] beam git commit: [BEAM-2595] Allow table schema objects in BQ
DoFn
Repository: beam
Updated Branches:
refs/heads/master 5fd2c6e13 -> e8c557448
[BEAM-2595] Allow table schema objects in BQ DoFn
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eb951c2e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eb951c2e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eb951c2e
Branch: refs/heads/master
Commit: eb951c2e161294510d5a23f7c641592b0a8be087
Parents: 5fd2c6e
Author: Sourabh Bajaj <so...@google.com>
Authored: Thu Jul 13 12:02:31 2017 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Thu Jul 13 13:51:15 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/gcp/bigquery.py | 100 +++++++++++++++---
sdks/python/apache_beam/io/gcp/bigquery_test.py | 105 +++++++++++++++++--
2 files changed, 180 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/eb951c2e/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index da8be68..23fd310 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1191,22 +1191,20 @@ class BigQueryWriteFn(DoFn):
@staticmethod
def get_table_schema(schema):
- # Transform the table schema into a bigquery.TableSchema instance.
- if isinstance(schema, basestring):
- table_schema = bigquery.TableSchema()
- schema_list = [s.strip() for s in schema.split(',')]
- for field_and_type in schema_list:
- field_name, field_type = field_and_type.split(':')
- field_schema = bigquery.TableFieldSchema()
- field_schema.name = field_name
- field_schema.type = field_type
- field_schema.mode = 'NULLABLE'
- table_schema.fields.append(field_schema)
- return table_schema
- elif schema is None:
- return schema
- elif isinstance(schema, bigquery.TableSchema):
+ """Transform the table schema into a bigquery.TableSchema instance.
+
+ Args:
+ schema: The schema to be used if the BigQuery table to write has to be
+ created. This is a dictionary object created in the WriteToBigQuery
+ transform.
+ Returns:
+ table_schema: The schema to be used if the BigQuery table to write has
+ to be created but in the bigquery.TableSchema format.
+ """
+ if schema is None:
return schema
+ elif isinstance(schema, dict):
+ return parse_table_schema_from_json(json.dumps(schema))
else:
raise TypeError('Unexpected schema argument: %s.' % schema)
@@ -1289,13 +1287,83 @@ class WriteToBigQuery(PTransform):
self.batch_size = batch_size
self.test_client = test_client
+ @staticmethod
+ def get_table_schema_from_string(schema):
+ """Transform the string table schema into a bigquery.TableSchema instance.
+
+ Args:
+ schema: The sting schema to be used if the BigQuery table to write has
+ to be created.
+ Returns:
+ table_schema: The schema to be used if the BigQuery table to write has
+ to be created but in the bigquery.TableSchema format.
+ """
+ table_schema = bigquery.TableSchema()
+ schema_list = [s.strip() for s in schema.split(',')]
+ for field_and_type in schema_list:
+ field_name, field_type = field_and_type.split(':')
+ field_schema = bigquery.TableFieldSchema()
+ field_schema.name = field_name
+ field_schema.type = field_type
+ field_schema.mode = 'NULLABLE'
+ table_schema.fields.append(field_schema)
+ return table_schema
+
+ @staticmethod
+ def table_schema_to_dict(table_schema):
+ """Create a dictionary representation of table schema for serialization
+ """
+ def get_table_field(field):
+ """Create a dictionary representation of a table field
+ """
+ result = {}
+ result['name'] = field.name
+ result['type'] = field.type
+ result['mode'] = getattr(field, 'mode', 'NULLABLE')
+ if hasattr(field, 'description') and field.description is not None:
+ result['description'] = field.description
+ if hasattr(field, 'fields') and field.fields:
+ result['fields'] = [get_table_field(f) for f in field.fields]
+ return result
+
+ if not isinstance(table_schema, bigquery.TableSchema):
+ raise ValueError("Table schema must be of the type bigquery.TableSchema")
+ schema = {'fields': []}
+ for field in table_schema.fields:
+ schema['fields'].append(get_table_field(field))
+ return schema
+
+ @staticmethod
+ def get_dict_table_schema(schema):
+ """Transform the table schema into a dictionary instance.
+
+ Args:
+ schema: The schema to be used if the BigQuery table to write has to be
+ created. This can either be a dict or string or in the TableSchema
+ format.
+ Returns:
+ table_schema: The schema to be used if the BigQuery table to write has
+ to be created but in the dictionary format.
+ """
+ if isinstance(schema, dict):
+ return schema
+ elif schema is None:
+ return schema
+ elif isinstance(schema, basestring):
+ table_schema = WriteToBigQuery.get_table_schema_from_string(schema)
+ return WriteToBigQuery.table_schema_to_dict(table_schema)
+ elif isinstance(schema, bigquery.TableSchema):
+ return WriteToBigQuery.table_schema_to_dict(schema)
+ else:
+ raise TypeError('Unexpected schema argument: %s.' % schema)
+
def expand(self, pcoll):
bigquery_write_fn = BigQueryWriteFn(
table_id=self.table_reference.tableId,
dataset_id=self.table_reference.datasetId,
project_id=self.table_reference.projectId,
batch_size=self.batch_size,
- schema=self.schema,
+ schema=self.get_dict_table_schema(self.schema),
create_disposition=self.create_disposition,
write_disposition=self.write_disposition,
client=self.test_client)
http://git-wip-us.apache.org/repos/asf/beam/blob/eb951c2e/sdks/python/apache_beam/io/gcp/bigquery_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index b7f766b..14247ba 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -834,12 +834,15 @@ class WriteToBigQuery(unittest.TestCase):
projectId='project_id', datasetId='dataset_id', tableId='table_id'))
create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+ schema = {'fields': [
+ {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
+
fn = beam.io.gcp.bigquery.BigQueryWriteFn(
table_id='table_id',
dataset_id='dataset_id',
project_id='project_id',
batch_size=2,
- schema='month:INTEGER',
+ schema=schema,
create_disposition=create_disposition,
write_disposition=write_disposition,
client=client)
@@ -855,13 +858,15 @@ class WriteToBigQuery(unittest.TestCase):
projectId='project_id', datasetId='dataset_id', tableId='table_id'))
create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+ schema = {'fields': [
+ {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
fn = beam.io.gcp.bigquery.BigQueryWriteFn(
table_id='table_id',
dataset_id='dataset_id',
project_id='project_id',
batch_size=2,
- schema='month:INTEGER',
+ schema=schema,
create_disposition=create_disposition,
write_disposition=write_disposition,
client=client)
@@ -879,13 +884,15 @@ class WriteToBigQuery(unittest.TestCase):
bigquery.TableDataInsertAllResponse(insertErrors=[])
create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+ schema = {'fields': [
+ {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
fn = beam.io.gcp.bigquery.BigQueryWriteFn(
table_id='table_id',
dataset_id='dataset_id',
project_id='project_id',
batch_size=2,
- schema='month:INTEGER',
+ schema=schema,
create_disposition=create_disposition,
write_disposition=write_disposition,
client=client)
@@ -906,13 +913,15 @@ class WriteToBigQuery(unittest.TestCase):
bigquery.TableDataInsertAllResponse(insertErrors=[]))
create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+ schema = {'fields': [
+ {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
fn = beam.io.gcp.bigquery.BigQueryWriteFn(
table_id='table_id',
dataset_id='dataset_id',
project_id='project_id',
batch_size=2,
- schema='month:INTEGER',
+ schema=schema,
create_disposition=create_disposition,
write_disposition=write_disposition,
client=client)
@@ -933,13 +942,15 @@ class WriteToBigQuery(unittest.TestCase):
bigquery.TableDataInsertAllResponse(insertErrors=[])
create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+ schema = {'fields': [
+ {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
fn = beam.io.gcp.bigquery.BigQueryWriteFn(
table_id='table_id',
dataset_id='dataset_id',
project_id='project_id',
batch_size=2,
- schema='month:INTEGER',
+ schema=schema,
create_disposition=create_disposition,
write_disposition=write_disposition,
client=client)
@@ -964,13 +975,15 @@ class WriteToBigQuery(unittest.TestCase):
bigquery.TableDataInsertAllResponse(insertErrors=[])
create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER
write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND
+ schema = {'fields': [
+ {'name': 'month', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
fn = beam.io.gcp.bigquery.BigQueryWriteFn(
table_id='table_id',
dataset_id='dataset_id',
project_id='project_id',
batch_size=2,
- schema='month:INTEGER',
+ schema=schema,
create_disposition=create_disposition,
write_disposition=write_disposition,
client=client)
@@ -984,17 +997,91 @@ class WriteToBigQuery(unittest.TestCase):
# InsertRows not called in finish bundle as no records
self.assertFalse(client.tabledata.InsertAll.called)
- def test_simple_schema_parsing(self):
+ def test_noop_schema_parsing(self):
+ expected_table_schema = None
table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema(
- schema='s:STRING, n:INTEGER')
+ schema=None)
+ self.assertEqual(expected_table_schema, table_schema)
+
+ def test_dict_schema_parsing(self):
+ schema = {'fields': [
+ {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
+ {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'},
+ {'name': 'r', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [
+ {'name': 'x', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}]}
+ table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema(schema)
string_field = bigquery.TableFieldSchema(
name='s', type='STRING', mode='NULLABLE')
+ nested_field = bigquery.TableFieldSchema(
+ name='x', type='INTEGER', mode='NULLABLE')
number_field = bigquery.TableFieldSchema(
name='n', type='INTEGER', mode='NULLABLE')
+ record_field = bigquery.TableFieldSchema(
+ name='r', type='RECORD', mode='NULLABLE', fields=[nested_field])
expected_table_schema = bigquery.TableSchema(
- fields=[string_field, number_field])
+ fields=[string_field, number_field, record_field])
self.assertEqual(expected_table_schema, table_schema)
+ def test_string_schema_parsing(self):
+ schema = 's:STRING, n:INTEGER'
+ expected_dict_schema = {'fields': [
+ {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
+ {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
+ dict_schema = (
+ beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
+ self.assertEqual(expected_dict_schema, dict_schema)
+
+ def test_table_schema_parsing(self):
+ string_field = bigquery.TableFieldSchema(
+ name='s', type='STRING', mode='NULLABLE')
+ nested_field = bigquery.TableFieldSchema(
+ name='x', type='INTEGER', mode='NULLABLE')
+ number_field = bigquery.TableFieldSchema(
+ name='n', type='INTEGER', mode='NULLABLE')
+ record_field = bigquery.TableFieldSchema(
+ name='r', type='RECORD', mode='NULLABLE', fields=[nested_field])
+ schema = bigquery.TableSchema(
+ fields=[string_field, number_field, record_field])
+ expected_dict_schema = {'fields': [
+ {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
+ {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'},
+ {'name': 'r', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [
+ {'name': 'x', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}]}
+ dict_schema = (
+ beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
+ self.assertEqual(expected_dict_schema, dict_schema)
+
+ def test_table_schema_parsing_end_to_end(self):
+ string_field = bigquery.TableFieldSchema(
+ name='s', type='STRING', mode='NULLABLE')
+ nested_field = bigquery.TableFieldSchema(
+ name='x', type='INTEGER', mode='NULLABLE')
+ number_field = bigquery.TableFieldSchema(
+ name='n', type='INTEGER', mode='NULLABLE')
+ record_field = bigquery.TableFieldSchema(
+ name='r', type='RECORD', mode='NULLABLE', fields=[nested_field])
+ schema = bigquery.TableSchema(
+ fields=[string_field, number_field, record_field])
+ table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema(
+ beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
+ self.assertEqual(table_schema, schema)
+
+ def test_none_schema_parsing(self):
+ schema = None
+ expected_dict_schema = None
+ dict_schema = (
+ beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
+ self.assertEqual(expected_dict_schema, dict_schema)
+
+ def test_noop_dict_schema_parsing(self):
+ schema = {'fields': [
+ {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE'},
+ {'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE'}]}
+ expected_dict_schema = schema
+ dict_schema = (
+ beam.io.gcp.bigquery.WriteToBigQuery.get_dict_table_schema(schema))
+ self.assertEqual(expected_dict_schema, dict_schema)
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
[2/2] beam git commit: This closes #3556
Posted by ch...@apache.org.
This closes #3556
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e8c55744
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e8c55744
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e8c55744
Branch: refs/heads/master
Commit: e8c5574483edc28d8bea30e55aa2d54b1d566722
Parents: 5fd2c6e eb951c2
Author: Chamikara Jayalath <ch...@google.com>
Authored: Thu Jul 13 13:52:02 2017 -0700
Committer: Chamikara Jayalath <ch...@google.com>
Committed: Thu Jul 13 13:52:02 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/gcp/bigquery.py | 100 +++++++++++++++---
sdks/python/apache_beam/io/gcp/bigquery_test.py | 105 +++++++++++++++++--
2 files changed, 180 insertions(+), 25 deletions(-)
----------------------------------------------------------------------