You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/03/06 21:46:51 UTC

[airflow] 07/37: Fix #28391 manual task trigger from UI fails for k8s executor (#28394)

This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2a19dd7741e76eb45be87a7e55d0ea1c736b73dd
Author: sanjayp <sa...@gmail.com>
AuthorDate: Tue Jan 24 09:18:45 2023 -0600

    Fix #28391 manual task trigger from UI fails for k8s executor (#28394)
    
    Manual task trigger from UI fails for k8s executor. the executor.job_id
    is currently set to "manual". the task instance queued_by_job_id field
    is expected to be None|Integer. this causes the filter query in
    clear_not_launched_queued_tasks method in kubernetes_executor to fail
    with psycopg2.errors.InvalidTextRepresentation invalid input syntax for integer: "manual" error.
    
    setting the job_id to None fixes the issue.
    
    (cherry picked from commit 9510043546d1ac8ac56b67bafa537e4b940d68a4)
---
 airflow/cli/commands/task_command.py     | 2 +-
 airflow/executors/kubernetes_executor.py | 2 --
 airflow/www/views.py                     | 2 +-
 3 files changed, 2 insertions(+), 4 deletions(-)

diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py
index 2f37579c35..b514754f65 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -218,7 +218,7 @@ def _run_task_by_executor(args, dag, ti):
             print(e)
             raise e
     executor = ExecutorLoader.get_default_executor()
-    executor.job_id = "manual"
+    executor.job_id = None
     executor.start()
     print("Sending to executor.")
     executor.queue_task_instance(
diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py
index 28f720f35e..7dcba12968 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -526,8 +526,6 @@ class KubernetesExecutor(BaseExecutor):
     def start(self) -> None:
         """Starts the executor"""
         self.log.info("Start Kubernetes executor")
-        if not self.job_id:
-            raise AirflowException("Could not get scheduler_job_id")
         self.scheduler_job_id = str(self.job_id)
         self.log.debug("Start with scheduler_job_id: %s", self.scheduler_job_id)
         self.kube_client = get_kube_client()
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 7e8abd02a4..85e4f710cb 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1795,7 +1795,7 @@ class Airflow(AirflowBaseView):
             msg = f"Could not queue task instance for execution, dependencies not met: {failed_deps_str}"
             return redirect_or_json(origin, msg, "error", 400)
 
-        executor.job_id = "manual"
+        executor.job_id = None
         executor.start()
         executor.queue_task_instance(
             ti,