You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by cr...@apache.org on 2017/10/16 20:45:42 UTC

incubator-airflow git commit: [AIRFLOW-1683] Cancel BigQuery job on timeout.

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 85127769d -> 15feb7dd3


[AIRFLOW-1683] Cancel BigQuery job on timeout.

This change causes the BigQuery job to be canceled when the
task that started it is killed, for example on execution timeout,
reducing wasted resources.

Closes #2665 from janczak10/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/15feb7dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/15feb7dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/15feb7dd

Branch: refs/heads/master
Commit: 15feb7dd3f39ba7926ae5817d488e4e54a3d7742
Parents: 8512776
Author: Maria Janczak <ja...@janczak.mtv.corp.google.com>
Authored: Mon Oct 16 13:45:17 2017 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Mon Oct 16 13:45:26 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py         | 54 +++++++++++++++++++--
 airflow/contrib/operators/bigquery_operator.py | 21 +++++---
 tests/contrib/hooks/test_bigquery_hook.py      | 28 +++++++++++
 3 files changed, 92 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15feb7dd/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index 5fc7e22..2cfef57 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -185,6 +185,7 @@ class BigQueryBaseCursor(LoggingMixin):
     def __init__(self, service, project_id):
         self.service = service
         self.project_id = project_id
+        self.running_job_id = None
 
     def run_query(
             self, bql, destination_dataset_table = False,
@@ -559,13 +560,13 @@ class BigQueryBaseCursor(LoggingMixin):
         query_reply = jobs \
             .insert(projectId=self.project_id, body=job_data) \
             .execute()
-        job_id = query_reply['jobReference']['jobId']
+        self.running_job_id = query_reply['jobReference']['jobId']
 
         # Wait for query to finish.
         keep_polling_job = True
         while (keep_polling_job):
             try:
-                job = jobs.get(projectId=self.project_id, jobId=job_id).execute()
+                job = jobs.get(projectId=self.project_id, jobId=self.running_job_id).execute()
                 if (job['status']['state'] == 'DONE'):
                     keep_polling_job = False
                     # Check if job had errors.
@@ -576,18 +577,61 @@ class BigQueryBaseCursor(LoggingMixin):
                             )
                         )
                 else:
-                    self.log.info('Waiting for job to complete : %s, %s', self.project_id, job_id)
+                    self.log.info('Waiting for job to complete : %s, %s', self.project_id, self.running_job_id)
                     time.sleep(5)
 
             except HttpError as err:
                 if err.resp.status in [500, 503]:
-                    self.log.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, job_id)
+                    self.log.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, self.running_job_id)
                     time.sleep(5)
                 else:
                     raise Exception(
                         'BigQuery job status check failed. Final error was: %s', err.resp.status)
 
-        return job_id
+        return self.running_job_id
+        
+    def poll_job_complete(self, job_id):
+        jobs = self.service.jobs()
+        try:
+            job = jobs.get(projectId=self.project_id, jobId=job_id).execute()
+            if (job['status']['state'] == 'DONE'):
+                return True
+        except HttpError as err:
+            if err.resp.status in [500, 503]:
+                self.log.info('%s: Retryable error while polling job with id %s', err.resp.status, job_id)
+            else:
+                raise Exception(
+                    'BigQuery job status check failed. Final error was: %s', err.resp.status)
+        return False
+      
+        
+    def cancel_query(self):
+        """
+        Cancel all started queries that have not yet completed
+        """
+        jobs = self.service.jobs()
+        if (self.running_job_id and not self.poll_job_complete(self.running_job_id)):
+            self.log.info('Attempting to cancel job : %s, %s', self.project_id, self.running_job_id)
+            jobs.cancel(projectId=self.project_id, jobId=self.running_job_id).execute()
+        else:
+            self.log.info('No running BigQuery jobs to cancel.')
+            return
+        
+        # Wait for all the calls to cancel to finish
+        max_polling_attempts = 12
+        polling_attempts = 0
+        
+        job_complete = False
+        while (polling_attempts < max_polling_attempts and not job_complete):
+            polling_attempts = polling_attempts+1
+            job_complete = self.poll_job_complete(self.running_job_id)
+            if (job_complete):
+                self.log.info('Job successfully canceled: %s, %s', self.project_id, self.running_job_id)
+            elif(polling_attempts == max_polling_attempts):
+                self.log.info('Stopping polling due to timeout. Job with id %s has not completed cancel and may or may not finish.', self.running_job_id)
+            else:
+                self.log.info('Waiting for canceled job with id %s to finish.', self.running_job_id)
+                time.sleep(5)
 
     def get_schema(self, dataset_id, table_id):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15feb7dd/airflow/contrib/operators/bigquery_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py
