You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/11/03 17:39:08 UTC

[3/3] incubator-beam git commit: BEAM-873 Support for BigQuery 2 SQL

BEAM-873 Support for BigQuery 2 SQL


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0a339653
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0a339653
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0a339653

Branch: refs/heads/python-sdk
Commit: 0a339653b0c7f6225a2b12e6c0988af8ad758098
Parents: 7270471
Author: Sourabh Bajaj <so...@google.com>
Authored: Fri Oct 28 13:59:23 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Thu Nov 3 10:38:44 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/bigquery.py          | 26 ++++++++++++++------
 .../apache_beam/runners/dataflow_runner.py      |  2 ++
 sdks/python/apache_beam/utils/names.py          |  1 +
 3 files changed, 21 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0a339653/sdks/python/apache_beam/io/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py
index 4eecaa5..41b1bdc 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -284,7 +284,7 @@ class BigQuerySource(dataflow_io.NativeSource):
   """A source based on a BigQuery table."""
 
   def __init__(self, table=None, dataset=None, project=None, query=None,
-               validate=False, coder=None):
+               validate=False, coder=None, use_legacy_sql=True):
     """Initialize a BigQuerySource.
 
     Args:
@@ -312,6 +312,10 @@ class BigQuerySource(dataflow_io.NativeSource):
         in a file as a JSON serialized dictionary. This argument needs a value
         only in special cases when returning table rows as dictionaries is not
         desirable.
+      useLegacySql: Specifies whether to use BigQuery's legacy
+        SQL dialect for this query. The default value is true. If set to false,
+        the query will use BigQuery's updated SQL dialect with improved
+        standards compliance. This parameter is forced to True for table inputs.
 
     Raises:
       ValueError: if any of the following is true
@@ -328,8 +332,10 @@ class BigQuerySource(dataflow_io.NativeSource):
     elif table is not None:
       self.table_reference = _parse_table_reference(table, dataset, project)
       self.query = None
+      self.use_legacy_sql = True
     else:
       self.query = query
+      self.use_legacy_sql = use_legacy_sql
       self.table_reference = None
 
     self.validate = validate
@@ -342,7 +348,9 @@ class BigQuerySource(dataflow_io.NativeSource):
 
   def reader(self, test_bigquery_client=None):
     return BigQueryReader(
-        source=self, test_bigquery_client=test_bigquery_client)
+        source=self,
+        test_bigquery_client=test_bigquery_client,
+        use_legacy_sql=self.use_legacy_sql)
 
 
 class BigQuerySink(dataflow_io.NativeSink):
@@ -462,7 +470,7 @@ class BigQuerySink(dataflow_io.NativeSink):
 class BigQueryReader(dataflow_io.NativeSourceReader):
   """A reader for a BigQuery source."""
 
-  def __init__(self, source, test_bigquery_client=None):
+  def __init__(self, source, test_bigquery_client=None, use_legacy_sql=True):
     self.source = source
     self.test_bigquery_client = test_bigquery_client
     if auth.is_running_in_gce:
@@ -484,6 +492,7 @@ class BigQueryReader(dataflow_io.NativeSourceReader):
     # for reading the field values in each row but could be useful for
     # getting additional details.
     self.schema = None
+    self.use_legacy_sql = use_legacy_sql
     if self.source.query is None:
       # If table schema did not define a project we default to executing
       # project.
@@ -506,7 +515,8 @@ class BigQueryReader(dataflow_io.NativeSourceReader):
 
   def __iter__(self):
     for rows, schema in self.client.run_query(
-        project_id=self.executing_project, query=self.query):
+        project_id=self.executing_project, query=self.query,
+        use_legacy_sql=self.use_legacy_sql):
       if self.schema is None:
         self.schema = schema
       for row in rows:
@@ -607,14 +617,14 @@ class BigQueryWrapper(object):
     return '%s_%d' % (self._row_id_prefix, self._unique_row_id)
 
   @retry.with_exponential_backoff()  # Using retry defaults from utils/retry.py
-  def _start_query_job(self, project_id, query, dry_run=False):
+  def _start_query_job(self, project_id, query, use_legacy_sql, dry_run=False):
     request = bigquery.BigqueryJobsInsertRequest(
         projectId=project_id,
         job=bigquery.Job(
             configuration=bigquery.JobConfiguration(
                 dryRun=dry_run,
                 query=bigquery.JobConfigurationQuery(
-                    query=query))))
+                    query=query, useLegacySql=use_legacy_sql))))
     response = self.client.jobs.Insert(request)
     return response.jobReference.jobId
 
@@ -745,8 +755,8 @@ class BigQueryWrapper(object):
                                 table_id=table_id,
                                 schema=schema or found_table.schema)
 
-  def run_query(self, project_id, query, dry_run=False):
-    job_id = self._start_query_job(project_id, query, dry_run)
+  def run_query(self, project_id, query, use_legacy_sql, dry_run=False):
+    job_id = self._start_query_job(project_id, query, use_legacy_sql, dry_run)
     if dry_run:
       # If this was a dry run then the fact that we get here means the
       # query has no errors. The start_query_job would raise an error otherwise.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0a339653/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 1d0398c..57867fa 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -528,6 +528,8 @@ class DataflowPipelineRunner(PipelineRunner):
                             transform.source.table_reference.projectId)
       elif transform.source.query is not None:
         step.add_property(PropertyNames.BIGQUERY_QUERY, transform.source.query)
+        step.add_property(PropertyNames.BIGQUERY_USE_LEGACY_SQL,
+                          transform.source.use_legacy_sql)
       else:
         raise ValueError('BigQuery source %r must specify either a table or'
                          ' a query',

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0a339653/sdks/python/apache_beam/utils/names.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/names.py b/sdks/python/apache_beam/utils/names.py
index 41b5b43..be8c92a 100644
--- a/sdks/python/apache_beam/utils/names.py
+++ b/sdks/python/apache_beam/utils/names.py
@@ -45,6 +45,7 @@ class PropertyNames(object):
   BIGQUERY_CREATE_DISPOSITION = 'create_disposition'
   BIGQUERY_DATASET = 'dataset'
   BIGQUERY_QUERY = 'bigquery_query'
+  BIGQUERY_USE_LEGACY_SQL = 'bigquery_use_legacy_sql'
   BIGQUERY_TABLE = 'table'
   BIGQUERY_PROJECT = 'project'
   BIGQUERY_SCHEMA = 'schema'