You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Kevin Gao (JIRA)" <ji...@apache.org> on 2016/11/23 01:12:59 UTC

[jira] [Created] (AIRFLOW-648) Using the LatestOnlyOperator results in tasks being reenqueued many times over

Kevin Gao created AIRFLOW-648:
---------------------------------

             Summary: Using the LatestOnlyOperator results in tasks being reenqueued many times over
                 Key: AIRFLOW-648
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-648
             Project: Apache Airflow
          Issue Type: Bug
          Components: operators, scheduler
    Affects Versions: Airflow 1.8
         Environment: Linux 3.2.0-109-virtual #150-Ubuntu SMP x86_64
Python Version: 2.7.3
Airflow Version: 1.7.1.3 with plugin to bring in LatestOnlyOperator
CeleryExecutor with Redis as a backend
Airflow Config (subset):
{code}
[core]
executor = CeleryExecutor
parallelism = 32
dag_concurrency = 16
dags_are_paused_at_creation = False
max_active_runs_per_dag = 16

[scheduler]
job_heartbeat_sec = 5
scheduler_heartbeat_sec = 5
{code}
            Reporter: Kevin Gao


We ported over a number of our cronjobs to run using Airflow. To achieve the desired behavior, we use the {{LatestOnlyOperator}} that was merged to master by pull request 1752: https://github.com/apache/incubator-airflow/pull/1752.

When we moved over our cronjobs, we migrated many at a time (using ansible). These DAGs had a start date going back a few days.

The first thing I noticed is that it seemed to take a long time to process the backfilled DAGs. They were being processed correctly, in the sense that the {{'latest_only'}} operator was being completely successfully, and the downstream {{BashOperator}} was marked as skipped. It also appeared that the DAG run completed successfully in the tree view. However, when I searched the DAG runs for {{state contains running}}, I saw that they were present.

One thing I noticed was that in the logs for one of the "stuck" DAG runs, it appeared that the {{'latest_only'}} task was processed multiple times.
{code}
[2016-11-22 12:26:27,701] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:28:50,335] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:28:53,288] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:28:58,400] {models.py:1196} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-11-22 12:28:59,334] {models.py:1219} INFO - Executing <Task(LatestOnlyOperator): latest_only> on 2016-11-20 04:00:00
[2016-11-22 12:29:00,671] {airflow_next.py:27} INFO - Checking latest only with left_window: 2016-11-21 04:00:00 right_window: 2016-11-22 04:00:00 now: 2016-11-22 12:29:00.670321
[2016-11-22 12:29:00,671] {airflow_next.py:29} INFO - Not latest execution, skipping downstream.
[2016-11-22 12:29:00,672] {airflow_next.py:34} INFO - Skipping task: my_dag
[2016-11-22 12:29:01,397] {airflow_next.py:41} INFO - Done.
[2016-11-22 12:31:13,055] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:31:17,899] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:32:31,907] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:34:56,522] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:35:00,975] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:35:36,323] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:38:00,140] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:38:05,057] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:38:50,014] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:41:07,609] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:41:12,232] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:41:45,857] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:44:05,354] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:44:09,635] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:44:30,851] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:46:58,977] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:47:02,836] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:48:27,571] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:50:54,034] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:50:57,951] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:51:21,442] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:53:44,461] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:53:48,392] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:54:28,745] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:56:50,740] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:56:54,382] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:57:59,881] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:59:04,245] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:59:05,666] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 13:02:18,434] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
{code}

We use CeleryExecutor backed by Redis. When inspecting the Redis key itself that held the queue, I verified that duplicate tasks were in there; there were thousnands of tasks in the queue. Here are the tasks that were at the head of the task list (the dag names are changed for readability):
{code}
[
 u"['airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py ']",
 u"['airflow run DAG_02 latest_only 2016-11-18T03:45:00 --local -sd DAGS_FOLDER/DAG_02.py ']",
 u"['airflow run DAG_03 latest_only 2016-11-18T00:08:00 --local -sd DAGS_FOLDER/DAG_03.py ']",
 u"['airflow run DAG_04 latest_only 2016-11-22T10:40:00 --local -sd DAGS_FOLDER/DAG_04.py ']",
 u"['airflow run DAG_05 latest_only 2016-11-18T05:00:00 --local -sd DAGS_FOLDER/DAG_05.py ']",
 u"['airflow run DAG_06 latest_only 2016-11-22T11:20:00 --local -sd DAGS_FOLDER/DAG_06.py ']",
 u"['airflow run DAG_07 latest_only 2016-11-18T10:14:00 --local -sd DAGS_FOLDER/DAG_07.py ']",
 u"['airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py ']",
 u"['airflow run DAG_02 latest_only 2016-11-18T03:45:00 --local -sd DAGS_FOLDER/DAG_02.py ']",
 u"['airflow run DAG_03 latest_only 2016-11-18T00:08:00 --local -sd DAGS_FOLDER/DAG_03.py ']",
 u"['airflow run DAG_04 latest_only 2016-11-22T10:40:00 --local -sd DAGS_FOLDER/DAG_04.py ']",
 u"['airflow run DAG_05 latest_only 2016-11-18T05:00:00 --local -sd DAGS_FOLDER/DAG_05.py ']",
 u"['airflow run DAG_06 latest_only 2016-11-22T11:20:00 --local -sd DAGS_FOLDER/DAG_06.py ']",
 u"['airflow run DAG_07 latest_only 2016-11-18T10:14:00 --local -sd DAGS_FOLDER/DAG_07.py ']"
]
{code}

