You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/07/31 20:24:27 UTC
incubator-airflow git commit: [AIRFLOW-1389] Support
createDisposition in BigQueryOperator
Repository: incubator-airflow
Updated Branches:
refs/heads/master 547f8184b -> 6e2640766
[AIRFLOW-1389] Support createDisposition in BigQueryOperator
Closes #2470 from yu-iskw/bq-operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6e264076
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6e264076
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6e264076
Branch: refs/heads/master
Commit: 6e2640766de437f2593424f1746a15d56f26f027
Parents: 547f818
Author: Yu ISHIKAWA <yu...@gmail.com>
Authored: Mon Jul 31 13:23:54 2017 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Mon Jul 31 13:23:58 2017 -0700
----------------------------------------------------------------------
airflow/contrib/hooks/bigquery_hook.py | 7 ++++++-
airflow/contrib/operators/bigquery_operator.py | 13 +++++++++++--
2 files changed, 17 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e264076/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index dc98b89..73e0a43 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -195,7 +195,8 @@ class BigQueryBaseCursor(object):
allow_large_results=False,
udf_config = False,
use_legacy_sql=True,
- maximum_billing_tier=None):
+ maximum_billing_tier=None,
+ create_disposition='CREATE_IF_NEEDED'):
"""
Executes a BigQuery SQL query. Optionally persists results in a BigQuery
table. See here:
@@ -210,6 +211,9 @@ class BigQueryBaseCursor(object):
BigQuery table to save the query results.
:param write_disposition: What to do if the table already exists in
BigQuery.
+ :type write_disposition: string
+ :param create_disposition: Specifies whether the job is allowed to create new tables.
+ :type create_disposition: string
:param allow_large_results: Whether to allow large results.
:type allow_large_results: boolean
:param udf_config: The User Defined Function configuration for the query.
@@ -238,6 +242,7 @@ class BigQueryBaseCursor(object):
configuration['query'].update({
'allowLargeResults': allow_large_results,
'writeDisposition': write_disposition,
+ 'createDisposition': create_disposition,
'destinationTable': {
'projectId': destination_project,
'datasetId': destination_dataset,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6e264076/airflow/contrib/operators/bigquery_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py
index 5e8a0d5..aaffc2e 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -31,6 +31,12 @@ class BigQueryOperator(BaseOperator):
(<project>.|<project>:)<dataset>.<table> that, if set, will store the results
of the query.
:type destination_dataset_table: string
+ :param write_disposition: Specifies the action that occurs if the destination table
+ already exists. (default: 'WRITE_EMPTY')
+ :type write_disposition: string
+ :param create_disposition: Specifies whether the job is allowed to create new tables.
+ (default: 'CREATE_IF_NEEDED')
+ :type create_disposition: string
:param bigquery_conn_id: reference to a specific BigQuery hook.
:type bigquery_conn_id: string
:param delegate_to: The account to impersonate, if any.
@@ -61,12 +67,14 @@ class BigQueryOperator(BaseOperator):
udf_config=False,
use_legacy_sql=True,
maximum_billing_tier=None,
+ create_disposition='CREATE_IF_NEEDED',
*args,
**kwargs):
super(BigQueryOperator, self).__init__(*args, **kwargs)
self.bql = bql
self.destination_dataset_table = destination_dataset_table
self.write_disposition = write_disposition
+ self.create_disposition = create_disposition
self.allow_large_results = allow_large_results
self.bigquery_conn_id = bigquery_conn_id
self.delegate_to = delegate_to
@@ -81,5 +89,6 @@ class BigQueryOperator(BaseOperator):
conn = hook.get_conn()
cursor = conn.cursor()
cursor.run_query(self.bql, self.destination_dataset_table, self.write_disposition,
- self.allow_large_results, self.udf_config, self.use_legacy_sql,
- self.maximum_billing_tier)
+ self.allow_large_results, self.udf_config,
+ self.use_legacy_sql, self.maximum_billing_tier,
+ self.create_disposition)