You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Muhammad Ahmmad (JIRA)" <ji...@apache.org> on 2018/01/15 15:08:00 UTC

[jira] [Commented] (AIRFLOW-1055) airflow/jobs.py:create_dag_run() exception for @once dag when catchup = False

    [ https://issues.apache.org/jira/browse/AIRFLOW-1055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16326320#comment-16326320 ] 

Muhammad Ahmmad commented on AIRFLOW-1055:
------------------------------------------

[~bolke], I was wondering where/when the fix for this issue was submitted?

I'm working on a fix for this issue, indeed it happens for once dags with catchup is enabled. The fix is simple; but it affects  AIRFLOW-1013, which needs more time to be fixed.

> airflow/jobs.py:create_dag_run() exception for @once dag when catchup = False
> -----------------------------------------------------------------------------
>
>                 Key: AIRFLOW-1055
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1055
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: Airflow 1.8
>            Reporter: Siddharth Anand
>            Assignee: Muhammad Ahmmad
>            Priority: Critical
>              Labels: dagrun, once, scheduler, sla
>             Fix For: 1.9.0
>
>
> Getting following exception 
> {noformat}
> [2017-03-19 20:16:25,786] {jobs.py:354} DagFileProcessor2638 ERROR - Got an exception! Propagating...
> Traceback (most recent call last):
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", line 346, in helper
>     pickle_dags)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
>     result = func(*args, **kwargs)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", line 1581, in process_file
>     self._process_dags(dagbag, dags, ti_keys_to_schedule)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", line 1175, in _process_dags
>     self.manage_slas(dag)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
>     result = func(*args, **kwargs)
>   File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", line 595, in manage_slas
>     while dttm < datetime.now():
> TypeError: can't compare datetime.datetime to NoneType
> {noformat}
> Exception is in airflow/jobs.py:manage_slas() :
> https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/jobs.py#L595
> {code}
>         ts = datetime.now()
>         SlaMiss = models.SlaMiss
>         for ti in max_tis:
>             task = dag.get_task(ti.task_id)
>             dttm = ti.execution_date
>             if task.sla:
>                 dttm = dag.following_schedule(dttm)
>   >>>           while dttm < datetime.now():          <<< here
>                     following_schedule = dag.following_schedule(dttm)
>                     if following_schedule + task.sla < datetime.now():
>                         session.merge(models.SlaMiss(
>                             task_id=ti.task_id,
> {code}
> It seems that dag.following_schedule() returns None for @once dag?
> Here's how dag is defined:
> {code}
> import datetime as dt
> import airflow
> from airflow.models import DAG
> from airflow.operators.dummy_operator import DummyOperator
> from datetime import datetime, timedelta
> def sla_alert_func(dag, task_list, blocking_task_list, slas, blocking_tis):
>     print('Executing SLA miss callback')
> now = datetime.now()
> now_to_the_hour = now.replace(hour=now.time().hour, minute=0, second=0, microsecond=0)
> START_DATE = now_to_the_hour + timedelta(hours=-3)
> DAG_NAME = 'manage_sla_once_dag'
> default_args = {
>     'owner': 'sanand',
>     'depends_on_past': False,
>     'start_date': START_DATE,
>     'sla': timedelta(hours=2)
> }
> dag = DAG(
>     dag_id = 'manage_sla_once_dag',
>     default_args = default_args,
>     catchup = False,                  
>     schedule_interval = '@once',                
>     sla_miss_callback = sla_alert_func
> )
> task1 = DummyOperator(task_id='task1', dag=dag)
> {code}
> This issue works if  catchup = True. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)