You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@airflow.apache.org by Gabor Hermann <ma...@gaborhermann.com> on 2020/05/07 11:11:32 UTC

DagRun unexpectedly stops without error

Hello fellow Airflowers,

I'm relatively new to Airflow and I'm really grateful as it already 
saved us some pain in production. So thanks for all the work! 🙏

Now I'm trying to set up DAG with around 20-30 tasks (BigQuery queries, 
Pyspark Dataproc jobs) and I've seen a weird behavior where a DAG run 
stops running, the DAG is marked as success but some tasks are clear. 
The annoying is that there's not even a sign of failure.

Do you know why this might be happening? I couldn't find a related issue 
on GitHub. One thing I'm suspecting is DAG importing timing out, could 
that cause such behavior?

(I'm using version 1.10.3.)

Thanks in advance for any pointers.

Cheers,
Gabor


Re: DagRun unexpectedly stops without error

Posted by Gabor Hermann <ma...@gaborhermann.com>.
Hi Ash,

Thanks for the quick reply.

While trying to copy-paste the DAG here I found the problem. I 
accidentally set the dagrun timeout to too low (timeout was 4 hours 
while a normal run took around 6 hours). So it was a silly mistake from 
my side and the problem is solved by increasing the dagrun timeout to 16 
hours.

I would have seen the DagRuns as failed, but for older DagRuns were 
marked as success because I had a special operator that was backfilling 
tasks (not full DAG) in previous days to make sure that older 
intermediate data is available.

For future reference, here's a more minimal example DAG:

import airflow
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils import timezone
from airflow.models import DAG

from datetime import timedelta
import time

operator_default_args = dict(
     owner="Gabor Hermann",
     start_date=timezone.datetime(2020, 5, 3),
)

with DAG(
         dag_id="test_timeout",
         schedule_interval="@daily",
         dagrun_timeout=timedelta(minutes=4),
         start_date=timezone.datetime(2020, 5, 3),
         end_date=timezone.datetime(2020, 5, 5),
) as dag:
     daily_incremental = DummyOperator(task_id="daily_incremental")
     check_daily_incrementals = BashOperator(
         task_id="check_daily_incrementals",
         bash_command="""
             airflow backfill test_timeout -s {{ macros.ds_add(ds, -3) 
}} -e {{ ds }} --ignore_dependencies --rerun_failed_tasks -t 
^daily_incremental$
         """
     )
     aggregate1 = PythonOperator(task_id="aggregate1", 
python_callable=lambda: time.sleep(90))
     aggregate2 = PythonOperator(task_id="aggregate2", 
python_callable=lambda: time.sleep(90))
     aggregate3 = PythonOperator(task_id="aggregate3", 
python_callable=lambda: time.sleep(90))

     daily_incremental >> aggregate1
     check_daily_incrementals >> aggregate1 >> aggregate2 >> aggregate3


In this example we would like to make sure that the last 3 days of 
`daily_incremental` is completed before we start aggregating them 
(`aggregate1`). For this purpose I used a BashOperator that executed an 
Airflow backfill command specifically backfilling the task 
`daily_incremental`.

The DAG always times out, we never get to `aggregate3` task. But only 
the latest DagRun is in state `failed`, previous ones are in `success` 
even though they did not get to run `aggregate3` because of timeout. 
(See visually here: https://i.imgur.com/ll0Rg19.png).


A separate question: is there a typical way to solve this "daily 
incremental" processing?

I can think of other ways, e.g. using `depends_on_past=True` and 
handling catch up with a sensor that checks if it's the latest dagrun 
and only runs aggregates for the latest dagrun.

Cheers,
Gabor

On 5/7/20 1:13 PM, Ash Berlin-Taylor wrote:

> That does sound very odd, I've not heard of that happening before.
>
> Are you able to share your DAG file (you can remove any queries etc) - 
> it may help us debug it.
>
> Thanks,
> Ash
>
> On May 7 2020, at 12:11 pm, Gabor Hermann <ma...@gaborhermann.com> wrote:
>
>     Hello fellow Airflowers, I'm relatively new to Airflow and I'm
>     really grateful as it already saved us some pain in production. So
>     thanks for all the work! 🙏 Now I'm trying to set up DAG with
>     around 20-30 tasks (BigQuery queries, Pyspark Dataproc jobs) and
>     I've seen a weird behavior where a DAG run stops running, the DAG
>     is marked as success but some tasks are clear. The annoying is
>     that there's not even a sign of failure. Do you know why this
>     might be happening? I couldn't find a related issue on GitHub. One
>     thing I'm suspecting is DAG importing timing out, could that cause
>     such behavior? (I'm using version 1.10.3.) Thanks in advance for
>     any pointers. Cheers, Gabor
>

Re: DagRun unexpectedly stops without error

Posted by Ash Berlin-Taylor <as...@apache.org>.
That does sound very odd, I've not heard of that happening before.

Are you able to share your DAG file (you can remove any queries etc) - it may help us debug it.
Thanks,
Ash

On May 7 2020, at 12:11 pm, Gabor Hermann <ma...@gaborhermann.com> wrote:
> Hello fellow Airflowers, I'm relatively new to Airflow and I'm really grateful as it already saved us some pain in production. So thanks for all the work! 🙏 Now I'm trying to set up DAG with around 20-30 tasks (BigQuery queries, Pyspark Dataproc jobs) and I've seen a weird behavior where a DAG run stops running, the DAG is marked as success but some tasks are clear. The annoying is that there's not even a sign of failure. Do you know why this might be happening? I couldn't find a related issue on GitHub. One thing I'm suspecting is DAG importing timing out, could that cause such behavior? (I'm using version 1.10.3.) Thanks in advance for any pointers. Cheers, Gabor