You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/09/10 23:04:35 UTC

[GitHub] r39132 closed pull request #3834: [AIRFLOW-2965] CLI tool to show the next execution datetime

r39132 closed pull request #3834: [AIRFLOW-2965] CLI tool to show the next execution datetime
URL: https://github.com/apache/incubator-airflow/pull/3834
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index e22427cf40..c38116e6c0 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -551,6 +551,31 @@ def dag_state(args):
     print(dr[0].state if len(dr) > 0 else None)
 
 
+@cli_utils.action_logging
+def next_execution(args):
+    """
+    Returns the next execution datetime of a DAG at the command line.
+    >>> airflow next_execution tutorial
+    2018-08-31 10:38:00
+    """
+    dag = get_dag(args)
+
+    if dag.is_paused:
+        print("[INFO] Please be reminded this DAG is PAUSED now.")
+
+    if dag.latest_execution_date:
+        next_execution_dttm = dag.following_schedule(dag.latest_execution_date)
+
+        if next_execution_dttm is None:
+            print("[WARN] No following schedule can be found. " +
+                  "This DAG may have schedule interval '@once' or `None`.")
+
+        print(next_execution_dttm)
+    else:
+        print("[WARN] Only applicable when there is execution record found for the DAG.")
+        print(None)
+
+
 @cli_utils.action_logging
 def list_dags(args):
     dagbag = DagBag(process_subdir(args.subdir))
@@ -1986,6 +2011,11 @@ class CLIFactory(object):
             'func': sync_perm,
             'help': "Update existing role's permissions.",
             'args': tuple(),
+        },
+        {
+            'func': next_execution,
+            'help': "Get the next execution datetime of a DAG.",
+            'args': ('dag_id', 'subdir')
         }
     )
     subparsers_dict = {sp['func'].__name__: sp for sp in subparsers}
diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py
index 616b9a0f16..93ec0576e6 100644
--- a/tests/cli/test_cli.py
+++ b/tests/cli/test_cli.py
@@ -20,9 +20,12 @@
 
 import unittest
 
+from datetime import datetime, timedelta, time
 from mock import patch, Mock, MagicMock
 from time import sleep
 import psutil
+import pytz
+import subprocess
 from argparse import Namespace
 from airflow import settings
 from airflow.bin.cli import get_num_ready_workers_running, run, get_dag
@@ -165,3 +168,80 @@ def test_local_run(self):
             ti.refresh_from_db()
             state = ti.current_state()
             self.assertEqual(state, State.SUCCESS)
+
+    def test_next_execution(self):
+        # A scaffolding function
+        def reset_dr_db(dag_id):
+            session = Session()
+            dr = session.query(models.DagRun).filter_by(dag_id=dag_id)
+            dr.delete()
+            session.commit()
+            session.close()
+
+        EXAMPLE_DAGS_FOLDER = os.path.join(
+            os.path.dirname(
+                os.path.dirname(
+                    os.path.dirname(os.path.realpath(__file__))
+                )
+            ),
+            "airflow/example_dags"
+        )
+
+        dagbag = models.DagBag(dag_folder=EXAMPLE_DAGS_FOLDER,
+                               include_examples=False)
+        dag_ids = ['example_bash_operator',  # schedule_interval is '0 0 * * *'
+                   'latest_only',  # schedule_interval is timedelta(hours=4)
+                   'example_python_operator',  # schedule_interval=None
+                   'example_xcom']  # schedule_interval="@once"
+
+        # The details below is determined by the schedule_interval of example DAGs
+        now = timezone.utcnow()
+        next_execution_time_for_dag1 = pytz.utc.localize(
+            datetime.combine(
+                now.date() + timedelta(days=1),
+                time(0)
+            )
+        )
+        next_execution_time_for_dag2 = now + timedelta(hours=4)
+        expected_output = [str(next_execution_time_for_dag1),
+                           str(next_execution_time_for_dag2),
+                           "None",
+                           "None"]
+
+        for i in range(len(dag_ids)):
+            dag_id = dag_ids[i]
+
+            # Clear dag run so no execution history fo each DAG
+            reset_dr_db(dag_id)
+
+            p = subprocess.Popen(["airflow", "next_execution", dag_id,
+                                  "--subdir", EXAMPLE_DAGS_FOLDER],
+                                 stdout=subprocess.PIPE)
+            p.wait()
+            stdout = []
+            for line in p.stdout:
+                stdout.append(str(line.decode("utf-8").rstrip()))
+
+            # `next_execution` function is inapplicable if no execution record found
+            # It prints `None` in such cases
+            self.assertEqual(stdout[-1], "None")
+
+            dag = dagbag.dags[dag_id]
+            # Create a DagRun for each DAG, to prepare for next step
+            dag.create_dagrun(
+                run_id='manual__' + now.isoformat(),
+                execution_date=now,
+                start_date=now,
+                state=State.FAILED
+            )
+
+            p = subprocess.Popen(["airflow", "next_execution", dag_id,
+                                  "--subdir", EXAMPLE_DAGS_FOLDER],
+                                 stdout=subprocess.PIPE)
+            p.wait()
+            stdout = []
+            for line in p.stdout:
+                stdout.append(str(line.decode("utf-8").rstrip()))
+            self.assertEqual(stdout[-1], expected_output[i])
+
+            reset_dr_db(dag_id)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services