You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jl...@apache.org on 2016/06/30 23:42:47 UTC

[1/4] incubator-airflow git commit: [AIRFLOW-301] Fix broken unit test

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 9f49f1285 -> 0a94cae4b


[AIRFLOW-301] Fix broken unit test

This unit tests always fails on the last day of the month, since it
tries to access a nonexistent day (like June 31st).


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f49e238b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f49e238b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f49e238b

Branch: refs/heads/master
Commit: f49e238b72c1701df8ad88a0be232b6619f03edb
Parents: efdbbb5
Author: jlowin <jl...@users.noreply.github.com>
Authored: Thu Jun 30 17:43:20 2016 -0400
Committer: jlowin <jl...@users.noreply.github.com>
Committed: Thu Jun 30 18:34:16 2016 -0400

----------------------------------------------------------------------
 tests/models.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f49e238b/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index e4f5aa8..3549e8f 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -557,7 +557,7 @@ class TaskInstanceTest(unittest.TestCase):
         ti.xcom_push(key=key, value=value)
         self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), value)
         ti.run()
-        exec_date = exec_date.replace(day=exec_date.day + 1)
+        exec_date += datetime.timedelta(days=1)
         ti = TI(
             task=task, execution_date=exec_date)
         ti.run()


[2/4] incubator-airflow git commit: [AIRFLOW-100] Add execution_date_fn to ExternalTaskSensor

Posted by jl...@apache.org.
[AIRFLOW-100] Add execution_date_fn to ExternalTaskSensor

Currently, ExternalTaskSensor only supports querying execution_dates
that are either the same as the ExternalTaskSensor's execution_date
or a fixed interval from that date (using `execution_delta`). This
adds the ability to provide a fn (`execution_date_fn`) that accepts
the current execution_date and can return any desired date for
querying. This is much more flexible. For example, it could
supply the last date of the previous month.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/efdbbb5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/efdbbb5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/efdbbb5d

Branch: refs/heads/master
Commit: efdbbb5d3beba49f9b633f0a25ce768f896c0a6a
Parents: 0965648
Author: jlowin <jl...@users.noreply.github.com>
Authored: Thu Jun 30 16:55:16 2016 -0400
Committer: jlowin <jl...@users.noreply.github.com>
Committed: Thu Jun 30 18:34:16 2016 -0400

----------------------------------------------------------------------
 airflow/operators/sensors.py | 17 ++++++++++++++++-
 airflow/utils/tests.py       |  5 ++++-
 tests/core.py                | 39 +++++++++++++++++++++++++++++++++++++++
 3 files changed, 59 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/efdbbb5d/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index e9b8885..9f7f380 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -179,8 +179,14 @@ class ExternalTaskSensor(BaseSensorOperator):
     :type allowed_states: list
     :param execution_delta: time difference with the previous execution to
         look at, the default is the same execution_date as the current task.
-        For yesterday, use [positive!] datetime.timedelta(days=1)
+        For yesterday, use [positive!] datetime.timedelta(days=1). Either
+        execution_delta or execution_date_fn can be passed to
+        ExternalTaskSensor, but not both.
     :type execution_delta: datetime.timedelta
