You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@airflow.apache.org by Gabor Hermann <ma...@gaborhermann.com> on 2020/05/07 11:11:32 UTC
DagRun unexpectedly stops without error
Hello fellow Airflowers,
I'm relatively new to Airflow and I'm really grateful as it already
saved us some pain in production. So thanks for all the work! 🙏
Now I'm trying to set up DAG with around 20-30 tasks (BigQuery queries,
Pyspark Dataproc jobs) and I've seen a weird behavior where a DAG run
stops running, the DAG is marked as success but some tasks are clear.
The annoying is that there's not even a sign of failure.
Do you know why this might be happening? I couldn't find a related issue
on GitHub. One thing I'm suspecting is DAG importing timing out, could
that cause such behavior?
(I'm using version 1.10.3.)
Thanks in advance for any pointers.
Cheers,
Gabor
Re: DagRun unexpectedly stops without error
Posted by Gabor Hermann <ma...@gaborhermann.com>.
Hi Ash,
Thanks for the quick reply.
While trying to copy-paste the DAG here I found the problem. I
accidentally set the dagrun timeout to too low (timeout was 4 hours
while a normal run took around 6 hours). So it was a silly mistake from
my side and the problem is solved by increasing the dagrun timeout to 16
hours.
I would have seen the DagRuns as failed, but for older DagRuns were
marked as success because I had a special operator that was backfilling
tasks (not full DAG) in previous days to make sure that older
intermediate data is available.
For future reference, here's a more minimal example DAG:
import airflow
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils import timezone
from airflow.models import DAG
from datetime import timedelta
import time
operator_default_args = dict(
owner="Gabor Hermann",
start_date=timezone.datetime(2020, 5, 3),
)
with DAG(
dag_id="test_timeout",
schedule_interval="@daily",
dagrun_timeout=timedelta(minutes=4),
start_date=timezone.datetime(2020, 5, 3),
end_date=timezone.datetime(2020, 5, 5),
) as dag:
daily_incremental = DummyOperator(task_id="daily_incremental")
check_daily_incrementals = BashOperator(
task_id="check_daily_incrementals",
bash_command="""
airflow backfill test_timeout -s {{ macros.ds_add(ds, -3)
}} -e {{ ds }} --ignore_dependencies --rerun_failed_tasks -t
^daily_incremental$
"""
)
aggregate1 = PythonOperator(task_id="aggregate1",
python_callable=lambda: time.sleep(90))
aggregate2 = PythonOperator(task_id="aggregate2",
python_callable=lambda: time.sleep(90))
aggregate3 = PythonOperator(task_id="aggregate3",
python_callable=lambda: time.sleep(90))
daily_incremental >> aggregate1
check_daily_incrementals >> aggregate1 >> aggregate2 >> aggregate3
In this example we would like to make sure that the last 3 days of
`daily_incremental` is completed before we start aggregating them
(`aggregate1`). For this purpose I used a BashOperator that executed an
Airflow backfill command specifically backfilling the task
`daily_incremental`.
The DAG always times out, we never get to `aggregate3` task. But only
the latest DagRun is in state `failed`, previous ones are in `success`
even though they did not get to run `aggregate3` because of timeout.
(See visually here: https://i.imgur.com/ll0Rg19.png).
A separate question: is there a typical way to solve this "daily
incremental" processing?
I can think of other ways, e.g. using `depends_on_past=True` and
handling catch up with a sensor that checks if it's the latest dagrun
and only runs aggregates for the latest dagrun.
Cheers,
Gabor
On 5/7/20 1:13 PM, Ash Berlin-Taylor wrote:
> That does sound very odd, I've not heard of that happening before.
>
> Are you able to share your DAG file (you can remove any queries etc) -
> it may help us debug it.
>
> Thanks,
> Ash
>
> On May 7 2020, at 12:11 pm, Gabor Hermann <ma...@gaborhermann.com> wrote:
>
> Hello fellow Airflowers, I'm relatively new to Airflow and I'm
> really grateful as it already saved us some pain in production. So
> thanks for all the work! 🙏 Now I'm trying to set up DAG with
> around 20-30 tasks (BigQuery queries, Pyspark Dataproc jobs) and
> I've seen a weird behavior where a DAG run stops running, the DAG
> is marked as success but some tasks are clear. The annoying is
> that there's not even a sign of failure. Do you know why this
> might be happening? I couldn't find a related issue on GitHub. One
> thing I'm suspecting is DAG importing timing out, could that cause
> such behavior? (I'm using version 1.10.3.) Thanks in advance for
> any pointers. Cheers, Gabor
>
Re: DagRun unexpectedly stops without error
Posted by Ash Berlin-Taylor <as...@apache.org>.
That does sound very odd, I've not heard of that happening before.
Are you able to share your DAG file (you can remove any queries etc) - it may help us debug it.
Thanks,
Ash
On May 7 2020, at 12:11 pm, Gabor Hermann <ma...@gaborhermann.com> wrote:
> Hello fellow Airflowers, I'm relatively new to Airflow and I'm really grateful as it already saved us some pain in production. So thanks for all the work! 🙏 Now I'm trying to set up DAG with around 20-30 tasks (BigQuery queries, Pyspark Dataproc jobs) and I've seen a weird behavior where a DAG run stops running, the DAG is marked as success but some tasks are clear. The annoying is that there's not even a sign of failure. Do you know why this might be happening? I couldn't find a related issue on GitHub. One thing I'm suspecting is DAG importing timing out, could that cause such behavior? (I'm using version 1.10.3.) Thanks in advance for any pointers. Cheers, Gabor