You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/10 08:09:01 UTC

incubator-airflow git commit: [AIRFLOW-489] Allow specifying execution date in trigger_dag API

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 2d54d8abd -> e0f5c0cb8


[AIRFLOW-489] Allow specifying execution date in trigger_dag API

Closes #1946 from robin-miller-
ow/release/New_API_Functionality


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e0f5c0cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e0f5c0cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e0f5c0cb

Branch: refs/heads/master
Commit: e0f5c0cb8b849c6df60297927f371b750521487a
Parents: 2d54d8a
Author: robin_miller_ow <ro...@affiliate.oliverwyman.com>
Authored: Tue Jan 10 09:08:28 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jan 10 09:08:32 2017 +0100

----------------------------------------------------------------------
 airflow/api/client/api_client.py               |  2 +-
 airflow/api/client/json_client.py              |  3 +-
 airflow/api/client/local_client.py             |  7 ++--
 airflow/api/common/experimental/trigger_dag.py |  5 +--
 airflow/bin/cli.py                             |  8 +++--
 airflow/models.py                              | 18 ++++++++++
 airflow/www/api/experimental/endpoints.py      | 32 ++++++++++++++---
 tests/www/api/experimental/test_endpoints.py   | 38 +++++++++++++++++++++
 8 files changed, 101 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e0f5c0cb/airflow/api/client/api_client.py
----------------------------------------------------------------------
diff --git a/airflow/api/client/api_client.py b/airflow/api/client/api_client.py
index bdb9a61..6a77538 100644
--- a/airflow/api/client/api_client.py
+++ b/airflow/api/client/api_client.py
@@ -19,7 +19,7 @@ class Client:
         self._api_base_url = api_base_url
         self._auth = auth
 
-    def trigger_dag(self, dag_id, run_id=None, conf=None):
+    def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
         """
         Creates a dag run for the specified dag
         :param dag_id:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e0f5c0cb/airflow/api/client/json_client.py
----------------------------------------------------------------------
diff --git a/airflow/api/client/json_client.py b/airflow/api/client/json_client.py
index 4d8e87a..d74fc63 100644
--- a/airflow/api/client/json_client.py
+++ b/airflow/api/client/json_client.py
@@ -20,7 +20,7 @@ import requests
 
 
 class Client(api_client.Client):
-    def trigger_dag(self, dag_id, run_id=None, conf=None):
+    def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
         endpoint = '/api/experimental/dags/{}/dag_runs'.format(dag_id)
         url = urljoin(self._api_base_url, endpoint)
 
@@ -29,6 +29,7 @@ class Client(api_client.Client):
                              json={
                                  "run_id": run_id,
                                  "conf": conf,
+                                 "execution_date": execution_date,
                              })
 
         if not resp.ok:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e0f5c0cb/airflow/api/client/local_client.py
----------------------------------------------------------------------
diff --git a/airflow/api/client/local_client.py b/airflow/api/client/local_client.py
index 7c2435b..a4d1f93 100644
--- a/airflow/api/client/local_client.py
+++ b/airflow/api/client/local_client.py
@@ -17,6 +17,9 @@ from airflow.api.common.experimental import trigger_dag
 
 
 class Client(api_client.Client):
-    def trigger_dag(self, dag_id, run_id=None, conf=None):
-        dr = trigger_dag.trigger_dag(dag_id=dag_id, run_id=run_id, conf=conf)
+    def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
+        dr = trigger_dag.trigger_dag(dag_id=dag_id,
+                                     run_id=run_id,
+                                     conf=conf,
+                                     execution_date=None)
         return "Created {}".format(dr)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e0f5c0cb/airflow/api/common/experimental/trigger_dag.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py
index 36abc99..0905017 100644
--- a/airflow/api/common/experimental/trigger_dag.py
+++ b/airflow/api/common/experimental/trigger_dag.py
@@ -22,7 +22,7 @@ from airflow.utils.state import State
 import logging
 
 
-def trigger_dag(dag_id, run_id=None, conf=None):
+def trigger_dag(dag_id, run_id=None, conf=None, execution_date=None):
     dagbag = DagBag()
 
     if dag_id not in dagbag.dags:
@@ -30,7 +30,8 @@ def trigger_dag(dag_id, run_id=None, conf=None):
 
     dag = dagbag.get_dag(dag_id)
 
-    execution_date = datetime.now()
+    if not execution_date:
+        execution_date = datetime.now()
 
     if not run_id:
         run_id = "manual__{0}".format(execution_date.isoformat())

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e0f5c0cb/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index fc5a242..d55fdfc 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -182,7 +182,8 @@ def trigger_dag(args):
     try:
         message = api_client.trigger_dag(dag_id=args.dag_id,
                                          run_id=args.run_id,
-                                         conf=args.conf)
+                                         conf=args.conf,
+                                         execution_date=args.exec_date)
     except IOError as err:
         logging.error(err)
         raise AirflowException(err)
@@ -1159,6 +1160,9 @@ class CLIFactory(object):
         'conf': Arg(
             ('-c', '--conf'),
             "JSON string that gets pickled into the DagRun's conf attribute"),
+        'exec_date': Arg(
+            ("-e", "--exec_date"), help="The execution date of the DAG",
+            type=parsedate),
         # pool
         'pool_set': Arg(
             ("-s", "--set"),
@@ -1406,7 +1410,7 @@ class CLIFactory(object):
         }, {
             'func': trigger_dag,
             'help': "Trigger a DAG run",
-            'args': ('dag_id', 'subdir', 'run_id', 'conf'),
+            'args': ('dag_id', 'subdir', 'run_id', 'conf', 'exec_date'),
         }, {
             'func': pool,
             'help': "CRUD operations on pools",

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e0f5c0cb/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 1a0919a..0bd744e 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2834,6 +2834,24 @@ class DAG(BaseDag, LoggingMixin):
 
         return active_dates
 
+    @provide_session
+    def get_dagrun(self, execution_date, session=None):
+        """
+        Returns the dag run for a given execution date if it exists, otherwise
+        none.
+        :param execution_date: The execution date of the DagRun to find.
+        :param session:
+        :return: The DagRun if found, otherwise None.
+        """
+        dagrun = (
+            session.query(DagRun)
+            .filter(
+                DagRun.dag_id == self.dag_id,
+                DagRun.execution_date == execution_date)
+            .first())
+
+        return dagrun
+
     @property
     def latest_execution_date(self):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e0f5c0cb/airflow/www/api/experimental/endpoints.py
----------------------------------------------------------------------
diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py
index bccbed6..9b248c1 100644
--- a/airflow/www/api/experimental/endpoints.py
+++ b/airflow/www/api/experimental/endpoints.py
@@ -22,17 +22,22 @@ from airflow.www.app import csrf
 from flask import (
     g, Markup, Blueprint, redirect, jsonify, abort, request, current_app, send_file
 )
+from datetime import datetime
+
+_log = logging.getLogger(__name__)
 
 requires_authentication = airflow.api.api_auth.requires_authentication
 
 api_experimental = Blueprint('api_experimental', __name__)
 
+
 @csrf.exempt
 @api_experimental.route('/dags/<string:dag_id>/dag_runs', methods=['POST'])
 @requires_authentication
 def trigger_dag(dag_id):
     """
