You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/01/06 08:34:00 UTC

[GitHub] yohei1126 closed pull request #4409: WIP [AIRFLOW-3601] add location support to BigQuery operators

yohei1126 closed pull request #4409: WIP [AIRFLOW-3601] add location support to BigQuery operators
URL: https://github.com/apache/airflow/pull/4409
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index 6dabd3ea4a..a1c71b9073 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -277,6 +277,9 @@ def create_empty_table(self,
             }
         }
 
+        if self.location:
+            table_resource['location'] = self.location
+
         if schema_fields:
             table_resource['schema'] = {'fields': schema_fields}
 
@@ -487,6 +490,12 @@ def create_external_table(self,
         if labels:
             table_resource['labels'] = labels
 
+        if self.location:
+            table_resource['location'] = self.location
+
+        self.log.info('inserting an external table: %s, body = %s',
+                      external_project_dataset_table, table_resource)
+
         try:
             self.service.tables().insert(
                 projectId=project_id,
@@ -1443,6 +1452,9 @@ def create_empty_dataset(self, dataset_id="", project_id="",
                     param_name, param,
                     dataset_reference['datasetReference'], 'dataset_reference')
 
+        if self.location:
+            dataset_reference['location'] = self.location
+
         dataset_id = dataset_reference.get("datasetReference").get("datasetId")
         dataset_project_id = dataset_reference.get("datasetReference").get(
             "projectId")
diff --git a/airflow/contrib/operators/bigquery_check_operator.py b/airflow/contrib/operators/bigquery_check_operator.py
index 247a1ae7fb..c23f1e84ac 100644
--- a/airflow/contrib/operators/bigquery_check_operator.py
+++ b/airflow/contrib/operators/bigquery_check_operator.py
@@ -58,6 +58,10 @@ class BigQueryCheckOperator(CheckOperator):
     :param use_legacy_sql: Whether to use legacy SQL (true)
         or standard SQL (false).
     :type use_legacy_sql: bool
+    :param location: The geographic location of the job. Required except for
+        US and EU. See details at
+        https://cloud.google.com/bigquery/docs/locations#specifying_your_location
+    :type location: str
     """
 
     @apply_defaults
@@ -65,15 +69,18 @@ def __init__(self,
                  sql,
                  bigquery_conn_id='bigquery_default',
                  use_legacy_sql=True,
+                 location=None,
                  *args, **kwargs):
         super(BigQueryCheckOperator, self).__init__(sql=sql, *args, **kwargs)
         self.bigquery_conn_id = bigquery_conn_id
         self.sql = sql
         self.use_legacy_sql = use_legacy_sql
+        self.location = location
 
     def get_db_hook(self):
         return BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
-                            use_legacy_sql=self.use_legacy_sql)
+                            use_legacy_sql=self.use_legacy_sql,
+                            location=self.location)
 
 
 class BigQueryValueCheckOperator(ValueCheckOperator):
diff --git a/airflow/contrib/operators/bigquery_get_data.py b/airflow/contrib/operators/bigquery_get_data.py
index f5e6e50f06..0ceb0959d4 100644
--- a/airflow/contrib/operators/bigquery_get_data.py
+++ b/airflow/contrib/operators/bigquery_get_data.py
@@ -66,6 +66,10 @@ class BigQueryGetDataOperator(BaseOperator):
         For this to work, the service account making the request must have domain-wide
         delegation enabled.
     :type delegate_to: str
+    :param location: The geographic location of the job. Required except for
+        US and EU. See details at
+        https://cloud.google.com/bigquery/docs/locations#specifying_your_location
+    :type location: str
     """
     template_fields = ('dataset_id', 'table_id', 'max_results')
     ui_color = '#e4f0e8'
@@ -78,6 +82,7 @@ def __init__(self,
                  selected_fields=None,
                  bigquery_conn_id='bigquery_default',
                  delegate_to=None,
+                 location=None,
                  *args,
                  **kwargs):
         super(BigQueryGetDataOperator, self).__init__(*args, **kwargs)
@@ -87,6 +92,7 @@ def __init__(self,
         self.selected_fields = selected_fields
         self.bigquery_conn_id = bigquery_conn_id
         self.delegate_to = delegate_to
+        self.location = location
 
     def execute(self, context):
         self.log.info('Fetching Data from:')
@@ -94,7 +100,8 @@ def execute(self, context):
                       self.dataset_id, self.table_id, self.max_results)
 
         hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
-                            delegate_to=self.delegate_to)
+                            delegate_to=self.delegate_to,
+                            location=self.location)
 
         conn = hook.get_conn()
         cursor = conn.cursor()
diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py
index f597db93a5..d2c9b3f6fb 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -278,7 +278,10 @@ class BigQueryCreateEmptyTableOperator(BaseOperator):
                 google_cloud_storage_conn_id='airflow-service-account'
             )
     :type labels: dict
-
+    :param location: The geographic location of the job. Required except for
+        US and EU. See details at
+        https://cloud.google.com/bigquery/docs/locations#specifying_your_location
+    :type location: str
     """
     template_fields = ('dataset_id', 'table_id', 'project_id',
                        'gcs_schema_object', 'labels')
@@ -296,6 +299,7 @@ def __init__(self,
                  google_cloud_storage_conn_id='google_cloud_default',
                  delegate_to=None,
                  labels=None,
+                 location=None,
                  *args, **kwargs):
 
         super(BigQueryCreateEmptyTableOperator, self).__init__(*args, **kwargs)
@@ -310,10 +314,12 @@ def __init__(self,
         self.delegate_to = delegate_to
         self.time_partitioning = {} if time_partitioning is None else time_partitioning
         self.labels = labels
+        self.location = location
 
     def execute(self, context):
         bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
-                               delegate_to=self.delegate_to)
+                               delegate_to=self.delegate_to,
+                               location=self.location)
 
         if not self.schema_fields and self.gcs_schema_object:
 
@@ -412,6 +418,10 @@ class BigQueryCreateExternalTableOperator(BaseOperator):
     :type src_fmt_configs: dict
     :param labels: a dictionary containing labels for the table, passed to BigQuery
     :type labels: dict
+    :param location: The geographic location of the job. Required except for
+        US and EU. See details at
+        https://cloud.google.com/bigquery/docs/locations#specifying_your_location
+    :type location: str
     """
     template_fields = ('bucket', 'source_objects',
                        'schema_object', 'destination_project_dataset_table', 'labels')
@@ -437,6 +447,7 @@ def __init__(self,
                  delegate_to=None,
                  src_fmt_configs={},
                  labels=None,
+                 location=None,
                  *args, **kwargs):
 
         super(BigQueryCreateExternalTableOperator, self).__init__(*args, **kwargs)
@@ -464,10 +475,12 @@ def __init__(self,
 
         self.src_fmt_configs = src_fmt_configs
         self.labels = labels
+        self.location = location
 
     def execute(self, context):
         bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
-                               delegate_to=self.delegate_to)
+                               delegate_to=self.delegate_to,
+                               location=self.location)
 
         if not self.schema_fields and self.schema_object \
                 and self.source_format != 'DATASTORE_BACKUP':
@@ -511,7 +524,10 @@ class BigQueryDeleteDatasetOperator(BaseOperator):
     :type project_id: str
     :param dataset_id: The dataset to be deleted.
     :type dataset_id: str
-
+    :param location: The geographic location of the job. Required except for
+        US and EU. See details at
+        https://cloud.google.com/bigquery/docs/locations#specifying_your_location
+    :type location: str
     **Example**: ::
 
         delete_temp_data = BigQueryDeleteDatasetOperator(dataset_id = 'temp-dataset',
@@ -530,11 +546,13 @@ def __init__(self,
                  project_id=None,
                  bigquery_conn_id='bigquery_default',
                  delegate_to=None,
+                 location=None,
                  *args, **kwargs):
         self.dataset_id = dataset_id
         self.project_id = project_id
         self.bigquery_conn_id = bigquery_conn_id
         self.delegate_to = delegate_to
+        self.location = location
 
         self.log.info('Dataset id: %s', self.dataset_id)
         self.log.info('Project id: %s', self.project_id)
@@ -543,7 +561,8 @@ def __init__(self,
 
     def execute(self, context):
         bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
-                               delegate_to=self.delegate_to)
+                               delegate_to=self.delegate_to,
+                               location=self.location)
 
         conn = bq_hook.get_conn()
         cursor = conn.cursor()
@@ -580,6 +599,10 @@ class BigQueryCreateEmptyDatasetOperator(BaseOperator):
                                     task_id='newDatasetCreator',
                                     dag=dag)
 
