You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/05/31 19:32:31 UTC
[airflow] branch master updated: Add displaying multiple dates in
airflow next_execution command (#9072)
This is an automated email from the ASF dual-hosted git repository.
kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new c002b25 Add displaying multiple dates in airflow next_execution command (#9072)
c002b25 is described below
commit c002b25e37c742884dabf6c13b102cf0506663fb
Author: Mauricio De Diana <md...@gmail.com>
AuthorDate: Sun May 31 16:31:58 2020 -0300
Add displaying multiple dates in airflow next_execution command (#9072)
The "next_execution" cli sub-command now accepts an optional number of
executions to be returned. This is particularly useful for checking
non-regular schedule intervals, such as those created by some cron
expressions.
Co-authored-by: Kamil BreguĊa <mi...@users.noreply.github.com>
---
airflow/cli/cli_parser.py | 23 ++++++++++++++++++--
airflow/cli/commands/dag_command.py | 13 ++++++++----
tests/cli/commands/test_dag_command.py | 39 ++++++++++++++++++++--------------
tests/cli/test_cli_parser.py | 7 ++++++
4 files changed, 60 insertions(+), 22 deletions(-)
diff --git a/airflow/cli/cli_parser.py b/airflow/cli/cli_parser.py
index 48e11e3..5f03c48 100644
--- a/airflow/cli/cli_parser.py
+++ b/airflow/cli/cli_parser.py
@@ -99,6 +99,17 @@ class Arg:
parser.add_argument(*self.flags, **self.kwargs)
+def positive_int(value):
+ """Define a positive int type for an argument."""
+ try:
+ value = int(value)
+ if value > 0:
+ return value
+ except ValueError:
+ pass
+ raise argparse.ArgumentTypeError(f"invalid positive int value: '{value}'")
+
+
# Shared
ARG_DAG_ID = Arg(
("dag_id",),
@@ -182,6 +193,13 @@ ARG_LIMIT = Arg(
("--limit",),
help="Return a limited number of records")
+# next_execution
+ARG_NUM_EXECUTIONS = Arg(
+ ("-n", "--num-executions"),
+ default=1,
+ type=positive_int,
+ help="The number of next execution datetimes to show")
+
# backfill
ARG_MARK_SUCCESS = Arg(
("-m", "--mark-success"),
@@ -793,9 +811,10 @@ DAGS_COMMANDS = (
),
ActionCommand(
name='next_execution',
- help="Get the next execution datetime of a DAG",
+ help="Get the next execution datetimes of a DAG. It returns one execution unless the "
+ "num-executions option is given",
func=lazy_load_command('airflow.cli.commands.dag_command.dag_next_execution'),
- args=(ARG_DAG_ID, ARG_SUBDIR),
+ args=(ARG_DAG_ID, ARG_SUBDIR, ARG_NUM_EXECUTIONS),
),
ActionCommand(
name='pause',
diff --git a/airflow/cli/commands/dag_command.py b/airflow/cli/commands/dag_command.py
index 148d0f5..685ca21 100644
--- a/airflow/cli/commands/dag_command.py
+++ b/airflow/cli/commands/dag_command.py
@@ -275,7 +275,7 @@ def dag_next_execution(args):
dag = get_dag(args.subdir, args.dag_id)
if dag.get_is_paused():
- print("[INFO] Please be reminded this DAG is PAUSED now.")
+ print("[INFO] Please be reminded this DAG is PAUSED now.", file=sys.stderr)
latest_execution_date = dag.get_latest_execution_date()
if latest_execution_date:
@@ -283,11 +283,16 @@ def dag_next_execution(args):
if next_execution_dttm is None:
print("[WARN] No following schedule can be found. " +
- "This DAG may have schedule interval '@once' or `None`.")
+ "This DAG may have schedule interval '@once' or `None`.", file=sys.stderr)
+ print(None)
+ else:
+ print(next_execution_dttm)
- print(next_execution_dttm)
+ for _ in range(1, args.num_executions):
+ next_execution_dttm = dag.following_schedule(next_execution_dttm)
+ print(next_execution_dttm)
else:
- print("[WARN] Only applicable when there is execution record found for the DAG.")
+ print("[WARN] Only applicable when there is execution record found for the DAG.", file=sys.stderr)
print(None)
diff --git a/tests/cli/commands/test_dag_command.py b/tests/cli/commands/test_dag_command.py
index 80b7852..9a5e50f 100644
--- a/tests/cli/commands/test_dag_command.py
+++ b/tests/cli/commands/test_dag_command.py
@@ -20,11 +20,10 @@ import io
import os
import tempfile
import unittest
-from datetime import datetime, time, timedelta
+from datetime import datetime, timedelta
import mock
import pytest
-import pytz
from airflow import settings
from airflow.cli import cli_parser
@@ -249,10 +248,10 @@ class TestCliDags(unittest.TestCase):
dr = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids))
dr.delete(synchronize_session=False)
+ # Test None output
args = self.parser.parse_args(['dags',
'next_execution',
dag_ids[0]])
-
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
dag_command.dag_next_execution(args)
out = temp_stdout.getvalue()
@@ -262,23 +261,16 @@ class TestCliDags(unittest.TestCase):
# The details below is determined by the schedule_interval of example DAGs
now = DEFAULT_DATE
- next_execution_time_for_dag1 = pytz.utc.localize(
- datetime.combine(
- now.date() + timedelta(days=1),
- time(0)
- )
- )
- next_execution_time_for_dag2 = now + timedelta(hours=4)
- expected_output = [str(next_execution_time_for_dag1),
- str(next_execution_time_for_dag2),
+ expected_output = [str(now + timedelta(days=1)),
+ str(now + timedelta(hours=4)),
"None",
"None"]
+ expected_output_2 = [str(now + timedelta(days=1)) + os.linesep + str(now + timedelta(days=2)),
+ str(now + timedelta(hours=4)) + os.linesep + str(now + timedelta(hours=8)),
+ "None",
+ "None"]
for i, dag_id in enumerate(dag_ids):
- args = self.parser.parse_args(['dags',
- 'next_execution',
- dag_id])
-
dag = self.dagbag.dags[dag_id]
# Create a DagRun for each DAG, to prepare for next step
dag.create_dagrun(
@@ -288,11 +280,26 @@ class TestCliDags(unittest.TestCase):
state=State.FAILED
)
+ # Test num-executions = 1 (default)
+ args = self.parser.parse_args(['dags',
+ 'next_execution',
+ dag_id])
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
dag_command.dag_next_execution(args)
out = temp_stdout.getvalue()
self.assertIn(expected_output[i], out)
+ # Test num-executions = 2
+ args = self.parser.parse_args(['dags',
+ 'next_execution',
+ dag_id,
+ '--num-executions',
+ '2'])
+ with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
+ dag_command.dag_next_execution(args)
+ out = temp_stdout.getvalue()
+ self.assertIn(expected_output_2[i], out)
+
# Clean up before leaving
with create_session() as session:
dr = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids))
diff --git a/tests/cli/test_cli_parser.py b/tests/cli/test_cli_parser.py
index 6a7d714..e608859 100644
--- a/tests/cli/test_cli_parser.py
+++ b/tests/cli/test_cli_parser.py
@@ -174,3 +174,10 @@ class TestCli(TestCase):
for cmd_args in all_command_as_args:
with self.assertRaises(SystemExit):
parser.parse_args([*cmd_args, '--help'])
+
+ def test_positive_int(self):
+ self.assertEqual(1, cli_parser.positive_int('1'))
+
+ with self.assertRaises(argparse.ArgumentTypeError):
+ cli_parser.positive_int('0')
+ cli_parser.positive_int('-1')