You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/08/29 08:51:42 UTC

[airflow] branch main updated: Add conf parameter to CLI for airflow dags test (#25900)

This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bcc2fe26f6 Add conf parameter to CLI for airflow dags test (#25900)
bcc2fe26f6 is described below

commit bcc2fe26f6e0b7204bdf73f57d25b4e6c7a69548
Author: Christian Schilling <ch...@gmail.com>
AuthorDate: Mon Aug 29 10:51:29 2022 +0200

    Add conf parameter to CLI for airflow dags test (#25900)
    
    Co-authored-by: Schilling Christian (XC-AD/ETV5) <ch...@bosch.com>
    Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
---
 airflow/cli/cli_parser.py              |  1 +
 airflow/cli/commands/dag_command.py    |  8 ++++++++
 tests/cli/commands/test_dag_command.py | 36 ++++++++++++++++++++++++++++++++++
 3 files changed, 45 insertions(+)

diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 20b55d56a3..01078aa38e 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -1170,6 +1170,7 @@ DAGS_COMMANDS = (
         args=(
             ARG_DAG_ID,
             ARG_EXECUTION_DATE,
+            ARG_CONF,
             ARG_SUBDIR,
             ARG_SHOW_DAGRUN,
             ARG_IMGCAT_DAGRUN,
diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py
index 559c0e75c3..ddfce4ebdb 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -451,6 +451,13 @@ def dag_list_dag_runs(args, dag=None, session=NEW_SESSION):
 @cli_utils.action_cli
 def dag_test(args, session=None):
     """Execute one single DagRun for a given DAG and execution date, using the DebugExecutor."""
+    run_conf = None
+    if args.conf:
+        try:
+            run_conf = json.loads(args.conf)
+        except ValueError as e:
+            raise SystemExit(f"Configuration {args.conf!r} is not valid JSON. Error: {e}")
+
     dag = get_dag(subdir=args.subdir, dag_id=args.dag_id)
     dag.clear(start_date=args.execution_date, end_date=args.execution_date, dag_run_state=False)
     try:
@@ -458,6 +465,7 @@ def dag_test(args, session=None):
             executor=DebugExecutor(),
             start_date=args.execution_date,
             end_date=args.execution_date,
+            conf=run_conf,
             # Always run the DAG at least once even if no logical runs are
             # available. This does not make a lot of sense, but Airflow has
             # been doing this prior to 2.2 so we keep compatibility.
diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py
index f6c85468a5..59df405629 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -17,6 +17,7 @@
 # under the License.
 import contextlib
 import io
+import json
 import os
 import tempfile
 import unittest
@@ -607,6 +608,40 @@ class TestCliDags(unittest.TestCase):
                     executor=mock_executor.return_value,
                     start_date=cli_args.execution_date,
                     end_date=cli_args.execution_date,
+                    conf=None,
+                    run_at_least_once=True,
+                ),
+            ]
+        )
+
+    @mock.patch("airflow.cli.commands.dag_command.DebugExecutor")
+    @mock.patch("airflow.cli.commands.dag_command.get_dag")
+    def test_dag_test_conf(self, mock_get_dag, mock_executor):
+        cli_args = self.parser.parse_args(
+            [
+                'dags',
+                'test',
+                'example_bash_operator',
+                DEFAULT_DATE.isoformat(),
+                "-c",
+                "{\"dag_run_conf_param\": \"param_value\"}",
+            ]
+        )
+        dag_command.dag_test(cli_args)
+
+        mock_get_dag.assert_has_calls(
+            [
+                mock.call(subdir=cli_args.subdir, dag_id='example_bash_operator'),
+                mock.call().clear(
+                    start_date=cli_args.execution_date,
+                    end_date=cli_args.execution_date,
+                    dag_run_state=False,
+                ),
+                mock.call().run(
+                    executor=mock_executor.return_value,
+                    start_date=cli_args.execution_date,
+                    end_date=cli_args.execution_date,
+                    conf=json.loads(cli_args.conf),
                     run_at_least_once=True,
                 ),
             ]
@@ -636,6 +671,7 @@ class TestCliDags(unittest.TestCase):
                     executor=mock_executor.return_value,
                     start_date=cli_args.execution_date,
                     end_date=cli_args.execution_date,
+                    conf=None,
                     run_at_least_once=True,
                 ),
             ]