You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/07/31 20:24:27 UTC

incubator-airflow git commit: [AIRFLOW-1389] Support createDisposition in BigQueryOperator

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 547f8184b -> 6e2640766


[AIRFLOW-1389] Support createDisposition in BigQueryOperator

Closes #2470 from yu-iskw/bq-operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6e264076
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6e264076
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6e264076

Branch: refs/heads/master
Commit: 6e2640766de437f2593424f1746a15d56f26f027
Parents: 547f818
Author: Yu ISHIKAWA <yu...@gmail.com>
Authored: Mon Jul 31 13:23:54 2017 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Mon Jul 31 13:23:58 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py         |  7 ++++++-
 airflow/contrib/operators/bigquery_operator.py | 13 +++++++++++--
 2 files changed, 17 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e264076/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index dc98b89..73e0a43 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -195,7 +195,8 @@ class BigQueryBaseCursor(object):
             allow_large_results=False,
             udf_config = False,
             use_legacy_sql=True,
-            maximum_billing_tier=None):
+            maximum_billing_tier=None,
+            create_disposition='CREATE_IF_NEEDED'):
         """
         Executes a BigQuery SQL query. Optionally persists results in a BigQuery
         table. See here:
@@ -210,6 +211,9 @@ class BigQueryBaseCursor(object):
             BigQuery table to save the query results.
         :param write_disposition: What to do if the table already exists in
             BigQuery.
+        :type write_disposition: string
+        :param create_disposition: Specifies whether the job is allowed to create new tables.
+        :type create_disposition: string
         :param allow_large_results: Whether to allow large results.
         :type allow_large_results: boolean
         :param udf_config: The User Defined Function configuration for the query.
@@ -238,6 +242,7 @@ class BigQueryBaseCursor(object):
             configuration['query'].update({
                 'allowLargeResults': allow_large_results,
                 'writeDisposition': write_disposition,
+                'createDisposition': create_disposition,
                 'destinationTable': {
                     'projectId': destination_project,
                     'datasetId': destination_dataset,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e264076/airflow/contrib/operators/bigquery_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py
index 5e8a0d5..aaffc2e 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -31,6 +31,12 @@ class BigQueryOperator(BaseOperator):
         (<project>.|<project>:)<dataset>.<table> that, if set, will store the results
         of the query.
     :type destination_dataset_table: string
+    :param write_disposition: Specifies the action that occurs if the destination table
+        already exists. (default: 'WRITE_EMPTY')
+    :type write_disposition: string
+    :param create_disposition: Specifies whether the job is allowed to create new tables.
+        (default: 'CREATE_IF_NEEDED')
+    :type create_disposition: string
     :param bigquery_conn_id: reference to a specific BigQuery hook.
     :type bigquery_conn_id: string
     :param delegate_to: The account to impersonate, if any.
@@ -61,12 +67,14 @@ class BigQueryOperator(BaseOperator):
                  udf_config=False,
                  use_legacy_sql=True,
                  maximum_billing_tier=None,
+                 create_disposition='CREATE_IF_NEEDED',
                  *args,
                  **kwargs):
         super(BigQueryOperator, self).__init__(*args, **kwargs)
         self.bql = bql
         self.destination_dataset_table = destination_dataset_table
         self.write_disposition = write_disposition
+        self.create_disposition = create_disposition
         self.allow_large_results = allow_large_results
         self.bigquery_conn_id = bigquery_conn_id
         self.delegate_to = delegate_to
@@ -81,5 +89,6 @@ class BigQueryOperator(BaseOperator):
         conn = hook.get_conn()
         cursor = conn.cursor()
         cursor.run_query(self.bql, self.destination_dataset_table, self.write_disposition,
-                         self.allow_large_results, self.udf_config, self.use_legacy_sql,
-                         self.maximum_billing_tier)
+                         self.allow_large_results, self.udf_config,
+                         self.use_legacy_sql, self.maximum_billing_tier,
+                         self.create_disposition)