You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/07/21 20:18:58 UTC

incubator-airflow git commit: [AIRFLOW-1439] Add max billing tier for the BQ Hook and Operator

Repository: incubator-airflow
Updated Branches:
  refs/heads/master b87903d12 -> f1f022c1e


[AIRFLOW-1439] Add max billing tier for the BQ Hook and Operator

Closes #2437 from aviDms/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f1f022c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f1f022c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f1f022c1

Branch: refs/heads/master
Commit: f1f022c1ece2f2719a16e5016dfa5c8bfe57bfd0
Parents: b87903d
Author: aviDms <av...@gmail.com>
Authored: Fri Jul 21 13:11:34 2017 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Fri Jul 21 13:11:42 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py         | 8 ++++++--
 airflow/contrib/operators/bigquery_operator.py | 8 +++++++-
 2 files changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1f022c1/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index 0950b22..dc98b89 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -194,7 +194,8 @@ class BigQueryBaseCursor(object):
             write_disposition = 'WRITE_EMPTY',
             allow_large_results=False,
             udf_config = False,
-            use_legacy_sql=True):
+            use_legacy_sql=True,
+            maximum_billing_tier=None):
         """
         Executes a BigQuery SQL query. Optionally persists results in a BigQuery
         table. See here:
@@ -216,11 +217,14 @@ class BigQueryBaseCursor(object):
         :type udf_config: list
         :param use_legacy_sql: Whether to use legacy SQL (true) or standard SQL (false).
         :type use_legacy_sql: boolean
+        :param maximum_billing_tier: Positive integer that serves as a multiplier of the basic price.
+        :type maximum_billing_tier: integer
         """
         configuration = {
             'query': {
                 'query': bql,
-                'useLegacySql': use_legacy_sql
+                'useLegacySql': use_legacy_sql,
+                'maximumBillingTier': maximum_billing_tier
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f1f022c1/airflow/contrib/operators/bigquery_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py
index 2f3abe7..5e8a0d5 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -42,6 +42,9 @@ class BigQueryOperator(BaseOperator):
     :type udf_config: list
     :param use_legacy_sql: Whether to use legacy SQL (true) or standard SQL (false).
     :type use_legacy_sql: boolean
+    :param maximum_billing_tier: Positive integer that serves as a multiplier of the basic price.
+        Defaults to None, in which case it uses the value set in the project.
+    :type maximum_billing_tier: integer
     """
     template_fields = ('bql', 'destination_dataset_table')
     template_ext = ('.sql',)
@@ -57,6 +60,7 @@ class BigQueryOperator(BaseOperator):
                  delegate_to=None,
                  udf_config=False,
                  use_legacy_sql=True,
+                 maximum_billing_tier=None,
                  *args,
                  **kwargs):
         super(BigQueryOperator, self).__init__(*args, **kwargs)
@@ -68,6 +72,7 @@ class BigQueryOperator(BaseOperator):
         self.delegate_to = delegate_to
         self.udf_config = udf_config
         self.use_legacy_sql = use_legacy_sql
+        self.maximum_billing_tier = maximum_billing_tier
 
     def execute(self, context):
         logging.info('Executing: %s', self.bql)
@@ -76,4 +81,5 @@ class BigQueryOperator(BaseOperator):
         conn = hook.get_conn()
         cursor = conn.cursor()
         cursor.run_query(self.bql, self.destination_dataset_table, self.write_disposition,
-                         self.allow_large_results, self.udf_config, self.use_legacy_sql)
+                         self.allow_large_results, self.udf_config, self.use_legacy_sql,
+                         self.maximum_billing_tier)