You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by fo...@apache.org on 2018/05/15 06:51:43 UTC

incubator-airflow git commit: [AIRFLOW-1929] Modifying TriggerDagRunOperator to accept execution_date

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 8fa0bbd56 -> 089c996fb


[AIRFLOW-1929] Modifying TriggerDagRunOperator to accept execution_date

Closes #3246 from sreenathkamath/AIRFLOW-1929


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/089c996f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/089c996f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/089c996f

Branch: refs/heads/master
Commit: 089c996fbd9ecb0014dbefedff232e8699ce6283
Parents: 8fa0bbd
Author: Sreenath Kamath <sr...@gmail.com>
Authored: Tue May 15 08:51:36 2018 +0200
Committer: Fokko Driesprong <fo...@godatadriven.com>
Committed: Tue May 15 08:51:36 2018 +0200

----------------------------------------------------------------------
 airflow/api/common/experimental/trigger_dag.py | 11 ++++---
 airflow/operators/dagrun_operator.py           | 32 ++++++++++-----------
 tests/core.py                                  | 20 +++++++++++++
 3 files changed, 42 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/089c996f/airflow/api/common/experimental/trigger_dag.py
----------------------------------------------------------------------
diff --git a/airflow/api/common/experimental/trigger_dag.py b/airflow/api/common/experimental/trigger_dag.py
index 7ff8184..67c43e1 100644
--- a/airflow/api/common/experimental/trigger_dag.py
+++ b/airflow/api/common/experimental/trigger_dag.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,7 +25,8 @@ from airflow.utils import timezone
 from airflow.utils.state import State
 
 
-def trigger_dag(dag_id, run_id=None, conf=None, execution_date=None):
+def trigger_dag(dag_id, run_id=None, conf=None, execution_date=None,
+                replace_microseconds=True):
     dagbag = DagBag()
 
     if dag_id not in dagbag.dags:
@@ -37,7 +38,9 @@ def trigger_dag(dag_id, run_id=None, conf=None, execution_date=None):
         execution_date = timezone.utcnow()
 
     assert timezone.is_localized(execution_date)
-    execution_date = execution_date.replace(microsecond=0)
+
+    if replace_microseconds:
+        execution_date = execution_date.replace(microsecond=0)
 
     if not run_id:
         run_id = "manual__{0}".format(execution_date.isoformat())

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/089c996f/airflow/operators/dagrun_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py
index feb3612..53814af 100644
--- a/airflow/operators/dagrun_operator.py
+++ b/airflow/operators/dagrun_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -17,12 +17,12 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from airflow.models import BaseOperator, DagBag
+from airflow.models import BaseOperator
 from airflow.utils import timezone
-from airflow.utils.db import create_session
 from airflow.utils.decorators import apply_defaults
-from airflow.utils.state import State
-from airflow import settings
+from airflow.api.common.experimental.trigger_dag import trigger_dag
+
+import json
 
 
 class DagRunOrder(object):
@@ -47,6 +47,8 @@ class TriggerDagRunOperator(BaseOperator):
         to your tasks while executing that DAG run. Your function header
         should look like ``def foo(context, dag_run_obj):``
     :type python_callable: python callable
+    :param execution_date: Execution date for the dag
+    :type execution_date: datetime.datetime
     """
     template_fields = tuple()
     template_ext = tuple()
@@ -57,26 +59,22 @@ class TriggerDagRunOperator(BaseOperator):
             self,
             trigger_dag_id,
             python_callable=None,
+            execution_date=None,
             *args, **kwargs):
         super(TriggerDagRunOperator, self).__init__(*args, **kwargs)
         self.python_callable = python_callable
         self.trigger_dag_id = trigger_dag_id
+        self.execution_date = execution_date
 
     def execute(self, context):
         dro = DagRunOrder(run_id='trig__' + timezone.utcnow().isoformat())
         if self.python_callable is not None:
             dro = self.python_callable(context, dro)
         if dro:
-            with create_session() as session:
-                dbag = DagBag(settings.DAGS_FOLDER)
-                trigger_dag = dbag.get_dag(self.trigger_dag_id)
-                dr = trigger_dag.create_dagrun(
-                    run_id=dro.run_id,
-                    state=State.RUNNING,
-                    conf=dro.payload,
-                    external_trigger=True)
-                self.log.info("Creating DagRun %s", dr)
-                session.add(dr)
-                session.commit()
+            trigger_dag(dag_id=self.trigger_dag_id,
+                        run_id=dro.run_id,
+                        conf=json.dumps(dro.payload),
+                        execution_date=self.execution_date,
+                        replace_microseconds=False)
         else:
             self.log.info("Criteria not met, moving on")

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/089c996f/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 0230ecf..ddcec42 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -949,6 +949,26 @@ class CoreTest(unittest.TestCase):
         self.assertEqual(run_command('echo "foo bar"'), u'foo bar\n')
         self.assertRaises(AirflowConfigException, run_command, 'bash -c "exit 1"')
 
+    def test_trigger_dagrun_with_execution_date(self):
+        utc_now = timezone.utcnow()
+        run_id = 'trig__' + utc_now.isoformat()
+
+        def payload_generator(context, object):
+            object.run_id = run_id
+            return object
+
+        task = TriggerDagRunOperator(task_id='test_trigger_dagrun_with_execution_date',
+                                     trigger_dag_id='example_bash_operator',
+                                     python_callable=payload_generator,
+                                     execution_date=utc_now,
+                                     dag=self.dag)
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+        dag_runs = models.DagRun.find(dag_id='example_bash_operator',
+                                      run_id=run_id)
+        self.assertEquals(len(dag_runs), 1)
+        dag_run = dag_runs[0]
+        self.assertEquals(dag_run.execution_date, utc_now)
+
 
 class CliTests(unittest.TestCase):