You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/03/06 21:46:52 UTC

[airflow] 08/37: introduce dag processor job (#28799)

This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 13ce6725d464debc1397589e99b1ebc9e72d439d
Author: Farhan Syakir <fa...@gmail.com>
AuthorDate: Tue Feb 21 11:54:52 2023 +0200

    introduce dag processor job (#28799)
    
    DagFileProcessorManager was not creating jobs in the metadata DB, so the livenessProbe was not valid.
    A new is created for the Standalone DAG Processor.
    By doing that, the airflow jobs check --hostname command would work correctly and the livenessProbe wouldn't fail
    
    (cherry picked from commit 0018b94a4a5f846fc87457e9393ca953ba0b5ec6)
---
 airflow/cli/commands/dag_processor_command.py    | 18 +++----
 airflow/jobs/dag_processor_job.py                | 69 ++++++++++++++++++++++++
 tests/cli/commands/test_dag_processor_command.py | 13 ++---
 3 files changed, 83 insertions(+), 17 deletions(-)

diff --git a/airflow/cli/commands/dag_processor_command.py b/airflow/cli/commands/dag_processor_command.py
index f8ce65663b..d96fb4f06f 100644
--- a/airflow/cli/commands/dag_processor_command.py
+++ b/airflow/cli/commands/dag_processor_command.py
@@ -25,21 +25,21 @@ from daemon.pidfile import TimeoutPIDLockFile
 
 from airflow import settings
 from airflow.configuration import conf
-from airflow.dag_processing.manager import DagFileProcessorManager
+from airflow.jobs.dag_processor_job import DagProcessorJob
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import setup_locations, setup_logging
 
 log = logging.getLogger(__name__)
 
 
-def _create_dag_processor_manager(args) -> DagFileProcessorManager:
+def _create_dag_processor_job(args) -> DagProcessorJob:
     """Creates DagFileProcessorProcess instance."""
     processor_timeout_seconds: int = conf.getint("core", "dag_file_processor_timeout")
     processor_timeout = timedelta(seconds=processor_timeout_seconds)
-    return DagFileProcessorManager(
+    return DagProcessorJob(
+        processor_timeout=processor_timeout,
         dag_directory=args.subdir,
         max_runs=args.num_runs,
-        processor_timeout=processor_timeout,
         dag_ids=[],
         pickle_dags=args.do_pickle,
     )
@@ -55,7 +55,7 @@ def dag_processor(args):
     if sql_conn.startswith("sqlite"):
         raise SystemExit("Standalone DagProcessor is not supported when using sqlite.")
 
-    manager = _create_dag_processor_manager(args)
+    job = _create_dag_processor_job(args)
 
     if args.daemon:
         pid, stdout, stderr, log_file = setup_locations(
@@ -74,10 +74,6 @@ def dag_processor(args):
                 umask=int(settings.DAEMON_UMASK, 8),
             )
             with ctx:
-                try:
-                    manager.start()
-                finally:
-                    manager.terminate()
-                    manager.end()
+                job.run()
     else:
-        manager.start()
+        job.run()
diff --git a/airflow/jobs/dag_processor_job.py b/airflow/jobs/dag_processor_job.py
new file mode 100644
index 0000000000..70690a78db
--- /dev/null
+++ b/airflow/jobs/dag_processor_job.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import os
+from datetime import timedelta
+
+from airflow.dag_processing.manager import DagFileProcessorManager
+from airflow.jobs.base_job import BaseJob
+
+
+class DagProcessorJob(BaseJob):
+    """
+    :param dag_directory: Directory where DAG definitions are kept. All
+        files in file_paths should be under this directory
+    :param max_runs: The number of times to parse and schedule each file. -1
+        for unlimited.
+    :param processor_timeout: How long to wait before timing out a DAG file processor
+    :param dag_ids: if specified, only schedule tasks with these DAG IDs
+    :param pickle_dags: whether to pickle DAGs.
+    :param async_mode: Whether to start agent in async mode
+    """
+
+    __mapper_args__ = {"polymorphic_identity": "DagProcessorJob"}
+
+    def __init__(
+        self,
+        dag_directory: os.PathLike,
+        max_runs: int,
+        processor_timeout: timedelta,
+        dag_ids: list[str] | None,
+        pickle_dags: bool,
+        *args,
+        **kwargs,
+    ):
+        self.processor = DagFileProcessorManager(
+            dag_directory=dag_directory,
+            max_runs=max_runs,
+            processor_timeout=processor_timeout,
+            dag_ids=dag_ids,
+            pickle_dags=pickle_dags,
+        )
+        super().__init__(*args, **kwargs)
+
+    def _execute(self) -> None:
+        self.log.info("Starting the Dag Processor Job")
+        try:
+            self.processor.start()
+        except Exception:
+            self.log.exception("Exception when executing DagProcessorJob")
+            raise
+        finally:
+            self.processor.terminate()
+            self.processor.end()
diff --git a/tests/cli/commands/test_dag_processor_command.py b/tests/cli/commands/test_dag_processor_command.py
index b3ff84531f..69d0dcdb02 100644
--- a/tests/cli/commands/test_dag_processor_command.py
+++ b/tests/cli/commands/test_dag_processor_command.py
@@ -42,16 +42,17 @@ class TestDagProcessorCommand:
             ("core", "load_examples"): "False",
         }
     )
-    @mock.patch("airflow.cli.commands.dag_processor_command.DagFileProcessorManager")
+    @mock.patch("airflow.cli.commands.dag_processor_command.DagProcessorJob")
     @pytest.mark.skipif(
         conf.get_mandatory_value("database", "sql_alchemy_conn").lower().startswith("sqlite"),
         reason="Standalone Dag Processor doesn't support sqlite.",
     )
-    def test_start_manager(
+    def test_start_job(
         self,
-        mock_dag_manager,
+        mock_dag_job,
     ):
         """Ensure that DagFileProcessorManager is started"""
-        args = self.parser.parse_args(["dag-processor"])
-        dag_processor_command.dag_processor(args)
-        mock_dag_manager.return_value.start.assert_called()
+        with conf_vars({("scheduler", "standalone_dag_processor"): "True"}):
+            args = self.parser.parse_args(["dag-processor"])
+            dag_processor_command.dag_processor(args)
+            mock_dag_job.return_value.run.assert_called()