You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/08/13 19:49:13 UTC
[airflow] 04/08: Add 'queued' state to DagRun (#16401)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 2bab3d462018efb3186f94440d7bfde3e6747901
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Tue Jul 6 15:03:27 2021 +0100
Add 'queued' state to DagRun (#16401)
This change adds queued state to DagRun. Newly created DagRuns
start in the queued state, are then moved to the running state after
satisfying the DAG's max_active_runs. If the Dag doesn't have
max_active_runs, the DagRuns are moved to running state immediately
Clearing a DagRun sets the state to queued state
Closes: #9975, #16366
(cherry picked from commit 6611ffd399dce0474d8329720de7e83f568df598)
---
airflow/api_connexion/openapi/v1.yaml | 4 +-
airflow/jobs/scheduler_job.py | 171 +++++++----------
...93827b8_add_queued_at_column_to_dagrun_table.py | 49 +++++
airflow/models/dag.py | 4 +-
airflow/models/dagrun.py | 17 +-
airflow/models/taskinstance.py | 6 +-
airflow/www/static/js/tree.js | 4 +-
airflow/www/views.py | 5 +-
docs/apache-airflow/migrations-ref.rst | 2 +-
tests/api/common/experimental/test_mark_tasks.py | 4 +-
.../endpoints/test_dag_run_endpoint.py | 14 +-
tests/api_connexion/schemas/test_dag_run_schema.py | 3 +
tests/dag_processing/test_manager.py | 10 +-
tests/dag_processing/test_processor.py | 65 ++++---
tests/jobs/test_scheduler_job.py | 206 +++++++++------------
tests/models/test_cleartasks.py | 37 ++++
tests/models/test_dagrun.py | 25 ++-
tests/sensors/test_external_task_sensor.py | 8 +-
18 files changed, 338 insertions(+), 296 deletions(-)
diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml
index 182f356..47fa3dc 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -1853,6 +1853,7 @@ components:
description: |
The start time. The time when DAG run was actually created.
readOnly: true
+ nullable: true
end_date:
type: string
format: date-time
@@ -3025,8 +3026,9 @@ components:
description: DAG State.
type: string
enum:
- - success
+ - queued
- running
+ - success
- failed
TriggerRule:
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 5b24e00..b564717 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -26,7 +26,7 @@ import sys
import time
from collections import defaultdict
from datetime import timedelta
-from typing import DefaultDict, Dict, Iterable, List, Optional, Set, Tuple
+from typing import DefaultDict, Dict, Iterable, List, Optional, Tuple
from sqlalchemy import and_, func, not_, or_, tuple_
from sqlalchemy.exc import OperationalError
@@ -197,7 +197,7 @@ class SchedulerJob(BaseJob):
"""
For all DAG IDs in the DagBag, look for task instances in the
old_states and set them to new_state if the corresponding DagRun
- does not exist or exists but is not in the running state. This
+ does not exist or exists but is not in the running or queued state. This
normally should not happen, but it can if the state of DagRuns are
changed manually.
@@ -214,7 +214,7 @@ class SchedulerJob(BaseJob):
.filter(models.TaskInstance.state.in_(old_states))
.filter(
or_(
- models.DagRun.state != State.RUNNING,
+ models.DagRun.state.notin_([State.RUNNING, State.QUEUED]),
models.DagRun.state.is_(None),
)
)
@@ -882,39 +882,12 @@ class SchedulerJob(BaseJob):
if settings.USE_JOB_SCHEDULE:
self._create_dagruns_for_dags(guard, session)
- dag_runs = self._get_next_dagruns_to_examine(session)
+ self._start_queued_dagruns(session)
+ guard.commit()
+ dag_runs = self._get_next_dagruns_to_examine(State.RUNNING, session)
# Bulk fetch the currently active dag runs for the dags we are
# examining, rather than making one query per DagRun
- # TODO: This query is probably horribly inefficient (though there is an
- # index on (dag_id,state)). It is to deal with the case when a user
- # clears more than max_active_runs older tasks -- we don't want the
- # scheduler to suddenly go and start running tasks from all of the
- # runs. (AIRFLOW-137/GH #1442)
- #
- # The longer term fix would be to have `clear` do this, and put DagRuns
- # in to the queued state, then take DRs out of queued before creating
- # any new ones
-
- # Build up a set of execution_dates that are "active" for a given
- # dag_id -- only tasks from those runs will be scheduled.
- active_runs_by_dag_id = defaultdict(set)
-
- query = (
- session.query(
- TI.dag_id,
- TI.execution_date,
- )
- .filter(
- TI.dag_id.in_(list({dag_run.dag_id for dag_run in dag_runs})),
- TI.state.notin_(list(State.finished) + [State.REMOVED]),
- )
- .group_by(TI.dag_id, TI.execution_date)
- )
-
- for dag_id, execution_date in query:
- active_runs_by_dag_id[dag_id].add(execution_date)
-
for dag_run in dag_runs:
# Use try_except to not stop the Scheduler when a Serialized DAG is not found
# This takes care of Dynamic DAGs especially
@@ -923,7 +896,7 @@ class SchedulerJob(BaseJob):
# But this would take care of the scenario when the Scheduler is restarted after DagRun is
# created and the DAG is deleted / renamed
try:
- self._schedule_dag_run(dag_run, active_runs_by_dag_id.get(dag_run.dag_id, set()), session)
+ self._schedule_dag_run(dag_run, session)
except SerializedDagNotFound:
self.log.exception("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
continue
@@ -963,9 +936,9 @@ class SchedulerJob(BaseJob):
return num_queued_tis
@retry_db_transaction
- def _get_next_dagruns_to_examine(self, session):
+ def _get_next_dagruns_to_examine(self, state, session):
"""Get Next DagRuns to Examine with retries"""
- return DagRun.next_dagruns_to_examine(session)
+ return DagRun.next_dagruns_to_examine(state, session)
@retry_db_transaction
def _create_dagruns_for_dags(self, guard, session):
@@ -986,14 +959,24 @@ class SchedulerJob(BaseJob):
# as DagModel.dag_id and DagModel.next_dagrun
# This list is used to verify if the DagRun already exist so that we don't attempt to create
# duplicate dag runs
- active_dagruns = (
- session.query(DagRun.dag_id, DagRun.execution_date)
- .filter(
- tuple_(DagRun.dag_id, DagRun.execution_date).in_(
- [(dm.dag_id, dm.next_dagrun) for dm in dag_models]
+
+ if session.bind.dialect.name == 'mssql':
+ existing_dagruns_filter = or_(
+ *(
+ and_(
+ DagRun.dag_id == dm.dag_id,
+ DagRun.execution_date == dm.next_dagrun,
+ )
+ for dm in dag_models
)
)
- .all()
+ else:
+ existing_dagruns_filter = tuple_(DagRun.dag_id, DagRun.execution_date).in_(
+ [(dm.dag_id, dm.next_dagrun) for dm in dag_models]
+ )
+
+ existing_dagruns = (
+ session.query(DagRun.dag_id, DagRun.execution_date).filter(existing_dagruns_filter).all()
)
for dag_model in dag_models:
@@ -1009,89 +992,83 @@ class SchedulerJob(BaseJob):
# are not updated.
# We opted to check DagRun existence instead
# of catching an Integrity error and rolling back the session i.e
- # we need to run self._update_dag_next_dagruns if the Dag Run already exists or if we
+ # we need to set dag.next_dagrun_info if the Dag Run already exists or if we
# create a new one. This is so that in the next Scheduling loop we try to create new runs
# instead of falling in a loop of Integrity Error.
- if (dag.dag_id, dag_model.next_dagrun) not in active_dagruns:
- run = dag.create_dagrun(
+ if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
+ dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=dag_model.next_dagrun,
- start_date=timezone.utcnow(),
- state=State.RUNNING,
+ state=State.QUEUED,
external_trigger=False,
session=session,
dag_hash=dag_hash,
creating_job_id=self.id,
)
-
- expected_start_date = dag.following_schedule(run.execution_date)
- if expected_start_date:
- schedule_delay = run.start_date - expected_start_date
- Stats.timing(
- f'dagrun.schedule_delay.{dag.dag_id}',
- schedule_delay,
- )
-
- self._update_dag_next_dagruns(dag_models, session)
+ dag_model.next_dagrun, dag_model.next_dagrun_create_after = dag.next_dagrun_info(
+ dag_model.next_dagrun
+ )
# TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
# memory for larger dags? or expunge_all()
- def _update_dag_next_dagruns(self, dag_models: Iterable[DagModel], session: Session) -> None:
- """
- Bulk update the next_dagrun and next_dagrun_create_after for all the dags.
+ def _start_queued_dagruns(
+ self,
+ session: Session,
+ ) -> int:
+ """Find DagRuns in queued state and decide moving them to running state"""
+ dag_runs = self._get_next_dagruns_to_examine(State.QUEUED, session)
- We batch the select queries to get info about all the dags at once
- """
- # Check max_active_runs, to see if we are _now_ at the limit for any of
- # these dag? (we've just created a DagRun for them after all)
- active_runs_of_dags = dict(
+ active_runs_of_dags = defaultdict(
+ lambda: 0,
session.query(DagRun.dag_id, func.count('*'))
- .filter(
- DagRun.dag_id.in_([o.dag_id for o in dag_models]),
+ .filter( # We use `list` here because SQLA doesn't accept a set
+ # We use set to avoid duplicate dag_ids
+ DagRun.dag_id.in_(list({dr.dag_id for dr in dag_runs})),
DagRun.state == State.RUNNING,
- DagRun.external_trigger.is_(False),
)
.group_by(DagRun.dag_id)
- .all()
+ .all(),
)
- for dag_model in dag_models:
- # Get the DAG in a try_except to not stop the Scheduler when a Serialized DAG is not found
- # This takes care of Dynamic DAGs especially
+ def _update_state(dag_run):
+ dag_run.state = State.RUNNING
+ dag_run.start_date = timezone.utcnow()
+ expected_start_date = dag.following_schedule(dag_run.execution_date)
+ if expected_start_date:
+ schedule_delay = dag_run.start_date - expected_start_date
+ Stats.timing(
+ f'dagrun.schedule_delay.{dag.dag_id}',
+ schedule_delay,
+ )
+
+ for dag_run in dag_runs:
try:
- dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
+ dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
except SerializedDagNotFound:
- self.log.exception("DAG '%s' not found in serialized_dag table", dag_model.dag_id)
+ self.log.exception("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
continue
- active_runs_of_dag = active_runs_of_dags.get(dag.dag_id, 0)
- if dag.max_active_runs and active_runs_of_dag >= dag.max_active_runs:
- self.log.info(
- "DAG %s is at (or above) max_active_runs (%d of %d), not creating any more runs",
+ active_runs = active_runs_of_dags[dag_run.dag_id]
+ if dag.max_active_runs and active_runs >= dag.max_active_runs:
+ self.log.debug(
+ "DAG %s already has %d active runs, not moving any more runs to RUNNING state %s",
dag.dag_id,
- active_runs_of_dag,
- dag.max_active_runs,
+ active_runs,
+ dag_run.execution_date,
)
- dag_model.next_dagrun_create_after = None
else:
- dag_model.next_dagrun, dag_model.next_dagrun_create_after = dag.next_dagrun_info(
- dag_model.next_dagrun
- )
+ active_runs_of_dags[dag_run.dag_id] += 1
+ _update_state(dag_run)
def _schedule_dag_run(
self,
dag_run: DagRun,
- currently_active_runs: Set[datetime.datetime],
session: Session,
) -> int:
"""
Make scheduling decisions about an individual dag run
- ``currently_active_runs`` is passed in so that a batch query can be
- used to ask this for all dag runs in the batch, to avoid an n+1 query.
-
:param dag_run: The DagRun to schedule
- :param currently_active_runs: Number of currently active runs of this DAG
:return: Number of tasks scheduled
"""
dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
@@ -1118,9 +1095,6 @@ class SchedulerJob(BaseJob):
session.flush()
self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
- # Work out if we should allow creating a new DagRun now?
- self._update_dag_next_dagruns([session.query(DagModel).get(dag_run.dag_id)], session)
-
callback_to_execute = DagCallbackRequest(
full_filepath=dag.fileloc,
dag_id=dag.dag_id,
@@ -1138,19 +1112,6 @@ class SchedulerJob(BaseJob):
self.log.error("Execution date is in future: %s", dag_run.execution_date)
return 0
- if dag.max_active_runs:
- if (
- len(currently_active_runs) >= dag.max_active_runs
- and dag_run.execution_date not in currently_active_runs
- ):
- self.log.info(
- "DAG %s already has %d active runs, not queuing any tasks for run %s",
- dag.dag_id,
- len(currently_active_runs),
- dag_run.execution_date,
- )
- return 0
-
self._verify_integrity_if_dag_changed(dag_run=dag_run, session=session)
# TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else?
schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
diff --git a/airflow/migrations/versions/97cdd93827b8_add_queued_at_column_to_dagrun_table.py b/airflow/migrations/versions/97cdd93827b8_add_queued_at_column_to_dagrun_table.py
new file mode 100644
index 0000000..db08d8b
--- /dev/null
+++ b/airflow/migrations/versions/97cdd93827b8_add_queued_at_column_to_dagrun_table.py
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add queued_at column to dagrun table
+
+Revision ID: 83f031fd9f1c
+Revises: 30867afad44a
+Create Date: 2021-06-29 21:53:48.059438
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mssql
+
+# revision identifiers, used by Alembic.
+revision = '97cdd93827b8'
+down_revision = '83f031fd9f1c'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ """Apply Add queued_at column to dagrun table"""
+ conn = op.get_bind()
+ if conn.dialect.name == "mssql":
+ op.add_column('dag_run', sa.Column('queued_at', mssql.DATETIME2(precision=6), nullable=True))
+ else:
+ op.add_column('dag_run', sa.Column('queued_at', sa.DateTime(), nullable=True))
+
+
+def downgrade():
+ """Unapply Add queued_at column to dagrun table"""
+ op.drop_column('dag_run', "queued_at")
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 13d69c2..a3d06db 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1153,7 +1153,7 @@ class DAG(LoggingMixin):
confirm_prompt=False,
include_subdags=True,
include_parentdag=True,
- dag_run_state: str = State.RUNNING,
+ dag_run_state: str = State.QUEUED,
dry_run=False,
session=None,
get_tis=False,
@@ -1369,7 +1369,7 @@ class DAG(LoggingMixin):
confirm_prompt=False,
include_subdags=True,
include_parentdag=False,
- dag_run_state=State.RUNNING,
+ dag_run_state=State.QUEUED,
dry_run=False,
):
all_tis = []
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 07f309d..a061dcc 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -61,12 +61,15 @@ class DagRun(Base, LoggingMixin):
__tablename__ = "dag_run"
+ __NO_VALUE = object()
+
id = Column(Integer, primary_key=True)
dag_id = Column(String(ID_LEN))
+ queued_at = Column(UtcDateTime)
execution_date = Column(UtcDateTime, default=timezone.utcnow)
- start_date = Column(UtcDateTime, default=timezone.utcnow)
+ start_date = Column(UtcDateTime)
end_date = Column(UtcDateTime)
- _state = Column('state', String(50), default=State.RUNNING)
+ _state = Column('state', String(50), default=State.QUEUED)
run_id = Column(String(ID_LEN))
creating_job_id = Column(Integer)
external_trigger = Column(Boolean, default=True)
@@ -102,6 +105,7 @@ class DagRun(Base, LoggingMixin):
self,
dag_id: Optional[str] = None,
run_id: Optional[str] = None,
+ queued_at: Optional[datetime] = __NO_VALUE,
execution_date: Optional[datetime] = None,
start_date: Optional[datetime] = None,
external_trigger: Optional[bool] = None,
@@ -118,6 +122,10 @@ class DagRun(Base, LoggingMixin):
self.external_trigger = external_trigger
self.conf = conf or {}
self.state = state
+ if queued_at is self.__NO_VALUE:
+ self.queued_at = timezone.utcnow() if state == State.QUEUED else None
+ else:
+ self.queued_at = queued_at
self.run_type = run_type
self.dag_hash = dag_hash
self.creating_job_id = creating_job_id
@@ -140,6 +148,8 @@ class DagRun(Base, LoggingMixin):
if self._state != state:
self._state = state
self.end_date = timezone.utcnow() if self._state in State.finished else None
+ if state == State.QUEUED:
+ self.queued_at = timezone.utcnow()
@declared_attr
def state(self):
@@ -160,6 +170,7 @@ class DagRun(Base, LoggingMixin):
@classmethod
def next_dagruns_to_examine(
cls,
+ state: str,
session: Session,
max_number: Optional[int] = None,
):
@@ -180,7 +191,7 @@ class DagRun(Base, LoggingMixin):
# TODO: Bake this query, it is run _A lot_
query = (
session.query(cls)
- .filter(cls.state == State.RUNNING, cls.run_type != DagRunType.BACKFILL_JOB)
+ .filter(cls.state == state, cls.run_type != DagRunType.BACKFILL_JOB)
.join(
DagModel,
DagModel.dag_id == cls.dag_id,
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 5fb8155..0e10567 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -137,7 +137,7 @@ def clear_task_instances(
session,
activate_dag_runs=None,
dag=None,
- dag_run_state: Union[str, Literal[False]] = State.RUNNING,
+ dag_run_state: Union[str, Literal[False]] = State.QUEUED,
):
"""
Clears a set of task instances, but makes sure the running ones
@@ -239,7 +239,9 @@ def clear_task_instances(
)
for dr in drs:
dr.state = dag_run_state
- dr.start_date = timezone.utcnow()
+ dr.start_date = None
+ if dag_run_state == State.QUEUED:
+ dr.last_scheduling_decision = None
class TaskInstanceKey(NamedTuple):
diff --git a/airflow/www/static/js/tree.js b/airflow/www/static/js/tree.js
index 4bf366a..d45c880 100644
--- a/airflow/www/static/js/tree.js
+++ b/airflow/www/static/js/tree.js
@@ -58,7 +58,9 @@ document.addEventListener('DOMContentLoaded', () => {
const tree = d3.layout.tree().nodeSize([0, 25]);
let nodes = tree.nodes(data);
const nodeobj = {};
- const getActiveRuns = () => data.instances.filter((run) => run.state === 'running').length > 0;
+ const runActiveStates = ['queued', 'running'];
+ const getActiveRuns = () => data.instances
+ .filter((run) => runActiveStates.includes(run.state)).length > 0;
const now = Date.now() / 1000;
const devicePixelRatio = window.devicePixelRatio || 1;
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 150ecc9..830047b 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1526,7 +1526,7 @@ class Airflow(AirflowBaseView):
dag.create_dagrun(
run_type=DagRunType.MANUAL,
execution_date=execution_date,
- state=State.RUNNING,
+ state=State.QUEUED,
conf=run_conf,
external_trigger=True,
dag_hash=current_app.dag_bag.dags_hash.get(dag_id),
@@ -3454,6 +3454,7 @@ class DagRunModelView(AirflowModelView):
'execution_date',
'run_id',
'run_type',
+ 'queued_at',
'start_date',
'end_date',
'external_trigger',
@@ -3789,7 +3790,7 @@ class TaskInstanceModelView(AirflowModelView):
lazy_gettext('Clear'),
lazy_gettext(
'Are you sure you want to clear the state of the selected task'
- ' instance(s) and set their dagruns to the running state?'
+ ' instance(s) and set their dagruns to the QUEUED state?'
),
single=False,
)
diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst
index 0c663da..c119bcc 100644
--- a/docs/apache-airflow/migrations-ref.rst
+++ b/docs/apache-airflow/migrations-ref.rst
@@ -23,7 +23,7 @@ Here's the list of all the Database Migrations that are executed via when you ru
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``e9304a3141f0`` (head) | ``83f031fd9f1c`` | | Make XCom primary key columns non-nullable |
+| ``97cdd93827b8`` (head) | ``83f031fd9f1c`` | | Add ``queued_at`` column in ``dag_run`` table |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
| ``83f031fd9f1c`` | ``a13f7613ad25`` | | Improve MSSQL compatibility |
+--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
diff --git a/tests/api/common/experimental/test_mark_tasks.py b/tests/api/common/experimental/test_mark_tasks.py
index 4dab57e..49008d3 100644
--- a/tests/api/common/experimental/test_mark_tasks.py
+++ b/tests/api/common/experimental/test_mark_tasks.py
@@ -414,7 +414,9 @@ class TestMarkDAGRun(unittest.TestCase):
assert ti.state == state
def _create_test_dag_run(self, state, date):
- return self.dag1.create_dagrun(run_type=DagRunType.MANUAL, state=state, execution_date=date)
+ return self.dag1.create_dagrun(
+ run_type=DagRunType.MANUAL, state=state, start_date=date, execution_date=date
+ )
def _verify_dag_run_state(self, dag, date, state):
drs = models.DagRun.find(dag_id=dag.dag_id, execution_date=date)
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index e51eca8..0aa13b2 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -90,7 +90,7 @@ class TestDagRunEndpoint:
def teardown_method(self) -> None:
clear_db_runs()
- # clear_db_dags()
+ clear_db_dags()
def _create_dag(self, dag_id):
dag_instance = DagModel(dag_id=dag_id)
@@ -118,6 +118,7 @@ class TestDagRunEndpoint:
execution_date=timezone.parse(self.default_time_2),
start_date=timezone.parse(self.default_time),
external_trigger=True,
+ state=state,
)
dag_runs.append(dagrun_model_2)
if extra_dag:
@@ -131,6 +132,7 @@ class TestDagRunEndpoint:
execution_date=timezone.parse(self.default_time_2),
start_date=timezone.parse(self.default_time),
external_trigger=True,
+ state=state,
)
)
if commit:
@@ -193,6 +195,7 @@ class TestGetDagRun(TestDagRunEndpoint):
execution_date=timezone.parse(self.default_time),
start_date=timezone.parse(self.default_time),
external_trigger=True,
+ state='running',
)
session.add(dagrun_model)
session.commit()
@@ -532,7 +535,7 @@ class TestGetDagRunsEndDateFilters(TestDagRunEndpoint):
(
f"api/v1/dags/TEST_DAG_ID/dagRuns?end_date_lte="
f"{(timezone.utcnow() + timedelta(days=1)).isoformat()}",
- ["TEST_DAG_RUN_ID_1"],
+ ["TEST_DAG_RUN_ID_1", "TEST_DAG_RUN_ID_2"],
),
]
)
@@ -750,6 +753,7 @@ class TestGetDagRunBatchPagination(TestDagRunEndpoint):
DagRun(
dag_id="TEST_DAG_ID",
run_id="TEST_DAG_RUN_ID" + str(i),
+ state='running',
run_type=DagRunType.MANUAL,
execution_date=timezone.parse(self.default_time) + timedelta(minutes=i),
start_date=timezone.parse(self.default_time),
@@ -884,7 +888,7 @@ class TestGetDagRunBatchDateFilters(TestDagRunEndpoint):
),
(
{"end_date_lte": f"{(timezone.utcnow() + timedelta(days=1)).isoformat()}"},
- ["TEST_DAG_RUN_ID_1"],
+ ["TEST_DAG_RUN_ID_1", "TEST_DAG_RUN_ID_2"],
),
]
)
@@ -927,8 +931,8 @@ class TestPostDagRun(TestDagRunEndpoint):
"end_date": None,
"execution_date": response.json["execution_date"],
"external_trigger": True,
- "start_date": response.json["start_date"],
- "state": "running",
+ "start_date": None,
+ "state": "queued",
} == response.json
@parameterized.expand(
diff --git a/tests/api_connexion/schemas/test_dag_run_schema.py b/tests/api_connexion/schemas/test_dag_run_schema.py
index 3e6bf2e..9e4a9e8 100644
--- a/tests/api_connexion/schemas/test_dag_run_schema.py
+++ b/tests/api_connexion/schemas/test_dag_run_schema.py
@@ -49,6 +49,7 @@ class TestDAGRunSchema(TestDAGRunBase):
def test_serialize(self, session):
dagrun_model = DagRun(
run_id="my-dag-run",
+ state='running',
run_type=DagRunType.MANUAL.value,
execution_date=timezone.parse(self.default_time),
start_date=timezone.parse(self.default_time),
@@ -124,6 +125,7 @@ class TestDagRunCollection(TestDAGRunBase):
def test_serialize(self, session):
dagrun_model_1 = DagRun(
run_id="my-dag-run",
+ state='running',
execution_date=timezone.parse(self.default_time),
run_type=DagRunType.MANUAL.value,
start_date=timezone.parse(self.default_time),
@@ -131,6 +133,7 @@ class TestDagRunCollection(TestDAGRunBase):
)
dagrun_model_2 = DagRun(
run_id="my-dag-run-2",
+ state='running',
execution_date=timezone.parse(self.default_time),
start_date=timezone.parse(self.default_time),
run_type=DagRunType.MANUAL.value,
diff --git a/tests/dag_processing/test_manager.py b/tests/dag_processing/test_manager.py
index 0ab7f2b..02613ec 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -43,6 +43,7 @@ from airflow.dag_processing.manager import (
)
from airflow.dag_processing.processor import DagFileProcessorProcess
from airflow.jobs.local_task_job import LocalTaskJob as LJ
+from airflow.jobs.scheduler_job import SchedulerJob
from airflow.models import DagBag, DagModel, TaskInstance as TI
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import SimpleTaskInstance
@@ -508,8 +509,8 @@ class TestDagFileProcessorManager(unittest.TestCase):
child_pipe.close()
parent_pipe.close()
- @mock.patch("airflow.jobs.scheduler_job.DagFileProcessorProcess.pid", new_callable=PropertyMock)
- @mock.patch("airflow.jobs.scheduler_job.DagFileProcessorProcess.kill")
+ @mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.pid", new_callable=PropertyMock)
+ @mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.kill")
def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid):
mock_pid.return_value = 1234
manager = DagFileProcessorManager(
@@ -529,8 +530,8 @@ class TestDagFileProcessorManager(unittest.TestCase):
manager._kill_timed_out_processors()
mock_kill.assert_called_once_with()
- @mock.patch("airflow.jobs.scheduler_job.DagFileProcessorProcess.pid", new_callable=PropertyMock)
- @mock.patch("airflow.jobs.scheduler_job.DagFileProcessorProcess")
+ @mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.pid", new_callable=PropertyMock)
+ @mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess")
def test_kill_timed_out_processors_no_kill(self, mock_dag_file_processor, mock_pid):
mock_pid.return_value = 1234
manager = DagFileProcessorManager(
@@ -560,7 +561,6 @@ class TestDagFileProcessorManager(unittest.TestCase):
# We need to _actually_ parse the files here to test the behaviour.
# Right now the parsing code lives in SchedulerJob, even though it's
# called via utils.dag_processing.
- from airflow.jobs.scheduler_job import SchedulerJob
dag_id = 'exit_test_dag'
dag_directory = TEST_DAG_FOLDER.parent / 'dags_with_system_exit'
diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py
index 5953517..feb3497 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -277,7 +277,7 @@ class TestDagFileProcessor(unittest.TestCase):
assert email1 in send_email_to
assert email2 not in send_email_to
- @mock.patch('airflow.jobs.scheduler_job.Stats.incr')
+ @mock.patch('airflow.dag_processing.processor.Stats.incr')
@mock.patch("airflow.utils.email.send_email")
def test_dag_file_processor_sla_miss_email_exception(self, mock_send_email, mock_stats_incr):
"""
@@ -387,7 +387,7 @@ class TestDagFileProcessor(unittest.TestCase):
ti.start_date = start_date
ti.end_date = end_date
- count = self.scheduler_job._schedule_dag_run(dr, set(), session)
+ count = self.scheduler_job._schedule_dag_run(dr, session)
assert count == 1
session.refresh(ti)
@@ -444,7 +444,7 @@ class TestDagFileProcessor(unittest.TestCase):
ti.start_date = start_date
ti.end_date = end_date
- count = self.scheduler_job._schedule_dag_run(dr, set(), session)
+ count = self.scheduler_job._schedule_dag_run(dr, session)
assert count == 1
session.refresh(ti)
@@ -504,7 +504,7 @@ class TestDagFileProcessor(unittest.TestCase):
ti.start_date = start_date
ti.end_date = end_date
- count = self.scheduler_job._schedule_dag_run(dr, set(), session)
+ count = self.scheduler_job._schedule_dag_run(dr, session)
assert count == 2
session.refresh(tis[0])
@@ -547,7 +547,7 @@ class TestDagFileProcessor(unittest.TestCase):
BashOperator(task_id='dummy2', dag=dag, owner='airflow', bash_command='echo test')
SerializedDagModel.write_dag(dag=dag)
- scheduled_tis = self.scheduler_job._schedule_dag_run(dr, set(), session)
+ scheduled_tis = self.scheduler_job._schedule_dag_run(dr, session)
session.flush()
assert scheduled_tis == 2
@@ -560,11 +560,10 @@ class TestDagFileProcessor(unittest.TestCase):
def test_runs_respected_after_clear(self):
"""
- Test if _process_task_instances only schedules ti's up to max_active_runs
- (related to issue AIRFLOW-137)
+ Test dag after dag.clear, max_active_runs is respected
"""
dag = DAG(dag_id='test_scheduler_max_active_runs_respected_after_clear', start_date=DEFAULT_DATE)
- dag.max_active_runs = 3
+ dag.max_active_runs = 1
BashOperator(task_id='dummy', dag=dag, owner='airflow', bash_command='echo Hi')
@@ -575,48 +574,46 @@ class TestDagFileProcessor(unittest.TestCase):
session.close()
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+ # Write Dag to DB
+ dagbag = DagBag(dag_folder="/dev/null", include_examples=False, read_dags_from_db=False)
+ dagbag.bag_dag(dag, root_dag=dag)
+ dagbag.sync_to_db()
+
+ dag = DagBag(read_dags_from_db=True, include_examples=False).get_dag(dag.dag_id)
+
self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.processor_agent = mock.MagicMock()
self.scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
- dag.clear()
date = DEFAULT_DATE
- dr1 = dag.create_dagrun(
+ dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=date,
- state=State.RUNNING,
+ state=State.QUEUED,
)
date = dag.following_schedule(date)
- dr2 = dag.create_dagrun(
+ dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=date,
- state=State.RUNNING,
+ state=State.QUEUED,
)
date = dag.following_schedule(date)
- dr3 = dag.create_dagrun(
+ dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=date,
- state=State.RUNNING,
+ state=State.QUEUED,
)
+ dag.clear()
- # First create up to 3 dagruns in RUNNING state.
- assert dr1 is not None
- assert dr2 is not None
- assert dr3 is not None
- assert len(DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)) == 3
-
- # Reduce max_active_runs to 1
- dag.max_active_runs = 1
+ assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, session=session)) == 3
- # and schedule them in, so we can check how many
- # tasks are put on the task_instances_list (should be one, not 3)
- with create_session() as session:
- num_scheduled = self.scheduler_job._schedule_dag_run(dr1, set(), session)
- assert num_scheduled == 1
- num_scheduled = self.scheduler_job._schedule_dag_run(dr2, {dr1.execution_date}, session)
- assert num_scheduled == 0
- num_scheduled = self.scheduler_job._schedule_dag_run(dr3, {dr1.execution_date}, session)
- assert num_scheduled == 0
+ session = settings.Session()
+ self.scheduler_job._start_queued_dagruns(session)
+ session.commit()
+ # Assert that only 1 dagrun is active
+ assert len(DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)) == 1
+ # Assert that the other two are queued
+ assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, session=session)) == 2
@patch.object(TaskInstance, 'handle_failure_with_callback')
def test_execute_on_failure_callbacks(self, mock_ti_handle_failure):
@@ -698,7 +695,7 @@ class TestDagFileProcessor(unittest.TestCase):
dr = drs[0]
# Schedule TaskInstances
- self.scheduler_job_job._schedule_dag_run(dr, {}, session)
+ self.scheduler_job_job._schedule_dag_run(dr, session)
with create_session() as session:
tis = session.query(TaskInstance).all()
@@ -724,7 +721,7 @@ class TestDagFileProcessor(unittest.TestCase):
assert end_date is None
assert duration is None
- self.scheduler_job_job._schedule_dag_run(dr, {}, session)
+ self.scheduler_job_job._schedule_dag_run(dr, session)
with create_session() as session:
tis = session.query(TaskInstance).all()
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 9fe8517..0ee6f5f 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -23,7 +23,6 @@ import shutil
import unittest
from datetime import timedelta
from tempfile import mkdtemp
-from time import sleep
from unittest import mock
from unittest.mock import MagicMock, patch
from zipfile import ZipFile
@@ -430,7 +429,6 @@ class TestSchedulerJob(unittest.TestCase):
session.flush()
res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
-
assert 2 == len(res)
res_keys = map(lambda x: x.key, res)
assert ti_no_dagrun.key in res_keys
@@ -1575,15 +1573,16 @@ class TestSchedulerJob(unittest.TestCase):
dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
- self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job._create_dag_runs([orm_dag], session)
+ self.scheduler_job._start_queued_dagruns(session)
drs = DagRun.find(dag_id=dag.dag_id, session=session)
assert len(drs) == 1
dr = drs[0]
- # Should not be able to create a new dag run, as we are at max active runs
- assert orm_dag.next_dagrun_create_after is None
+ # This should have a value since we control max_active_runs
+ # by DagRun State.
+ assert orm_dag.next_dagrun_create_after
# But we should record the date of _what run_ it would be
assert isinstance(orm_dag.next_dagrun, datetime.datetime)
@@ -1595,7 +1594,7 @@ class TestSchedulerJob(unittest.TestCase):
self.scheduler_job.processor_agent = mock.Mock()
self.scheduler_job.processor_agent.send_callback_to_execute = mock.Mock()
- self.scheduler_job._schedule_dag_run(dr, {}, session)
+ self.scheduler_job._schedule_dag_run(dr, session)
session.flush()
session.refresh(dr)
@@ -1652,7 +1651,7 @@ class TestSchedulerJob(unittest.TestCase):
self.scheduler_job.processor_agent = mock.Mock()
self.scheduler_job.processor_agent.send_callback_to_execute = mock.Mock()
- self.scheduler_job._schedule_dag_run(dr, {}, session)
+ self.scheduler_job._schedule_dag_run(dr, session)
session.flush()
session.refresh(dr)
@@ -1711,7 +1710,7 @@ class TestSchedulerJob(unittest.TestCase):
ti = dr.get_task_instance('dummy')
ti.set_state(state, session)
- self.scheduler_job._schedule_dag_run(dr, {}, session)
+ self.scheduler_job._schedule_dag_run(dr, session)
expected_callback = DagCallbackRequest(
full_filepath=dr.dag.fileloc,
@@ -1766,7 +1765,7 @@ class TestSchedulerJob(unittest.TestCase):
ti = dr.get_task_instance('test_task')
ti.set_state(state, session)
- self.scheduler_job._schedule_dag_run(dr, set(), session)
+ self.scheduler_job._schedule_dag_run(dr, session)
# Verify Callback is not set (i.e is None) when no callbacks are set on DAG
self.scheduler_job._send_dag_callbacks_to_processor.assert_called_once_with(dr, None)
@@ -2146,13 +2145,13 @@ class TestSchedulerJob(unittest.TestCase):
execution_date=DEFAULT_DATE,
state=State.RUNNING,
)
- self.scheduler_job._schedule_dag_run(dr, {}, session)
+ self.scheduler_job._schedule_dag_run(dr, session)
dr = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=dag.following_schedule(dr.execution_date),
state=State.RUNNING,
)
- self.scheduler_job._schedule_dag_run(dr, {}, session)
+ self.scheduler_job._schedule_dag_run(dr, session)
task_instances_list = self.scheduler_job._executable_task_instances_to_queued(
max_tis=32, session=session
)
@@ -2203,7 +2202,7 @@ class TestSchedulerJob(unittest.TestCase):
execution_date=date,
state=State.RUNNING,
)
- self.scheduler_job._schedule_dag_run(dr, {}, session)
+ self.scheduler_job._schedule_dag_run(dr, session)
date = dag.following_schedule(date)
task_instances_list = self.scheduler_job._executable_task_instances_to_queued(
@@ -2266,7 +2265,7 @@ class TestSchedulerJob(unittest.TestCase):
execution_date=date,
state=State.RUNNING,
)
- scheduler._schedule_dag_run(dr, {}, session)
+ scheduler._schedule_dag_run(dr, session)
date = dag_d1.following_schedule(date)
date = DEFAULT_DATE
@@ -2276,7 +2275,7 @@ class TestSchedulerJob(unittest.TestCase):
execution_date=date,
state=State.RUNNING,
)
- scheduler._schedule_dag_run(dr, {}, session)
+ scheduler._schedule_dag_run(dr, session)
date = dag_d2.following_schedule(date)
scheduler._executable_task_instances_to_queued(max_tis=2, session=session)
@@ -2353,7 +2352,7 @@ class TestSchedulerJob(unittest.TestCase):
execution_date=DEFAULT_DATE,
state=State.RUNNING,
)
- self.scheduler_job._schedule_dag_run(dr, {}, session)
+ self.scheduler_job._schedule_dag_run(dr, session)
task_instances_list = self.scheduler_job._executable_task_instances_to_queued(
max_tis=32, session=session
@@ -2412,7 +2411,7 @@ class TestSchedulerJob(unittest.TestCase):
# Verify that DagRun.verify_integrity is not called
with mock.patch('airflow.jobs.scheduler_job.DagRun.verify_integrity') as mock_verify_integrity:
- scheduled_tis = self.scheduler_job._schedule_dag_run(dr, {}, session)
+ scheduled_tis = self.scheduler_job._schedule_dag_run(dr, session)
mock_verify_integrity.assert_not_called()
session.flush()
@@ -2475,7 +2474,7 @@ class TestSchedulerJob(unittest.TestCase):
dag_version_2 = SerializedDagModel.get_latest_version_hash(dr.dag_id, session=session)
assert dag_version_2 != dag_version_1
- scheduled_tis = self.scheduler_job._schedule_dag_run(dr, {}, session)
+ scheduled_tis = self.scheduler_job._schedule_dag_run(dr, session)
session.flush()
assert scheduled_tis == 2
@@ -3187,14 +3186,13 @@ class TestSchedulerJob(unittest.TestCase):
full_filepath=dag.fileloc, dag_id=dag_id
)
- @freeze_time(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9))
- @mock.patch('airflow.jobs.scheduler_job.Stats.timing')
- def test_create_dag_runs(self, stats_timing):
+ def test_create_dag_runs(self):
"""
Test various invariants of _create_dag_runs.
- That the run created has the creating_job_id set
- - That we emit the right DagRun metrics
+ - That the run created is on QUEUED State
+ - That dag_model has next_dagrun
"""
dag = DAG(dag_id='test_create_dag_runs', start_date=DEFAULT_DATE)
@@ -3218,8 +3216,51 @@ class TestSchedulerJob(unittest.TestCase):
with create_session() as session:
self.scheduler_job._create_dag_runs([dag_model], session)
+ dr = session.query(DagRun).filter(DagRun.dag_id == dag.dag_id).first()
+ # Assert dr state is queued
+ assert dr.state == State.QUEUED
+ assert dr.start_date is None
+
+ assert dag.get_last_dagrun().creating_job_id == self.scheduler_job.id
+
+ @freeze_time(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9))
+ @mock.patch('airflow.jobs.scheduler_job.Stats.timing')
+ def test_start_dagruns(self, stats_timing):
+ """
+ Test that _start_dagrun:
+
+ - moves runs to RUNNING State
+ - emit the right DagRun metrics
+ """
+ dag = DAG(dag_id='test_start_dag_runs', start_date=DEFAULT_DATE)
+
+ DummyOperator(
+ task_id='dummy',
+ dag=dag,
+ )
+
+ dagbag = DagBag(
+ dag_folder=os.devnull,
+ include_examples=False,
+ read_dags_from_db=True,
+ )
+ dagbag.bag_dag(dag=dag, root_dag=dag)
+ dagbag.sync_to_db()
+ dag_model = DagModel.get_dagmodel(dag.dag_id)
+
+ self.scheduler_job = SchedulerJob(executor=self.null_exec)
+ self.scheduler_job.processor_agent = mock.MagicMock()
+
+ with create_session() as session:
+ self.scheduler_job._create_dag_runs([dag_model], session)
+ self.scheduler_job._start_queued_dagruns(session)
+
+ dr = session.query(DagRun).filter(DagRun.dag_id == dag.dag_id).first()
+ # Assert dr state is running
+ assert dr.state == State.RUNNING
+
stats_timing.assert_called_once_with(
- "dagrun.schedule_delay.test_create_dag_runs", datetime.timedelta(seconds=9)
+ "dagrun.schedule_delay.test_start_dag_runs", datetime.timedelta(seconds=9)
)
assert dag.get_last_dagrun().creating_job_id == self.scheduler_job.id
@@ -3418,61 +3459,7 @@ class TestSchedulerJob(unittest.TestCase):
assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(days=1)
session.rollback()
- def test_do_schedule_max_active_runs_upstream_failed(self):
- """
- Test that tasks in upstream failed don't count as actively running.
-
- This test can be removed when adding a queued state to DagRuns.
- """
-
- with DAG(
- dag_id='test_max_active_run_with_upstream_failed',
- start_date=DEFAULT_DATE,
- schedule_interval='@once',
- max_active_runs=1,
- ) as dag:
- # Can't use DummyOperator as that goes straight to success
- task1 = BashOperator(task_id='dummy1', bash_command='true')
-
- session = settings.Session()
- dagbag = DagBag(
- dag_folder=os.devnull,
- include_examples=False,
- read_dags_from_db=True,
- )
-
- dagbag.bag_dag(dag=dag, root_dag=dag)
- dagbag.sync_to_db(session=session)
-
- run1 = dag.create_dagrun(
- run_type=DagRunType.SCHEDULED,
- execution_date=DEFAULT_DATE,
- state=State.RUNNING,
- session=session,
- )
-
- ti = run1.get_task_instance(task1.task_id, session)
- ti.state = State.UPSTREAM_FAILED
-
- run2 = dag.create_dagrun(
- run_type=DagRunType.SCHEDULED,
- execution_date=DEFAULT_DATE + timedelta(hours=1),
- state=State.RUNNING,
- session=session,
- )
-
- dag.sync_to_db(session=session) # Update the date fields
-
- self.scheduler_job = SchedulerJob(subdir=os.devnull)
- self.scheduler_job.executor = MockExecutor(do_update=False)
- self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
-
- num_queued = self.scheduler_job._do_scheduling(session)
-
- assert num_queued == 1
- ti = run2.get_task_instance(task1.task_id, session)
- assert ti.state == State.QUEUED
-
+ @conf_vars({('scheduler', 'use_job_schedule'): "false"})
def test_do_schedule_max_active_runs_dag_timed_out(self):
"""Test that tasks are set to a finished state when their DAG times out"""
@@ -3505,33 +3492,36 @@ class TestSchedulerJob(unittest.TestCase):
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
state=State.RUNNING,
+ start_date=timezone.utcnow() - timedelta(seconds=2),
session=session,
)
+
run1_ti = run1.get_task_instance(task1.task_id, session)
run1_ti.state = State.RUNNING
- sleep(1)
-
run2 = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE + timedelta(seconds=10),
- state=State.RUNNING,
+ state=State.QUEUED,
session=session,
)
dag.sync_to_db(session=session)
-
self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.executor = MockExecutor()
self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
- _ = self.scheduler_job._do_scheduling(session)
-
+ self.scheduler_job._do_scheduling(session)
+ session.add(run1)
+ session.refresh(run1)
assert run1.state == State.FAILED
assert run1_ti.state == State.SKIPPED
- assert run2.state == State.RUNNING
- _ = self.scheduler_job._do_scheduling(session)
+ # Run scheduling again to assert run2 has started
+ self.scheduler_job._do_scheduling(session)
+ session.add(run2)
+ session.refresh(run2)
+ assert run2.state == State.RUNNING
run2_ti = run2.get_task_instance(task1.task_id, session)
assert run2_ti.state == State.QUEUED
@@ -3581,8 +3571,8 @@ class TestSchedulerJob(unittest.TestCase):
def test_do_schedule_max_active_runs_and_manual_trigger(self):
"""
- Make sure that when a DAG is already at max_active_runs, that manually triggering a run doesn't cause
- the dag to "stall".
+ Make sure that when a DAG is already at max_active_runs, that manually triggered
+ dagruns don't start running.
"""
with DAG(
@@ -3597,7 +3587,7 @@ class TestSchedulerJob(unittest.TestCase):
task1 >> task2
- task3 = BashOperator(task_id='dummy3', bash_command='true')
+ BashOperator(task_id='dummy3', bash_command='true')
session = settings.Session()
dagbag = DagBag(
@@ -3612,7 +3602,7 @@ class TestSchedulerJob(unittest.TestCase):
dag_run = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
- state=State.RUNNING,
+ state=State.QUEUED,
session=session,
)
@@ -3630,47 +3620,23 @@ class TestSchedulerJob(unittest.TestCase):
assert num_queued == 2
assert dag_run.state == State.RUNNING
- ti1 = dag_run.get_task_instance(task1.task_id, session)
- assert ti1.state == State.QUEUED
-
- # Set task1 to success (so task2 can run) but keep task3 as "running"
- ti1.state = State.SUCCESS
-
- ti3 = dag_run.get_task_instance(task3.task_id, session)
- ti3.state = State.RUNNING
-
- session.flush()
-
- # At this point, ti2 and ti3 of the scheduled dag run should be running
- num_queued = self.scheduler_job._do_scheduling(session)
-
- assert num_queued == 1
- # Should have queued task2
- ti2 = dag_run.get_task_instance(task2.task_id, session)
- assert ti2.state == State.QUEUED
-
- ti2.state = None
- session.flush()
# Now that this one is running, manually trigger a dag.
- manual_run = dag.create_dagrun(
+ dag.create_dagrun(
run_type=DagRunType.MANUAL,
execution_date=DEFAULT_DATE + timedelta(hours=1),
- state=State.RUNNING,
+ state=State.QUEUED,
session=session,
)
session.flush()
- num_queued = self.scheduler_job._do_scheduling(session)
+ self.scheduler_job._do_scheduling(session)
- assert num_queued == 1
- # Should have queued task2 again.
- ti2 = dag_run.get_task_instance(task2.task_id, session)
- assert ti2.state == State.QUEUED
- # Manual run shouldn't have been started, because we're at max_active_runs with DR1
- ti1 = manual_run.get_task_instance(task1.task_id, session)
- assert ti1.state is None
+ # Assert that only 1 dagrun is active
+ assert len(DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)) == 1
+ # Assert that the other one is queued
+ assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, session=session)) == 1
@pytest.mark.xfail(reason="Work out where this goes")
diff --git a/tests/models/test_cleartasks.py b/tests/models/test_cleartasks.py
index 9b8fbd0..4f64347 100644
--- a/tests/models/test_cleartasks.py
+++ b/tests/models/test_cleartasks.py
@@ -19,6 +19,8 @@
import datetime
import unittest
+from parameterized import parameterized
+
from airflow import settings
from airflow.models import DAG, TaskInstance as TI, TaskReschedule, clear_task_instances
from airflow.operators.dummy import DummyOperator
@@ -92,6 +94,41 @@ class TestClearTasks(unittest.TestCase):
assert ti0.state is None
assert ti0.external_executor_id is None
+ @parameterized.expand([(State.QUEUED, None), (State.RUNNING, DEFAULT_DATE)])
+ def test_clear_task_instances_dr_state(self, state, last_scheduling):
+ """Test that DR state is set to None after clear.
+ And that DR.last_scheduling_decision is handled OK.
+ start_date is also set to None
+ """
+ dag = DAG(
+ 'test_clear_task_instances',
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+ )
+ task0 = DummyOperator(task_id='0', owner='test', dag=dag)
+ task1 = DummyOperator(task_id='1', owner='test', dag=dag, retries=2)
+ ti0 = TI(task=task0, execution_date=DEFAULT_DATE)
+ ti1 = TI(task=task1, execution_date=DEFAULT_DATE)
+ session = settings.Session()
+ dr = dag.create_dagrun(
+ execution_date=ti0.execution_date,
+ state=State.RUNNING,
+ run_type=DagRunType.SCHEDULED,
+ )
+ dr.last_scheduling_decision = DEFAULT_DATE
+ session.add(dr)
+ session.commit()
+
+ ti0.run()
+ ti1.run()
+ qry = session.query(TI).filter(TI.dag_id == dag.dag_id).all()
+ clear_task_instances(qry, session, dag_run_state=state, dag=dag)
+
+ dr = ti0.get_dagrun()
+ assert dr.state == state
+ assert dr.start_date is None
+ assert dr.last_scheduling_decision == last_scheduling
+
def test_clear_task_instances_without_task(self):
dag = DAG(
'test_clear_task_instances_without_task',
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 7899199..fac38bf 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -39,7 +39,7 @@ from airflow.utils.state import State
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.types import DagRunType
from tests.models import DEFAULT_DATE
-from tests.test_utils.db import clear_db_jobs, clear_db_pools, clear_db_runs
+from tests.test_utils.db import clear_db_dags, clear_db_jobs, clear_db_pools, clear_db_runs
class TestDagRun(unittest.TestCase):
@@ -50,6 +50,12 @@ class TestDagRun(unittest.TestCase):
def setUp(self):
clear_db_runs()
clear_db_pools()
+ clear_db_dags()
+
+ def tearDown(self) -> None:
+ clear_db_runs()
+ clear_db_pools()
+ clear_db_dags()
def create_dag_run(
self,
@@ -102,7 +108,7 @@ class TestDagRun(unittest.TestCase):
session.commit()
ti0.refresh_from_db()
dr0 = session.query(DagRun).filter(DagRun.dag_id == dag_id, DagRun.execution_date == now).first()
- assert dr0.state == State.RUNNING
+ assert dr0.state == State.QUEUED
def test_dagrun_find(self):
session = settings.Session()
@@ -692,9 +698,11 @@ class TestDagRun(unittest.TestCase):
ti.run()
assert (ti.state == State.SUCCESS) == is_ti_success
- def test_next_dagruns_to_examine_only_unpaused(self):
+ @parameterized.expand([(State.QUEUED,), (State.RUNNING,)])
+ def test_next_dagruns_to_examine_only_unpaused(self, state):
"""
Check that "next_dagruns_to_examine" ignores runs from paused/inactive DAGs
+ and gets running/queued dagruns
"""
dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
@@ -712,25 +720,22 @@ class TestDagRun(unittest.TestCase):
session.flush()
dr = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
- state=State.RUNNING,
+ state=state,
execution_date=DEFAULT_DATE,
- start_date=DEFAULT_DATE,
+ start_date=DEFAULT_DATE if state == State.RUNNING else None,
session=session,
)
- runs = DagRun.next_dagruns_to_examine(session).all()
+ runs = DagRun.next_dagruns_to_examine(state, session).all()
assert runs == [dr]
orm_dag.is_paused = True
session.flush()
- runs = DagRun.next_dagruns_to_examine(session).all()
+ runs = DagRun.next_dagruns_to_examine(state, session).all()
assert runs == []
- session.rollback()
- session.close()
-
@mock.patch.object(Stats, 'timing')
def test_no_scheduling_delay_for_nonscheduled_runs(self, stats_mock):
"""
diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 187fdb2..274766c 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -545,16 +545,16 @@ def test_external_task_marker_clear_activate(dag_bag_parent_child):
task_0 = dag_0.get_task("task_0")
clear_tasks(dag_bag, dag_0, task_0, start_date=day_1, end_date=day_2)
- # Assert that dagruns of all the affected dags are set to RUNNING after tasks are cleared.
+ # Assert that dagruns of all the affected dags are set to QUEUED after tasks are cleared.
# Unaffected dagruns should be left as SUCCESS.
dagrun_0_1 = dag_bag.get_dag('parent_dag_0').get_dagrun(execution_date=day_1)
dagrun_0_2 = dag_bag.get_dag('parent_dag_0').get_dagrun(execution_date=day_2)
dagrun_1_1 = dag_bag.get_dag('child_dag_1').get_dagrun(execution_date=day_1)
dagrun_1_2 = dag_bag.get_dag('child_dag_1').get_dagrun(execution_date=day_2)
- assert dagrun_0_1.state == State.RUNNING
- assert dagrun_0_2.state == State.RUNNING
- assert dagrun_1_1.state == State.RUNNING
+ assert dagrun_0_1.state == State.QUEUED
+ assert dagrun_0_2.state == State.QUEUED
+ assert dagrun_1_1.state == State.QUEUED
assert dagrun_1_2.state == State.SUCCESS