You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2016/05/18 17:50:06 UTC

[1/2] incubator-airflow git commit: AIRFLOW-124 Implement create_dagrun

Repository: incubator-airflow
Updated Branches:
  refs/heads/master f01854a4b -> 434323483


AIRFLOW-124 Implement create_dagrun

This adds the create_dagrun function to DAG and the staticmethod
DagRun.find. create_dagrun will create a dagrun including its tasks.

By having taskinstances created at dagrun instantiation time,
deadlocks that were tested for will not take place anymore. Tests
have been adjusted accordingly.

In addition, integrity has been improved by a bugfix to add_task
of the BaseOperator to make sure to always assign a Dag if it is
present to a task.

DagRun.find is a convenience function that returns the DagRuns
for a given dag. It makes sure to have a single place how to
find dagruns.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/cb562897
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/cb562897
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/cb562897

Branch: refs/heads/master
Commit: cb56289743d6f28db667e2eec2509b9e245b37c7
Parents: 72ab63e
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Sun May 15 12:45:20 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed May 18 12:30:50 2016 +0200

----------------------------------------------------------------------
 airflow/bin/cli.py | 41 +++++++++++----------
 airflow/jobs.py    | 14 +++----
 airflow/models.py  | 97 ++++++++++++++++++++++++++++++++++++++++++++++++-
 tests/core.py      | 33 ++++++-----------
 tests/jobs.py      |  6 +--
 5 files changed, 137 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cb562897/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index d735bad..53e758f 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -118,29 +118,32 @@ def backfill(args, dag=None):
 
 
 def trigger_dag(args):
-    session = settings.Session()
-    # TODO: verify dag_id
+    dag = get_dag(args)
+
+    if not dag:
+        logging.error("Cannot find dag {}".format(args.dag_id))
+        sys.exit(1)
+
     execution_date = datetime.now()
     run_id = args.run_id or "manual__{0}".format(execution_date.isoformat())
-    dr = session.query(DagRun).filter(
-        DagRun.dag_id == args.dag_id, DagRun.run_id == run_id).first()
 
-    conf = {}
-    if args.conf:
-        conf = json.loads(args.conf)
+    dr = DagRun.find(dag_id=args.dag_id, run_id=run_id)
     if dr:
-        logging.error("This run_id already exists")
-    else:
-        trigger = DagRun(
-            dag_id=args.dag_id,
-            run_id=run_id,
-            execution_date=execution_date,
-            state=State.RUNNING,
-            conf=conf,
-            external_trigger=True)
-        session.add(trigger)
-        logging.info("Created {}".format(trigger))
-    session.commit()
+        logging.error("This run_id {} already exists".format(run_id))
+        raise AirflowException()
+
+    run_conf = {}
+    if args.conf:
+        run_conf = json.loads(args.conf)
+
+    trigger = dag.create_dagrun(
+        run_id=run_id,
+        execution_date=execution_date,
+        state=State.RUNNING,
+        conf=run_conf,
+        external_trigger=True
+    )
+    logging.info("Created {}".format(trigger))
 
 
 def variables(args):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cb562897/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 7244d84..7a8eb33 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -386,12 +386,11 @@ class SchedulerJob(BaseJob):
         if dag.schedule_interval:
             DagRun = models.DagRun
             session = settings.Session()