Grepping the scheduler's logs, here is one of the instances I see of the scheduler enqueuing a duplicate task:
{code}
$ grep -A2 "DAG_01" /var/log/airflow/airflow-scheduler.log | grep -A2 "09:23"
[2016-11-22 13:24:26,660] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
[2016-11-22 13:24:26,672] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
[2016-11-22 13:24:26,678] {jobs.py:514} INFO - Checking dependencies on 2 tasks instances, minus 0 skippable ones
[2016-11-22 13:24:26,726] {base_executor.py:36} INFO - Adding to queue: airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py
[2016-11-22 13:24:26,769] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
[2016-11-22 13:24:26,769] {jobs.py:514} INFO - Checking dependencies on 0 tasks instances, minus 0 skippable ones
--
[2016-11-22 13:24:31,830] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
[2016-11-22 13:24:31,832] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
[2016-11-22 13:24:31,832] {jobs.py:514} INFO - Checking dependencies on 0 tasks instances, minus 0 skippable ones
--
[2016-11-22 13:24:37,238] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
[2016-11-22 13:24:37,240] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
[2016-11-22 13:24:37,252] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
--
[2016-11-22 13:24:45,736] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
[2016-11-22 13:24:45,744] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
[2016-11-22 13:24:45,756] {jobs.py:514} INFO - Checking dependencies on 2 tasks instances, minus 0 skippable ones
--
[2016-11-22 13:24:56,613] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
[2016-11-22 13:24:56,624] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
[2016-11-22 13:24:56,638] {jobs.py:514} INFO - Checking dependencies on 2 tasks instances, minus 0 skippable ones
--
[2016-11-22 13:24:56,680] {base_executor.py:36} INFO - Adding to queue: airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py
[2016-11-22 13:24:56,823] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
[2016-11-22 13:24:56,824] {jobs.py:514} INFO - Checking dependencies on 0 tasks instances, minus 0 skippable ones
{code}

Eventually, we ended up just creating new DAG definitions with future start dates and manually cleared the Redis queue.

**Additional Context**:

Our scheduler is daemonized by upstart and runs with {{-n 5}}

Here is the template we use for our cron DAGs (note it's a jinja2 template):
{code}
# {{ansible_managed}}

from dateutil import parser

from airflow.operators import LatestOnlyOperator
from airflow.operators import BashOperator
from airflow.models import DAG

args = {
    'owner': 'airflow',
    'start_date': parser.parse('{{item.start_date}}'),
    'retries': 0,
}

dag = DAG(
  dag_id='{{item.name}}',
  default_args=args,
  schedule_interval='{{item.schedule}}',
  max_active_runs=1,
)

latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)

script = BashOperator(
  task_id='{{item.name}}',
  bash_command='{{item.command}}',
  default_args=args,
  dag=dag,
)

script.set_upstream(latest_only)
{code}

One thing to note: We are on Airflow 2.7.1.3; however, we brought in the operator through a plugin:
{code}
import datetime
import logging

from airflow.models import BaseOperator, TaskInstance
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.state import State
from airflow import settings


class LatestOnlyOperator(BaseOperator):
    """
    Allows a workflow to skip tasks that are not running during the most
    recent schedule interval.
    If the task is run outside of the latest schedule interval, all
    directly downstream tasks will be skipped.
    """

    ui_color = '#e9ffdb'  # nyanza

    def execute(self, context):
        now = datetime.datetime.now()
        left_window = context['dag'].following_schedule(
            context['execution_date'])
        right_window = context['dag'].following_schedule(left_window)
        logging.info(
            'Checking latest only with left_window: %s right_window: %s '
            'now: %s', left_window, right_window, now)
        if not left_window < now <= right_window:
            logging.info('Not latest execution, skipping downstream.')
            session = settings.Session()
            for task in context['task'].downstream_list:
                ti = TaskInstance(
                    task, execution_date=context['ti'].execution_date)
                logging.info('Skipping task: %s', ti.task_id)
                ti.state = State.SKIPPED
                ti.start_date = now
                ti.end_date = now
                session.merge(ti)
            session.commit()
            session.close()
            logging.info('Done.')
        else:
            logging.info('Latest, allowing execution to proceed.')


class AirflowNextPlugin(AirflowPlugin):
    name = "airflow_next"
    operators = [LatestOnlyOperator]
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)