You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/01/10 08:09:01 UTC
incubator-airflow git commit: [AIRFLOW-489] Allow specifying
execution date in trigger_dag API
Repository: incubator-airflow
Updated Branches:
refs/heads/master 2d54d8abd -> e0f5c0cb8
[AIRFLOW-489] Allow specifying execution date in trigger_dag API
Closes #1946 from robin-miller-
ow/release/New_API_Functionality
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e0f5c0cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e0f5c0cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e0f5c0cb
Branch: refs/heads/master
Commit: e0f5c0cb8b849c6df60297927f371b750521487a
Parents: 2d54d8a
Author: robin_miller_ow <ro...@affiliate.oliverwyman.com>
Authored: Tue Jan 10 09:08:28 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jan 10 09:08:32 2017 +0100
----------------------------------------------------------------------
airflow/api/client/api_client.py | 2 +-
airflow/api/client/json_client.py | 3 +-
airflow/api/client/local_client.py | 7 ++--
airflow/api/common/experimental/trigger_dag.py | 5 +--
airflow/bin/cli.py | 8 +++--
airflow/models.py | 18 ++++++++++
airflow/www/api/experimental/endpoints.py | 32 ++++++++++++++---
tests/www/api/experimental/test_endpoints.py | 38 +++++++++++++++++++++
8 files changed, 101 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e0f5c0cb/airflow/api/client/api_client.py
----------------------------------------------------------------------
diff --git a/airflow/api/client/api_client.py b/airflow/api/client/api_client.py
index bdb9a61..6a77538 100644
--- a/airflow/api/client/api_client.py
+++ b/airflow/api/client/api_client.py
@@ -19,7 +19,7 @@ class Client:
self._api_base_url = api_base_url
self._auth = auth
- def trigger_dag(self, dag_id, run_id=None, conf=None):
+ def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
"""
Creates a dag run for the specified dag
:param dag_id:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e0f5c0cb/airflow/api/client/json_client.py
----------------------------------------------------------------------
diff --git a/airflow/api/client/json_client.py b/airflow/api/client/json_client.py
index 4d8e87a..d74fc63 100644
--- a/airflow/api/client/json_client.py
+++ b/airflow/api/client/json_client.py
@@ -20,7 +20,7 @@ import requests
class Client(api_client.Client):
- def trigger_dag(self, dag_id, run_id=None, conf=None):
+ def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
endpoint = '/api/experimental/dags/{}/dag_runs'.format(dag_id)
url = urljoin(self._api_base_url, endpoint)
@@ -29,6 +29,7 @@ class Client(api_client.Client):
json={
"run_id": run_id,
"conf": conf,
+ "execution_date": execution_date,
})
if not resp.ok:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e0f5c0cb/airflow/api/client/local_client.py
----------------------------------------------------------------------
diff --git a/airflow/api/client/local_client.py b/airflow/api/client/local_client.py
index 7c2435b..a4d1f93 100644
--- a/airflow/api/client/local_client.py
+++ b/airflow/api/client/local_client.py
@@ -17,6 +17,9 @@ from airflow.api.common.experimental import trigger_dag
class Client(api_client.Client):
- def trigger_dag(self, dag_id, run_id=None, conf=None):
- dr = trigger_dag.trigger_dag(dag_id=dag_id, run_id=run_id, conf=conf)
+ def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
+ dr = trigger_dag.trigger_dag(dag_id=dag_id,
+ run_id=run_id,
+ conf=conf,
+ execution_date=None)
return "Created {}".format(dr)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e0f5c0cb/airflow/api/common/experimental/trigger_dag.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py
index 36abc99..0905017 100644
--- a/airflow/api/common/experimental/trigger_dag.py
+++ b/airflow/api/common/experimental/trigger_dag.py
@@ -22,7 +22,7 @@ from airflow.utils.state import State
import logging
-def trigger_dag(dag_id, run_id=None, conf=None):
+def trigger_dag(dag_id, run_id=None, conf=None, execution_date=None):
dagbag = DagBag()
if dag_id not in dagbag.dags:
@@ -30,7 +30,8 @@ def trigger_dag(dag_id, run_id=None, conf=None):
dag = dagbag.get_dag(dag_id)
- execution_date = datetime.now()
+ if not execution_date:
+ execution_date = datetime.now()
if not run_id:
run_id = "manual__{0}".format(execution_date.isoformat())
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e0f5c0cb/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index fc5a242..d55fdfc 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -182,7 +182,8 @@ def trigger_dag(args):
try:
message = api_client.trigger_dag(dag_id=args.dag_id,
run_id=args.run_id,
- conf=args.conf)
+ conf=args.conf,
+ execution_date=args.exec_date)
except IOError as err:
logging.error(err)
raise AirflowException(err)
@@ -1159,6 +1160,9 @@ class CLIFactory(object):
'conf': Arg(
('-c', '--conf'),
"JSON string that gets pickled into the DagRun's conf attribute"),
+ 'exec_date': Arg(
+ ("-e", "--exec_date"), help="The execution date of the DAG",
+ type=parsedate),
# pool
'pool_set': Arg(
("-s", "--set"),
@@ -1406,7 +1410,7 @@ class CLIFactory(object):
}, {
'func': trigger_dag,
'help': "Trigger a DAG run",
- 'args': ('dag_id', 'subdir', 'run_id', 'conf'),
+ 'args': ('dag_id', 'subdir', 'run_id', 'conf', 'exec_date'),
}, {
'func': pool,
'help': "CRUD operations on pools",
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e0f5c0cb/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 1a0919a..0bd744e 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2834,6 +2834,24 @@ class DAG(BaseDag, LoggingMixin):
return active_dates
+ @provide_session
+ def get_dagrun(self, execution_date, session=None):
+ """
+ Returns the dag run for a given execution date if it exists, otherwise
+ none.
+ :param execution_date: The execution date of the DagRun to find.
+ :param session:
+ :return: The DagRun if found, otherwise None.
+ """
+ dagrun = (
+ session.query(DagRun)
+ .filter(
+ DagRun.dag_id == self.dag_id,
+ DagRun.execution_date == execution_date)
+ .first())
+
+ return dagrun
+
@property
def latest_execution_date(self):
"""
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e0f5c0cb/airflow/www/api/experimental/endpoints.py
----------------------------------------------------------------------
diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py
index bccbed6..9b248c1 100644
--- a/airflow/www/api/experimental/endpoints.py
+++ b/airflow/www/api/experimental/endpoints.py
@@ -22,17 +22,22 @@ from airflow.www.app import csrf
from flask import (
g, Markup, Blueprint, redirect, jsonify, abort, request, current_app, send_file
)
+from datetime import datetime
+
+_log = logging.getLogger(__name__)
requires_authentication = airflow.api.api_auth.requires_authentication
api_experimental = Blueprint('api_experimental', __name__)
+
@csrf.exempt
@api_experimental.route('/dags/<string:dag_id>/dag_runs', methods=['POST'])
@requires_authentication
def trigger_dag(dag_id):
"""
- Trigger a new dag run for a Dag
+ Trigger a new dag run for a Dag with an execution date of now unless
+ specified in the data.
"""
data = request.get_json(force=True)
@@ -44,16 +49,35 @@ def trigger_dag(dag_id):
if 'conf' in data:
conf = data['conf']
+ execution_date = None
+ if 'execution_date' in data:
+ execution_date = data['execution_date']
+
+ # Convert string datetime into actual datetime
+ try:
+ execution_date = datetime.strptime(execution_date,
+ '%Y-%m-%dT%H:%M:%S')
+ except ValueError:
+ error_message = (
+ 'Given execution date, {}, could not be identified '
+ 'as a date. Example date format: 2015-11-16T14:34:15'
+ .format(execution_date))
+ _log.info(error_message)
+ response = jsonify({'error': error_message})
+ response.status_code = 400
+
+ return response
+
try:
- dr = trigger.trigger_dag(dag_id, run_id, conf)
+ dr = trigger.trigger_dag(dag_id, run_id, conf, execution_date)
except AirflowException as err:
- logging.error(err)
+ _log.error(err)
response = jsonify(error="{}".format(err))
response.status_code = 404
return response
if getattr(g, 'user', None):
- logging.info("User {} created {}".format(g.user, dr))
+ _log.info("User {} created {}".format(g.user, dr))
response = jsonify(message="Created {}".format(dr))
return response
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e0f5c0cb/tests/www/api/experimental/test_endpoints.py
----------------------------------------------------------------------
diff --git a/tests/www/api/experimental/test_endpoints.py b/tests/www/api/experimental/test_endpoints.py
index 3046584..bc41171 100644
--- a/tests/www/api/experimental/test_endpoints.py
+++ b/tests/www/api/experimental/test_endpoints.py
@@ -14,6 +14,7 @@
import unittest
from datetime import datetime
+from airflow.models import DagBag
import json
@@ -59,4 +60,41 @@ class ApiExperimentalTests(unittest.TestCase):
)
self.assertEqual(404, response.status_code)
+ def test_trigger_dag_for_date(self):
+ url_template = '/api/experimental/dags/{}/dag_runs'
+ dag_id = 'example_bash_operator'
+ now = datetime.now()
+ execution_date = datetime(now.year, now.month, now.day, now.hour + 1)
+ datetime_string = execution_date.isoformat()
+
+ # Test Correct execution
+ response = self.app.post(
+ url_template.format(dag_id),
+ data=json.dumps(dict(execution_date=execution_date.isoformat())),
+ content_type="application/json"
+ )
+ self.assertEqual(200, response.status_code)
+
+ dagbag = DagBag()
+ dag = dagbag.get_dag(dag_id)
+ dag_run = dag.get_dagrun(execution_date)
+ self.assertTrue(dag_run,
+ 'Dag Run not found for execution date {}'
+ .format(execution_date))
+
+ # Test error for nonexistent dag
+ response = self.app.post(
+ url_template.format('does_not_exist_dag'),
+ data=json.dumps(dict(execution_date=execution_date.isoformat())),
+ content_type="application/json"
+ )
+ self.assertEqual(404, response.status_code)
+
+ # Test error for bad datetime format
+ response = self.app.post(
+ url_template.format(dag_id),
+ data=json.dumps(dict(execution_date='not_a_datetime')),
+ content_type="application/json"
+ )
+ self.assertEqual(400, response.status_code)