-    Trigger a new dag run for a Dag
+    Trigger a new dag run for a Dag with an execution date of now unless
+    specified in the data.
     """
     data = request.get_json(force=True)
 
@@ -44,16 +49,35 @@ def trigger_dag(dag_id):
     if 'conf' in data:
         conf = data['conf']
 
+    execution_date = None
+    if 'execution_date' in data:
+        execution_date = data['execution_date']
+
+        # Convert string datetime into actual datetime
+        try:
+            execution_date = datetime.strptime(execution_date,
+                                               '%Y-%m-%dT%H:%M:%S')
+        except ValueError:
+            error_message = (
+                'Given execution date, {}, could not be identified '
+                'as a date. Example date format: 2015-11-16T14:34:15'
+                .format(execution_date))
+            _log.info(error_message)
+            response = jsonify({'error': error_message})
+            response.status_code = 400
+
+            return response
+
     try:
-        dr = trigger.trigger_dag(dag_id, run_id, conf)
+        dr = trigger.trigger_dag(dag_id, run_id, conf, execution_date)
     except AirflowException as err:
-        logging.error(err)
+        _log.error(err)
         response = jsonify(error="{}".format(err))
         response.status_code = 404
         return response
 
     if getattr(g, 'user', None):
-        logging.info("User {} created {}".format(g.user, dr))
+        _log.info("User {} created {}".format(g.user, dr))
 
     response = jsonify(message="Created {}".format(dr))
     return response

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e0f5c0cb/tests/www/api/experimental/test_endpoints.py
----------------------------------------------------------------------
diff --git a/tests/www/api/experimental/test_endpoints.py b/tests/www/api/experimental/test_endpoints.py
index 3046584..bc41171 100644
--- a/tests/www/api/experimental/test_endpoints.py
+++ b/tests/www/api/experimental/test_endpoints.py
@@ -14,6 +14,7 @@
 import unittest
 
 from datetime import datetime
+from airflow.models import DagBag
 
 import json
 
@@ -59,4 +60,41 @@ class ApiExperimentalTests(unittest.TestCase):
         )
         self.assertEqual(404, response.status_code)
 
+    def test_trigger_dag_for_date(self):
+        url_template = '/api/experimental/dags/{}/dag_runs'
+        dag_id = 'example_bash_operator'
+        now = datetime.now()
+        execution_date = datetime(now.year, now.month, now.day, now.hour + 1)
+        datetime_string = execution_date.isoformat()
+
+        # Test Correct execution
+        response = self.app.post(
+            url_template.format(dag_id),
+            data=json.dumps(dict(execution_date=execution_date.isoformat())),
+            content_type="application/json"
+        )
+        self.assertEqual(200, response.status_code)
+
+        dagbag = DagBag()
+        dag = dagbag.get_dag(dag_id)
+        dag_run = dag.get_dagrun(execution_date)
+        self.assertTrue(dag_run,
+                        'Dag Run not found for execution date {}'
+                        .format(execution_date))
+
+        # Test error for nonexistent dag
+        response = self.app.post(
+            url_template.format('does_not_exist_dag'),
+            data=json.dumps(dict(execution_date=execution_date.isoformat())),
+            content_type="application/json"
+        )
+        self.assertEqual(404, response.status_code)
+
+        # Test error for bad datetime format
+        response = self.app.post(
+            url_template.format(dag_id),
+            data=json.dumps(dict(execution_date='not_a_datetime')),
+            content_type="application/json"
+        )
+        self.assertEqual(400, response.status_code)