You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jl...@apache.org on 2016/06/30 23:42:47 UTC
[1/4] incubator-airflow git commit: [AIRFLOW-301] Fix broken unit test
Repository: incubator-airflow
Updated Branches:
refs/heads/master 9f49f1285 -> 0a94cae4b
[AIRFLOW-301] Fix broken unit test
This unit tests always fails on the last day of the month, since it
tries to access a nonexistent day (like June 31st).
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f49e238b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f49e238b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f49e238b
Branch: refs/heads/master
Commit: f49e238b72c1701df8ad88a0be232b6619f03edb
Parents: efdbbb5
Author: jlowin <jl...@users.noreply.github.com>
Authored: Thu Jun 30 17:43:20 2016 -0400
Committer: jlowin <jl...@users.noreply.github.com>
Committed: Thu Jun 30 18:34:16 2016 -0400
----------------------------------------------------------------------
tests/models.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f49e238b/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index e4f5aa8..3549e8f 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -557,7 +557,7 @@ class TaskInstanceTest(unittest.TestCase):
ti.xcom_push(key=key, value=value)
self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), value)
ti.run()
- exec_date = exec_date.replace(day=exec_date.day + 1)
+ exec_date += datetime.timedelta(days=1)
ti = TI(
task=task, execution_date=exec_date)
ti.run()
[2/4] incubator-airflow git commit: [AIRFLOW-100] Add
execution_date_fn to ExternalTaskSensor
Posted by jl...@apache.org.
[AIRFLOW-100] Add execution_date_fn to ExternalTaskSensor
Currently, ExternalTaskSensor only supports querying execution_dates
that are either the same as the ExternalTaskSensor's execution_date
or a fixed interval from that date (using `execution_delta`). This
adds the ability to provide a fn (`execution_date_fn`) that accepts
the current execution_date and can return any desired date for
querying. This is much more flexible. For example, it could
supply the last date of the previous month.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/efdbbb5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/efdbbb5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/efdbbb5d
Branch: refs/heads/master
Commit: efdbbb5d3beba49f9b633f0a25ce768f896c0a6a
Parents: 0965648
Author: jlowin <jl...@users.noreply.github.com>
Authored: Thu Jun 30 16:55:16 2016 -0400
Committer: jlowin <jl...@users.noreply.github.com>
Committed: Thu Jun 30 18:34:16 2016 -0400
----------------------------------------------------------------------
airflow/operators/sensors.py | 17 ++++++++++++++++-
airflow/utils/tests.py | 5 ++++-
tests/core.py | 39 +++++++++++++++++++++++++++++++++++++++
3 files changed, 59 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/efdbbb5d/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index e9b8885..9f7f380 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -179,8 +179,14 @@ class ExternalTaskSensor(BaseSensorOperator):
:type allowed_states: list
:param execution_delta: time difference with the previous execution to
look at, the default is the same execution_date as the current task.
- For yesterday, use [positive!] datetime.timedelta(days=1)
+ For yesterday, use [positive!] datetime.timedelta(days=1). Either
+ execution_delta or execution_date_fn can be passed to
+ ExternalTaskSensor, but not both.
:type execution_delta: datetime.timedelta
+ :param execution_date_fn: function that receives the current execution date
+ and returns the desired execution date to query. Either execution_delta
+ or execution_date_fn can be passed to ExternalTaskSensor, but not both.
+ :type execution_date_fn: callable
"""
@apply_defaults
@@ -190,16 +196,25 @@ class ExternalTaskSensor(BaseSensorOperator):
external_task_id,
allowed_states=None,
execution_delta=None,
+ execution_date_fn=None,
*args, **kwargs):
super(ExternalTaskSensor, self).__init__(*args, **kwargs)
self.allowed_states = allowed_states or [State.SUCCESS]
+ if execution_delta is not None and execution_date_fn is not None:
+ raise ValueError(
+ 'Only one of `execution_date` or `execution_date_fn` may'
+ 'be provided to ExternalTaskSensor; not both.')
+
self.execution_delta = execution_delta
+ self.execution_date_fn = execution_date_fn
self.external_dag_id = external_dag_id
self.external_task_id = external_task_id
def poke(self, context):
if self.execution_delta:
dttm = context['execution_date'] - self.execution_delta
+ elif self.execution_date_fn:
+ dttm = self.execution_date_fn(context['execution_date'])
else:
dttm = context['execution_date']
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/efdbbb5d/airflow/utils/tests.py
----------------------------------------------------------------------
diff --git a/airflow/utils/tests.py b/airflow/utils/tests.py
index 50490d3..83db6e8 100644
--- a/airflow/utils/tests.py
+++ b/airflow/utils/tests.py
@@ -16,7 +16,10 @@ import unittest
def skipUnlessImported(module, obj):
import importlib
- m = importlib.import_module(module)
+ try:
+ m = importlib.import_module(module)
+ except ImportError:
+ m = None
return unittest.skipUnless(
obj in dir(m),
"Skipping test because {} could not be imported from {}".format(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/efdbbb5d/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 24d2938..4f3197d 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -437,6 +437,45 @@ class CoreTest(unittest.TestCase):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+ def test_external_task_sensor_fn(self):
+ self.test_time_sensor()
+ # check that the execution_fn works
+ t = sensors.ExternalTaskSensor(
+ task_id='test_external_task_sensor_check_delta',
+ external_dag_id=TEST_DAG_ID,
+ external_task_id='time_sensor_check',
+ execution_date_fn=lambda dt: dt + timedelta(0),
+ allowed_states=['success'],
+ dag=self.dag)
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+ # double check that the execution is being called by failing the test
+ t2 = sensors.ExternalTaskSensor(
+ task_id='test_external_task_sensor_check_delta',
+ external_dag_id=TEST_DAG_ID,
+ external_task_id='time_sensor_check',
+ execution_date_fn=lambda dt: dt + timedelta(days=1),
+ allowed_states=['success'],
+ timeout=1,
+ poke_interval=1,
+ dag=self.dag)
+ with self.assertRaises(exceptions.AirflowSensorTimeout):
+ t2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+ def test_external_task_sensor_error_delta_and_fn(self):
+ """
+ Test that providing execution_delta and a function raises an error
+ """
+ with self.assertRaises(ValueError):
+ t = sensors.ExternalTaskSensor(
+ task_id='test_external_task_sensor_check_delta',
+ external_dag_id=TEST_DAG_ID,
+ external_task_id='time_sensor_check',
+ execution_delta=timedelta(0),
+ execution_date_fn=lambda dt: dt,
+ allowed_states=['success'],
+ dag=self.dag)
+
def test_timeout(self):
t = PythonOperator(
task_id='test_timeout',
[4/4] incubator-airflow git commit: Merge pull request #1641 from
jlowin/external-task
Posted by jl...@apache.org.
Merge pull request #1641 from jlowin/external-task
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0a94cae4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0a94cae4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0a94cae4
Branch: refs/heads/master
Commit: 0a94cae4b7f3824278702fddb8bac672fc01c894
Parents: 9f49f12 ddbcd88
Author: jlowin <jl...@users.noreply.github.com>
Authored: Thu Jun 30 19:42:38 2016 -0400
Committer: jlowin <jl...@users.noreply.github.com>
Committed: Thu Jun 30 19:42:38 2016 -0400
----------------------------------------------------------------------
.../versions/211e584da130_add_ti_state_index.py | 14 +++++++
airflow/operators/sensors.py | 17 ++++++++-
airflow/utils/tests.py | 5 ++-
tests/core.py | 39 ++++++++++++++++++++
tests/models.py | 2 +-
5 files changed, 74 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
[3/4] incubator-airflow git commit: Add license to migration file
Posted by jl...@apache.org.
Add license to migration file
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ddbcd88b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ddbcd88b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ddbcd88b
Branch: refs/heads/master
Commit: ddbcd88bb086d1978c9196833d126ded18db97f8
Parents: f49e238
Author: jlowin <jl...@users.noreply.github.com>
Authored: Thu Jun 30 18:35:06 2016 -0400
Committer: jlowin <jl...@users.noreply.github.com>
Committed: Thu Jun 30 18:35:06 2016 -0400
----------------------------------------------------------------------
.../versions/211e584da130_add_ti_state_index.py | 14 ++++++++++++++
1 file changed, 14 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ddbcd88b/airflow/migrations/versions/211e584da130_add_ti_state_index.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/versions/211e584da130_add_ti_state_index.py b/airflow/migrations/versions/211e584da130_add_ti_state_index.py
index 5991683..05aa321 100644
--- a/airflow/migrations/versions/211e584da130_add_ti_state_index.py
+++ b/airflow/migrations/versions/211e584da130_add_ti_state_index.py
@@ -1,3 +1,17 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
"""add TI state index
Revision ID: 211e584da130