You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Kevin Gao (JIRA)" <ji...@apache.org> on 2016/11/23 01:12:59 UTC
[jira] [Created] (AIRFLOW-648) Using the LatestOnlyOperator results
in tasks being reenqueued many times over
Kevin Gao created AIRFLOW-648:
---------------------------------
Summary: Using the LatestOnlyOperator results in tasks being reenqueued many times over
Key: AIRFLOW-648
URL: https://issues.apache.org/jira/browse/AIRFLOW-648
Project: Apache Airflow
Issue Type: Bug
Components: operators, scheduler
Affects Versions: Airflow 1.8
Environment: Linux 3.2.0-109-virtual #150-Ubuntu SMP x86_64
Python Version: 2.7.3
Airflow Version: 1.7.1.3 with plugin to bring in LatestOnlyOperator
CeleryExecutor with Redis as a backend
Airflow Config (subset):
{code}
[core]
executor = CeleryExecutor
parallelism = 32
dag_concurrency = 16
dags_are_paused_at_creation = False
max_active_runs_per_dag = 16
[scheduler]
job_heartbeat_sec = 5
scheduler_heartbeat_sec = 5
{code}
Reporter: Kevin Gao
We ported over a number of our cronjobs to run using Airflow. To achieve the desired behavior, we use the {{LatestOnlyOperator}} that was merged to master by pull request 1752: https://github.com/apache/incubator-airflow/pull/1752.
When we moved over our cronjobs, we migrated many at a time (using ansible). These DAGs had a start date going back a few days.
The first thing I noticed is that it seemed to take a long time to process the backfilled DAGs. They were being processed correctly, in the sense that the {{'latest_only'}} operator was being completely successfully, and the downstream {{BashOperator}} was marked as skipped. It also appeared that the DAG run completed successfully in the tree view. However, when I searched the DAG runs for {{state contains running}}, I saw that they were present.
One thing I noticed was that in the logs for one of the "stuck" DAG runs, it appeared that the {{'latest_only'}} task was processed multiple times.
{code}
[2016-11-22 12:26:27,701] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:28:50,335] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:28:53,288] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:28:58,400] {models.py:1196} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------
[2016-11-22 12:28:59,334] {models.py:1219} INFO - Executing <Task(LatestOnlyOperator): latest_only> on 2016-11-20 04:00:00
[2016-11-22 12:29:00,671] {airflow_next.py:27} INFO - Checking latest only with left_window: 2016-11-21 04:00:00 right_window: 2016-11-22 04:00:00 now: 2016-11-22 12:29:00.670321
[2016-11-22 12:29:00,671] {airflow_next.py:29} INFO - Not latest execution, skipping downstream.
[2016-11-22 12:29:00,672] {airflow_next.py:34} INFO - Skipping task: my_dag
[2016-11-22 12:29:01,397] {airflow_next.py:41} INFO - Done.
[2016-11-22 12:31:13,055] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:31:17,899] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:32:31,907] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:34:56,522] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:35:00,975] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:35:36,323] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:38:00,140] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:38:05,057] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:38:50,014] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:41:07,609] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:41:12,232] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:41:45,857] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:44:05,354] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:44:09,635] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:44:30,851] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:46:58,977] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:47:02,836] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:48:27,571] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:50:54,034] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:50:57,951] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:51:21,442] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:53:44,461] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:53:48,392] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:54:28,745] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:56:50,740] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:56:54,382] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 12:57:59,881] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:59:04,245] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
[2016-11-22 12:59:05,666] {models.py:1150} INFO - Task <TaskInstance: my_dag.latest_only 2016-11-20 04:00:00 [success]> previously succeeded on 2016-11-22 12:29:01.397870
[2016-11-22 13:02:18,434] {models.py:154} INFO - Filling up the DagBag from /DAGDIR/my_dag.py
{code}
We use CeleryExecutor backed by Redis. When inspecting the Redis key itself that held the queue, I verified that duplicate tasks were in there; there were thousnands of tasks in the queue. Here are the tasks that were at the head of the task list (the dag names are changed for readability):
{code}
[
u"['airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py ']",
u"['airflow run DAG_02 latest_only 2016-11-18T03:45:00 --local -sd DAGS_FOLDER/DAG_02.py ']",
u"['airflow run DAG_03 latest_only 2016-11-18T00:08:00 --local -sd DAGS_FOLDER/DAG_03.py ']",
u"['airflow run DAG_04 latest_only 2016-11-22T10:40:00 --local -sd DAGS_FOLDER/DAG_04.py ']",
u"['airflow run DAG_05 latest_only 2016-11-18T05:00:00 --local -sd DAGS_FOLDER/DAG_05.py ']",
u"['airflow run DAG_06 latest_only 2016-11-22T11:20:00 --local -sd DAGS_FOLDER/DAG_06.py ']",
u"['airflow run DAG_07 latest_only 2016-11-18T10:14:00 --local -sd DAGS_FOLDER/DAG_07.py ']",
u"['airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py ']",
u"['airflow run DAG_02 latest_only 2016-11-18T03:45:00 --local -sd DAGS_FOLDER/DAG_02.py ']",
u"['airflow run DAG_03 latest_only 2016-11-18T00:08:00 --local -sd DAGS_FOLDER/DAG_03.py ']",
u"['airflow run DAG_04 latest_only 2016-11-22T10:40:00 --local -sd DAGS_FOLDER/DAG_04.py ']",
u"['airflow run DAG_05 latest_only 2016-11-18T05:00:00 --local -sd DAGS_FOLDER/DAG_05.py ']",
u"['airflow run DAG_06 latest_only 2016-11-22T11:20:00 --local -sd DAGS_FOLDER/DAG_06.py ']",
u"['airflow run DAG_07 latest_only 2016-11-18T10:14:00 --local -sd DAGS_FOLDER/DAG_07.py ']"
]
{code}
Grepping the scheduler's logs, here is one of the instances I see of the scheduler enqueuing a duplicate task:
{code}
$ grep -A2 "DAG_01" /var/log/airflow/airflow-scheduler.log | grep -A2 "09:23"
[2016-11-22 13:24:26,660] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
[2016-11-22 13:24:26,672] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
[2016-11-22 13:24:26,678] {jobs.py:514} INFO - Checking dependencies on 2 tasks instances, minus 0 skippable ones
[2016-11-22 13:24:26,726] {base_executor.py:36} INFO - Adding to queue: airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py
[2016-11-22 13:24:26,769] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
[2016-11-22 13:24:26,769] {jobs.py:514} INFO - Checking dependencies on 0 tasks instances, minus 0 skippable ones
--
[2016-11-22 13:24:31,830] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
[2016-11-22 13:24:31,832] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
[2016-11-22 13:24:31,832] {jobs.py:514} INFO - Checking dependencies on 0 tasks instances, minus 0 skippable ones
--
[2016-11-22 13:24:37,238] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
[2016-11-22 13:24:37,240] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
[2016-11-22 13:24:37,252] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
--
[2016-11-22 13:24:45,736] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
[2016-11-22 13:24:45,744] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
[2016-11-22 13:24:45,756] {jobs.py:514} INFO - Checking dependencies on 2 tasks instances, minus 0 skippable ones
--
[2016-11-22 13:24:56,613] {models.py:2660} INFO - Checking state for <DagRun DAG_01 @ 2016-11-18 09:23:00: scheduled__2016-11-18T09:23:00, externally triggered: False>
[2016-11-22 13:24:56,624] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
[2016-11-22 13:24:56,638] {jobs.py:514} INFO - Checking dependencies on 2 tasks instances, minus 0 skippable ones
--
[2016-11-22 13:24:56,680] {base_executor.py:36} INFO - Adding to queue: airflow run DAG_01 latest_only 2016-11-18T09:23:00 --local -sd DAGS_FOLDER/DAG_01.py
[2016-11-22 13:24:56,823] {jobs.py:498} INFO - Getting list of tasks to skip for active runs.
[2016-11-22 13:24:56,824] {jobs.py:514} INFO - Checking dependencies on 0 tasks instances, minus 0 skippable ones
{code}
Eventually, we ended up just creating new DAG definitions with future start dates and manually cleared the Redis queue.
**Additional Context**:
Our scheduler is daemonized by upstart and runs with {{-n 5}}
Here is the template we use for our cron DAGs (note it's a jinja2 template):
{code}
# {{ansible_managed}}
from dateutil import parser
from airflow.operators import LatestOnlyOperator
from airflow.operators import BashOperator
from airflow.models import DAG
args = {
'owner': 'airflow',
'start_date': parser.parse('{{item.start_date}}'),
'retries': 0,
}
dag = DAG(
dag_id='{{item.name}}',
default_args=args,
schedule_interval='{{item.schedule}}',
max_active_runs=1,
)
latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
script = BashOperator(
task_id='{{item.name}}',
bash_command='{{item.command}}',
default_args=args,
dag=dag,
)
script.set_upstream(latest_only)
{code}
One thing to note: We are on Airflow 2.7.1.3; however, we brought in the operator through a plugin:
{code}
import datetime
import logging
from airflow.models import BaseOperator, TaskInstance
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.state import State
from airflow import settings
class LatestOnlyOperator(BaseOperator):
"""
Allows a workflow to skip tasks that are not running during the most
recent schedule interval.
If the task is run outside of the latest schedule interval, all
directly downstream tasks will be skipped.
"""
ui_color = '#e9ffdb' # nyanza
def execute(self, context):
now = datetime.datetime.now()
left_window = context['dag'].following_schedule(
context['execution_date'])
right_window = context['dag'].following_schedule(left_window)
logging.info(
'Checking latest only with left_window: %s right_window: %s '
'now: %s', left_window, right_window, now)
if not left_window < now <= right_window:
logging.info('Not latest execution, skipping downstream.')
session = settings.Session()
for task in context['task'].downstream_list:
ti = TaskInstance(
task, execution_date=context['ti'].execution_date)
logging.info('Skipping task: %s', ti.task_id)
ti.state = State.SKIPPED
ti.start_date = now
ti.end_date = now
session.merge(ti)
session.commit()
session.close()
logging.info('Done.')
else:
logging.info('Latest, allowing execution to proceed.')
class AirflowNextPlugin(AirflowPlugin):
name = "airflow_next"
operators = [LatestOnlyOperator]
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)