+    :param location: The geographic location of the job. Required except for
+        US and EU. See details at
+        https://cloud.google.com/bigquery/docs/locations#specifying_your_location
+    :type location: str
     """
 
     template_fields = ('dataset_id', 'project_id')
@@ -592,12 +615,14 @@ def __init__(self,
                  dataset_reference=None,
                  bigquery_conn_id='bigquery_default',
                  delegate_to=None,
+                 location=None,
                  *args, **kwargs):
         self.dataset_id = dataset_id
         self.project_id = project_id
         self.bigquery_conn_id = bigquery_conn_id
         self.dataset_reference = dataset_reference if dataset_reference else {}
         self.delegate_to = delegate_to
+        self.location = location
 
         self.log.info('Dataset id: %s', self.dataset_id)
         self.log.info('Project id: %s', self.project_id)
@@ -606,7 +631,8 @@ def __init__(self,
 
     def execute(self, context):
         bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
-                               delegate_to=self.delegate_to)
+                               delegate_to=self.delegate_to,
+                               location=self.location)
 
         conn = bq_hook.get_conn()
         cursor = conn.cursor()
diff --git a/airflow/contrib/operators/bigquery_table_delete_operator.py b/airflow/contrib/operators/bigquery_table_delete_operator.py
index 45c481454e..0d530e5676 100644
--- a/airflow/contrib/operators/bigquery_table_delete_operator.py
+++ b/airflow/contrib/operators/bigquery_table_delete_operator.py
@@ -39,6 +39,10 @@ class BigQueryTableDeleteOperator(BaseOperator):
     :param ignore_if_missing: if True, then return success even if the
         requested table does not exist.
     :type ignore_if_missing: bool
+    :param location: The geographic location of the job. Required except for
+        US and EU. See details at
+        https://cloud.google.com/bigquery/docs/locations#specifying_your_location
+    :type location: str
     """
     template_fields = ('deletion_dataset_table',)
     ui_color = '#ffd1dc'
@@ -49,6 +53,7 @@ def __init__(self,
                  bigquery_conn_id='bigquery_default',
                  delegate_to=None,
                  ignore_if_missing=False,
+                 location=None,
                  *args,
                  **kwargs):
         super(BigQueryTableDeleteOperator, self).__init__(*args, **kwargs)
@@ -56,11 +61,13 @@ def __init__(self,
         self.bigquery_conn_id = bigquery_conn_id
         self.delegate_to = delegate_to
         self.ignore_if_missing = ignore_if_missing
+        self.location = location
 
     def execute(self, context):
         self.log.info('Deleting: %s', self.deletion_dataset_table)
         hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
-                            delegate_to=self.delegate_to)
+                            delegate_to=self.delegate_to,
+                            location=self.location)
         conn = hook.get_conn()
         cursor = conn.cursor()
         cursor.run_table_delete(self.deletion_dataset_table, self.ignore_if_missing)
diff --git a/airflow/contrib/operators/bigquery_to_bigquery.py b/airflow/contrib/operators/bigquery_to_bigquery.py
index 2073cadef1..da5b9f353c 100644
--- a/airflow/contrib/operators/bigquery_to_bigquery.py
+++ b/airflow/contrib/operators/bigquery_to_bigquery.py
@@ -52,6 +52,10 @@ class BigQueryToBigQueryOperator(BaseOperator):
     :param labels: a dictionary containing labels for the job/query,
         passed to BigQuery
     :type labels: dict
+    :param location: The geographic location of the job. Required except for
+        US and EU. See details at
+        https://cloud.google.com/bigquery/docs/locations#specifying_your_location
+    :type location: str
     """
     template_fields = ('source_project_dataset_tables',
                        'destination_project_dataset_table', 'labels')
@@ -67,6 +71,7 @@ def __init__(self,
                  bigquery_conn_id='bigquery_default',
                  delegate_to=None,
                  labels=None,
+                 location=None,
                  *args,
                  **kwargs):
         super(BigQueryToBigQueryOperator, self).__init__(*args, **kwargs)
@@ -77,6 +82,7 @@ def __init__(self,
         self.bigquery_conn_id = bigquery_conn_id
         self.delegate_to = delegate_to
         self.labels = labels
+        self.location = location
 
     def execute(self, context):
         self.log.info(
@@ -84,7 +90,8 @@ def execute(self, context):
             self.source_project_dataset_tables, self.destination_project_dataset_table
         )
         hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
-                            delegate_to=self.delegate_to)
+                            delegate_to=self.delegate_to,
+                            location=self.location)
         conn = hook.get_conn()
         cursor = conn.cursor()
         cursor.run_copy(
diff --git a/airflow/contrib/operators/bigquery_to_gcs.py b/airflow/contrib/operators/bigquery_to_gcs.py
index ec6b937d21..c6dc72986d 100644
--- a/airflow/contrib/operators/bigquery_to_gcs.py
+++ b/airflow/contrib/operators/bigquery_to_gcs.py
@@ -57,6 +57,10 @@ class BigQueryToCloudStorageOperator(BaseOperator):
     :param labels: a dictionary containing labels for the job/query,
         passed to BigQuery
     :type labels: dict
+    :param location: The geographic location of the job. Required except for
+        US and EU. See details at
+        https://cloud.google.com/bigquery/docs/locations#specifying_your_location
+    :type location: str
     """
     template_fields = ('source_project_dataset_table',
                        'destination_cloud_storage_uris', 'labels')
