You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/08/08 18:52:49 UTC
[airflow] branch main updated: DAG regex flag in backfill command (#23870)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 72a6ac5e54 DAG regex flag in backfill command (#23870)
72a6ac5e54 is described below
commit 72a6ac5e54b6ab682861537e37abcd97f405d911
Author: domagojrazum <86...@users.noreply.github.com>
AuthorDate: Mon Aug 8 20:52:41 2022 +0200
DAG regex flag in backfill command (#23870)
---
airflow/cli/cli_parser.py | 6 ++
airflow/cli/commands/dag_command.py | 111 ++++++++++++++++++---------------
airflow/jobs/backfill_job.py | 2 +-
tests/cli/commands/test_dag_command.py | 29 ++++++++-
4 files changed, 96 insertions(+), 52 deletions(-)
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 11b38ebd31..e4be3e978d 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -351,6 +351,11 @@ ARG_RUN_BACKWARDS = Arg(
),
action="store_true",
)
+ARG_TREAT_DAG_AS_REGEX = Arg(
+ ("--treat-dag-as-regex",),
+ help=("if set, dag_id will be treated as regex instead of an exact string"),
+ action="store_true",
+)
# test_dag
ARG_SHOW_DAGRUN = Arg(
("--show-dagrun",),
@@ -1135,6 +1140,7 @@ DAGS_COMMANDS = (
ARG_RESET_DAG_RUN,
ARG_RERUN_FAILED_TASKS,
ARG_RUN_BACKWARDS,
+ ARG_TREAT_DAG_AS_REGEX,
),
),
ActionCommand(
diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py
index b193ad729a..515dcf2d80 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -40,15 +40,17 @@ from airflow.models import DagBag, DagModel, DagRun, TaskInstance
from airflow.models.dag import DAG
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils import cli as cli_utils
-from airflow.utils.cli import get_dag, process_subdir, sigint_handler, suppress_logs_and_warning
+from airflow.utils.cli import get_dag, get_dags, process_subdir, sigint_handler, suppress_logs_and_warning
from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.state import DagRunState
+log = logging.getLogger(__name__)
+
@cli_utils.action_cli
def dag_backfill(args, dag=None):
- """Creates backfill job or dry run for a DAG"""
+ """Creates backfill job or dry run for a DAG or list of DAGs using regex"""
logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT)
signal.signal(signal.SIGTERM, sigint_handler)
@@ -66,64 +68,73 @@ def dag_backfill(args, dag=None):
if not args.start_date and not args.end_date:
raise AirflowException("Provide a start_date and/or end_date")
- dag = dag or get_dag(args.subdir, args.dag_id)
+ if not dag:
+ dags = get_dags(args.subdir, dag_id=args.dag_id, use_regex=args.treat_dag_as_regex)
+ else:
+ dags = dag if type(dag) == list else [dag]
+
+ dags.sort(key=lambda d: d.dag_id)
# If only one date is passed, using same as start and end
args.end_date = args.end_date or args.start_date
args.start_date = args.start_date or args.end_date
- if args.task_regex:
- dag = dag.partial_subset(
- task_ids_or_regex=args.task_regex, include_upstream=not args.ignore_dependencies
- )
- if not dag.task_dict:
- raise AirflowException(
- f"There are no tasks that match '{args.task_regex}' regex. Nothing to run, exiting..."
- )
-
run_conf = None
if args.conf:
run_conf = json.loads(args.conf)
- if args.dry_run:
- print(f"Dry run of DAG {args.dag_id} on {args.start_date}")
- dr = DagRun(dag.dag_id, execution_date=args.start_date)
- for task in dag.tasks:
- print(f"Task {task.task_id}")
- ti = TaskInstance(task, run_id=None)
- ti.dag_run = dr
- ti.dry_run()
- else:
- if args.reset_dagruns:
- DAG.clear_dags(
- [dag],
- start_date=args.start_date,
- end_date=args.end_date,
- confirm_prompt=not args.yes,
- include_subdags=True,
- dag_run_state=DagRunState.QUEUED,
- )
-
- try:
- dag.run(
- start_date=args.start_date,
- end_date=args.end_date,
- mark_success=args.mark_success,
- local=args.local,
- donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
- ignore_first_depends_on_past=args.ignore_first_depends_on_past,
- ignore_task_deps=args.ignore_dependencies,
- pool=args.pool,
- delay_on_limit_secs=args.delay_on_limit,
- verbose=args.verbose,
- conf=run_conf,
- rerun_failed_tasks=args.rerun_failed_tasks,
- run_backwards=args.run_backwards,
- continue_on_failures=args.continue_on_failures,
+ for dag in dags:
+ if args.task_regex:
+ dag = dag.partial_subset(
+ task_ids_or_regex=args.task_regex, include_upstream=not args.ignore_dependencies
)
- except ValueError as vr:
- print(str(vr))
- sys.exit(1)
+ if not dag.task_dict:
+ raise AirflowException(
+ f"There are no tasks that match '{args.task_regex}' regex. Nothing to run, exiting..."
+ )
+
+ if args.dry_run:
+ print(f"Dry run of DAG {dag.dag_id} on {args.start_date}")
+ dr = DagRun(dag.dag_id, execution_date=args.start_date)
+ for task in dag.tasks:
+ print(f"Task {task.task_id} located in DAG {dag.dag_id}")
+ ti = TaskInstance(task, run_id=None)
+ ti.dag_run = dr
+ ti.dry_run()
+ else:
+ if args.reset_dagruns:
+ DAG.clear_dags(
+ [dag],
+ start_date=args.start_date,
+ end_date=args.end_date,
+ confirm_prompt=not args.yes,
+ include_subdags=True,
+ dag_run_state=DagRunState.QUEUED,
+ )
+
+ try:
+ dag.run(
+ start_date=args.start_date,
+ end_date=args.end_date,
+ mark_success=args.mark_success,
+ local=args.local,
+ donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
+ ignore_first_depends_on_past=args.ignore_first_depends_on_past,
+ ignore_task_deps=args.ignore_dependencies,
+ pool=args.pool,
+ delay_on_limit_secs=args.delay_on_limit,
+ verbose=args.verbose,
+ conf=run_conf,
+ rerun_failed_tasks=args.rerun_failed_tasks,
+ run_backwards=args.run_backwards,
+ continue_on_failures=args.continue_on_failures,
+ )
+ except ValueError as vr:
+ print(str(vr))
+ sys.exit(1)
+
+ if len(dags) > 1:
+ log.info("All of the backfills are done.")
@cli_utils.action_cli
diff --git a/airflow/jobs/backfill_job.py b/airflow/jobs/backfill_job.py
index c5b98c2a8d..4f28eee9a9 100644
--- a/airflow/jobs/backfill_job.py
+++ b/airflow/jobs/backfill_job.py
@@ -876,7 +876,7 @@ class BackfillJob(BaseJob):
session.commit()
executor.end()
- self.log.info("Backfill done. Exiting.")
+ self.log.info("Backfill done for DAG %s. Exiting.", self.dag)
@provide_session
def reset_state_for_orphaned_tasks(self, filter_by_dag_run=None, session=None):
diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py
index 859290745b..ae2baef2f1 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -124,7 +124,7 @@ class TestCliDags(unittest.TestCase):
output = stdout.getvalue()
assert f"Dry run of DAG example_bash_operator on {DEFAULT_DATE.isoformat()}\n" in output
- assert "Task runme_0\n" in output
+ assert "Task runme_0 located in DAG example_bash_operator\n" in output
mock_run.assert_not_called() # Dry run shouldn't run the backfill
@@ -176,6 +176,33 @@ class TestCliDags(unittest.TestCase):
)
mock_run.reset_mock()
+ with contextlib.redirect_stdout(io.StringIO()) as stdout:
+ dag_command.dag_backfill(
+ self.parser.parse_args(
+ [
+ 'dags',
+ 'backfill',
+ 'example_branch_(python_){0,1}operator(_decorator){0,1}',
+ '--task-regex',
+ 'run_this_first',
+ '--dry-run',
+ '--treat-dag-as-regex',
+ '--start-date',
+ DEFAULT_DATE.isoformat(),
+ ]
+ ),
+ )
+
+ output = stdout.getvalue()
+
+ assert (
+ f"Dry run of DAG example_branch_python_operator_decorator on "
+ f"{DEFAULT_DATE.isoformat()}\n" in output
+ )
+ assert "Task run_this_first located in DAG example_branch_python_operator_decorator\n" in output
+ assert f"Dry run of DAG example_branch_operator on {DEFAULT_DATE.isoformat()}\n" in output
+ assert "Task run_this_first located in DAG example_branch_operator\n" in output
+
@mock.patch("airflow.cli.commands.dag_command.get_dag")
def test_backfill_fails_without_loading_dags(self, mock_get_dag):