You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/04 15:04:29 UTC

[3/4] incubator-airflow git commit: Merge branch 'AIRFLOW-719' into AIRFLOW-719-3

Merge branch 'AIRFLOW-719' into AIRFLOW-719-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/15fd4d98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/15fd4d98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/15fd4d98

Branch: refs/heads/master
Commit: 15fd4d98d141766f81552d270c8b5c43b15f4f44
Parents: f2dae7d eb705fd
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Tue Apr 4 11:55:20 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Apr 4 11:55:20 2017 +0200

----------------------------------------------------------------------
 airflow/operators/latest_only_operator.py     |  30 +++-
 airflow/operators/python_operator.py          |  82 +++++++---
 airflow/ti_deps/deps/trigger_rule_dep.py      |   6 +-
 scripts/ci/requirements.txt                   |   1 +
 tests/dags/test_dagrun_short_circuit_false.py |  38 -----
 tests/models.py                               |  77 +++++-----
 tests/operators/__init__.py                   |   2 +
 tests/operators/latest_only_operator.py       |   2 +-
 tests/operators/python_operator.py            | 167 ++++++++++++++++++++-
 9 files changed, 301 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15fd4d98/tests/models.py
----------------------------------------------------------------------
diff --cc tests/models.py
index 43fccca,3e77894..a013f8a
--- a/tests/models.py
+++ b/tests/models.py
@@@ -223,59 -220,10 +220,43 @@@ class DagRunTest(unittest.TestCase)
      def test_id_for_date(self):
          run_id = models.DagRun.id_for_date(
              datetime.datetime(2015, 1, 2, 3, 4, 5, 6, None))
 -        self.assertEqual('scheduled__2015-01-02T03:04:05', run_id,
 -                         msg='Generated run_id did not match expectations: {0}'
 -                         .format(run_id))
 +        self.assertEqual(
 +            'scheduled__2015-01-02T03:04:05', run_id,
 +            'Generated run_id did not match expectations: {0}'.format(run_id))
 +
 +    def test_dagrun_find(self):
 +        session = settings.Session()
 +        now = datetime.datetime.now()
 +
 +        dag_id1 = "test_dagrun_find_externally_triggered"
 +        dag_run = models.DagRun(
 +            dag_id=dag_id1,
 +            run_id='manual__' + now.isoformat(),
 +            execution_date=now,
 +            start_date=now,
 +            state=State.RUNNING,
 +            external_trigger=True,
 +        )
 +        session.add(dag_run)
 +
 +        dag_id2 = "test_dagrun_find_not_externally_triggered"
 +        dag_run = models.DagRun(
 +            dag_id=dag_id2,
 +            run_id='manual__' + now.isoformat(),
 +            execution_date=now,
 +            start_date=now,
 +            state=State.RUNNING,
 +            external_trigger=False,
 +        )
 +        session.add(dag_run)
 +
 +        session.commit()
 +
 +        self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id1, external_trigger=True)))
 +        self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id1, external_trigger=False)))
 +        self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id2, external_trigger=True)))
 +        self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id2, external_trigger=False)))
  
-     def test_dagrun_running_when_upstream_skipped(self):
-         """
-         Tests that a DAG run is not failed when an upstream task is skipped
-         """
-         initial_task_states = {
-             'test_short_circuit_false': State.SUCCESS,
-             'test_state_skipped1': State.SKIPPED,
-             'test_state_skipped2': State.NONE,
-         }
-         # dags/test_dagrun_short_circuit_false.py
-         dag_run = self.create_dag_run('test_dagrun_short_circuit_false',
-                                       state=State.RUNNING,
-                                       task_states=initial_task_states)
-         updated_dag_state = dag_run.update_state()
-         self.assertEqual(State.RUNNING, updated_dag_state)
- 
      def test_dagrun_success_when_all_skipped(self):
          """
          Tests that a DAG run succeeds when all tasks are skipped