+    :param execution_date_fn: function that receives the current execution date
+        and returns the desired execution date to query. Either execution_delta
+        or execution_date_fn can be passed to ExternalTaskSensor, but not both.
+    :type execution_date_fn: callable
     """
 
     @apply_defaults
@@ -190,16 +196,25 @@ class ExternalTaskSensor(BaseSensorOperator):
             external_task_id,
             allowed_states=None,
             execution_delta=None,
+            execution_date_fn=None,
             *args, **kwargs):
         super(ExternalTaskSensor, self).__init__(*args, **kwargs)
         self.allowed_states = allowed_states or [State.SUCCESS]
+        if execution_delta is not None and execution_date_fn is not None:
+            raise ValueError(
+                'Only one of `execution_date` or `execution_date_fn` may'
+                'be provided to ExternalTaskSensor; not both.')
+
         self.execution_delta = execution_delta
+        self.execution_date_fn = execution_date_fn
         self.external_dag_id = external_dag_id
         self.external_task_id = external_task_id
 
     def poke(self, context):
         if self.execution_delta:
             dttm = context['execution_date'] - self.execution_delta
+        elif self.execution_date_fn:
+            dttm = self.execution_date_fn(context['execution_date'])
         else:
             dttm = context['execution_date']
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/efdbbb5d/airflow/utils/tests.py
----------------------------------------------------------------------
diff --git a/airflow/utils/tests.py b/airflow/utils/tests.py
index 50490d3..83db6e8 100644
--- a/airflow/utils/tests.py
+++ b/airflow/utils/tests.py
@@ -16,7 +16,10 @@ import unittest
 
 def skipUnlessImported(module, obj):
     import importlib
-    m = importlib.import_module(module)
+    try:
+        m = importlib.import_module(module)
+    except ImportError:
+        m = None
     return unittest.skipUnless(
         obj in dir(m),
         "Skipping test because {} could not be imported from {}".format(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/efdbbb5d/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 24d2938..4f3197d 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -437,6 +437,45 @@ class CoreTest(unittest.TestCase):
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
+    def test_external_task_sensor_fn(self):
+        self.test_time_sensor()
+        # check that the execution_fn works
+        t = sensors.ExternalTaskSensor(
+            task_id='test_external_task_sensor_check_delta',
+            external_dag_id=TEST_DAG_ID,
+            external_task_id='time_sensor_check',
+            execution_date_fn=lambda dt: dt + timedelta(0),
+            allowed_states=['success'],
+            dag=self.dag)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+        # double check that the execution is being called by failing the test
+        t2 = sensors.ExternalTaskSensor(
+            task_id='test_external_task_sensor_check_delta',
+            external_dag_id=TEST_DAG_ID,
+            external_task_id='time_sensor_check',
+            execution_date_fn=lambda dt: dt + timedelta(days=1),
+            allowed_states=['success'],
+            timeout=1,
+            poke_interval=1,
+            dag=self.dag)
+        with self.assertRaises(exceptions.AirflowSensorTimeout):
+            t2.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+    def test_external_task_sensor_error_delta_and_fn(self):
+        """
+        Test that providing execution_delta and a function raises an error
+        """
+        with self.assertRaises(ValueError):
+            t = sensors.ExternalTaskSensor(
+                task_id='test_external_task_sensor_check_delta',
+                external_dag_id=TEST_DAG_ID,
+                external_task_id='time_sensor_check',
+                execution_delta=timedelta(0),
+                execution_date_fn=lambda dt: dt,
+                allowed_states=['success'],
+                dag=self.dag)
+
     def test_timeout(self):
         t = PythonOperator(
             task_id='test_timeout',


[4/4] incubator-airflow git commit: Merge pull request #1641 from jlowin/external-task

Posted by jl...@apache.org.
Merge pull request #1641 from jlowin/external-task


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0a94cae4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0a94cae4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0a94cae4

Branch: refs/heads/master
Commit: 0a94cae4b7f3824278702fddb8bac672fc01c894
Parents: 9f49f12 ddbcd88
Author: jlowin <jl...@users.noreply.github.com>
Authored: Thu Jun 30 19:42:38 2016 -0400
Committer: jlowin <jl...@users.noreply.github.com>
Committed: Thu Jun 30 19:42:38 2016 -0400

----------------------------------------------------------------------
 .../versions/211e584da130_add_ti_state_index.py | 14 +++++++
 airflow/operators/sensors.py                    | 17 ++++++++-
 airflow/utils/tests.py                          |  5 ++-
 tests/core.py                                   | 39 ++++++++++++++++++++
 tests/models.py                                 |  2 +-
 5 files changed, 74 insertions(+), 3 deletions(-)
----------------------------------------------------------------------



[3/4] incubator-airflow git commit: Add license to migration file

Posted by jl...@apache.org.
Add license to migration file


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ddbcd88b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ddbcd88b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ddbcd88b

Branch: refs/heads/master
Commit: ddbcd88bb086d1978c9196833d126ded18db97f8
Parents: f49e238
Author: jlowin <jl...@users.noreply.github.com>
Authored: Thu Jun 30 18:35:06 2016 -0400
Committer: jlowin <jl...@users.noreply.github.com>
Committed: Thu Jun 30 18:35:06 2016 -0400

----------------------------------------------------------------------
 .../versions/211e584da130_add_ti_state_index.py       | 14 ++++++++++++++
 1 file changed, 14 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ddbcd88b/airflow/migrations/versions/211e584da130_add_ti_state_index.py
----------------------------------------------------------------------
diff --git a/airflow/migrations/versions/211e584da130_add_ti_state_index.py b/airflow/migrations/versions/211e584da130_add_ti_state_index.py
index 5991683..05aa321 100644
--- a/airflow/migrations/versions/211e584da130_add_ti_state_index.py
+++ b/airflow/migrations/versions/211e584da130_add_ti_state_index.py
@@ -1,3 +1,17 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 """add TI state index
 
 Revision ID: 211e584da130