You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/09/01 19:59:19 UTC

incubator-airflow git commit: [AIRFLOW-1556][Airflow 1556] Add support for SQL parameters in BigQueryBaseCursor

Repository: incubator-airflow
Updated Branches:
  refs/heads/master de593216d -> 9df0ac64c


[AIRFLOW-1556][Airflow 1556] Add support for SQL parameters in BigQueryBaseCursor

Closes #2557 from rajivpb/sql-parameters


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9df0ac64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9df0ac64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9df0ac64

Branch: refs/heads/master
Commit: 9df0ac64c0ce1a654875197697a3851484fd57af
Parents: de59321
Author: Rajiv Bharadwaja <ra...@google.com>
Authored: Fri Sep 1 12:59:11 2017 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Fri Sep 1 12:59:11 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py         | 6 +++++-
 airflow/contrib/operators/bigquery_operator.py | 8 +++++++-
 2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9df0ac64/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index b979ed9..e60f597 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -196,7 +196,8 @@ class BigQueryBaseCursor(object):
             udf_config = False,
             use_legacy_sql=True,
             maximum_billing_tier=None,
-            create_disposition='CREATE_IF_NEEDED'):
+            create_disposition='CREATE_IF_NEEDED',
+            query_params=None):
         """
         Executes a BigQuery SQL query. Optionally persists results in a BigQuery
         table. See here:
@@ -255,6 +256,9 @@ class BigQueryBaseCursor(object):
                 'userDefinedFunctionResources': udf_config
             })
 
+        if query_params:
+            configuration['query']['queryParameters'] = query_params
+
         return self.run_with_configuration(configuration)
 
     def run_extract(  # noqa

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9df0ac64/airflow/contrib/operators/bigquery_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py
index aaffc2e..3b804a8 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -51,6 +51,10 @@ class BigQueryOperator(BaseOperator):
     :param maximum_billing_tier: Positive integer that serves as a multiplier of the basic price.
         Defaults to None, in which case it uses the value set in the project.
     :type maximum_billing_tier: integer
+    :param query_params: a dictionary containing query parameter types and values, passed to
+        BigQuery.
+    :type query_params: dict
+
     """
     template_fields = ('bql', 'destination_dataset_table')
     template_ext = ('.sql',)
@@ -68,6 +72,7 @@ class BigQueryOperator(BaseOperator):
                  use_legacy_sql=True,
                  maximum_billing_tier=None,
                  create_disposition='CREATE_IF_NEEDED',
+                 query_params=None,
                  *args,
                  **kwargs):
         super(BigQueryOperator, self).__init__(*args, **kwargs)
@@ -81,6 +86,7 @@ class BigQueryOperator(BaseOperator):
         self.udf_config = udf_config
         self.use_legacy_sql = use_legacy_sql
         self.maximum_billing_tier = maximum_billing_tier
+        self.query_params = query_params
 
     def execute(self, context):
         logging.info('Executing: %s', self.bql)
@@ -91,4 +97,4 @@ class BigQueryOperator(BaseOperator):
         cursor.run_query(self.bql, self.destination_dataset_table, self.write_disposition,
                          self.allow_large_results, self.udf_config,
                          self.use_legacy_sql, self.maximum_billing_tier,
-                         self.create_disposition)
+                         self.create_disposition, self.query_params)