You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ar...@apache.org on 2016/06/20 23:02:28 UTC
[1/2] incubator-airflow git commit: [AIRFLOW-180] Fix timeout
behavior for sensors
Repository: incubator-airflow
Updated Branches:
refs/heads/master 45b735bae -> 26c31d9bc
[AIRFLOW-180] Fix timeout behavior for sensors
In the previous state of the code, datetime.now was compared to
started_at and seconds was pulled out. It turns out that the seconds
attribute of a timedelta has a maximum of 86400 and the rolls up to 1 day.
The unintended consequence is that timeout larger than 86400 are
ignored, with sensors running forever.
To fix this we use the total_seconds method to get at the real
timedelta in seconds.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c38a5c2a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c38a5c2a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c38a5c2a
Branch: refs/heads/master
Commit: c38a5c2a8b227194ec52d81e8a5a85c97751ecd9
Parents: 45b735b
Author: Arthur Wiedmer <ar...@gmail.com>
Authored: Thu May 26 10:27:55 2016 -0700
Committer: Arthur Wiedmer <ar...@gmail.com>
Committed: Mon Jun 20 15:45:44 2016 -0700
----------------------------------------------------------------------
airflow/operators/sensors.py | 4 +-
tests/operators/sensors.py | 77 ++++++++++++++++++++++++++++++++++++++-
2 files changed, 77 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c38a5c2a/airflow/operators/sensors.py
----------------------------------------------------------------------
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index 5276f6e..4e4cb3b 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -69,12 +69,12 @@ class BaseSensorOperator(BaseOperator):
def execute(self, context):
started_at = datetime.now()
while not self.poke(context):
- sleep(self.poke_interval)
- if (datetime.now() - started_at).seconds > self.timeout:
+ if (datetime.now() - started_at).total_seconds() > self.timeout:
if self.soft_fail:
raise AirflowSkipException('Snap. Time is OUT.')
else:
raise AirflowSensorTimeout('Snap. Time is OUT.')
+ sleep(self.poke_interval)
logging.info("Success criteria met. Exiting.")
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c38a5c2a/tests/operators/sensors.py
----------------------------------------------------------------------
diff --git a/tests/operators/sensors.py b/tests/operators/sensors.py
index 025790e..325ee8d 100644
--- a/tests/operators/sensors.py
+++ b/tests/operators/sensors.py
@@ -12,11 +12,84 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+
+import logging
import os
+import time
import unittest
-from airflow.operators.sensors import HttpSensor
-from airflow.exceptions import AirflowException
+from datetime import datetime, timedelta
+
+from airflow import DAG, configuration
+from airflow.operators.sensors import HttpSensor, BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.exceptions import (AirflowException,
+ AirflowSensorTimeout,
+ AirflowSkipException)
+configuration.test_mode()
+
+DEFAULT_DATE = datetime(2015, 1, 1)
+TEST_DAG_ID = 'unit_test_dag'
+
+
+class TimeoutTestSensor(BaseSensorOperator):
+ """
+ Sensor that always returns the return_value provided
+
+ :param return_value: Set to true to mark the task as SKIPPED on failure
+ :type return_value: any
+ """
+
+ @apply_defaults
+ def __init__(
+ self,
+ return_value=False,
+ *args,
+ **kwargs):
+ self.return_value = return_value
+ super(TimeoutTestSensor, self).__init__(*args, **kwargs)
+
+ def poke(self, context):
+ return self.return_value
+
+ def execute(self, context):
+ started_at = datetime.now()
+ time_jump = self.params.get('time_jump')
+ while not self.poke(context):
+ if time_jump:
+ started_at -= time_jump
+ if (datetime.now() - started_at).total_seconds() > self.timeout:
+ if self.soft_fail:
+ raise AirflowSkipException('Snap. Time is OUT.')
+ else:
+ raise AirflowSensorTimeout('Snap. Time is OUT.')
+ time.sleep(self.poke_interval)
+ logging.info("Success criteria met. Exiting.")
+
+
+class SensorTimeoutTest(unittest.TestCase):
+ def setUp(self):
+ configuration.test_mode()
+ args = {
+ 'owner': 'airflow',
+ 'start_date': DEFAULT_DATE
+ }
+ dag = DAG(TEST_DAG_ID, default_args=args)
+ self.dag = dag
+
+ def test_timeout(self):
+ t = TimeoutTestSensor(
+ task_id='test_timeout',
+ execution_timeout=timedelta(days=2),
+ return_value=False,
+ poke_interval=5,
+ params={'time_jump': timedelta(days=2, seconds=1)},
+ dag=self.dag
+ )
+ self.assertRaises(
+ AirflowSensorTimeout,
+ t.run,
+ start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
class HttpSensorTests(unittest.TestCase):
[2/2] incubator-airflow git commit: Merge pull request #1547 from
artwr/artwr_fix_sensor_timeout
Posted by ar...@apache.org.
Merge pull request #1547 from artwr/artwr_fix_sensor_timeout
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/26c31d9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/26c31d9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/26c31d9b
Branch: refs/heads/master
Commit: 26c31d9bca16a88c915d0d501aaa58915056a2a9
Parents: 45b735b c38a5c2
Author: Arthur Wiedmer <ar...@gmail.com>
Authored: Mon Jun 20 16:02:14 2016 -0700
Committer: Arthur Wiedmer <ar...@gmail.com>
Committed: Mon Jun 20 16:02:14 2016 -0700
----------------------------------------------------------------------
airflow/operators/sensors.py | 4 +-
tests/operators/sensors.py | 77 ++++++++++++++++++++++++++++++++++++++-
2 files changed, 77 insertions(+), 4 deletions(-)
----------------------------------------------------------------------