You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Russell Pierce (JIRA)" <ji...@apache.org> on 2017/07/29 03:03:00 UTC

[jira] [Commented] (AIRFLOW-648) Using the LatestOnlyOperator results in tasks being reenqueued many times over

    [ https://issues.apache.org/jira/browse/AIRFLOW-648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105997#comment-16105997 ] 

Russell Pierce commented on AIRFLOW-648:
----------------------------------------

I wonder if it is really the LatestOnly operator that is to blame here.  I've noticed similar behavior (although it was in the context of a SubDAG) when... 

> Our scheduler is daemonized by upstart and runs with -n 5

Specifically, at the end of each 5th cycle the Scheduler restarts.  When it restarts, it appears to not respect the things that are already waiting on the Redis queue, and if things look like they haven't been run yet - it will re-run them.  I'd be curious, if you eliminated your scheduler restart (which IIRC is no longer recommended practice under 1.8.x), whether your multiple enqueuing issue would resolve. 

> Using the LatestOnlyOperator results in tasks being reenqueued many times over
> ------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-648
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-648
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: operators, scheduler
>    Affects Versions: Airflow 1.8
>         Environment: Linux 3.2.0-109-virtual #150-Ubuntu SMP x86_64
> Python Version: 2.7.3
> Airflow Version: 1.7.1.3 with plugin to bring in LatestOnlyOperator
> CeleryExecutor with Redis as a backend
> Airflow Config (subset):
> {code}
> [core]
> executor = CeleryExecutor
> parallelism = 32
> dag_concurrency = 16
> dags_are_paused_at_creation = False
> max_active_runs_per_dag = 16
> [scheduler]
> job_heartbeat_sec = 5
> scheduler_heartbeat_sec = 5
> {code}
>            Reporter: Kevin Gao
>
> We ported over a number of our cronjobs to run using Airflow. To achieve the desired behavior, we use the {{LatestOnlyOperator}} that was merged to master by pull request 1752: https://github.com/apache/incubator-airflow/pull/1752.
> When we moved over our cronjobs, we migrated many at a time (using ansible). These DAGs had a start date going back a few days.
> The first thing I noticed is that it seemed to take a long time to process the backfilled DAGs. They were being processed correctly, in the sense that the {{'latest_only'}} operator was being completely successfully, and the downstream {{BashOperator}} was marked as skipped. It also appeared that the DAG run completed successfully in the tree view. However, when I searched the DAG runs for {{state contains running}}, I saw that they were present.
> One thing I noticed was that in the logs for one of the "stuck" DAG runs, it appeared that the {{'latest_only'}} task was processed multiple times.
> {code}
> [2016-11-22 12:26:27,701] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:28:50,335] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:28:53,288] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:28:58,400] {models.py:1196} INFO -
> --------------------------------------------------------------------------------
> Starting attempt 1 of 1
> --------------------------------------------------------------------------------
> [2016-11-22 12:28:59,334] {models.py:1219} INFO - Executing <Task(LatestOnlyOperator): latest_only> on 2016-11-20 04:00:00
> [2016-11-22 12:29:00,671] {airflow_next.py:27} INFO - Checking latest only with left_window: 2016-11-21 04:00:00 right_window: 2016-11-22 04:00:00 now: 2016-11-22 12:29:00.670321
> [2016-11-22 12:29:00,671] {airflow_next.py:29} INFO - Not latest execution, skipping downstream.
> [2016-11-22 12:29:00,672] {airflow_next.py:34} INFO - Skipping task: my_dag
> [2016-11-22 12:29:01,397] {airflow_next.py:41} INFO - Done.
> [2016-11-22 12:31:13,055] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:31:17,899] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
> [2016-11-22 12:32:31,907] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:34:56,522] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:35:00,975] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
> [2016-11-22 12:35:36,323] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:38:00,140] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:38:05,057] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
> [2016-11-22 12:38:50,014] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:41:07,609] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:41:12,232] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
> [2016-11-22 12:41:45,857] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:44:05,354] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:44:09,635] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
> [2016-11-22 12:44:30,851] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:46:58,977] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:47:02,836] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
> [2016-11-22 12:48:27,571] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:50:54,034] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:50:57,951] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
> [2016-11-22 12:51:21,442] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:53:44,461] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:53:48,392] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
> [2016-11-22 12:54:28,745] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:56:50,740] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:56:54,382] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
> [2016-11-22 12:57:59,881] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:59:04,245] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> [2016-11-22 12:59:05,666] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
> [2016-11-22 13:02:18,434] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
> {code}
> We use CeleryExecutor backed by Redis. When inspecting the Redis key itself that held the queue, I verified that duplicate tasks were in there; there were thousnands of tasks in the queue. Here are the tasks that were at the head of the task list (the dag names are changed for readability):
> {code}
> [
>  u"['airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py ']",
>  u"['airflow run DAG_02 latest_only 2016-11-18T03:45:00 --local -sd DAGS_FOLDER/DAG_02.py ']",
>  u"['airflow run DAG_03 latest_only 2016-11-18T00:08:00 --local -sd DAGS_FOLDER/DAG_03.py ']",
>  u"['airflow run DAG_04 latest_only 2016-11-22T10:40:00 --local -sd DAGS_FOLDER/DAG_04.py ']",
>  u"['airflow run DAG_05 latest_only 2016-11-18T05:00:00 --local -sd DAGS_FOLDER/DAG_05.py ']",
>  u"['airflow run DAG_06 latest_only 2016-11-22T11:20:00 --local -sd DAGS_FOLDER/DAG_06.py ']",
>  u"['airflow run DAG_07 latest_only 2016-11-18T10:14:00 --local -sd DAGS_FOLDER/DAG_07.py ']",
>  u"['airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py ']",
>  u"['airflow run DAG_02 latest_only 2016-11-18T03:45:00 --local -sd DAGS_FOLDER/DAG_02.py ']",
>  u"['airflow run DAG_03 latest_only 2016-11-18T00:08:00 --local -sd DAGS_FOLDER/DAG_03.py ']",
>  u"['airflow run DAG_04 latest_only 2016-11-22T10:40:00 --local -sd DAGS_FOLDER/DAG_04.py ']",
>  u"['airflow run DAG_05 latest_only 2016-11-18T05:00:00 --local -sd DAGS_FOLDER/DAG_05.py ']",
>  u"['airflow run DAG_06 latest_only 2016-11-22T11:20:00 --local -sd DAGS_FOLDER/DAG_06.py ']",
>  u"['airflow run DAG_07 latest_only 2016-11-18T10:14:00 --local -sd DAGS_FOLDER/DAG_07.py ']"
> ]
> {code}
> Grepping the scheduler's logs, here is one of the instances I see of the scheduler enqueuing a duplicate task:
> {code}
> $ grep -A2 "DAG_01" /var/log/airflow/airflow-scheduler.log | grep -A2 "09:23"
> [2016-11-22 13:24:26,660] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
> [2016-11-22 13:24:26,672] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
> [2016-11-22 13:24:26,678] {jobs.py:514} INFO - Checking dependencies on 2 tasks instances, minus 0 skippable ones
> [2016-11-22 13:24:26,726] {base_executor.py:36} INFO - Adding to queue: airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py
> [2016-11-22 13:24:26,769] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
> [2016-11-22 13:24:26,769] {jobs.py:514} INFO - Checking dependencies on 0 tasks instances, minus 0 skippable ones
> --
> [2016-11-22 13:24:31,830] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
> [2016-11-22 13:24:31,832] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
> [2016-11-22 13:24:31,832] {jobs.py:514} INFO - Checking dependencies on 0 tasks instances, minus 0 skippable ones
> --
> [2016-11-22 13:24:37,238] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
> [2016-11-22 13:24:37,240] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
> [2016-11-22 13:24:37,252] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
> --
> [2016-11-22 13:24:45,736] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
> [2016-11-22 13:24:45,744] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
> [2016-11-22 13:24:45,756] {jobs.py:514} INFO - Checking dependencies on 2 tasks instances, minus 0 skippable ones
> --
> [2016-11-22 13:24:56,613] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
> [2016-11-22 13:24:56,624] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
> [2016-11-22 13:24:56,638] {jobs.py:514} INFO - Checking dependencies on 2 tasks instances, minus 0 skippable ones
> --
> [2016-11-22 13:24:56,680] {base_executor.py:36} INFO - Adding to queue: airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py
> [2016-11-22 13:24:56,823] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
> [2016-11-22 13:24:56,824] {jobs.py:514} INFO - Checking dependencies on 0 tasks instances, minus 0 skippable ones
> {code}
> Eventually, we ended up just creating new DAG definitions with future start dates and manually cleared the Redis queue.
> **Additional Context**:
> Our scheduler is daemonized by upstart and runs with {{-n 5}}
> Here is the template we use for our cron DAGs (note it's a jinja2 template):
> {code}
> # {{ansible_managed}}
> from dateutil import parser
> from airflow.operators import LatestOnlyOperator
> from airflow.operators import BashOperator
> from airflow.models import DAG
> args = {
>     'owner': 'airflow',
>     'start_date': parser.parse('{{item.start_date}}'),
>     'retries': 0,
> }
> dag = DAG(
>   dag_id='{{item.name}}',
>   default_args=args,
>   schedule_interval='{{item.schedule}}',
>   max_active_runs=1,
> )
> latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
> script = BashOperator(
>   task_id='{{item.name}}',
>   bash_command='{{item.command}}',
>   default_args=args,
>   dag=dag,
> )
> script.set_upstream(latest_only)
> {code}
> One thing to note: We are on Airflow 2.7.1.3; however, we brought in the operator through a plugin:
> {code}
> import datetime
> import logging
> from airflow.models import BaseOperator, TaskInstance
> from airflow.plugins_manager import AirflowPlugin
> from airflow.utils.state import State
> from airflow import settings
> class LatestOnlyOperator(BaseOperator):
>     """
>     Allows a workflow to skip tasks that are not running during the most
>     recent schedule interval.
>     If the task is run outside of the latest schedule interval, all
>     directly downstream tasks will be skipped.
>     """
>     ui_color = '#e9ffdb'  # nyanza
>     def execute(self, context):
>         now = datetime.datetime.now()
>         left_window = context['dag'].following_schedule(
>             context['execution_date'])
>         right_window = context['dag'].following_schedule(left_window)
>         logging.info(
>             'Checking latest only with left_window: %s right_window: %s '
>             'now: %s', left_window, right_window, now)
>         if not left_window < now <= right_window:
>             logging.info('Not latest execution, skipping downstream.')
>             session = settings.Session()
>             for task in context['task'].downstream_list:
>                 ti = TaskInstance(
>                     task, execution_date=context['ti'].execution_date)
>                 logging.info('Skipping task: %s', ti.task_id)
>                 ti.state = State.SKIPPED
>                 ti.start_date = now
>                 ti.end_date = now
>                 session.merge(ti)
>             session.commit()
>             session.close()
>             logging.info('Done.')
>         else:
>             logging.info('Latest, allowing execution to proceed.')
> class AirflowNextPlugin(AirflowPlugin):
>     name = "airflow_next"
>     operators = [LatestOnlyOperator]
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)