-            qry = session.query(DagRun).filter(
-                DagRun.dag_id == dag.dag_id,
-                DagRun.external_trigger == False,
-                DagRun.state == State.RUNNING,
+            active_runs = DagRun.find(
+                dag_id=dag.dag_id,
+                state=State.RUNNING,
+                external_trigger=False
             )
-            active_runs = qry.all()
             if len(active_runs) >= dag.max_active_runs:
                 return
             for dr in active_runs:
@@ -457,16 +456,13 @@ class SchedulerJob(BaseJob):
                 return
 
             if next_run_date and schedule_end and schedule_end <= datetime.now():
-                next_run = DagRun(
-                    dag_id=dag.dag_id,
+                next_run = dag.create_dagrun(
                     run_id='scheduled__' + next_run_date.isoformat(),
                     execution_date=next_run_date,
                     start_date=datetime.now(),
                     state=State.RUNNING,
                     external_trigger=False
                 )
-                session.add(next_run)
-                session.commit()
                 return next_run
 
     def process_dag(self, dag, queue):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cb562897/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 0f664a0..67958f2 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1935,7 +1935,8 @@ class BaseOperator(object):
                 "The DAG assigned to {} can not be changed.".format(self))
         elif self.task_id not in dag.task_dict:
             dag.add_task(self)
-            self._dag = dag
+
+        self._dag = dag
 
     def has_dag(self):
         """
@@ -3073,6 +3074,56 @@ class DAG(LoggingMixin):
         args = parser.parse_args()
         args.func(args, self)
 
+    @provide_session
+    def create_dagrun(self,
+                      run_id,
+                      execution_date,
+                      state,
+                      start_date=None,
+                      external_trigger=False,
+                      conf=None,
+                      session=None):
+        """
+        Creates a dag run from this dag including the tasks associated with this dag. Returns the dag
+        run.
+        :param run_id: defines the the run id for this dag run
+        :type run_id: string
+        :param execution_date: the execution date of this dag run
+        :type execution_date: datetime
+        :param state: the state of the dag run
+        :type state: State
+        :param start_date: the date this dag run should be evaluated
+        :type state_date: datetime
+        :param external_trigger: whether this dag run is externally triggered
+        :type external_trigger: bool
+        :param session: database session
+        :type session: Session
+        """
+        run = DagRun(
+            dag_id=self.dag_id,
+            run_id=run_id,
+            execution_date=execution_date,
+            start_date=start_date,
+            external_trigger=external_trigger,
+            conf=conf,
+            state=state
+        )
+        session.add(run)
+
+        # create the associated taskinstances
+        # state is None at the moment of creation
+        for task in self.tasks:
+            if task.adhoc:
+                continue
+
+            ti = TaskInstance(task, execution_date)
+            session.add(ti)
+
+        session.commit()
+
+        run.refresh_from_db()
+        return run
+
 
 class Chart(Base):
     __tablename__ = "chart"
@@ -3368,6 +3419,50 @@ class DagRun(Base):
     def id_for_date(klass, date, prefix=ID_FORMAT_PREFIX):
         return prefix.format(date.isoformat()[:19])
 
+    @provide_session
+    def refresh_from_db(self, session=None):
+        """
+        Reloads the current dagrun from the database
+        :param session: database session
+        """
+        DR = DagRun
+
+        dr = session.query(DR).filter(
+            DR.dag_id == self.dag_id,
+            DR.execution_date == self.execution_date,
+            DR.run_id == self.run_id
+        ).one()
+        if dr:
+            self.id = dr.id
+            self.state = dr.state
+
+    @staticmethod
+    @provide_session
+    def find(dag_id, run_id=None, state=None, external_trigger=None, session=None):
+        """
+        Returns a set of dag runs for the given search criteria.
+        :param run_id: defines the the run id for this dag run
+        :type run_id: string
+        :param state: the state of the dag run
+        :type state: State
+        :param external_trigger: whether this dag run is externally triggered
+        :type external_trigger: bool
+        :param session: database session
+        :type session: Session
+        """
+        DR = DagRun
+
+        qry = session.query(DR).filter(DR.dag_id == dag_id)
+        if run_id:
+            qry = qry.filter(DR.run_id == run_id)
+        if state:
+            qry = qry.filter(DR.state == state)
+        if external_trigger:
+            qry = qry.filter(DR.external_trigger == external_trigger)
+        dr = qry.all()
+
+        return dr
+
 
 class Pool(Base):
     __tablename__ = "slot_pool"

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cb562897/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 8c33de2..4b5d563 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -116,6 +116,7 @@ class CoreTest(unittest.TestCase):
                 .format(dag_run.execution_date))
         assert dag_run.state == State.RUNNING
         assert dag_run.external_trigger == False
+        dag.clear()
 
     def test_schedule_dag_fake_scheduled_previous(self):
         """
@@ -131,14 +132,10 @@ class CoreTest(unittest.TestCase):
             owner='Also fake',
             start_date=DEFAULT_DATE))
         scheduler = jobs.SchedulerJob(test_mode=True)
