You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/10/07 16:08:30 UTC

[GitHub] [airflow] grayver opened a new issue #11331: Long-running task blocks other tasks with LocalExecutor

grayver opened a new issue #11331:
URL: https://github.com/apache/airflow/issues/11331


   **Apache Airflow version**: 1.10.12
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**: Amazon EC2 instance, 4 CPU cores, 8GB RAM
   - **OS** (e.g. from /etc/os-release): Ubuntu 18.04.5 LTS (Bionic Beaver)
   - **Kernel** (e.g. `uname -a`): Linux ip-XX-XX-XX-XX.ec2.internal 5.4.0-1025-aws #25~18.04.1-Ubuntu SMP Fri Sep 11 12:03:04 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux
   - **Install tools**: Ansible Airflow role (https://github.com/idealista/airflow-role)
   - **Some Airflow configuration parameters**:
   ```
   executor = LocalExecutor
   sql_alchemy_conn = postgresql+psycopg2://user:pwd@aws-rds-server:5432/airflow_dev2
   sql_alchemy_pool_size = 20
   parallelism = 32
   dag_concurrency = 8
   task_concurrency = 4
   non_pooled_task_slot_count = 128
   task_runner = StandardTaskRunner
   max_threads = 4
   ```
   
   **What happened**:
   
   We have 13 DAGs in our Airflow. Some of them in some circumstances process a large amount of data. Usually it's a parsing some large file, transform parsed data and load it into database. Also there are database processing tasks which involve long-running queries. So, some tasks could be running for several hours sometimes. The problem is those long-running tasks block all other tasks from being started. Tasks which are scheduled to run hourly are not started until long-running task is completed. Also we see an yellow bar in our Airflow Web UI:
   ```
   The scheduler does not appear to be running. Last heartbeat was received XX minutes ago.
   The DAGs list may not update, and new tasks will not be scheduled.
   ```
   
   We examined Airflow scheduler logs and figured out that scheduler just doesn't try to grab new tasks while long-running task is running. When there is no long-running task running we see that scheduler tries to check whether any task could run and check parallelism/concurrency limitation for them. But with long-running task there are no log messages like this.
   
   Manual triggering also doesn't help - triggered tasks are not started until long-running task is finished.
   
   **What you expected to happen**:
   
   We expect all other DAGs to start according to their schedule when long-running task is running. This how LocalExecutor should work according to documentation.
   
   We also checked server resources for those cases - but there are a lot of free RAM and CPU in that time, so it shoudn't be the cause.
   
   **Anything else we need to know**:
   
   This problem occurs every time.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on issue #11331: Long-running task blocks other tasks with LocalExecutor

Posted by GitBox <gi...@apache.org>.
turbaszek commented on issue #11331:
URL: https://github.com/apache/airflow/issues/11331#issuecomment-705086746


   Have you tried increasing `parallelism` or setting it to 0? 
   
   On the other note, have you considered using CeleryExecutor? Then you can have separate queues for long and short running tasks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] grayver commented on issue #11331: Long-running task blocks other tasks with LocalExecutor

Posted by GitBox <gi...@apache.org>.
grayver commented on issue #11331:
URL: https://github.com/apache/airflow/issues/11331#issuecomment-705166448


   I've just tried setting `parallelism` to 0 - it doesn't help.
   No, I haven't considered CeleryExecutor. We don't have many DAGs and I thought that resources of single machine should be enough for that.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] grayver commented on issue #11331: Long-running task blocks other tasks with LocalExecutor

Posted by GitBox <gi...@apache.org>.
grayver commented on issue #11331:
URL: https://github.com/apache/airflow/issues/11331#issuecomment-705532853


   We have 13 DAGs, each has ~5 tasks (so, ~70 tasks total). Most of those DAGs run hourly (with some time offset). In most cases DAG just checks for new files to grab and does nothing if no new files found. When DAG discovers new unprocessed file, it grabs it, parses it, load parsed data into database and call process function there. In that case it could take some time (up to few hours).
   
   I've prepared reproduce code with 2 DAGs: one long-running DAG and second is short-running DAG. When long-running DAG is running no new short-running DAG runs are scheduled and started (and UI reports that the scheduler doesn't appear to be running).
   
   <details><summary>blocking_reproduce_dag.py</summary>
   
   ```python
   from airflow.models import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.operators.bash_operator import BashOperator
   from airflow.operators.python_operator import BranchPythonOperator
   from airflow.utils.dates import days_ago
   from airflow.utils.trigger_rule import TriggerRule
   
   long_dag = DAG(
       dag_id='long_dag',
       schedule_interval='@hourly',
       start_date=days_ago(1),
       catchup=False
   )
   
   check_files_task = BranchPythonOperator(
       task_id='check_files', dag=long_dag,
       python_callable=lambda: 'parse_files'
   )
   
   parse_files_task = BashOperator(
       task_id='parse_files', dag=long_dag,
       bash_command='sleep 20m'
   )
   
   process_files_task = BashOperator(
       task_id='process_files', dag=long_dag,
       bash_command='sleep 15m'
   )
   
   slack_report_task = DummyOperator(
       task_id='slack_report', dag=long_dag,
       trigger_rule=TriggerRule.NONE_FAILED_OR_SKIPPED
   )
   
   check_files_task >> parse_files_task >> process_files_task >> slack_report_task
   check_files_task >> slack_report_task
   
   
   short_dag = DAG(
       dag_id='short_dag',
       schedule_interval='*/5 * * * *',
       start_date=days_ago(1),
       catchup=False,
       max_active_runs=1
   )
   
   query_service_task = BashOperator(
       task_id='query_service', dag=short_dag,
       bash_command='sleep 30s'
   )
   
   do_something_task = DummyOperator(
       task_id='do_something', dag=short_dag
   )
   
   query_service_task >> do_something_task
   ```
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] grayver commented on issue #11331: Long-running task blocks other tasks with LocalExecutor