index a2ba824..19efa55 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -85,14 +85,23 @@ class BigQueryOperator(BaseOperator):
         self.use_legacy_sql = use_legacy_sql
         self.maximum_billing_tier = maximum_billing_tier
         self.query_params = query_params
+        self.bq_cursor = None
 
     def execute(self, context):
-        self.log.info('Executing: %s', self.bql)
-        hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
-                            delegate_to=self.delegate_to)
-        conn = hook.get_conn()
-        cursor = conn.cursor()
-        cursor.run_query(self.bql, self.destination_dataset_table, self.write_disposition,
+        if(self.bq_cursor == None):
+            self.log.info('Executing: %s', self.bql)
+            hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
+                                delegate_to=self.delegate_to)
+            conn = hook.get_conn()
+            self.bq_cursor = conn.cursor()
+        self.bq_cursor.run_query(self.bql, self.destination_dataset_table, self.write_disposition,
                          self.allow_large_results, self.udf_config,
                          self.use_legacy_sql, self.maximum_billing_tier,
                          self.create_disposition, self.query_params)
+        
+                         
+    def on_kill(self):
+        super(BigQueryOperator, self).on_kill()
+        if(self.bq_cursor!=None):
+            self.log.info('Canceling running query due to execution timeout')
+            self.bq_cursor.cancel_query()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15feb7dd/tests/contrib/hooks/test_bigquery_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_bigquery_hook.py b/tests/contrib/hooks/test_bigquery_hook.py
index 2fe9ddb..0365bba 100644
--- a/tests/contrib/hooks/test_bigquery_hook.py
+++ b/tests/contrib/hooks/test_bigquery_hook.py
@@ -14,6 +14,7 @@
 #
 
 import unittest
+import mock
 
 from airflow.contrib.hooks import bigquery_hook as hook
 from oauth2client.contrib.gce import HttpAccessTokenRefreshError
@@ -163,6 +164,14 @@ class TestBigQueryHookSourceFormat(unittest.TestCase):
         # since we passed 'json' in, and it's not valid, make sure it's present in the error string.
         self.assertIn("JSON", str(context.exception))
 
+# Helpers to test_cancel_queries that have mock_poll_job_complete returning false, unless mock_job_cancel was called with the same job_id
+mock_canceled_jobs = []
+def mock_poll_job_complete(job_id):
+    return job_id in mock_canceled_jobs
+
+def mock_job_cancel(projectId, jobId):
+    mock_canceled_jobs.append(jobId)
+    return mock.Mock()
 
 class TestBigQueryBaseCursor(unittest.TestCase):
     def test_invalid_schema_update_options(self):
@@ -185,6 +194,25 @@ class TestBigQueryBaseCursor(unittest.TestCase):
                 write_disposition='WRITE_EMPTY'
             )
         self.assertIn("schema_update_options is only", str(context.exception))
+    
+    @mock.patch("airflow.contrib.hooks.bigquery_hook.LoggingMixin")
+    @mock.patch("airflow.contrib.hooks.bigquery_hook.time")
+    def test_cancel_queries(self, mocked_logging, mocked_time):
+        project_id = 12345
+        running_job_id = 3
+        
+        mock_jobs = mock.Mock()
+        mock_jobs.cancel = mock.Mock(side_effect=mock_job_cancel)
+        mock_service = mock.Mock()
+        mock_service.jobs = mock.Mock(return_value=mock_jobs)
+        
+        bq_hook = hook.BigQueryBaseCursor(mock_service, project_id)
+        bq_hook.running_job_id = running_job_id
+        bq_hook.poll_job_complete = mock.Mock(side_effect=mock_poll_job_complete)
+        
+        bq_hook.cancel_query()
+        
+        mock_jobs.cancel.assert_called_with(projectId=project_id, jobId=running_job_id)
 
 if __name__ == '__main__':
     unittest.main()