You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Ruslan Dautkhanov (JIRA)" <ji...@apache.org> on 2017/03/20 02:22:41 UTC

[jira] [Created] (AIRFLOW-1013) airflow/jobs.py:manage_slas() exception for @once dag

Ruslan Dautkhanov created AIRFLOW-1013:
------------------------------------------

             Summary: airflow/jobs.py:manage_slas() exception for @once dag
                 Key: AIRFLOW-1013
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1013
             Project: Apache Airflow
          Issue Type: Bug
    Affects Versions: Airflow 1.8
            Reporter: Ruslan Dautkhanov


Getting following exception 
{noformat}
[2017-03-19 20:16:25,786] {jobs.py:354} DagFileProcessor2638 ERROR - Got an exception! Propagating...
Traceback (most recent call last):
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", line 346, in helper
    pickle_dags)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", line 1581, in process_file
    self._process_dags(dagbag, dags, ti_keys_to_schedule)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", line 1175, in _process_dags
    self.manage_slas(dag)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", line 595, in manage_slas
    while dttm < datetime.now():
TypeError: can't compare datetime.datetime to NoneType
{noformat}

Exception is in airflow/jobs.py:manage_slas() :

https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/jobs.py#L595

{code}
        ts = datetime.now()
        SlaMiss = models.SlaMiss
        for ti in max_tis:
            task = dag.get_task(ti.task_id)
            dttm = ti.execution_date
            if task.sla:
                dttm = dag.following_schedule(dttm)
  >>>           while dttm < datetime.now():          <<< here
                    following_schedule = dag.following_schedule(dttm)
                    if following_schedule + task.sla < datetime.now():
                        session.merge(models.SlaMiss(
                            task_id=ti.task_id,
{code}

It seems that dag.following_schedule() returns None for @once dag?

Here's how dag is defined:

{code}
main_dag = DAG(
    dag_id                         = 'DISCOVER-Oracle-Load',
    default_args                   = default_args,           
    user_defined_macros            = dag_macros,       
    start_date                     = datetime.now(),         
    catchup                        = False,                  
    schedule_interval              = '@once',                
    concurrency                    = 2,                      
    max_active_runs                = 1,                      
    dagrun_timeout                 = timedelta(days=4),      
)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)