You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/08/08 18:52:49 UTC

[airflow] branch main updated: DAG regex flag in backfill command (#23870)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 72a6ac5e54 DAG regex flag in backfill command (#23870)
72a6ac5e54 is described below

commit 72a6ac5e54b6ab682861537e37abcd97f405d911
Author: domagojrazum <86...@users.noreply.github.com>
AuthorDate: Mon Aug 8 20:52:41 2022 +0200

    DAG regex flag in backfill command (#23870)
---
 airflow/cli/cli_parser.py              |   6 ++
 airflow/cli/commands/dag_command.py    | 111 ++++++++++++++++++---------------
 airflow/jobs/backfill_job.py           |   2 +-
 tests/cli/commands/test_dag_command.py |  29 ++++++++-
 4 files changed, 96 insertions(+), 52 deletions(-)

diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 11b38ebd31..e4be3e978d 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -351,6 +351,11 @@ ARG_RUN_BACKWARDS = Arg(
     ),
     action="store_true",
 )
+ARG_TREAT_DAG_AS_REGEX = Arg(
+    ("--treat-dag-as-regex",),
+    help=("if set, dag_id will be treated as regex instead of an exact string"),
+    action="store_true",
+)
 # test_dag
 ARG_SHOW_DAGRUN = Arg(
     ("--show-dagrun",),
@@ -1135,6 +1140,7 @@ DAGS_COMMANDS = (
             ARG_RESET_DAG_RUN,
             ARG_RERUN_FAILED_TASKS,
             ARG_RUN_BACKWARDS,
+            ARG_TREAT_DAG_AS_REGEX,
         ),
     ),
     ActionCommand(
diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py
index b193ad729a..515dcf2d80 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -40,15 +40,17 @@ from airflow.models import DagBag, DagModel, DagRun, TaskInstance
 from airflow.models.dag import DAG
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.utils import cli as cli_utils
-from airflow.utils.cli import get_dag, process_subdir, sigint_handler, suppress_logs_and_warning
+from airflow.utils.cli import get_dag, get_dags, process_subdir, sigint_handler, suppress_logs_and_warning
 from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
 from airflow.utils.session import NEW_SESSION, create_session, provide_session
 from airflow.utils.state import DagRunState
 
+log = logging.getLogger(__name__)
+
 
 @cli_utils.action_cli
 def dag_backfill(args, dag=None):
-    """Creates backfill job or dry run for a DAG"""
+    """Creates backfill job or dry run for a DAG or list of DAGs using regex"""
     logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT)
 
     signal.signal(signal.SIGTERM, sigint_handler)
@@ -66,64 +68,73 @@ def dag_backfill(args, dag=None):
     if not args.start_date and not args.end_date:
         raise AirflowException("Provide a start_date and/or end_date")
 
-    dag = dag or get_dag(args.subdir, args.dag_id)
+    if not dag:
+        dags = get_dags(args.subdir, dag_id=args.dag_id, use_regex=args.treat_dag_as_regex)
+    else:
+        dags = dag if type(dag) == list else [dag]
+
+    dags.sort(key=lambda d: d.dag_id)
 
     # If only one date is passed, using same as start and end
     args.end_date = args.end_date or args.start_date
     args.start_date = args.start_date or args.end_date
 
-    if args.task_regex:
-        dag = dag.partial_subset(
-            task_ids_or_regex=args.task_regex, include_upstream=not args.ignore_dependencies
-        )
-        if not dag.task_dict:
-            raise AirflowException(
-                f"There are no tasks that match '{args.task_regex}' regex. Nothing to run, exiting..."
-            )
-
     run_conf = None
     if args.conf:
         run_conf = json.loads(args.conf)
 
-    if args.dry_run:
-        print(f"Dry run of DAG {args.dag_id} on {args.start_date}")
-        dr = DagRun(dag.dag_id, execution_date=args.start_date)
-        for task in dag.tasks:
-            print(f"Task {task.task_id}")
-            ti = TaskInstance(task, run_id=None)
-            ti.dag_run = dr
-            ti.dry_run()
-    else:
-        if args.reset_dagruns:
-            DAG.clear_dags(
-                [dag],
-                start_date=args.start_date,
-                end_date=args.end_date,
-                confirm_prompt=not args.yes,
-                include_subdags=True,
-                dag_run_state=DagRunState.QUEUED,
-            )
-
-        try:
-            dag.run(
-                start_date=args.start_date,
-                end_date=args.end_date,
-                mark_success=args.mark_success,
-                local=args.local,
-                donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
-                ignore_first_depends_on_past=args.ignore_first_depends_on_past,
-                ignore_task_deps=args.ignore_dependencies,
-                pool=args.pool,
-                delay_on_limit_secs=args.delay_on_limit,
-                verbose=args.verbose,
-                conf=run_conf,
-                rerun_failed_tasks=args.rerun_failed_tasks,
-                run_backwards=args.run_backwards,
-                continue_on_failures=args.continue_on_failures,
+    for dag in dags:
+        if args.task_regex:
+            dag = dag.partial_subset(
+                task_ids_or_regex=args.task_regex, include_upstream=not args.ignore_dependencies
             )
-        except ValueError as vr:
-            print(str(vr))
-            sys.exit(1)
+            if not dag.task_dict:
+                raise AirflowException(
+                    f"There are no tasks that match '{args.task_regex}' regex. Nothing to run, exiting..."
+                )
+
+        if args.dry_run:
+            print(f"Dry run of DAG {dag.dag_id} on {args.start_date}")
+            dr = DagRun(dag.dag_id, execution_date=args.start_date)
+            for task in dag.tasks:
+                print(f"Task {task.task_id} located in DAG {dag.dag_id}")
+                ti = TaskInstance(task, run_id=None)
+                ti.dag_run = dr
+                ti.dry_run()
+        else:
+            if args.reset_dagruns:
+                DAG.clear_dags(
+                    [dag],
+                    start_date=args.start_date,
+                    end_date=args.end_date,
+                    confirm_prompt=not args.yes,
+                    include_subdags=True,
+                    dag_run_state=DagRunState.QUEUED,
+                )
+
+            try:
+                dag.run(
+                    start_date=args.start_date,
+                    end_date=args.end_date,
+                    mark_success=args.mark_success,
+                    local=args.local,
+                    donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
+                    ignore_first_depends_on_past=args.ignore_first_depends_on_past,
+                    ignore_task_deps=args.ignore_dependencies,
+                    pool=args.pool,
+                    delay_on_limit_secs=args.delay_on_limit,
+                    verbose=args.verbose,
+                    conf=run_conf,
+                    rerun_failed_tasks=args.rerun_failed_tasks,
+                    run_backwards=args.run_backwards,
+                    continue_on_failures=args.continue_on_failures,
+                )
+            except ValueError as vr:
+                print(str(vr))
+                sys.exit(1)
+
+    if len(dags) > 1:
+        log.info("All of the backfills are done.")
 
 
 @cli_utils.action_cli
diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py
index c5b98c2a8d..4f28eee9a9 100644
--- a/airflow/jobs/backfill_job.py
+++ b/airflow/jobs/backfill_job.py
@@ -876,7 +876,7 @@ class BackfillJob(BaseJob):
             session.commit()
             executor.end()
 
-        self.log.info("Backfill done. Exiting.")
+        self.log.info("Backfill done for DAG %s. Exiting.", self.dag)
 
     @provide_session
     def reset_state_for_orphaned_tasks(self, filter_by_dag_run=None, session=None):
diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py
index 859290745b..ae2baef2f1 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -124,7 +124,7 @@ class TestCliDags(unittest.TestCase):
 
         output = stdout.getvalue()
         assert f"Dry run of DAG example_bash_operator on {DEFAULT_DATE.isoformat()}\n" in output
-        assert "Task runme_0\n" in output
+        assert "Task runme_0 located in DAG example_bash_operator\n" in output
 
         mock_run.assert_not_called()  # Dry run shouldn't run the backfill
 
@@ -176,6 +176,33 @@ class TestCliDags(unittest.TestCase):
         )
         mock_run.reset_mock()
 
+        with contextlib.redirect_stdout(io.StringIO()) as stdout:
+            dag_command.dag_backfill(
+                self.parser.parse_args(
+                    [
+                        'dags',
+                        'backfill',
+                        'example_branch_(python_){0,1}operator(_decorator){0,1}',
+                        '--task-regex',
+                        'run_this_first',
+                        '--dry-run',
+                        '--treat-dag-as-regex',
+                        '--start-date',
+                        DEFAULT_DATE.isoformat(),
+                    ]
+                ),
+            )
+
+        output = stdout.getvalue()
+
+        assert (
+            f"Dry run of DAG example_branch_python_operator_decorator on "
+            f"{DEFAULT_DATE.isoformat()}\n" in output
+        )
+        assert "Task run_this_first located in DAG example_branch_python_operator_decorator\n" in output
+        assert f"Dry run of DAG example_branch_operator on {DEFAULT_DATE.isoformat()}\n" in output
+        assert "Task run_this_first located in DAG example_branch_operator\n" in output
+
     @mock.patch("airflow.cli.commands.dag_command.get_dag")
     def test_backfill_fails_without_loading_dags(self, mock_get_dag):