You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/03 17:39:08 UTC
[3/3] incubator-beam git commit: BEAM-873 Support for BigQuery 2 SQL
BEAM-873 Support for BigQuery 2 SQL
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0a339653
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0a339653
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0a339653
Branch: refs/heads/python-sdk
Commit: 0a339653b0c7f6225a2b12e6c0988af8ad758098
Parents: 7270471
Author: Sourabh Bajaj <so...@google.com>
Authored: Fri Oct 28 13:59:23 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Nov 3 10:38:44 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/bigquery.py | 26 ++++++++++++++------
.../apache_beam/runners/dataflow_runner.py | 2 ++
sdks/python/apache_beam/utils/names.py | 1 +
3 files changed, 21 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0a339653/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py
index 4eecaa5..41b1bdc 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -284,7 +284,7 @@ class BigQuerySource(dataflow_io.NativeSource):
"""A source based on a BigQuery table."""
def __init__(self, table=None, dataset=None, project=None, query=None,
- validate=False, coder=None):
+ validate=False, coder=None, use_legacy_sql=True):
"""Initialize a BigQuerySource.
Args:
@@ -312,6 +312,10 @@ class BigQuerySource(dataflow_io.NativeSource):
in a file as a JSON serialized dictionary. This argument needs a value
only in special cases when returning table rows as dictionaries is not
desirable.
+ useLegacySql: Specifies whether to use BigQuery's legacy
+ SQL dialect for this query. The default value is true. If set to false,
+ the query will use BigQuery's updated SQL dialect with improved
+ standards compliance. This parameter is forced to True for table inputs.
Raises:
ValueError: if any of the following is true
@@ -328,8 +332,10 @@ class BigQuerySource(dataflow_io.NativeSource):
elif table is not None:
self.table_reference = _parse_table_reference(table, dataset, project)
self.query = None
+ self.use_legacy_sql = True
else:
self.query = query
+ self.use_legacy_sql = use_legacy_sql
self.table_reference = None
self.validate = validate
@@ -342,7 +348,9 @@ class BigQuerySource(dataflow_io.NativeSource):
def reader(self, test_bigquery_client=None):
return BigQueryReader(
- source=self, test_bigquery_client=test_bigquery_client)
+ source=self,
+ test_bigquery_client=test_bigquery_client,
+ use_legacy_sql=self.use_legacy_sql)
class BigQuerySink(dataflow_io.NativeSink):
@@ -462,7 +470,7 @@ class BigQuerySink(dataflow_io.NativeSink):
class BigQueryReader(dataflow_io.NativeSourceReader):
"""A reader for a BigQuery source."""
- def __init__(self, source, test_bigquery_client=None):
+ def __init__(self, source, test_bigquery_client=None, use_legacy_sql=True):
self.source = source
self.test_bigquery_client = test_bigquery_client
if auth.is_running_in_gce:
@@ -484,6 +492,7 @@ class BigQueryReader(dataflow_io.NativeSourceReader):
# for reading the field values in each row but could be useful for
# getting additional details.
self.schema = None
+ self.use_legacy_sql = use_legacy_sql
if self.source.query is None:
# If table schema did not define a project we default to executing
# project.
@@ -506,7 +515,8 @@ class BigQueryReader(dataflow_io.NativeSourceReader):
def __iter__(self):
for rows, schema in self.client.run_query(
- project_id=self.executing_project, query=self.query):
+ project_id=self.executing_project, query=self.query,
+ use_legacy_sql=self.use_legacy_sql):
if self.schema is None:
self.schema = schema
for row in rows:
@@ -607,14 +617,14 @@ class BigQueryWrapper(object):
return '%s_%d' % (self._row_id_prefix, self._unique_row_id)
@retry.with_exponential_backoff() # Using retry defaults from utils/retry.py
- def _start_query_job(self, project_id, query, dry_run=False):
+ def _start_query_job(self, project_id, query, use_legacy_sql, dry_run=False):
request = bigquery.BigqueryJobsInsertRequest(
projectId=project_id,
job=bigquery.Job(
configuration=bigquery.JobConfiguration(
dryRun=dry_run,
query=bigquery.JobConfigurationQuery(
- query=query))))
+ query=query, useLegacySql=use_legacy_sql))))
response = self.client.jobs.Insert(request)
return response.jobReference.jobId
@@ -745,8 +755,8 @@ class BigQueryWrapper(object):
table_id=table_id,
schema=schema or found_table.schema)
- def run_query(self, project_id, query, dry_run=False):
- job_id = self._start_query_job(project_id, query, dry_run)
+ def run_query(self, project_id, query, use_legacy_sql, dry_run=False):
+ job_id = self._start_query_job(project_id, query, use_legacy_sql, dry_run)
if dry_run:
# If this was a dry run then the fact that we get here means the
# query has no errors. The start_query_job would raise an error otherwise.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0a339653/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 1d0398c..57867fa 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -528,6 +528,8 @@ class DataflowPipelineRunner(PipelineRunner):
transform.source.table_reference.projectId)
elif transform.source.query is not None:
step.add_property(PropertyNames.BIGQUERY_QUERY, transform.source.query)
+ step.add_property(PropertyNames.BIGQUERY_USE_LEGACY_SQL,
+ transform.source.use_legacy_sql)
else:
raise ValueError('BigQuery source %r must specify either a table or'
' a query',
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0a339653/sdks/python/apache_beam/utils/names.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/names.py b/sdks/python/apache_beam/utils/names.py
index 41b5b43..be8c92a 100644
--- a/sdks/python/apache_beam/utils/names.py
+++ b/sdks/python/apache_beam/utils/names.py
@@ -45,6 +45,7 @@ class PropertyNames(object):
BIGQUERY_CREATE_DISPOSITION = 'create_disposition'
BIGQUERY_DATASET = 'dataset'
BIGQUERY_QUERY = 'bigquery_query'
+ BIGQUERY_USE_LEGACY_SQL = 'bigquery_use_legacy_sql'
BIGQUERY_TABLE = 'table'
BIGQUERY_PROJECT = 'project'
BIGQUERY_SCHEMA = 'schema'