You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/05/31 19:32:31 UTC

[airflow] branch master updated: Add displaying multiple dates in airflow next_execution command (#9072)

This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new c002b25  Add displaying multiple dates in airflow next_execution command (#9072)
c002b25 is described below

commit c002b25e37c742884dabf6c13b102cf0506663fb
Author: Mauricio De Diana <md...@gmail.com>
AuthorDate: Sun May 31 16:31:58 2020 -0300

    Add displaying multiple dates in airflow next_execution command (#9072)
    
    The "next_execution" cli sub-command now accepts an optional number of
    executions to be returned. This is particularly useful for checking
    non-regular schedule intervals, such as those created by some cron
    expressions.
    
    Co-authored-by: Kamil BreguĊ‚a <mi...@users.noreply.github.com>
---
 airflow/cli/cli_parser.py              | 23 ++++++++++++++++++--
 airflow/cli/commands/dag_command.py    | 13 ++++++++----
 tests/cli/commands/test_dag_command.py | 39 ++++++++++++++++++++--------------
 tests/cli/test_cli_parser.py           |  7 ++++++
 4 files changed, 60 insertions(+), 22 deletions(-)

diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 48e11e3..5f03c48 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -99,6 +99,17 @@ class Arg:
         parser.add_argument(*self.flags, **self.kwargs)
 
 
+def positive_int(value):
+    """Define a positive int type for an argument."""
+    try:
+        value = int(value)
+        if value > 0:
+            return value
+    except ValueError:
+        pass
+    raise argparse.ArgumentTypeError(f"invalid positive int value: '{value}'")
+
+
 # Shared
 ARG_DAG_ID = Arg(
     ("dag_id",),
@@ -182,6 +193,13 @@ ARG_LIMIT = Arg(
     ("--limit",),
     help="Return a limited number of records")
 
+# next_execution
+ARG_NUM_EXECUTIONS = Arg(
+    ("-n", "--num-executions"),
+    default=1,
+    type=positive_int,
+    help="The number of next execution datetimes to show")
+
 # backfill
 ARG_MARK_SUCCESS = Arg(
     ("-m", "--mark-success"),
@@ -793,9 +811,10 @@ DAGS_COMMANDS = (
     ),
     ActionCommand(
         name='next_execution',
-        help="Get the next execution datetime of a DAG",
+        help="Get the next execution datetimes of a DAG. It returns one execution unless the "
+             "num-executions option is given",
         func=lazy_load_command('airflow.cli.commands.dag_command.dag_next_execution'),
-        args=(ARG_DAG_ID, ARG_SUBDIR),
+        args=(ARG_DAG_ID, ARG_SUBDIR, ARG_NUM_EXECUTIONS),
     ),
     ActionCommand(
         name='pause',
diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py
index 148d0f5..685ca21 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -275,7 +275,7 @@ def dag_next_execution(args):
     dag = get_dag(args.subdir, args.dag_id)
 
     if dag.get_is_paused():
-        print("[INFO] Please be reminded this DAG is PAUSED now.")
+        print("[INFO] Please be reminded this DAG is PAUSED now.", file=sys.stderr)
 
     latest_execution_date = dag.get_latest_execution_date()
     if latest_execution_date:
@@ -283,11 +283,16 @@ def dag_next_execution(args):
 
         if next_execution_dttm is None:
             print("[WARN] No following schedule can be found. " +
-                  "This DAG may have schedule interval '@once' or `None`.")
+                  "This DAG may have schedule interval '@once' or `None`.", file=sys.stderr)
+            print(None)
+        else:
+            print(next_execution_dttm)
 
-        print(next_execution_dttm)
+            for _ in range(1, args.num_executions):
+                next_execution_dttm = dag.following_schedule(next_execution_dttm)
+                print(next_execution_dttm)
     else:
-        print("[WARN] Only applicable when there is execution record found for the DAG.")
+        print("[WARN] Only applicable when there is execution record found for the DAG.", file=sys.stderr)
         print(None)
 
 
diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py
index 80b7852..9a5e50f 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -20,11 +20,10 @@ import io
 import os
 import tempfile
 import unittest
-from datetime import datetime, time, timedelta
+from datetime import datetime, timedelta
 
 import mock
 import pytest
-import pytz
 
 from airflow import settings
 from airflow.cli import cli_parser
@@ -249,10 +248,10 @@ class TestCliDags(unittest.TestCase):
             dr = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids))
             dr.delete(synchronize_session=False)
 
+        # Test None output
         args = self.parser.parse_args(['dags',
                                        'next_execution',
                                        dag_ids[0]])
-
         with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
             dag_command.dag_next_execution(args)
             out = temp_stdout.getvalue()
@@ -262,23 +261,16 @@ class TestCliDags(unittest.TestCase):
 
         # The details below is determined by the schedule_interval of example DAGs
         now = DEFAULT_DATE
-        next_execution_time_for_dag1 = pytz.utc.localize(
-            datetime.combine(
-                now.date() + timedelta(days=1),
-                time(0)
-            )
-        )
-        next_execution_time_for_dag2 = now + timedelta(hours=4)
-        expected_output = [str(next_execution_time_for_dag1),
-                           str(next_execution_time_for_dag2),
+        expected_output = [str(now + timedelta(days=1)),
+                           str(now + timedelta(hours=4)),
                            "None",
                            "None"]
+        expected_output_2 = [str(now + timedelta(days=1)) + os.linesep + str(now + timedelta(days=2)),
+                             str(now + timedelta(hours=4)) + os.linesep + str(now + timedelta(hours=8)),
+                             "None",
+                             "None"]
 
         for i, dag_id in enumerate(dag_ids):
-            args = self.parser.parse_args(['dags',
-                                           'next_execution',
-                                           dag_id])
-
             dag = self.dagbag.dags[dag_id]
             # Create a DagRun for each DAG, to prepare for next step
             dag.create_dagrun(
@@ -288,11 +280,26 @@ class TestCliDags(unittest.TestCase):
                 state=State.FAILED
             )
 
+            # Test num-executions = 1 (default)
+            args = self.parser.parse_args(['dags',
+                                           'next_execution',
+                                           dag_id])
             with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
                 dag_command.dag_next_execution(args)
                 out = temp_stdout.getvalue()
             self.assertIn(expected_output[i], out)
 
+            # Test num-executions = 2
+            args = self.parser.parse_args(['dags',
+                                           'next_execution',
+                                           dag_id,
+                                           '--num-executions',
+                                           '2'])
+            with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
+                dag_command.dag_next_execution(args)
+                out = temp_stdout.getvalue()
+            self.assertIn(expected_output_2[i], out)
+
         # Clean up before leaving
         with create_session() as session:
             dr = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids))
diff --git a/tests/cli/test_cli_parser.py b/tests/cli/test_cli_parser.py
index 6a7d714..e608859 100644
--- a/tests/cli/test_cli_parser.py
+++ b/tests/cli/test_cli_parser.py
@@ -174,3 +174,10 @@ class TestCli(TestCase):
         for cmd_args in all_command_as_args:
             with self.assertRaises(SystemExit):
                 parser.parse_args([*cmd_args, '--help'])
+
+    def test_positive_int(self):
+        self.assertEqual(1, cli_parser.positive_int('1'))
+
+        with self.assertRaises(argparse.ArgumentTypeError):
+            cli_parser.positive_int('0')
+            cli_parser.positive_int('-1')