You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Siddharth Anand (JIRA)" <ji...@apache.org> on 2017/03/30 07:03:41 UTC

[jira] [Updated] (AIRFLOW-1055) airflow/jobs.py:create_dag_run() exception for @once dag when catchup = False

     [ https://issues.apache.org/jira/browse/AIRFLOW-1055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Siddharth Anand updated AIRFLOW-1055:
-------------------------------------
    Description: 
Getting following exception 
{noformat}
[2017-03-29 23:27:31,006] {dag_processing.py:628} INFO - Started a process (PID: 51215) to generate tasks for /Users/siddharth/Projects/r39132_airflow/my_dags/manage_slas_once_dag.py - logging into /Users/siddharth/Projects/r39132_airflow/logs/scheduler/2017-03-29/manage_slas_once_dag.py.log
Process DagFileProcessor25-Process:
Traceback (most recent call last):
  File "/usr/local/Cellar/python/2.7.12_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python/2.7.12_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/siddharth/Projects/r39132_airflow/airflow/jobs.py", line 346, in helper
    pickle_dags)
  File "/Users/siddharth/Projects/r39132_airflow/airflow/utils/db.py", line 48, in wrapper
    result = func(*args, **kwargs)
  File "/Users/siddharth/Projects/r39132_airflow/airflow/jobs.py", line 1585, in process_file
    self._process_dags(dagbag, dags, ti_keys_to_schedule)
  File "/Users/siddharth/Projects/r39132_airflow/airflow/jobs.py", line 1175, in _process_dags
    dag_run = self.create_dag_run(dag)
  File "/Users/siddharth/Projects/r39132_airflow/airflow/utils/db.py", line 48, in wrapper
    result = func(*args, **kwargs)
  File "/Users/siddharth/Projects/r39132_airflow/airflow/jobs.py", line 780, in create_dag_run
    if next_start <= now:
TypeError: can't compare datetime.datetime to NoneType
{noformat}

Exception is in airflow/jobs.py:create_dag_run() :

https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/jobs.py#L776

{code}
            # don't do scheduler catchup for dag's that don't have dag.catchup = True
            if not dag.catchup:
                # The logic is that we move start_date up until
                # one period before, so that datetime.now() is AFTER
                # the period end, and the job can be created...
                now = datetime.now()
                next_start = dag.following_schedule(now)
                last_start = dag.previous_schedule(now)
  >>>        if next_start <= now:                                  <<< here
                    new_start = last_start
                else:
                    new_start = dag.previous_schedule(last_start)

                if dag.start_date:
                    if new_start >= dag.start_date:
                        dag.start_date = new_start
                else:
                    dag.start_date = new_start
{code}

It seems that dag.following_schedule() returns None for @once dag?

Here's how dag is defined:

{code}
import datetime as dt

import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

def sla_alert_func(dag, task_list, blocking_task_list, slas, blocking_tis):
    print('Executing SLA miss callback')

now = datetime.now()
now_to_the_hour = now.replace(hour=now.time().hour, minute=0, second=0, microsecond=0)
START_DATE = now_to_the_hour + timedelta(hours=-3)
DAG_NAME = 'manage_sla_once_dag'

default_args = {
    'owner': 'sanand',
    'depends_on_past': False,
    'start_date': START_DATE,
    'sla': timedelta(hours=2)
}
dag = DAG(
    dag_id = 'manage_sla_once_dag',
    default_args = default_args,
    catchup = False,                  
    schedule_interval = '@once',                
    sla_miss_callback = sla_alert_func
)

task1 = DummyOperator(task_id='task1', dag=dag)

{code}

When catchup == True, there is no issue!

  was:
Getting following exception 
{noformat}
[2017-03-19 20:16:25,786] {jobs.py:354} DagFileProcessor2638 ERROR - Got an exception! Propagating...
Traceback (most recent call last):
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", line 346, in helper
    pickle_dags)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", line 1581, in process_file
    self._process_dags(dagbag, dags, ti_keys_to_schedule)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", line 1175, in _process_dags
    self.manage_slas(dag)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in wrapper
    result = func(*args, **kwargs)
  File "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", line 595, in manage_slas
    while dttm < datetime.now():
TypeError: can't compare datetime.datetime to NoneType
{noformat}

Exception is in airflow/jobs.py:manage_slas() :

https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/jobs.py#L595

