You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/09 22:29:13 UTC

[GitHub] kaxil closed pull request #4274: [AIRFLOW-3438] Fix default values in BigQuery Hook & BigQueryOperator

kaxil closed pull request #4274: [AIRFLOW-3438] Fix default values in BigQuery Hook & BigQueryOperator
URL: https://github.com/apache/incubator-airflow/pull/4274
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index c7324adde4..5cab013b28 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -566,7 +566,7 @@ def run_query(self,
         :param labels a dictionary containing labels for the job/query,
             passed to BigQuery
         :type labels: dict
-        :param schema_update_options: Allows the schema of the desitination
+        :param schema_update_options: Allows the schema of the destination
             table to be updated as a side effect of the query job.
         :type schema_update_options: tuple
         :param priority: Specifies a priority for the query.
@@ -582,6 +582,9 @@ def run_query(self,
         :type cluster_fields: list of str
         """
 
+        if time_partitioning is None:
+            time_partitioning = {}
+
         if not api_resource_configs:
             api_resource_configs = self.api_resource_configs
         else:
diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py
index 7ce3102ad2..106bee8b69 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -106,13 +106,13 @@ class BigQueryOperator(BaseOperator):
     @apply_defaults
     def __init__(self,
                  sql,
-                 destination_dataset_table=False,
+                 destination_dataset_table=None,
                  write_disposition='WRITE_EMPTY',
                  allow_large_results=False,
                  flatten_results=None,
                  bigquery_conn_id='bigquery_default',
                  delegate_to=None,
-                 udf_config=False,
+                 udf_config=None,
                  use_legacy_sql=True,
                  maximum_billing_tier=None,
                  maximum_bytes_billed=None,
@@ -144,10 +144,8 @@ def __init__(self,
         self.labels = labels
         self.bq_cursor = None
         self.priority = priority
-        if time_partitioning is None:
-            self.time_partitioning = {}
-        if api_resource_configs is None:
-            self.api_resource_configs = {}
+        self.time_partitioning = time_partitioning
+        self.api_resource_configs = api_resource_configs
         self.cluster_fields = cluster_fields
 
     def execute(self, context):
@@ -160,7 +158,7 @@ def execute(self, context):
             conn = hook.get_conn()
             self.bq_cursor = conn.cursor()
         self.bq_cursor.run_query(
-            self.sql,
+            sql=self.sql,
             destination_dataset_table=self.destination_dataset_table,
             write_disposition=self.write_disposition,
             allow_large_results=self.allow_large_results,
diff --git a/tests/contrib/operators/test_bigquery_operator.py b/tests/contrib/operators/test_bigquery_operator.py
index 4e62221395..b92116a031 100644
--- a/tests/contrib/operators/test_bigquery_operator.py
+++ b/tests/contrib/operators/test_bigquery_operator.py
@@ -21,7 +21,8 @@
 
 from airflow.contrib.operators.bigquery_operator import \
     BigQueryCreateExternalTableOperator, BigQueryCreateEmptyTableOperator, \
-    BigQueryDeleteDatasetOperator, BigQueryCreateEmptyDatasetOperator
+    BigQueryDeleteDatasetOperator, BigQueryCreateEmptyDatasetOperator, \
+    BigQueryOperator
 
 try:
     from unittest import mock
@@ -143,3 +144,84 @@ def test_execute(self, mock_hook):
                 project_id=TEST_PROJECT_ID,
                 dataset_reference={}
             )
+
+
+class BigQueryOperatorTest(unittest.TestCase):
+    @mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook')
+    def test_execute(self, mock_hook):
+        operator = BigQueryOperator(
+            task_id=TASK_ID,
+            sql='Select * from test_table',
+            destination_dataset_table=None,
+            write_disposition='WRITE_EMPTY',
+            allow_large_results=False,
+            flatten_results=None,
+            bigquery_conn_id='bigquery_default',
+            udf_config=None,
+            use_legacy_sql=True,
+            maximum_billing_tier=None,
+            maximum_bytes_billed=None,
+            create_disposition='CREATE_IF_NEEDED',
+            schema_update_options=(),
+            query_params=None,
+            labels=None,
+            priority='INTERACTIVE',
+            time_partitioning=None,
+            api_resource_configs=None,
+            cluster_fields=None,
+        )
+
+        operator.execute(None)
+        mock_hook.return_value \
+            .get_conn() \
+            .cursor() \
+            .run_query \
+            .assert_called_once_with(
+                sql='Select * from test_table',
+                destination_dataset_table=None,
+                write_disposition='WRITE_EMPTY',
+                allow_large_results=False,
+                flatten_results=None,
+                udf_config=None,
+                maximum_billing_tier=None,
+                maximum_bytes_billed=None,
+                create_disposition='CREATE_IF_NEEDED',
+                schema_update_options=(),
+                query_params=None,
+                labels=None,
+                priority='INTERACTIVE',
+                time_partitioning=None,
+                api_resource_configs=None,
+                cluster_fields=None,
+            )
+
+    @mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook')
+    def test_bigquery_operator_defaults(self, mock_hook):
+        operator = BigQueryOperator(
+            task_id=TASK_ID,
+            sql='Select * from test_table',
+        )
+
+        operator.execute(None)
+        mock_hook.return_value \
+            .get_conn() \
+            .cursor() \
+            .run_query \
+            .assert_called_once_with(
+                sql='Select * from test_table',
+                destination_dataset_table=None,
+                write_disposition='WRITE_EMPTY',
+                allow_large_results=False,
+                flatten_results=None,
+                udf_config=None,
+                maximum_billing_tier=None,
+                maximum_bytes_billed=None,
+                create_disposition='CREATE_IF_NEEDED',
+                schema_update_options=(),
+                query_params=None,
+                labels=None,
+                priority='INTERACTIVE',
+                time_partitioning=None,
+                api_resource_configs=None,
+                cluster_fields=None,
+            )


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services