You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Li Xuanji (JIRA)" <ji...@apache.org> on 2016/09/11 00:52:20 UTC
[jira] [Comment Edited] (AIRFLOW-434) max_dag_run_reached blocks
dag state change and new task scheduling
[ https://issues.apache.org/jira/browse/AIRFLOW-434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15480738#comment-15480738 ]
Li Xuanji edited comment on AIRFLOW-434 at 9/11/16 12:51 AM:
-------------------------------------------------------------
should be fixed by https://github.com/apache/incubator-airflow/commit/3a1be4aacf31ee33d6128e5d5fa563a7625c7c62, needs testing
was (Author: xuanji):
should be fixed by https://github.com/apache/incubator-airflow/commit/3a1be4aacf31ee33d6128e5d5fa563a7625c7c62
> max_dag_run_reached blocks dag state change and new task scheduling
> -------------------------------------------------------------------
>
> Key: AIRFLOW-434
> URL: https://issues.apache.org/jira/browse/AIRFLOW-434
> Project: Apache Airflow
> Issue Type: Bug
> Reporter: Li Xuanji
> Assignee: Siddharth Anand
> Priority: Blocker
>
> Using the following DAG:
> ```
> from airflow import DAG
> from airflow.operators.bash_operator import BashOperator
> from datetime import datetime, timedelta
> default_args = {
> 'owner': 'airflow',
> 'depends_on_past': False,
> 'start_date': datetime(2016, 1, 1, 1, 0),
> 'email': ['xuanji@gmail.com'],
> 'email_on_failure': True,
> 'email_on_retry': False,
> 'retries': 3,
> 'retry_delay': timedelta(minutes=1),
> }
> dag = DAG('bash_bash_bash', default_args=default_args, schedule_interval=timedelta(seconds=10))
> # t1, t2 and t3 are examples of tasks created by instatiating operators
> t1 = BashOperator(
> task_id='print_date',
> bash_command='date',
> dag=dag)
> t2 = BashOperator(
> task_id='sleep',
> bash_command='sleep 120',
> retries=3,
> dag=dag)
> templated_command = """
> {% for i in range(5) %}
> echo "{{ ds }}"
> echo "{{ macros.ds_add(ds, 7)}}"
> echo "{{ params.my_param }}"
> {% endfor %}
> """
> t3 = BashOperator(
> task_id='templated',
> bash_command=templated_command,
> params={'my_param': 'Parameter I passed in'},
> dag=dag)
> t2.set_upstream(t1)
> t3.set_upstream(t1)
> ```
> and an `airflow.cfg` that contains this:
> ```
> min_file_process_interval = 1
> ```
> The state eventually becomes this:
> http://imgur.com/a/5bRTe
> The scheduler should be marking the 14 leftmost dagruns as success, but does not. the scheduler should also be scheduling tasks for the last two dagruns.
> A look at the logs explains the probable cause:
> ```
> [2016-08-16 15:12:10,257] {jobs.py:1446} DagFileProcessor174 INFO - Processing file /Users/xuanji_li/airflow/dags/bash_bash_bash.py for tasks to queue
> [2016-08-16 15:12:10,258] {models.py:162} DagFileProcessor174 INFO - Filling up the DagBag from /Users/xuanji_li/airflow/dags/bash_bash_bash.py
> [2016-08-16 15:12:10,267] {jobs.py:1460} DagFileProcessor174 INFO - DAG(s) ['bash_bash_bash'] retrieved from /Users/xuanji_li/airflow/dags/bash_bash_bash.py
> [2016-08-16 15:12:10,289] {jobs.py:1062} DagFileProcessor174 INFO - Not processing DAG bash_bash_bash since its max runs has been reached
> [2016-08-16 15:12:10,290] {models.py:313} DagFileProcessor174 INFO - Finding 'running' jobs without a recent heartbeat
> [2016-08-16 15:12:10,290] {models.py:319} DagFileProcessor174 INFO - Failing jobs without heartbeat after 2016-08-16 15:09:55.290479
> ```
> It seems that processing of the dagrun is skipped completely because there are already 16 running dagruns.
> Binary search tracked down this commit as the one that introduced the bug. The logic added looks wrong to me. https://github.com/apache/incubator-airflow/pull/1716
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)