You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/05/15 06:51:43 UTC
incubator-airflow git commit: [AIRFLOW-1929] Modifying
TriggerDagRunOperator to accept execution_date
Repository: incubator-airflow
Updated Branches:
refs/heads/master 8fa0bbd56 -> 089c996fb
[AIRFLOW-1929] Modifying TriggerDagRunOperator to accept execution_date
Closes #3246 from sreenathkamath/AIRFLOW-1929
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/089c996f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/089c996f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/089c996f
Branch: refs/heads/master
Commit: 089c996fbd9ecb0014dbefedff232e8699ce6283
Parents: 8fa0bbd
Author: Sreenath Kamath <sr...@gmail.com>
Authored: Tue May 15 08:51:36 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Tue May 15 08:51:36 2018 +0200
----------------------------------------------------------------------
airflow/api/common/experimental/trigger_dag.py | 11 ++++---
airflow/operators/dagrun_operator.py | 32 ++++++++++-----------
tests/core.py | 20 +++++++++++++
3 files changed, 42 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/089c996f/airflow/api/common/experimental/trigger_dag.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py
index 7ff8184..67c43e1 100644
--- a/airflow/api/common/experimental/trigger_dag.py
+++ b/airflow/api/common/experimental/trigger_dag.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,7 +25,8 @@ from airflow.utils import timezone
from airflow.utils.state import State
-def trigger_dag(dag_id, run_id=None, conf=None, execution_date=None):
+def trigger_dag(dag_id, run_id=None, conf=None, execution_date=None,
+ replace_microseconds=True):
dagbag = DagBag()
if dag_id not in dagbag.dags:
@@ -37,7 +38,9 @@ def trigger_dag(dag_id, run_id=None, conf=None, execution_date=None):
execution_date = timezone.utcnow()
assert timezone.is_localized(execution_date)
- execution_date = execution_date.replace(microsecond=0)
+
+ if replace_microseconds:
+ execution_date = execution_date.replace(microsecond=0)
if not run_id:
run_id = "manual__{0}".format(execution_date.isoformat())
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/089c996f/airflow/operators/dagrun_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py
index feb3612..53814af 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -17,12 +17,12 @@
# specific language governing permissions and limitations
# under the License.
-from airflow.models import BaseOperator, DagBag
+from airflow.models import BaseOperator
from airflow.utils import timezone
-from airflow.utils.db import create_session
from airflow.utils.decorators import apply_defaults
-from airflow.utils.state import State
-from airflow import settings
+from airflow.api.common.experimental.trigger_dag import trigger_dag
+
+import json
class DagRunOrder(object):
@@ -47,6 +47,8 @@ class TriggerDagRunOperator(BaseOperator):
to your tasks while executing that DAG run. Your function header
should look like ``def foo(context, dag_run_obj):``
:type python_callable: python callable
+ :param execution_date: Execution date for the dag
+ :type execution_date: datetime.datetime
"""
template_fields = tuple()
template_ext = tuple()
@@ -57,26 +59,22 @@ class TriggerDagRunOperator(BaseOperator):
self,
trigger_dag_id,
python_callable=None,
+ execution_date=None,
*args, **kwargs):
super(TriggerDagRunOperator, self).__init__(*args, **kwargs)
self.python_callable = python_callable
self.trigger_dag_id = trigger_dag_id
+ self.execution_date = execution_date
def execute(self, context):
dro = DagRunOrder(run_id='trig__' + timezone.utcnow().isoformat())
if self.python_callable is not None:
dro = self.python_callable(context, dro)
if dro:
- with create_session() as session:
- dbag = DagBag(settings.DAGS_FOLDER)
- trigger_dag = dbag.get_dag(self.trigger_dag_id)
- dr = trigger_dag.create_dagrun(
- run_id=dro.run_id,
- state=State.RUNNING,
- conf=dro.payload,
- external_trigger=True)
- self.log.info("Creating DagRun %s", dr)
- session.add(dr)
- session.commit()
+ trigger_dag(dag_id=self.trigger_dag_id,
+ run_id=dro.run_id,
+ conf=json.dumps(dro.payload),
+ execution_date=self.execution_date,
+ replace_microseconds=False)
else:
self.log.info("Criteria not met, moving on")
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/089c996f/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 0230ecf..ddcec42 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -949,6 +949,26 @@ class CoreTest(unittest.TestCase):
self.assertEqual(run_command('echo "foo bar"'), u'foo bar\n')
self.assertRaises(AirflowConfigException, run_command, 'bash -c "exit 1"')
+ def test_trigger_dagrun_with_execution_date(self):
+ utc_now = timezone.utcnow()
+ run_id = 'trig__' + utc_now.isoformat()
+
+ def payload_generator(context, object):
+ object.run_id = run_id
+ return object
+
+ task = TriggerDagRunOperator(task_id='test_trigger_dagrun_with_execution_date',
+ trigger_dag_id='example_bash_operator',
+ python_callable=payload_generator,
+ execution_date=utc_now,
+ dag=self.dag)
+ task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+ dag_runs = models.DagRun.find(dag_id='example_bash_operator',
+ run_id=run_id)
+ self.assertEquals(len(dag_runs), 1)
+ dag_run = dag_runs[0]
+ self.assertEquals(dag_run.execution_date, utc_now)
+
class CliTests(unittest.TestCase):