You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "dud (JIRA)" <ji...@apache.org> on 2016/06/16 16:55:05 UTC
[jira] [Created] (AIRFLOW-249) Refactor the SLA mecanism
dud created AIRFLOW-249:
---------------------------
Summary: Refactor the SLA mecanism
Key: AIRFLOW-249
URL: https://issues.apache.org/jira/browse/AIRFLOW-249
Project: Apache Airflow
Issue Type: Improvement
Reporter: dud
Hello
I've noticed the SLA feature is currently behaving as follow :
- it doesn't work on DAG scheduled @once or None because they have no dag.followwing_schedule property
- it keeps endlessly checking for SLA misses without ever worrying about any end_date. Worse I noticed that emails are still being sent for runs that are never happening because of end_date
- it keeps checking for recent TIs even if SLA notification has been already been sent for them
- the SLA logic is only being fired after following_schedule + sla has elapsed, in other words one has to wait for the next TI before having a chance of getting any email. Also the email reports dag.following_schedule time (I guess because it is close of TI.start_date), but unfortunately that doesn't match what the task instances shows nor the log filename
- the SLA logic is based on max(TI.execution_date) for the starting point of its checks, that means that for a DAG whose SLA is longer than its schedule period if half of the TIs are running longer than expected it will go unnoticed. This could be demonstrated with a DAG like this one :
{code}
from airflow import DAG
from airflow.operators import *
from datetime import datetime, timedelta
from time import sleep
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 6, 16, 12, 20),
'email': my_email
'sla': timedelta(minutes=2),
}
dag = DAG('unnoticed_sla', default_args=default_args, schedule_interval=timedelta(minutes=1))
def alternating_sleep(**kwargs):
minute = kwargs['execution_date'].strftime("%M")
is_odd = int(minute) % 2
if is_odd:
sleep(300)
else:
sleep(10)
return True
PythonOperator(
task_id='sla_miss',
python_callable=alternating_sleep,
provide_context=True,
dag=dag)
{code}
I've tried to rework the SLA triggering mechanism by addressing the above points., please [have a look on it|https://github.com/dud225/incubator-airflow/commit/972260354075683a8d55a1c960d839c37e629e7d]
I made some tests with this patch :
- the fluctuent DAG shown above no longer make Airflow skip any SLA event :
{code}
task_id | dag_id | execution_date | email_sent | timestamp | description | notification_sent
----------+---------------+---------------------+------------+----------------------------+-------------+-------------------
sla_miss | dag_sla_miss1 | 2016-06-16 15:05:00 | t | 2016-06-16 15:08:26.058631 | | t
sla_miss | dag_sla_miss1 | 2016-06-16 15:07:00 | t | 2016-06-16 15:10:06.093253 | | t
sla_miss | dag_sla_miss1 | 2016-06-16 15:09:00 | t | 2016-06-16 15:12:06.241773 | | t
{code}
- on a normal DAG, the SLA is being triggred more quickly :
{code}
// start_date = 2016-06-16 15:55:00
// end_date = 2016-06-16 16:00:00
// schedule_interval = timedelta(minutes=1)
// sla = timedelta(minutes=2)
task_id | dag_id | execution_date | email_sent | timestamp | description | notification_sent
----------+---------------+---------------------+------------+----------------------------+-------------+-------------------
sla_miss | dag_sla_miss1 | 2016-06-16 15:55:00 | t | 2016-06-16 15:58:11.832299 | | t
sla_miss | dag_sla_miss1 | 2016-06-16 15:56:00 | t | 2016-06-16 15:59:09.663778 | | t
sla_miss | dag_sla_miss1 | 2016-06-16 15:57:00 | t | 2016-06-16 16:00:13.651422 | | t
sla_miss | dag_sla_miss1 | 2016-06-16 15:58:00 | t | 2016-06-16 16:01:08.576399 | | t
sla_miss | dag_sla_miss1 | 2016-06-16 15:59:00 | t | 2016-06-16 16:02:08.523486 | | t
sla_miss | dag_sla_miss1 | 2016-06-16 16:00:00 | t | 2016-06-16 16:03:08.538593 | | t
(6 rows)
{code}
than before (current master branch) :
{code}
// start_date = 2016-06-16 15:40:00
// end_date = 2016-06-16 15:45:00
// schedule_interval = timedelta(minutes=1)
// sla = timedelta(minutes=2)
task_id | dag_id | execution_date | email_sent | timestamp | description | notification_sent
----------+---------------+---------------------+------------+----------------------------+-------------+-------------------
sla_miss | dag_sla_miss1 | 2016-06-16 15:41:00 | t | 2016-06-16 15:44:30.305287 | | t
sla_miss | dag_sla_miss1 | 2016-06-16 15:42:00 | t | 2016-06-16 15:45:35.372118 | | t
sla_miss | dag_sla_miss1 | 2016-06-16 15:43:00 | t | 2016-06-16 15:46:30.415744 | | t
sla_miss | dag_sla_miss1 | 2016-06-16 15:44:00 | t | 2016-06-16 15:47:30.507345 | | t
sla_miss | dag_sla_miss1 | 2016-06-16 15:45:00 | t | 2016-06-16 15:48:30.487742 | | t
sla_miss | dag_sla_miss1 | 2016-06-16 15:46:00 | t | 2016-06-16 15:50:40.647373 | | t
sla_miss | dag_sla_miss1 | 2016-06-16 15:47:00 | t | 2016-06-16 15:50:40.647373 | | t
{code}
Please note that in this last case (current master) execution_date is equal to dag.following_schedule, so SLA is being fired after one extra schedule_interval. Also note that SLA are still being triggered after end_date. Also note the timestamp column being updated seveal time.
Please tell me what do you think about my patch.
dud
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)