You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by pi...@apache.org on 2023/03/06 21:46:52 UTC
[airflow] 08/37: introduce dag processor job (#28799)
This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v2-5-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 13ce6725d464debc1397589e99b1ebc9e72d439d
Author: Farhan Syakir <fa...@gmail.com>
AuthorDate: Tue Feb 21 11:54:52 2023 +0200
introduce dag processor job (#28799)
DagFileProcessorManager was not creating jobs in the metadata DB, so the livenessProbe was not valid.
A new is created for the Standalone DAG Processor.
By doing that, the airflow jobs check --hostname command would work correctly and the livenessProbe wouldn't fail
(cherry picked from commit 0018b94a4a5f846fc87457e9393ca953ba0b5ec6)
---
airflow/cli/commands/dag_processor_command.py | 18 +++----
airflow/jobs/dag_processor_job.py | 69 ++++++++++++++++++++++++
tests/cli/commands/test_dag_processor_command.py | 13 ++---
3 files changed, 83 insertions(+), 17 deletions(-)
diff --git a/airflow/cli/commands/dag_processor_command.py b/airflow/cli/commands/dag_processor_command.py
index f8ce65663b..d96fb4f06f 100644
--- a/airflow/cli/commands/dag_processor_command.py
+++ b/airflow/cli/commands/dag_processor_command.py
@@ -25,21 +25,21 @@ from daemon.pidfile import TimeoutPIDLockFile
from airflow import settings
from airflow.configuration import conf
-from airflow.dag_processing.manager import DagFileProcessorManager
+from airflow.jobs.dag_processor_job import DagProcessorJob
from airflow.utils import cli as cli_utils
from airflow.utils.cli import setup_locations, setup_logging
log = logging.getLogger(__name__)
-def _create_dag_processor_manager(args) -> DagFileProcessorManager:
+def _create_dag_processor_job(args) -> DagProcessorJob:
"""Creates DagFileProcessorProcess instance."""
processor_timeout_seconds: int = conf.getint("core", "dag_file_processor_timeout")
processor_timeout = timedelta(seconds=processor_timeout_seconds)
- return DagFileProcessorManager(
+ return DagProcessorJob(
+ processor_timeout=processor_timeout,
dag_directory=args.subdir,
max_runs=args.num_runs,
- processor_timeout=processor_timeout,
dag_ids=[],
pickle_dags=args.do_pickle,
)
@@ -55,7 +55,7 @@ def dag_processor(args):
if sql_conn.startswith("sqlite"):
raise SystemExit("Standalone DagProcessor is not supported when using sqlite.")
- manager = _create_dag_processor_manager(args)
+ job = _create_dag_processor_job(args)
if args.daemon:
pid, stdout, stderr, log_file = setup_locations(
@@ -74,10 +74,6 @@ def dag_processor(args):
umask=int(settings.DAEMON_UMASK, 8),
)
with ctx:
- try:
- manager.start()
- finally:
- manager.terminate()
- manager.end()
+ job.run()
else:
- manager.start()
+ job.run()
diff --git a/airflow/jobs/dag_processor_job.py b/airflow/jobs/dag_processor_job.py
new file mode 100644
index 0000000000..70690a78db
--- /dev/null
+++ b/airflow/jobs/dag_processor_job.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import os
+from datetime import timedelta
+
+from airflow.dag_processing.manager import DagFileProcessorManager
+from airflow.jobs.base_job import BaseJob
+
+
+class DagProcessorJob(BaseJob):
+ """
+ :param dag_directory: Directory where DAG definitions are kept. All
+ files in file_paths should be under this directory
+ :param max_runs: The number of times to parse and schedule each file. -1
+ for unlimited.
+ :param processor_timeout: How long to wait before timing out a DAG file processor
+ :param dag_ids: if specified, only schedule tasks with these DAG IDs
+ :param pickle_dags: whether to pickle DAGs.
+ :param async_mode: Whether to start agent in async mode
+ """
+
+ __mapper_args__ = {"polymorphic_identity": "DagProcessorJob"}
+
+ def __init__(
+ self,
+ dag_directory: os.PathLike,
+ max_runs: int,
+ processor_timeout: timedelta,
+ dag_ids: list[str] | None,
+ pickle_dags: bool,
+ *args,
+ **kwargs,
+ ):
+ self.processor = DagFileProcessorManager(
+ dag_directory=dag_directory,
+ max_runs=max_runs,
+ processor_timeout=processor_timeout,
+ dag_ids=dag_ids,
+ pickle_dags=pickle_dags,
+ )
+ super().__init__(*args, **kwargs)
+
+ def _execute(self) -> None:
+ self.log.info("Starting the Dag Processor Job")
+ try:
+ self.processor.start()
+ except Exception:
+ self.log.exception("Exception when executing DagProcessorJob")
+ raise
+ finally:
+ self.processor.terminate()
+ self.processor.end()
diff --git a/tests/cli/commands/test_dag_processor_command.py b/tests/cli/commands/test_dag_processor_command.py
index b3ff84531f..69d0dcdb02 100644
--- a/tests/cli/commands/test_dag_processor_command.py
+++ b/tests/cli/commands/test_dag_processor_command.py
@@ -42,16 +42,17 @@ class TestDagProcessorCommand:
("core", "load_examples"): "False",
}
)
- @mock.patch("airflow.cli.commands.dag_processor_command.DagFileProcessorManager")
+ @mock.patch("airflow.cli.commands.dag_processor_command.DagProcessorJob")
@pytest.mark.skipif(
conf.get_mandatory_value("database", "sql_alchemy_conn").lower().startswith("sqlite"),
reason="Standalone Dag Processor doesn't support sqlite.",
)
- def test_start_manager(
+ def test_start_job(
self,
- mock_dag_manager,
+ mock_dag_job,
):
"""Ensure that DagFileProcessorManager is started"""
- args = self.parser.parse_args(["dag-processor"])
- dag_processor_command.dag_processor(args)
- mock_dag_manager.return_value.start.assert_called()
+ with conf_vars({("scheduler", "standalone_dag_processor"): "True"}):
+ args = self.parser.parse_args(["dag-processor"])
+ dag_processor_command.dag_processor(args)
+ mock_dag_job.return_value.run.assert_called()