You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/04 15:04:29 UTC
[3/4] incubator-airflow git commit: Merge branch 'AIRFLOW-719' into
AIRFLOW-719-3
Merge branch 'AIRFLOW-719' into AIRFLOW-719-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/15fd4d98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/15fd4d98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/15fd4d98
Branch: refs/heads/master
Commit: 15fd4d98d141766f81552d270c8b5c43b15f4f44
Parents: f2dae7d eb705fd
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Tue Apr 4 11:55:20 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Apr 4 11:55:20 2017 +0200
----------------------------------------------------------------------
airflow/operators/latest_only_operator.py | 30 +++-
airflow/operators/python_operator.py | 82 +++++++---
airflow/ti_deps/deps/trigger_rule_dep.py | 6 +-
scripts/ci/requirements.txt | 1 +
tests/dags/test_dagrun_short_circuit_false.py | 38 -----
tests/models.py | 77 +++++-----
tests/operators/__init__.py | 2 +
tests/operators/latest_only_operator.py | 2 +-
tests/operators/python_operator.py | 167 ++++++++++++++++++++-
9 files changed, 301 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15fd4d98/tests/models.py
----------------------------------------------------------------------
diff --cc tests/models.py
index 43fccca,3e77894..a013f8a
--- a/tests/models.py
+++ b/tests/models.py
@@@ -223,59 -220,10 +220,43 @@@ class DagRunTest(unittest.TestCase)
def test_id_for_date(self):
run_id = models.DagRun.id_for_date(
datetime.datetime(2015, 1, 2, 3, 4, 5, 6, None))
- self.assertEqual('scheduled__2015-01-02T03:04:05', run_id,
- msg='Generated run_id did not match expectations: {0}'
- .format(run_id))
+ self.assertEqual(
+ 'scheduled__2015-01-02T03:04:05', run_id,
+ 'Generated run_id did not match expectations: {0}'.format(run_id))
+
+ def test_dagrun_find(self):
+ session = settings.Session()
+ now = datetime.datetime.now()
+
+ dag_id1 = "test_dagrun_find_externally_triggered"
+ dag_run = models.DagRun(
+ dag_id=dag_id1,
+ run_id='manual__' + now.isoformat(),
+ execution_date=now,
+ start_date=now,
+ state=State.RUNNING,
+ external_trigger=True,
+ )
+ session.add(dag_run)
+
+ dag_id2 = "test_dagrun_find_not_externally_triggered"
+ dag_run = models.DagRun(
+ dag_id=dag_id2,
+ run_id='manual__' + now.isoformat(),
+ execution_date=now,
+ start_date=now,
+ state=State.RUNNING,
+ external_trigger=False,
+ )
+ session.add(dag_run)
+
+ session.commit()
+
+ self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id1, external_trigger=True)))
+ self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id1, external_trigger=False)))
+ self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id2, external_trigger=True)))
+ self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id2, external_trigger=False)))
- def test_dagrun_running_when_upstream_skipped(self):
- """
- Tests that a DAG run is not failed when an upstream task is skipped
- """
- initial_task_states = {
- 'test_short_circuit_false': State.SUCCESS,
- 'test_state_skipped1': State.SKIPPED,
- 'test_state_skipped2': State.NONE,
- }
- # dags/test_dagrun_short_circuit_false.py
- dag_run = self.create_dag_run('test_dagrun_short_circuit_false',
- state=State.RUNNING,
- task_states=initial_task_states)
- updated_dag_state = dag_run.update_state()
- self.assertEqual(State.RUNNING, updated_dag_state)
-
def test_dagrun_success_when_all_skipped(self):
"""
Tests that a DAG run succeeds when all tasks are skipped