-        trigger = models.DagRun(
-            dag_id=dag.dag_id,
-            run_id=models.DagRun.id_for_date(DEFAULT_DATE),
-            execution_date=DEFAULT_DATE,
-            state=State.SUCCESS,
-            external_trigger=True)
-        settings.Session().add(trigger)
-        settings.Session().commit()
+        dag.create_dagrun(run_id=models.DagRun.id_for_date(DEFAULT_DATE),
+                          execution_date=DEFAULT_DATE,
+                          state=State.SUCCESS,
+                          external_trigger=True)
         dag_run = scheduler.schedule_dag(dag)
         assert dag_run is not None
         assert dag_run.dag_id == dag.dag_id
@@ -166,6 +163,7 @@ class CoreTest(unittest.TestCase):
 
         assert dag_run is not None
         assert dag_run2 is None
+        dag.clear()
 
     def test_schedule_dag_start_end_dates(self):
         """
@@ -180,16 +178,13 @@ class CoreTest(unittest.TestCase):
                   start_date=start_date,
                   end_date=end_date,
                   schedule_interval=delta)
+        dag.add_task(models.BaseOperator(task_id='faketastic',
+                                         owner='Also fake'))
 
         # Create and schedule the dag runs
         dag_runs = []
         scheduler = jobs.SchedulerJob(test_mode=True)
         for i in range(runs):
-            date = dag.start_date + i * delta
-            task = models.BaseOperator(task_id='faketastic__%s' % i,
-                                       owner='Also fake',
-                                       start_date=date)
-            dag.task_dict[task.task_id] = task
             dag_runs.append(scheduler.schedule_dag(dag))
 
         additional_dag_run = scheduler.schedule_dag(dag)
@@ -219,19 +214,12 @@ class CoreTest(unittest.TestCase):
         dag = DAG(TEST_DAG_ID + 'test_schedule_dag_no_end_date_up_to_today_only',
                   start_date=start_date,
                   schedule_interval=delta)
+        dag.add_task(models.BaseOperator(task_id='faketastic',
+                                         owner='Also fake'))
 
         dag_runs = []
         scheduler = jobs.SchedulerJob(test_mode=True)
         for i in range(runs):
-            # Create the DagRun
-            date = dag.start_date + i * delta
-            task = models.BaseOperator(task_id='faketastic__%s' % i,
-                                       owner='Also fake',
-                                       start_date=date)
-
-            dag.task_dict[task.task_id] = task
-
-            # Schedule the DagRun
             dag_run = scheduler.schedule_dag(dag)
             dag_runs.append(dag_run)
 
@@ -730,6 +718,7 @@ class CliTests(unittest.TestCase):
             cli.trigger_dag,
             self.parser.parse_args([
                 'trigger_dag', 'example_bash_operator',
+                '--run_id', 'trigger_dag_xxx',
                 '-c', 'NOT JSON'])
         )
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/cb562897/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 6802aae..1d2c614 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -252,9 +252,9 @@ class SchedulerJobTest(unittest.TestCase):
 
     def test_dagrun_deadlock(self):
         """
-        Deadlocked DagRun is marked a failure
+        Do not deadlock
 
-        Test that a deadlocked dagrun is marked as a failure by having
+        Test that a dagrun is marked as a running by having
         depends_on_past and an execution_date after the start_date
         """
         self.evaluate_dagrun(
@@ -263,7 +263,7 @@ class SchedulerJobTest(unittest.TestCase):
                 'test_depends_on_past': None,
                 'test_depends_on_past_2': None,
             },
-            dagrun_state=State.FAILED,
+            dagrun_state=State.RUNNING,
             advance_execution_date=True)
 
     def test_scheduler_pooled_tasks(self):


[2/2] incubator-airflow git commit: Merge branch 'dag_run'

Posted by bo...@apache.org.
Merge branch 'dag_run'


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/43432348
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/43432348
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/43432348

Branch: refs/heads/master
Commit: 4343234836abdcdafc6176b3e4ace14b4c5e82cf
Parents: f01854a cb56289
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Wed May 18 19:49:47 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed May 18 19:49:47 2016 +0200

----------------------------------------------------------------------
 airflow/bin/cli.py | 41 +++++++++++----------
 airflow/jobs.py    | 14 +++----
 airflow/models.py  | 97 ++++++++++++++++++++++++++++++++++++++++++++++++-
 tests/core.py      | 33 ++++++-----------
 tests/jobs.py      |  6 +--
 5 files changed, 137 insertions(+), 54 deletions(-)
----------------------------------------------------------------------