Posted by GitBox <gi...@apache.org>.
grayver commented on issue #11331:
URL: https://github.com/apache/airflow/issues/11331#issuecomment-705532853


   We have 13 DAGs, each has ~5 tasks (so, ~70 tasks total). Most of those DAGs run hourly (with some time offset). In most cases DAG just checks for new files to grab and does nothing if no new files found. When DAG discovers new unprocessed file, it grabs it, parses it, load parsed data into database and call process function there. In that case it could take some time (up to few hours).
   
   I've prepared reproduce code with 2 DAGs: one long-running DAG and second is short-running DAG. When long-running DAG is running no new short-running DAG runs are scheduled and started (and UI reports that the scheduler doesn't appear to be running).
   
   <details><summary>blocking_reproduce_dag.py</summary>
   
   ```python
   from airflow.models import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.operators.bash_operator import BashOperator
   from airflow.operators.python_operator import BranchPythonOperator
   from airflow.utils.dates import days_ago
   from airflow.utils.trigger_rule import TriggerRule
   
   long_dag = DAG(
       dag_id='long_dag',
       schedule_interval='@hourly',
       start_date=days_ago(1),
       catchup=False
   )
   
   check_files_task = BranchPythonOperator(
       task_id='check_files', dag=long_dag,
       python_callable=lambda: 'parse_files'
   )
   
   parse_files_task = BashOperator(
       task_id='parse_files', dag=long_dag,
       bash_command='sleep 20m'
   )
   
   process_files_task = BashOperator(
       task_id='process_files', dag=long_dag,
       bash_command='sleep 15m'
   )
   
   slack_report_task = DummyOperator(
       task_id='slack_report', dag=long_dag,
       trigger_rule=TriggerRule.NONE_FAILED_OR_SKIPPED
   )
   
   check_files_task >> parse_files_task >> process_files_task >> slack_report_task
   check_files_task >> slack_report_task
   
   
   short_dag = DAG(
       dag_id='short_dag',
       schedule_interval='*/5 * * * *',
       start_date=days_ago(1),
       catchup=False,
       max_active_runs=1
   )
   
   query_service_task = BashOperator(
       task_id='query_service', dag=short_dag,
       bash_command='sleep 30s'
   )
   
   do_something_task = DummyOperator(
       task_id='do_something', dag=short_dag
   )
   
   query_service_task >> do_something_task
   ```
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb closed issue #11331: Long-running task blocks other tasks with LocalExecutor

Posted by GitBox <gi...@apache.org>.
ashb closed issue #11331:
URL: https://github.com/apache/airflow/issues/11331


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on issue #11331: Long-running task blocks other tasks with LocalExecutor

Posted by GitBox <gi...@apache.org>.
turbaszek commented on issue #11331:
URL: https://github.com/apache/airflow/issues/11331#issuecomment-705170033


   How many DAGs/tasks do you have? Would you be able to provide a replicable setup using `BashOperators` that sleep for as long as your long-running tasks?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] grayver commented on issue #11331: Long-running task blocks other tasks with LocalExecutor

Posted by GitBox <gi...@apache.org>.
grayver commented on issue #11331:
URL: https://github.com/apache/airflow/issues/11331#issuecomment-707878281


   It seems to be I figured out what the problem is. We have Airflow scheduler installed as a daemon service on Ubuntu machine. Service command is following:
   ```
   /bin/bash -c 'source /home/airflow/airflow_venv/bin/activate && /home/airflow/airflow scheduler -n ${SCHEDULER_RUNS} --pid /run/airflow/scheduler.pid'
   ```
   SCHEDULER_RUNS is set to 5 in environment variables. So, scheduler starts, makes 5 loops of scanning DAG files and then stops. Linux daemon restart policy automatically restart it again. However, if long-running task is still running after service restart (it's not stopped) scheduler doesn't grab new tasks until it finishes.
   
   Setting SCHEDULER_RUNS to -1 solved the issue.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #11331: Long-running task blocks other tasks with LocalExecutor

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #11331:
URL: https://github.com/apache/airflow/issues/11331#issuecomment-705039130


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org