@@ -74,6 +78,7 @@ def __init__(self,
                  bigquery_conn_id='bigquery_default',
                  delegate_to=None,
                  labels=None,
+                 location=None,
                  *args,
                  **kwargs):
         super(BigQueryToCloudStorageOperator, self).__init__(*args, **kwargs)
@@ -86,13 +91,15 @@ def __init__(self,
         self.bigquery_conn_id = bigquery_conn_id
         self.delegate_to = delegate_to
         self.labels = labels
+        self.location = location
 
     def execute(self, context):
         self.log.info('Executing extract of %s into: %s',
                       self.source_project_dataset_table,
                       self.destination_cloud_storage_uris)
         hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
-                            delegate_to=self.delegate_to)
+                            delegate_to=self.delegate_to,
+                            location=self.location)
         conn = hook.get_conn()
         cursor = conn.cursor()
         cursor.run_extract(
diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py
index abbf380b2d..20be6d70c0 100644
--- a/airflow/contrib/operators/gcs_to_bq.py
+++ b/airflow/contrib/operators/gcs_to_bq.py
@@ -119,6 +119,10 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
         time_partitioning. The order of columns given determines the sort order.
         Not applicable for external tables.
     :type cluster_fields: list of str
+    :param location: The geographic location of the job. Required except for
+        US and EU. See details at
+        https://cloud.google.com/bigquery/docs/locations#specifying_your_location
+    :type location: str
     """
     template_fields = ('bucket', 'source_objects',
                        'schema_object', 'destination_project_dataset_table')
@@ -153,6 +157,7 @@ def __init__(self,
                  time_partitioning=None,
                  cluster_fields=None,
                  autodetect=False,
+                 location=None,
                  *args, **kwargs):
 
         super(GoogleCloudStorageToBigQueryOperator, self).__init__(*args, **kwargs)
@@ -192,10 +197,12 @@ def __init__(self,
         self.time_partitioning = time_partitioning
         self.cluster_fields = cluster_fields
         self.autodetect = autodetect
+        self.location = location
 
     def execute(self, context):
         bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
-                               delegate_to=self.delegate_to)
+                               delegate_to=self.delegate_to,
+                               location=self.location)
 
         if not self.schema_fields:
             if self.schema_object and self.source_format != 'DATASTORE_BACKUP':
diff --git a/airflow/contrib/sensors/bigquery_sensor.py b/airflow/contrib/sensors/bigquery_sensor.py
index fe8bd2ed6a..170c7df024 100644
--- a/airflow/contrib/sensors/bigquery_sensor.py
+++ b/airflow/contrib/sensors/bigquery_sensor.py
@@ -41,6 +41,10 @@ class BigQueryTableSensor(BaseSensorOperator):
         For this to work, the service account making the request must
         have domain-wide delegation enabled.
     :type delegate_to: str
+    :param location: The geographic location of the job. Required except for
+        US and EU. See details at
+        https://cloud.google.com/bigquery/docs/locations#specifying_your_location
+    :type location: str
     """
     template_fields = ('project_id', 'dataset_id', 'table_id',)
     ui_color = '#f0eee4'
@@ -52,6 +56,7 @@ def __init__(self,
                  table_id,
                  bigquery_conn_id='bigquery_default_conn',
                  delegate_to=None,
+                 location=None,
                  *args, **kwargs):
 
         super(BigQueryTableSensor, self).__init__(*args, **kwargs)
@@ -60,11 +65,13 @@ def __init__(self,
         self.table_id = table_id
         self.bigquery_conn_id = bigquery_conn_id
         self.delegate_to = delegate_to
+        self.location = location
 
     def poke(self, context):
         table_uri = '{0}:{1}.{2}'.format(self.project_id, self.dataset_id, self.table_id)
         self.log.info('Sensor checks existence of table: %s', table_uri)
         hook = BigQueryHook(
             bigquery_conn_id=self.bigquery_conn_id,
-            delegate_to=self.delegate_to)
+            delegate_to=self.delegate_to,
+            location=self.location)
         return hook.table_exists(self.project_id, self.dataset_id, self.table_id)
diff --git a/tests/contrib/hooks/test_bigquery_hook.py b/tests/contrib/hooks/test_bigquery_hook.py
index 244574e5e3..04071de149 100644
--- a/tests/contrib/hooks/test_bigquery_hook.py
+++ b/tests/contrib/hooks/test_bigquery_hook.py
@@ -191,6 +191,46 @@ def test_invalid_source_format(self):
         # error string.
         self.assertIn("JSON", str(context.exception))
 
+    def test_table_location(self):
+        project_id = "test_project"
+        dataset_id = "test_dataset"
+        external_project_dataset_table = "{}.{}".format(project_id, dataset_id)
+        schema_fields = [{
+            "name": "test_name",
+            "type": "string",
+            "mode": "REQUIRED",
+            "description": "test_description"
+        }]
+        source_uris = ["gs://test_bucket/test_data.csv"]
+        location = "asia-northeast1"
+
+        mock_service = mock.Mock()
+        method = mock_service.tables.return_value.insert
+        cursor = hook.BigQueryBaseCursor("test", project_id, location=location)
+        cursor.create_external_table(external_project_dataset_table, schema_fields, source_uris)
+        body = {
+            "schema": {
+                "fields": schema_fields,
+            },
+            "externalDataConfiguration": {
+                "autodetect": False,
+                "sourceFormat": "CSV",
+                "sourceUris": source_uris,
+                "compression": "NONE",
+                "ignoreUnknownValues": False,
+            },
+            "tableReference": {
+                "projectId": project_id,
+                "datasetId": dataset_id,
+                "tableId": 'test_table',
+            },
+            "location": location
+        }
+        method.assert_called_once_with(
+            projectId=project_id,
+            datasetId=dataset_id,
+            body=body)
+
 
 # Helpers to test_cancel_queries that have mock_poll_job_complete returning false,
 # unless mock_job_cancel was called with the same job_id
@@ -380,6 +420,24 @@ def test_create_view(self):
         }
         method.assert_called_once_with(projectId=project_id, datasetId=dataset_id, body=body)
 
