You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Daniel Imberman (Jira)" <ji...@apache.org> on 2020/03/29 15:38:00 UTC

[jira] [Commented] (AIRFLOW-648) Using the LatestOnlyOperator results in tasks being reenqueued many times over

    [ https://issues.apache.org/jira/browse/AIRFLOW-648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17070402#comment-17070402 ] 

Daniel Imberman commented on AIRFLOW-648:
-----------------------------------------

This issue has been moved to https://github.com/apache/airflow/issues/7969

> Using the LatestOnlyOperator results in tasks being reenqueued many times over
> ------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-648
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-648
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: operators, scheduler
>    Affects Versions: 1.8.0
>         Environment: Linux 3.2.0-109-virtual #150-Ubuntu SMP x86_64
> Python Version: 2.7.3
> Airflow Version: 1.7.1.3 with plugin to bring in LatestOnlyOperator
> CeleryExecutor with Redis as a backend
> Airflow Config (subset):
> {code}
> [core]
> executor = CeleryExecutor
> parallelism = 32
> dag_concurrency = 16
> dags_are_paused_at_creation = False
> max_active_runs_per_dag = 16
> [scheduler]
> job_heartbeat_sec = 5
> scheduler_heartbeat_sec = 5
> {code}
>            Reporter: Kevin Gao
>            Priority: Major
>
> We ported over a number of our cronjobs to run using Airflow. To achieve the desired behavior, we use the {{LatestOnlyOperator}} that was merged to master by pull request 1752: https://github.com/apache/incubator-airflow/pull/1752.
> When we moved over our cronjobs, we migrated many at a time (using ansible). These DAGs had a start date going back a few days.
> The first thing I noticed is that it seemed to take a long time to process the backfilled DAGs. They were being processed correctly, in the sense that the {{'latest_only'}} operator was being completely successfully, and the downstream {{BashOperator}} was marked as skipped. It also appeared that the DAG run completed successfully in the tree view. However, when I searched the DAG runs for {{state contains running}}, I saw that they were present.
> One thing I noticed was that in the logs for one of the "stuck" DAG runs, it appeared that the {{'latest_only'}} task was processed multiple times.
> {code}
> [2016-11-22 12:26:27,701] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:28:50,335] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:28:53,288] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:28:58,400] {models.py:1196} INFO -
> --------------------------------------------------------------------------------
> Starting attempt 1 of 1
> --------------------------------------------------------------------------------
> [2016-11-22 12:28:59,334] {models.py:1219} INFO - Executing <Task(LatestOnlyOperator): latest_only> on 2016-11-20 04:00:00
> [2016-11-22 12:29:00,671] {airflow_next.py:27} INFO - Checking latest only with left_window: 2016-11-21 04:00:00 right_window: 2016-11-22 04:00:00 now: 2016-11-22 12:29:00.670321
> [2016-11-22 12:29:00,671] {airflow_next.py:29} INFO - Not latest execution, skipping downstream.
> [2016-11-22 12:29:00,672] {airflow_next.py:34} INFO - Skipping task: my_dag
> [2016-11-22 12:29:01,397] {airflow_next.py:41} INFO - Done.
> [2016-11-22 12:31:13,055] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:31:17,899] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
> [2016-11-22 12:32:31,907] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:34:56,522] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:35:00,975] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
> [2016-11-22 12:35:36,323] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:38:00,140] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:38:05,057] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
> [2016-11-22 12:38:50,014] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:41:07,609] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:41:12,232] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
> [2016-11-22 12:41:45,857] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:44:05,354] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:44:09,635] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
> [2016-11-22 12:44:30,851] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:46:58,977] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:47:02,836] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
> [2016-11-22 12:48:27,571] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:50:54,034] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:50:57,951] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
> [2016-11-22 12:51:21,442] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:53:44,461] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:53:48,392] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
> [2016-11-22 12:54:28,745] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:56:50,740] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:56:54,382] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
> [2016-11-22 12:57:59,881] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:59:04,245] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:59:05,666] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
> [2016-11-22 13:02:18,434] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> {code}
> We use CeleryExecutor backed by Redis. When inspecting the Redis key itself that held the queue, I verified that duplicate tasks were in there; there were thousnands of tasks in the queue. Here are the tasks that were at the head of the task list (the dag names are changed for readability):
> {code}
> [
>  u"['airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py ']",
>  u"['airflow run DAG_02 latest_only 2016-11-18T03:45:00 --local -sd DAGS_FOLDER/DAG_02.py ']",
>  u"['airflow run DAG_03 latest_only 2016-11-18T00:08:00 --local -sd DAGS_FOLDER/DAG_03.py ']",
>  u"['airflow run DAG_04 latest_only 2016-11-22T10:40:00 --local -sd DAGS_FOLDER/DAG_04.py ']",
>  u"['airflow run DAG_05 latest_only 2016-11-18T05:00:00 --local -sd DAGS_FOLDER/DAG_05.py ']",
>  u"['airflow run DAG_06 latest_only 2016-11-22T11:20:00 --local -sd DAGS_FOLDER/DAG_06.py ']",
>  u"['airflow run DAG_07 latest_only 2016-11-18T10:14:00 --local -sd DAGS_FOLDER/DAG_07.py ']",
>  u"['airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py ']",
>  u"['airflow run DAG_02 latest_only 2016-11-18T03:45:00 --local -sd DAGS_FOLDER/DAG_02.py ']",
>  u"['airflow run DAG_03 latest_only 2016-11-18T00:08:00 --local -sd DAGS_FOLDER/DAG_03.py ']",
>  u"['airflow run DAG_04 latest_only 2016-11-22T10:40:00 --local -sd DAGS_FOLDER/DAG_04.py ']",
>  u"['airflow run DAG_05 latest_only 2016-11-18T05:00:00 --local -sd DAGS_FOLDER/DAG_05.py ']",
>  u"['airflow run DAG_06 latest_only 2016-11-22T11:20:00 --local -sd DAGS_FOLDER/DAG_06.py ']",
>  u"['airflow run DAG_07 latest_only 2016-11-18T10:14:00 --local -sd DAGS_FOLDER/DAG_07.py ']"
> ]
> {code}
> Grepping the scheduler's logs, here is one of the instances I see of the scheduler enqueuing a duplicate task:
> {code}
> $ grep -A2 "DAG_01" /var/log/airflow/airflow-scheduler.log | grep -A2 "09:23"
> [2016-11-22 13:24:26,660] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
> [2016-11-22 13:24:26,672] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
> [2016-11-22 13:24:26,678] {jobs.py:514} INFO - Checking dependencies on 2 tasks instances, minus 0 skippable ones
> [2016-11-22 13:24:26,726] {base_executor.py:36} INFO - Adding to queue: airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py
> [2016-11-22 13:24:26,769] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
> [2016-11-22 13:24:26,769] {jobs.py:514} INFO - Checking dependencies on 0 tasks instances, minus 0 skippable ones
> --
> [2016-11-22 13:24:31,830] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
> [2016-11-22 13:24:31,832] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
> [2016-11-22 13:24:31,832] {jobs.py:514} INFO - Checking dependencies on 0 tasks instances, minus 0 skippable ones
> --
> [2016-11-22 13:24:37,238] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
> [2016-11-22 13:24:37,240] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
> [2016-11-22 13:24:37,252] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
> --
> [2016-11-22 13:24:45,736] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
> [2016-11-22 13:24:45,744] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
> [2016-11-22 13:24:45,756] {jobs.py:514} INFO - Checking dependencies on 2 tasks instances, minus 0 skippable ones
> --
> [2016-11-22 13:24:56,613] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
> [2016-11-22 13:24:56,624] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
> [2016-11-22 13:24:56,638] {jobs.py:514} INFO - Checking dependencies on 2 tasks instances, minus 0 skippable ones
> --
> [2016-11-22 13:24:56,680] {base_executor.py:36} INFO - Adding to queue: airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py
> [2016-11-22 13:24:56,823] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
> [2016-11-22 13:24:56,824] {jobs.py:514} INFO - Checking dependencies on 0 tasks instances, minus 0 skippable ones
> {code}
> Eventually, we ended up just creating new DAG definitions with future start dates and manually cleared the Redis queue.
> **Additional Context**:
> Our scheduler is daemonized by upstart and runs with {{-n 5}}
> Here is the template we use for our cron DAGs (note it's a jinja2 template):
> {code}
> # {{ansible_managed}}
> from dateutil import parser
> from airflow.operators import LatestOnlyOperator
> from airflow.operators import BashOperator
> from airflow.models import DAG
> args = {
>     'owner': 'airflow',
>     'start_date': parser.parse('{{item.start_date}}'),
>     'retries': 0,
> }
> dag = DAG(
>   dag_id='{{item.name}}',
>   default_args=args,
>   schedule_interval='{{item.schedule}}',
>   max_active_runs=1,
> )
> latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
> script = BashOperator(
>   task_id='{{item.name}}',
>   bash_command='{{item.command}}',
>   default_args=args,
>   dag=dag,
> )
> script.set_upstream(latest_only)
> {code}
> One thing to note: We are on Airflow 2.7.1.3; however, we brought in the operator through a plugin:
> {code}
> import datetime
> import logging
> from airflow.models import BaseOperator, TaskInstance
> from airflow.plugins_manager import AirflowPlugin
> from airflow.utils.state import State
> from airflow import settings
> class LatestOnlyOperator(BaseOperator):
>     """
>     Allows a workflow to skip tasks that are not running during the most
>     recent schedule interval.
>     If the task is run outside of the latest schedule interval, all
>     directly downstream tasks will be skipped.
>     """
>     ui_color = '#e9ffdb'  # nyanza
>     def execute(self, context):
>         now = datetime.datetime.now()
>         left_window = context['dag'].following_schedule(
>             context['execution_date'])
>         right_window = context['dag'].following_schedule(left_window)
>         logging.info(
>             'Checking latest only with left_window: %s right_window: %s '
>             'now: %s', left_window, right_window, now)
>         if not left_window < now <= right_window:
>             logging.info('Not latest execution, skipping downstream.')
>             session = settings.Session()
>             for task in context['task'].downstream_list:
>                 ti = TaskInstance(
>                     task, execution_date=context['ti'].execution_date)
>                 logging.info('Skipping task: %s', ti.task_id)
>                 ti.state = State.SKIPPED
>                 ti.start_date = now
>                 ti.end_date = now
>                 session.merge(ti)
>             session.commit()
>             session.close()
>             logging.info('Done.')
>         else:
>             logging.info('Latest, allowing execution to proceed.')
> class AirflowNextPlugin(AirflowPlugin):
>     name = "airflow_next"
>     operators = [LatestOnlyOperator]
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)