{code}
        ts = datetime.now()
        SlaMiss = models.SlaMiss
        for ti in max_tis:
            task = dag.get_task(ti.task_id)
            dttm = ti.execution_date
            if task.sla:
                dttm = dag.following_schedule(dttm)
  >>>           while dttm < datetime.now():          <<< here
                    following_schedule = dag.following_schedule(dttm)
                    if following_schedule + task.sla < datetime.now():
                        session.merge(models.SlaMiss(
                            task_id=ti.task_id,
{code}

It seems that dag.following_schedule() returns None for @once dag?

Here's how dag is defined:

{code}
main_dag = DAG(
    dag_id                         = 'DISCOVER-Oracle-Load',
    default_args                   = default_args,           
    user_defined_macros            = dag_macros,       
    start_date                     = datetime.now(),         
    catchup                        = False,                  
    schedule_interval              = '@once',                
    concurrency                    = 2,                      
    max_active_runs                = 1,                      
    dagrun_timeout                 = timedelta(days=4),      
)
{code}


> airflow/jobs.py:create_dag_run() exception for @once dag when catchup = False
> -----------------------------------------------------------------------------
>
>                 Key: AIRFLOW-1055
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1055
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: Airflow 1.8
>            Reporter: Ruslan Dautkhanov
>            Assignee: Siddharth Anand
>            Priority: Blocker
>              Labels: dagrun, once, scheduler, sla
>             Fix For: 1.8.1
>
>
> Getting following exception 
> {noformat}
> [2017-03-29 23:27:31,006] {dag_processing.py:628} INFO - Started a process (PID: 51215) to generate tasks for /Users/siddharth/Projects/r39132_airflow/my_dags/manage_slas_once_dag.py - logging into /Users/siddharth/Projects/r39132_airflow/logs/scheduler/2017-03-29/manage_slas_once_dag.py.log
> Process DagFileProcessor25-Process:
> Traceback (most recent call last):
>   File "/usr/local/Cellar/python/2.7.12_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
>     self.run()
>   File "/usr/local/Cellar/python/2.7.12_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 114, in run
>     self._target(*self._args, **self._kwargs)
>   File "/Users/siddharth/Projects/r39132_airflow/airflow/jobs.py", line 346, in helper
>     pickle_dags)
>   File "/Users/siddharth/Projects/r39132_airflow/airflow/utils/db.py", line 48, in wrapper
>     result = func(*args, **kwargs)
>   File "/Users/siddharth/Projects/r39132_airflow/airflow/jobs.py", line 1585, in process_file
>     self._process_dags(dagbag, dags, ti_keys_to_schedule)
>   File "/Users/siddharth/Projects/r39132_airflow/airflow/jobs.py", line 1175, in _process_dags
>     dag_run = self.create_dag_run(dag)
>   File "/Users/siddharth/Projects/r39132_airflow/airflow/utils/db.py", line 48, in wrapper
>     result = func(*args, **kwargs)
>   File "/Users/siddharth/Projects/r39132_airflow/airflow/jobs.py", line 780, in create_dag_run
>     if next_start <= now:
> TypeError: can't compare datetime.datetime to NoneType
> {noformat}
> Exception is in airflow/jobs.py:create_dag_run() :
> https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/jobs.py#L776
> {code}
>             # don't do scheduler catchup for dag's that don't have dag.catchup = True
>             if not dag.catchup:
>                 # The logic is that we move start_date up until
>                 # one period before, so that datetime.now() is AFTER
>                 # the period end, and the job can be created...
>                 now = datetime.now()
>                 next_start = dag.following_schedule(now)
>                 last_start = dag.previous_schedule(now)
>   >>>        if next_start <= now:                                  <<< here
>                     new_start = last_start
>                 else:
>                     new_start = dag.previous_schedule(last_start)
>                 if dag.start_date:
>                     if new_start >= dag.start_date:
>                         dag.start_date = new_start
>                 else:
>                     dag.start_date = new_start
> {code}
> It seems that dag.following_schedule() returns None for @once dag?
> Here's how dag is defined:
> {code}
> import datetime as dt
> import airflow
> from airflow.models import DAG
> from airflow.operators.dummy_operator import DummyOperator
> from datetime import datetime, timedelta
> def sla_alert_func(dag, task_list, blocking_task_list, slas, blocking_tis):
>     print('Executing SLA miss callback')
> now = datetime.now()
> now_to_the_hour = now.replace(hour=now.time().hour, minute=0, second=0, microsecond=0)
> START_DATE = now_to_the_hour + timedelta(hours=-3)
> DAG_NAME = 'manage_sla_once_dag'
> default_args = {
>     'owner': 'sanand',
>     'depends_on_past': False,
>     'start_date': START_DATE,
>     'sla': timedelta(hours=2)
> }
> dag = DAG(
>     dag_id = 'manage_sla_once_dag',
>     default_args = default_args,
>     catchup = False,                  
>     schedule_interval = '@once',                
>     sla_miss_callback = sla_alert_func
> )
> task1 = DummyOperator(task_id='task1', dag=dag)
> {code}
> When catchup == True, there is no issue!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)