+    def test_create_empty_table_with_location(self):
+        project_id = 'bq-project'
+        dataset_id = 'bq_dataset'
+        table_id = 'bq_table_location'
+        location = 'asia-northeast1'
+
+        mock_service = mock.Mock()
+        method = mock_service.tables.return_value.insert
+        cursor = hook.BigQueryBaseCursor(mock_service, project_id, location=location)
+        cursor.create_empty_table(project_id, dataset_id, table_id)
+        body = {
+            'tableReference': {
+                'tableId': table_id
+            },
+            'location': location
+        }
+        method.assert_called_once_with(projectId=project_id, datasetId=dataset_id, body=body)
+
 
 class TestBigQueryCursor(unittest.TestCase):
     @mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration')
diff --git a/tests/contrib/operators/test_bigquery_operator.py b/tests/contrib/operators/test_bigquery_operator.py
index 84f1750bed..36aab5ca9b 100644
--- a/tests/contrib/operators/test_bigquery_operator.py
+++ b/tests/contrib/operators/test_bigquery_operator.py
@@ -48,6 +48,7 @@
 TEST_SOURCE_FORMAT = 'CSV'
 DEFAULT_DATE = datetime(2015, 1, 1)
 TEST_DAG_ID = 'test-bigquery-operators'
+TEST_LOCATION = 'asia-northeast1'
 
 
 class BigQueryCreateEmptyTableOperatorTest(unittest.TestCase):
@@ -57,12 +58,12 @@ def test_execute(self, mock_hook):
         operator = BigQueryCreateEmptyTableOperator(task_id=TASK_ID,
                                                     dataset_id=TEST_DATASET,
                                                     project_id=TEST_PROJECT_ID,
-                                                    table_id=TEST_TABLE_ID)
+                                                    table_id=TEST_TABLE_ID,
+                                                    location=TEST_LOCATION)
 
         operator.execute(None)
