You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/12/30 21:38:51 UTC

[airflow] branch main updated: Fix mypy databricks operator (#20598)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0bf424f  Fix mypy databricks operator (#20598)
0bf424f is described below

commit 0bf424f37fc2786e7a74e7f1df88dc92538abbd4
Author: Kanthi <su...@gmail.com>
AuthorDate: Thu Dec 30 16:38:18 2021 -0500

    Fix mypy databricks operator (#20598)
    
    * [16185] Added LocalKubernetesExecutor to breeze supported executors
    
    * Revert "[16185] Added LocalKubernetesExecutor to breeze supported executors"
    
    This reverts commit a1c532eacfeddcbefaa3e565a0522e25315286c4.
    
    * Fixed mypy errors in databricks/operators
---
 .../providers/databricks/operators/databricks.py   | 26 +++++++++++++++-------
 1 file changed, 18 insertions(+), 8 deletions(-)

diff --git a/airflow/providers/databricks/operators/databricks.py b/airflow/providers/databricks/operators/databricks.py
index 5aa4ec8..8a0d76a 100644
--- a/airflow/providers/databricks/operators/databricks.py
+++ b/airflow/providers/databricks/operators/databricks.py
@@ -330,7 +330,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
 
         self.json = _deep_string_coerce(self.json)
         # This variable will be used in case our task gets killed.
-        self.run_id = None
+        self.run_id: Optional[int] = None
         self.do_xcom_push = do_xcom_push
 
     def _get_hook(self) -> DatabricksHook:
@@ -346,9 +346,14 @@ class DatabricksSubmitRunOperator(BaseOperator):
         _handle_databricks_operator_execution(self, hook, self.log, context)
 
     def on_kill(self):
-        hook = self._get_hook()
-        hook.cancel_run(self.run_id)
-        self.log.info('Task: %s with run_id: %s was requested to be cancelled.', self.task_id, self.run_id)
+        if self.run_id:
+            hook = self._get_hook()
+            hook.cancel_run(self.run_id)
+            self.log.info(
+                'Task: %s with run_id: %s was requested to be cancelled.', self.task_id, self.run_id
+            )
+        else:
+            self.log.error('Error: Task: %s with invalid run_id was requested to be cancelled.', self.task_id)
 
 
 class DatabricksRunNowOperator(BaseOperator):
@@ -544,7 +549,7 @@ class DatabricksRunNowOperator(BaseOperator):
 
         self.json = _deep_string_coerce(self.json)
         # This variable will be used in case our task gets killed.
-        self.run_id = None
+        self.run_id: Optional[int] = None
         self.do_xcom_push = do_xcom_push
 
     def _get_hook(self) -> DatabricksHook:
@@ -560,6 +565,11 @@ class DatabricksRunNowOperator(BaseOperator):
         _handle_databricks_operator_execution(self, hook, self.log, context)
 
     def on_kill(self):
-        hook = self._get_hook()
-        hook.cancel_run(self.run_id)
-        self.log.info('Task: %s with run_id: %s was requested to be cancelled.', self.task_id, self.run_id)
+        if self.run_id:
+            hook = self._get_hook()
+            hook.cancel_run(self.run_id)
+            self.log.info(
+                'Task: %s with run_id: %s was requested to be cancelled.', self.task_id, self.run_id
+            )
+        else:
+            self.log.error('Error: Task: %s with invalid run_id was requested to be cancelled.', self.task_id)