-        mock_hook.return_value \
-            .get_conn() \
-            .cursor() \
+        bq_cursor = mock_hook.return_value.get_conn().cursor()
+        bq_cursor \
             .create_empty_table \
             .assert_called_once_with(
                 dataset_id=TEST_DATASET,
@@ -72,6 +73,7 @@ def test_execute(self, mock_hook):
                 time_partitioning={},
                 labels=None
             )
+        self.assertEquals(bq_cursor.location, TEST_LOCATION)
 
 
 class BigQueryCreateExternalTableOperatorTest(unittest.TestCase):
@@ -86,14 +88,13 @@ def test_execute(self, mock_hook):
             schema_fields=[],
             bucket=TEST_GCS_BUCKET,
             source_objects=TEST_GCS_DATA,
-            source_format=TEST_SOURCE_FORMAT
+            source_format=TEST_SOURCE_FORMAT,
+            location=TEST_LOCATION,
         )
 
         operator.execute(None)
-        mock_hook.return_value \
-            .get_conn() \
-            .cursor() \
-            .create_external_table \
+        bq_cursor = mock_hook.return_value.get_conn().cursor()
+        bq_cursor.create_external_table \
             .assert_called_once_with(
                 external_project_dataset_table='{}.{}'.format(
                     TEST_DATASET, TEST_TABLE_ID
@@ -112,6 +113,7 @@ def test_execute(self, mock_hook):
                 src_fmt_configs={},
                 labels=None
             )
+        self.assertEquals(bq_cursor.location, TEST_LOCATION)
 
 
 class BigQueryDeleteDatasetOperatorTest(unittest.TestCase):
@@ -120,18 +122,18 @@ def test_execute(self, mock_hook):
         operator = BigQueryDeleteDatasetOperator(
             task_id=TASK_ID,
             dataset_id=TEST_DATASET,
-            project_id=TEST_PROJECT_ID
+            project_id=TEST_PROJECT_ID,
+            location=TEST_LOCATION,
         )
 
         operator.execute(None)
-        mock_hook.return_value \
-            .get_conn() \
-            .cursor() \
-            .delete_dataset \
+        bq_cursor = mock_hook.return_value.get_conn().cursor()
+        bq_cursor.delete_dataset \
             .assert_called_once_with(
                 dataset_id=TEST_DATASET,
                 project_id=TEST_PROJECT_ID
             )
+        self.assertEquals(bq_cursor.location, TEST_LOCATION)
 
 
 class BigQueryCreateEmptyDatasetOperatorTest(unittest.TestCase):
@@ -257,3 +259,15 @@ def test_bigquery_operator_defaults(self, mock_hook):
         ti = TaskInstance(task=operator, execution_date=DEFAULT_DATE)
         ti.render_templates()
         self.assertTrue(isinstance(ti.task.sql, six.string_types))
+
+    @mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook')
+    def test_bigquery_operator_location(self, mock_hook):
+        operator = BigQueryOperator(
+            task_id=TASK_ID,
+            sql='Select * from test_table',
+            location=TEST_LOCATION,
+        )
+
+        operator.execute(None)
+        bq_cursor = mock_hook.return_value.get_conn().cursor()
+        self.assertEquals(bq_cursor.location, TEST_LOCATION)
diff --git a/tests/www_rbac/test_views.py b/tests/www_rbac/test_views.py
index 062b7c2abf..0c811e91cf 100644
--- a/tests/www_rbac/test_views.py
+++ b/tests/www_rbac/test_views.py
@@ -1470,7 +1470,15 @@ def test_trigger_dag_button(self):
         self.session.query(DR).delete()
         self.session.commit()
 
+<<<<<<< HEAD
+<<<<<<< HEAD
         self.client.get('trigger?dag_id={}'.format(test_dag_id))
+=======
+        resp = self.client.get('trigger?dag_id={}'.format(test_dag_id))
+>>>>>>> 548f38a8... [AIRFLOW-3600] Remove dagbag from trigger (#4407)
+=======
+        self.client.get('trigger?dag_id={}'.format(test_dag_id))
+>>>>>>> 0d5c127d... [AIRFLOW-3606] Fix Flake8 test & fix the Flake8 errors introduced since Flake8 test was broken (#4415)
 
         run = self.session.query(DR).filter(DR.dag_id == test_dag_id).first()
         self.assertIsNotNone(run)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services