You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/07/04 12:58:36 UTC
[airflow] branch main updated: Revert "Refactor Sqlalchemy queries to 2.0 style (Part 3) (#32177)" (#32343)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3c3c337edd Revert "Refactor Sqlalchemy queries to 2.0 style (Part 3) (#32177)" (#32343)
3c3c337edd is described below
commit 3c3c337edd6a6905c958206dc8f9fe4303c856eb
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Tue Jul 4 14:58:29 2023 +0200
Revert "Refactor Sqlalchemy queries to 2.0 style (Part 3) (#32177)" (#32343)
This reverts commit 1065687ec6df2b9b3557e38a67e71f835796427f.
---
airflow/utils/db.py | 38 ++---
airflow/www/utils.py | 11 +-
airflow/www/views.py | 460 +++++++++++++++++++++++++--------------------------
3 files changed, 244 insertions(+), 265 deletions(-)
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 46ddcbfb34..a76f0d4f67 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -92,7 +92,7 @@ def _format_airflow_moved_table_name(source_table, version, category):
@provide_session
def merge_conn(conn, session: Session = NEW_SESSION):
"""Add new Connection."""
- if not session.scalar(select(conn.__class__).filter_by(conn_id=conn.conn_id).limit(1)):
+ if not session.query(conn.__class__).filter_by(conn_id=conn.conn_id).first():
session.add(conn)
session.commit()
@@ -959,9 +959,7 @@ def check_conn_id_duplicates(session: Session) -> Iterable[str]:
dups = []
try:
- dups = session.execute(
- select(Connection.conn_id).group_by(Connection.conn_id).having(func.count() > 1)
- ).all()
+ dups = session.query(Connection.conn_id).group_by(Connection.conn_id).having(func.count() > 1).all()
except (exc.OperationalError, exc.ProgrammingError):
# fallback if tables hasn't been created yet
session.rollback()
@@ -986,11 +984,12 @@ def check_username_duplicates(session: Session) -> Iterable[str]:
for model in [User, RegisterUser]:
dups = []
try:
- dups = session.execute(
- select(model.username) # type: ignore[attr-defined]
+ dups = (
+ session.query(model.username) # type: ignore[attr-defined]
.group_by(model.username) # type: ignore[attr-defined]
.having(func.count() > 1)
- ).all()
+ .all()
+ )
except (exc.OperationalError, exc.ProgrammingError):
# fallback if tables hasn't been created yet
session.rollback()
@@ -1059,13 +1058,13 @@ def check_table_for_duplicates(
"""
minimal_table_obj = table(table_name, *[column(x) for x in uniqueness])
try:
- subquery = session.execute(
- select(minimal_table_obj, func.count().label("dupe_count"))
+ subquery = (
+ session.query(minimal_table_obj, func.count().label("dupe_count"))
.group_by(*[text(x) for x in uniqueness])
.having(func.count() > text("1"))
.subquery()
)
- dupe_count = session.scalar(select(func.sum(subquery.c.dupe_count)))
+ dupe_count = session.query(func.sum(subquery.c.dupe_count)).scalar()
if not dupe_count:
# there are no duplicates; nothing to do.
return
@@ -1102,7 +1101,7 @@ def check_conn_type_null(session: Session) -> Iterable[str]:
n_nulls = []
try:
- n_nulls = session.scalars(select(Connection.conn_id).where(Connection.conn_type.is_(None))).all()
+ n_nulls = session.query(Connection.conn_id).filter(Connection.conn_type.is_(None)).all()
except (exc.OperationalError, exc.ProgrammingError, exc.InternalError):
# fallback if tables hasn't been created yet
session.rollback()
@@ -1144,7 +1143,7 @@ def check_run_id_null(session: Session) -> Iterable[str]:
dagrun_table.c.run_id.is_(None),
dagrun_table.c.execution_date.is_(None),
)
- invalid_dagrun_count = session.scalar(select(func.count(dagrun_table.c.id)).where(invalid_dagrun_filter))
+ invalid_dagrun_count = session.query(func.count(dagrun_table.c.id)).filter(invalid_dagrun_filter).scalar()
if invalid_dagrun_count > 0:
dagrun_dangling_table_name = _format_airflow_moved_table_name(dagrun_table.name, "2.2", "dangling")
if dagrun_dangling_table_name in inspect(session.get_bind()).get_table_names():
@@ -1241,7 +1240,7 @@ def _move_dangling_data_to_new_table(
pk_cols = source_table.primary_key.columns
delete = source_table.delete().where(
- tuple_(*pk_cols).in_(session.select(*target_table.primary_key.columns).subquery())
+ tuple_(*pk_cols).in_(session.query(*target_table.primary_key.columns).subquery())
)
else:
delete = source_table.delete().where(
@@ -1263,11 +1262,10 @@ def _dangling_against_dag_run(session, source_table, dag_run):
source_table.c.dag_id == dag_run.c.dag_id,
source_table.c.execution_date == dag_run.c.execution_date,
)
-
return (
- select(*[c.label(c.name) for c in source_table.c])
+ session.query(*[c.label(c.name) for c in source_table.c])
.join(dag_run, source_to_dag_run_join_cond, isouter=True)
- .where(dag_run.c.dag_id.is_(None))
+ .filter(dag_run.c.dag_id.is_(None))
)
@@ -1306,10 +1304,10 @@ def _dangling_against_task_instance(session, source_table, dag_run, task_instanc
)
return (
- select(*[c.label(c.name) for c in source_table.c])
+ session.query(*[c.label(c.name) for c in source_table.c])
.join(dag_run, dr_join_cond, isouter=True)
.join(task_instance, ti_join_cond, isouter=True)
- .where(or_(task_instance.c.dag_id.is_(None), dag_run.c.dag_id.is_(None)))
+ .filter(or_(task_instance.c.dag_id.is_(None), dag_run.c.dag_id.is_(None)))
)
@@ -1333,9 +1331,9 @@ def _move_duplicate_data_to_new_table(
"""
bind = session.get_bind()
dialect_name = bind.dialect.name
-
query = (
- select(*[getattr(source_table.c, x.name).label(str(x.name)) for x in source_table.columns])
+ session.query(source_table)
+ .with_entities(*[getattr(source_table.c, x.name).label(str(x.name)) for x in source_table.columns])
.select_from(source_table)
.join(subquery, and_(*[getattr(source_table.c, x) == getattr(subquery.c, x) for x in uniqueness]))
)
diff --git a/airflow/www/utils.py b/airflow/www/utils.py
index b31f9326d9..76914dd9cd 100644
--- a/airflow/www/utils.py
+++ b/airflow/www/utils.py
@@ -39,7 +39,6 @@ from pygments.formatters import HtmlFormatter
from pygments.lexer import Lexer
from sqlalchemy import delete, func, types
from sqlalchemy.ext.associationproxy import AssociationProxy
-from sqlalchemy.sql import Select
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.models import errors
@@ -54,6 +53,7 @@ from airflow.www.forms import DateTimeWithTimezoneField
from airflow.www.widgets import AirflowDateTimePickerWidget
if TYPE_CHECKING:
+ from sqlalchemy.orm.query import Query
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.operators import ColumnOperators
@@ -518,21 +518,18 @@ def _get_run_ordering_expr(name: str) -> ColumnOperators:
return expr.desc()
-def sorted_dag_runs(
- query: Select, *, ordering: Sequence[str], limit: int, session: Session
-) -> Sequence[DagRun]:
+def sorted_dag_runs(query: Query, *, ordering: Sequence[str], limit: int) -> Sequence[DagRun]:
"""Produce DAG runs sorted by specified columns.
- :param query: An ORM select object against *DagRun*.
+ :param query: An ORM query object against *DagRun*.
:param ordering: Column names to sort the runs. should generally come from a
timetable's ``run_ordering``.
:param limit: Number of runs to limit to.
- :param session: SQLAlchemy ORM session object
:return: A list of DagRun objects ordered by the specified columns. The list
contains only the *last* objects, but in *ascending* order.
"""
ordering_exprs = (_get_run_ordering_expr(name) for name in ordering)
- runs = session.scalars(query.order_by(*ordering_exprs, DagRun.id.desc()).limit(limit)).all()
+ runs = query.order_by(*ordering_exprs, DagRun.id.desc()).limit(limit).all()
runs.reverse()
return runs
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 90d98a424c..3f0965da38 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -69,7 +69,7 @@ from pendulum.datetime import DateTime
from pendulum.parsing.exceptions import ParserError
from pygments import highlight, lexers
from pygments.formatters import HtmlFormatter
-from sqlalchemy import Date, and_, case, desc, func, inspect, or_, select, union_all
+from sqlalchemy import Date, and_, case, desc, func, inspect, or_, union_all
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, joinedload
from wtforms import SelectField, validators
@@ -95,7 +95,7 @@ from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.job import Job
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
-from airflow.models import Connection, DagModel, DagTag, Log, SlaMiss, TaskFail, Trigger, XCom, errors
+from airflow.models import Connection, DagModel, DagTag, Log, SlaMiss, TaskFail, XCom, errors
from airflow.models.abstractoperator import AbstractOperator
from airflow.models.dag import DAG, get_dataset_triggered_next_run_info
from airflow.models.dagcode import DagCode
@@ -231,15 +231,16 @@ def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag):
# loaded and the actual requested run would be excluded by the limit(). Once
# the user has changed base date to be anything else we want to use that instead.
query_date = base_date
- if date_time < base_date <= date_time + datetime.timedelta(seconds=1):
+ if date_time < base_date and date_time + datetime.timedelta(seconds=1) >= base_date:
query_date = date_time
- drs = session.scalars(
- select(DagRun)
- .where(DagRun.dag_id == dag.dag_id, DagRun.execution_date <= query_date)
+ drs = (
+ session.query(DagRun)
+ .filter(DagRun.dag_id == dag.dag_id, DagRun.execution_date <= query_date)
.order_by(desc(DagRun.execution_date))
.limit(num_runs)
- ).all()
+ .all()
+ )
dr_choices = []
dr_state = None
for dr in drs:
@@ -290,8 +291,8 @@ def dag_to_grid(dag: DagModel, dag_runs: Sequence[DagRun], session: Session):
Create a nested dict representation of the DAG's TaskGroup and its children
used to construct the Graph and Grid views.
"""
- query = session.execute(
- select(
+ query = (
+ session.query(
TaskInstance.task_id,
TaskInstance.run_id,
TaskInstance.state,
@@ -302,7 +303,7 @@ def dag_to_grid(dag: DagModel, dag_runs: Sequence[DagRun], session: Session):
func.max(TaskInstance.end_date).label("end_date"),
)
.join(TaskInstance.task_instance_note, isouter=True)
- .where(
+ .filter(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]),
)
@@ -425,9 +426,11 @@ def dag_to_grid(dag: DagModel, dag_runs: Sequence[DagRun], session: Session):
}
def get_mapped_group_summaries():
- mapped_ti_query = session.execute(
- select(TaskInstance.task_id, TaskInstance.state, TaskInstance.run_id, TaskInstance.map_index)
- .where(
+ mapped_ti_query = (
+ session.query(
+ TaskInstance.task_id, TaskInstance.state, TaskInstance.run_id, TaskInstance.map_index
+ )
+ .filter(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.task_id.in_(child["id"] for child in children),
TaskInstance.run_id.in_(r.run_id for r in dag_runs),
@@ -735,20 +738,21 @@ class Airflow(AirflowBaseView):
with create_session() as session:
# read orm_dags from the db
- dags_query = select(DagModel).where(~DagModel.is_subdag, DagModel.is_active)
+ dags_query = session.query(DagModel).filter(~DagModel.is_subdag, DagModel.is_active)
if arg_search_query:
escaped_arg_search_query = arg_search_query.replace("_", r"\_")
- dags_query = dags_query.where(
+ dags_query = dags_query.filter(
DagModel.dag_id.ilike("%" + escaped_arg_search_query + "%", escape="\\")
| DagModel.owners.ilike("%" + escaped_arg_search_query + "%", escape="\\")
)
if arg_tags_filter:
- dags_query = dags_query.where(DagModel.tags.any(DagTag.name.in_(arg_tags_filter)))
+ dags_query = dags_query.filter(DagModel.tags.any(DagTag.name.in_(arg_tags_filter)))
- dags_query = dags_query.where(DagModel.dag_id.in_(filter_dag_ids))
- filtered_dag_count = session.scalar(select(func.count()).select_from(dags_query))
+ dags_query = dags_query.filter(DagModel.dag_id.in_(filter_dag_ids))
+
+ filtered_dag_count = dags_query.count()
if filtered_dag_count == 0 and len(arg_tags_filter):
flash(
"No matching DAG tags found.",
@@ -758,28 +762,28 @@ class Airflow(AirflowBaseView):
return redirect(url_for("Airflow.index"))
all_dags = dags_query
- active_dags = dags_query.where(~DagModel.is_paused)
- paused_dags = dags_query.where(DagModel.is_paused)
+ active_dags = dags_query.filter(~DagModel.is_paused)
+ paused_dags = dags_query.filter(DagModel.is_paused)
# find DAGs which have a RUNNING DagRun
- running_dags = dags_query.join(DagRun, DagModel.dag_id == DagRun.dag_id).where(
+ running_dags = dags_query.join(DagRun, DagModel.dag_id == DagRun.dag_id).filter(
DagRun.state == State.RUNNING
)
# find DAGs for which the latest DagRun is FAILED
subq_all = (
- select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
+ session.query(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
.group_by(DagRun.dag_id)
.subquery()
)
subq_failed = (
- select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
- .where(DagRun.state == State.FAILED)
+ session.query(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
+ .filter(DagRun.state == State.FAILED)
.group_by(DagRun.dag_id)
.subquery()
)
subq_join = (
- select(subq_all.c.dag_id, subq_all.c.start_date)
+ session.query(subq_all.c.dag_id, subq_all.c.start_date)
.join(
subq_failed,
and_(
@@ -792,18 +796,16 @@ class Airflow(AirflowBaseView):
failed_dags = dags_query.join(subq_join, DagModel.dag_id == subq_join.c.dag_id)
is_paused_count = dict(
- session.execute(
- select(DagModel.is_paused, func.count(DagModel.dag_id))
- .group_by(DagModel.is_paused)
- .select_from(all_dags)
- ).all()
+ all_dags.with_entities(DagModel.is_paused, func.count(DagModel.dag_id)).group_by(
+ DagModel.is_paused
+ )
)
status_count_active = is_paused_count.get(False, 0)
status_count_paused = is_paused_count.get(True, 0)
- status_count_running = session.scalar(select(func.count()).select_from(running_dags))
- status_count_failed = session.scalar(select(func.count()).select_from(failed_dags))
+ status_count_running = running_dags.count()
+ status_count_failed = failed_dags.count()
all_dags_count = status_count_active + status_count_paused
if arg_status_filter == "active":
@@ -824,7 +826,7 @@ class Airflow(AirflowBaseView):
if arg_sorting_key == "last_dagrun":
dag_run_subquery = (
- select(
+ session.query(
DagRun.dag_id,
sqla.func.max(DagRun.execution_date).label("max_execution_date"),
)
@@ -852,13 +854,7 @@ class Airflow(AirflowBaseView):
else:
current_dags = current_dags.order_by(null_case, sort_column)
- dags = (
- session.scalars(
- current_dags.options(joinedload(DagModel.tags)).offset(start).limit(dags_per_page)
- )
- .unique()
- .all()
- )
+ dags = current_dags.options(joinedload(DagModel.tags)).offset(start).limit(dags_per_page).all()
user_permissions = g.user.perms
can_create_dag_run = (
permissions.ACTION_CAN_CREATE,
@@ -878,7 +874,7 @@ class Airflow(AirflowBaseView):
dag.can_trigger = dag.can_edit and can_create_dag_run
dag.can_delete = get_airflow_app().appbuilder.sm.can_delete_dag(dag.dag_id, g.user)
- dagtags = session.execute(select(func.distinct(DagTag.name)).order_by(DagTag.name)).all()
+ dagtags = session.query(func.distinct(DagTag.name)).order_by(DagTag.name).all()
tags = [
{"name": name, "selected": bool(arg_tags_filter and name in arg_tags_filter)}
for name, in dagtags
@@ -886,15 +882,14 @@ class Airflow(AirflowBaseView):
owner_links_dict = DagOwnerAttributes.get_all(session)
- import_errors = select(errors.ImportError).order_by(errors.ImportError.id)
+ import_errors = session.query(errors.ImportError).order_by(errors.ImportError.id)
if (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG) not in user_permissions:
# if the user doesn't have access to all DAGs, only display errors from visible DAGs
import_errors = import_errors.join(
DagModel, DagModel.fileloc == errors.ImportError.filename
- ).where(DagModel.dag_id.in_(filter_dag_ids))
+ ).filter(DagModel.dag_id.in_(filter_dag_ids))
- import_errors = session.scalars(import_errors)
for import_error in import_errors:
flash(
f"Broken DAG: [{import_error.filename}] {import_error.stacktrace}",
@@ -938,12 +933,10 @@ class Airflow(AirflowBaseView):
permissions.RESOURCE_ADMIN_MENU,
) in user_permissions and conf.getboolean("webserver", "warn_deployment_exposure"):
robots_file_access_count = (
- select(Log)
- .where(Log.event == "robots")
- .where(Log.dttm > (utcnow() - datetime.timedelta(days=7)))
- )
- robots_file_access_count = session.scalar(
- select(func.count()).select_from(robots_file_access_count)
+ session.query(Log)
+ .filter(Log.event == "robots")
+ .filter(Log.dttm > (utcnow() - datetime.timedelta(days=7)))
+ .count()
)
if robots_file_access_count > 0:
flash(
@@ -1045,11 +1038,9 @@ class Airflow(AirflowBaseView):
dataset_triggered_dag_ids = [
dag.dag_id
for dag in (
- session.scalars(
- select(DagModel.dag_id)
- .where(DagModel.dag_id.in_(filter_dag_ids))
- .where(DagModel.schedule_interval == "Dataset")
- )
+ session.query(DagModel.dag_id)
+ .filter(DagModel.dag_id.in_(filter_dag_ids))
+ .filter(DagModel.schedule_interval == "Dataset")
)
]
@@ -1080,10 +1071,10 @@ class Airflow(AirflowBaseView):
if not filter_dag_ids:
return flask.json.jsonify({})
- dag_state_stats = session.execute(
- select(DagRun.dag_id, DagRun.state, sqla.func.count(DagRun.state))
+ dag_state_stats = (
+ session.query(DagRun.dag_id, DagRun.state, sqla.func.count(DagRun.state))
.group_by(DagRun.dag_id, DagRun.state)
- .where(DagRun.dag_id.in_(filter_dag_ids))
+ .filter(DagRun.dag_id.in_(filter_dag_ids))
)
dag_state_data = {(dag_id, state): count for dag_id, state, count in dag_state_stats}
@@ -1121,17 +1112,17 @@ class Airflow(AirflowBaseView):
filter_dag_ids = allowed_dag_ids
running_dag_run_query_result = (
- select(DagRun.dag_id, DagRun.run_id)
+ session.query(DagRun.dag_id, DagRun.run_id)
.join(DagModel, DagModel.dag_id == DagRun.dag_id)
- .where(DagRun.state == State.RUNNING, DagModel.is_active)
+ .filter(DagRun.state == State.RUNNING, DagModel.is_active)
)
- running_dag_run_query_result = running_dag_run_query_result.where(DagRun.dag_id.in_(filter_dag_ids))
+ running_dag_run_query_result = running_dag_run_query_result.filter(DagRun.dag_id.in_(filter_dag_ids))
running_dag_run_query_result = running_dag_run_query_result.subquery("running_dag_run")
# Select all task_instances from active dag_runs.
- running_task_instance_query_result = select(
+ running_task_instance_query_result = session.query(
TaskInstance.dag_id.label("dag_id"),
TaskInstance.state.label("state"),
sqla.literal(True).label("is_dag_running"),
@@ -1145,19 +1136,19 @@ class Airflow(AirflowBaseView):
if conf.getboolean("webserver", "SHOW_RECENT_STATS_FOR_COMPLETED_RUNS", fallback=True):
last_dag_run = (
- select(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label("execution_date"))
+ session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label("execution_date"))
.join(DagModel, DagModel.dag_id == DagRun.dag_id)
- .where(DagRun.state != State.RUNNING, DagModel.is_active)
+ .filter(DagRun.state != State.RUNNING, DagModel.is_active)
.group_by(DagRun.dag_id)
)
- last_dag_run = last_dag_run.where(DagRun.dag_id.in_(filter_dag_ids))
+ last_dag_run = last_dag_run.filter(DagRun.dag_id.in_(filter_dag_ids))
last_dag_run = last_dag_run.subquery("last_dag_run")
# Select all task_instances from active dag_runs.
# If no dag_run is active, return task instances from most recent dag_run.
last_task_instance_query_result = (
- select(
+ session.query(
TaskInstance.dag_id.label("dag_id"),
TaskInstance.state.label("state"),
sqla.literal(False).label("is_dag_running"),
@@ -1178,8 +1169,8 @@ class Airflow(AirflowBaseView):
else:
final_task_instance_query_result = running_task_instance_query_result.subquery("final_ti")
- qry = session.execute(
- select(
+ qry = (
+ session.query(
final_task_instance_query_result.c.dag_id,
final_task_instance_query_result.c.state,
final_task_instance_query_result.c.is_dag_running,
@@ -1195,6 +1186,7 @@ class Airflow(AirflowBaseView):
final_task_instance_query_result.c.is_dag_running.desc(),
)
)
+
data = get_task_stats_from_query(qry)
payload: dict[str, list[dict[str, Any]]] = collections.defaultdict(list)
for dag_id in filter_dag_ids:
@@ -1227,31 +1219,29 @@ class Airflow(AirflowBaseView):
return flask.json.jsonify({})
last_runs_subquery = (
- select(
+ session.query(
DagRun.dag_id,
sqla.func.max(DagRun.execution_date).label("max_execution_date"),
)
.group_by(DagRun.dag_id)
- .where(DagRun.dag_id.in_(filter_dag_ids)) # Only include accessible/selected DAGs.
+ .filter(DagRun.dag_id.in_(filter_dag_ids)) # Only include accessible/selected DAGs.
.subquery("last_runs")
)
- query = session.execute(
- select(
- DagRun.dag_id,
- DagRun.start_date,
- DagRun.end_date,
- DagRun.state,
- DagRun.execution_date,
- DagRun.data_interval_start,
- DagRun.data_interval_end,
- ).join(
- last_runs_subquery,
- and_(
- last_runs_subquery.c.dag_id == DagRun.dag_id,
- last_runs_subquery.c.max_execution_date == DagRun.execution_date,
- ),
- )
+ query = session.query(
+ DagRun.dag_id,
+ DagRun.start_date,
+ DagRun.end_date,
+ DagRun.state,
+ DagRun.execution_date,
+ DagRun.data_interval_start,
+ DagRun.data_interval_end,
+ ).join(
+ last_runs_subquery,
+ and_(
+ last_runs_subquery.c.dag_id == DagRun.dag_id,
+ last_runs_subquery.c.max_execution_date == DagRun.execution_date,
+ ),
)
resp = {
@@ -1350,18 +1340,19 @@ class Airflow(AirflowBaseView):
title = "DAG Details"
root = request.args.get("root", "")
- states = session.execute(
- select(TaskInstance.state, sqla.func.count(TaskInstance.dag_id))
- .where(TaskInstance.dag_id == dag_id)
+ states = (
+ session.query(TaskInstance.state, sqla.func.count(TaskInstance.dag_id))
+ .filter(TaskInstance.dag_id == dag_id)
.group_by(TaskInstance.state)
- ).all()
+ .all()
+ )
active_runs = models.DagRun.find(dag_id=dag_id, state=DagRunState.RUNNING, external_trigger=False)
- tags = session.scalars(select(models.DagTag).where(models.DagTag.dag_id == dag_id)).all()
+ tags = session.query(models.DagTag).filter(models.DagTag.dag_id == dag_id).all()
# TODO: convert this to a relationship
- owner_links = session.execute(select(DagOwnerAttributes).filter_by(dag_id=dag_id)).all()
+ owner_links = session.query(DagOwnerAttributes).filter_by(dag_id=dag_id).all()
attrs_to_avoid = [
"schedule_datasets",
@@ -1622,17 +1613,18 @@ class Airflow(AirflowBaseView):
"metadata": {"end_of_log": True},
}
- ti = session.scalar(
- select(models.TaskInstance)
- .where(
+ ti = (
+ session.query(models.TaskInstance)
+ .filter(
TaskInstance.task_id == task_id,
TaskInstance.dag_id == dag_id,
TaskInstance.execution_date == execution_date,
TaskInstance.map_index == map_index,
)
.join(TaskInstance.dag_run)
- .options(joinedload(TaskInstance.trigger).joinedload(Trigger.triggerer_job))
- .limit(1)
+ .options(joinedload("trigger"))
+ .options(joinedload("trigger.triggerer_job"))
+ .first()
)
if ti is None:
@@ -1690,10 +1682,10 @@ class Airflow(AirflowBaseView):
form = DateTimeForm(data={"execution_date": dttm})
dag_model = DagModel.get_dagmodel(dag_id)
- ti = session.scalar(
- select(models.TaskInstance)
+ ti = (
+ session.query(models.TaskInstance)
.filter_by(dag_id=dag_id, task_id=task_id, execution_date=dttm, map_index=map_index)
- .limit(1)
+ .first()
)
num_logs = 0
@@ -1734,10 +1726,10 @@ class Airflow(AirflowBaseView):
map_index = request.args.get("map_index", -1, type=int)
try_number = request.args.get("try_number", 1)
- ti = session.scalar(
- select(models.TaskInstance)
+ ti = (
+ session.query(models.TaskInstance)
.filter_by(dag_id=dag_id, task_id=task_id, execution_date=dttm, map_index=map_index)
- .limit(1)
+ .first()
)
if not ti:
@@ -1779,8 +1771,8 @@ class Airflow(AirflowBaseView):
task = copy.copy(dag.get_task(task_id))
task.resolve_template_files()
- ti: TaskInstance | None = session.scalar(
- select(TaskInstance)
+ ti: TaskInstance | None = (
+ session.query(TaskInstance)
.options(
# HACK: Eager-load relationships. This is needed because
# multiple properties mis-use provide_session() that destroys
@@ -1789,6 +1781,7 @@ class Airflow(AirflowBaseView):
joinedload(TaskInstance.trigger, innerjoin=False),
)
.filter_by(execution_date=dttm, dag_id=dag_id, task_id=task_id, map_index=map_index)
+ .one_or_none()
)
if ti is None:
ti_attrs: list[tuple[str, Any]] | None = None
@@ -1905,19 +1898,17 @@ class Airflow(AirflowBaseView):
form = DateTimeForm(data={"execution_date": dttm})
root = request.args.get("root", "")
dag = DagModel.get_dagmodel(dag_id)
- ti = session.scalar(select(TaskInstance).filter_by(dag_id=dag_id, task_id=task_id).limit(1))
+ ti = session.query(TaskInstance).filter_by(dag_id=dag_id, task_id=task_id).first()
if not ti:
flash(f"Task [{dag_id}.{task_id}] doesn't seem to exist at the moment", "error")
return redirect(url_for("Airflow.index"))
- xcom_query = session.execute(
- select(XCom.key, XCom.value).where(
- XCom.dag_id == dag_id,
- XCom.task_id == task_id,
- XCom.execution_date == dttm,
- XCom.map_index == map_index,
- )
+ xcom_query = session.query(XCom.key, XCom.value).filter(
+ XCom.dag_id == dag_id,
+ XCom.task_id == task_id,
+ XCom.execution_date == dttm,
+ XCom.map_index == map_index,
)
attributes = [(k, v) for k, v in xcom_query if not k.startswith("_")]
@@ -1987,7 +1978,7 @@ class Airflow(AirflowBaseView):
request_execution_date = request.values.get("execution_date", default=timezone.utcnow().isoformat())
is_dag_run_conf_overrides_params = conf.getboolean("core", "dag_run_conf_overrides_params")
dag = get_airflow_app().dag_bag.get_dag(dag_id)
- dag_orm: DagModel = session.scalar(select(DagModel).where(DagModel.dag_id == dag_id).limit(1))
+ dag_orm: DagModel = session.query(DagModel).filter(DagModel.dag_id == dag_id).first()
# Prepare form fields with param struct details to render a proper form with schema information
form_fields = {}
@@ -2025,9 +2016,11 @@ class Airflow(AirflowBaseView):
flash(f"Cannot create dagruns because the dag {dag_id} has import errors", "error")
return redirect(origin)
- recent_runs = session.execute(
- select(DagRun.conf, func.max(DagRun.run_id).label("run_id"), func.max(DagRun.execution_date))
- .where(
+ recent_runs = (
+ session.query(
+ DagRun.conf, func.max(DagRun.run_id).label("run_id"), func.max(DagRun.execution_date)
+ )
+ .filter(
DagRun.dag_id == dag_id,
DagRun.run_type == DagRunType.MANUAL,
DagRun.conf.isnot(None),
@@ -2300,17 +2293,15 @@ class Airflow(AirflowBaseView):
# Lock the related dag runs to prevent from possible dead lock.
# https://github.com/apache/airflow/pull/26658
- dag_runs_query = session.scalars(
- select(DagRun.id).where(DagRun.dag_id == dag_id).with_for_update()
- )
+ dag_runs_query = session.query(DagRun.id).filter(DagRun.dag_id == dag_id).with_for_update()
if start_date is None and end_date is None:
- dag_runs_query = dag_runs_query.where(DagRun.execution_date == start_date)
+ dag_runs_query = dag_runs_query.filter(DagRun.execution_date == start_date)
else:
if start_date is not None:
- dag_runs_query = dag_runs_query.where(DagRun.execution_date >= start_date)
+ dag_runs_query = dag_runs_query.filter(DagRun.execution_date >= start_date)
if end_date is not None:
- dag_runs_query = dag_runs_query.where(DagRun.execution_date <= end_date)
+ dag_runs_query = dag_runs_query.filter(DagRun.execution_date <= end_date)
locked_dag_run_ids = dag_runs_query.all()
elif task_id:
@@ -2399,10 +2390,10 @@ class Airflow(AirflowBaseView):
if not filter_dag_ids:
return flask.json.jsonify([])
- dags = session.execute(
- select(DagRun.dag_id, sqla.func.count(DagRun.id))
- .where(DagRun.state == DagRunState.RUNNING)
- .where(DagRun.dag_id.in_(filter_dag_ids))
+ dags = (
+ session.query(DagRun.dag_id, sqla.func.count(DagRun.id))
+ .filter(DagRun.state == DagRunState.RUNNING)
+ .filter(DagRun.dag_id.in_(filter_dag_ids))
.group_by(DagRun.dag_id)
)
@@ -2483,11 +2474,9 @@ class Airflow(AirflowBaseView):
# Identify tasks that will be queued up to run when confirmed
all_task_ids = [task.task_id for task in dag.tasks]
- existing_tis = session.execute(
- select(TaskInstance.task_id).where(
- TaskInstance.dag_id == dag.dag_id,
- TaskInstance.run_id == dag_run_id,
- )
+ existing_tis = session.query(TaskInstance.task_id).filter(
+ TaskInstance.dag_id == dag.dag_id,
+ TaskInstance.run_id == dag_run_id,
)
completed_tis_ids = [task_id for task_id, in existing_tis]
@@ -2969,18 +2958,19 @@ class Airflow(AirflowBaseView):
if root:
dag = dag.partial_subset(task_ids_or_regex=root, include_downstream=False, include_upstream=True)
- dag_states = session.execute(
- select(
- _convert_to_date(session, DagRun.execution_date).label("date"),
+ dag_states = (
+ session.query(
+ (_convert_to_date(session, DagRun.execution_date)).label("date"),
DagRun.state,
func.max(DagRun.data_interval_start).label("data_interval_start"),
func.max(DagRun.data_interval_end).label("data_interval_end"),
func.count("*").label("count"),
)
- .where(DagRun.dag_id == dag.dag_id)
+ .filter(DagRun.dag_id == dag.dag_id)
.group_by(_convert_to_date(session, DagRun.execution_date), DagRun.state)
.order_by(_convert_to_date(session, DagRun.execution_date).asc())
- ).all()
+ .all()
+ )
data_dag_states = [
{
@@ -3251,17 +3241,16 @@ class Airflow(AirflowBaseView):
else:
min_date = timezone.utc_epoch()
ti_fails = (
- select(TaskFail)
+ session.query(TaskFail)
.join(TaskFail.dag_run)
- .where(
+ .filter(
TaskFail.dag_id == dag.dag_id,
DagRun.execution_date >= min_date,
DagRun.execution_date <= base_date,
)
)
if dag.partial:
- ti_fails = ti_fails.where(TaskFail.task_id.in_([t.task_id for t in dag.tasks]))
- ti_fails = session.scalars(ti_fails)
+ ti_fails = ti_fails.filter(TaskFail.task_id.in_([t.task_id for t in dag.tasks]))
fails_totals: dict[tuple[str, str, str], int] = defaultdict(int)
for failed_task_instance in ti_fails:
dict_key = (
@@ -3602,9 +3591,9 @@ class Airflow(AirflowBaseView):
form = DateTimeWithNumRunsWithDagRunsForm(data=dt_nr_dr_data)
form.execution_date.choices = dt_nr_dr_data["dr_choices"]
- tis = session.scalars(
- select(TaskInstance)
- .where(
+ tis = (
+ session.query(TaskInstance)
+ .filter(
TaskInstance.dag_id == dag_id,
TaskInstance.run_id == dag_run_id,
TaskInstance.start_date.is_not(None),
@@ -3613,10 +3602,10 @@ class Airflow(AirflowBaseView):
.order_by(TaskInstance.start_date)
)
- ti_fails = select(TaskFail).filter_by(run_id=dag_run_id, dag_id=dag_id)
+ ti_fails = session.query(TaskFail).filter_by(run_id=dag_run_id, dag_id=dag_id)
if dag.partial:
- ti_fails = ti_fails.where(TaskFail.task_id.in_([t.task_id for t in dag.tasks]))
- ti_fails = session.scalars(ti_fails)
+ ti_fails = ti_fails.filter(TaskFail.task_id.in_([t.task_id for t in dag.tasks]))
+
tasks = []
for ti in tis:
if not dag.has_task(ti.task_id):
@@ -3722,13 +3711,12 @@ class Airflow(AirflowBaseView):
if link_name is None:
return {"url": None, "error": "Link name not passed"}, 400
- ti = session.scalar(
- select(TaskInstance)
+ ti = (
+ session.query(TaskInstance)
.filter_by(dag_id=dag_id, task_id=task_id, execution_date=dttm, map_index=map_index)
.options(joinedload(TaskInstance.dag_run))
- .limit(1)
+ .first()
)
-
if not ti:
return {"url": None, "error": "Task Instances not found"}, 404
try:
@@ -3836,25 +3824,27 @@ class Airflow(AirflowBaseView):
base_date = dag.get_latest_execution_date() or timezone.utcnow()
with create_session() as session:
- query = select(DagRun).where(DagRun.dag_id == dag.dag_id, DagRun.execution_date <= base_date)
+ query = session.query(DagRun).filter(
+ DagRun.dag_id == dag.dag_id, DagRun.execution_date <= base_date
+ )
- run_type = request.args.get("run_type")
- if run_type:
- query = query.where(DagRun.run_type == run_type)
+ run_type = request.args.get("run_type")
+ if run_type:
+ query = query.filter(DagRun.run_type == run_type)
- run_state = request.args.get("run_state")
- if run_state:
- query = query.where(DagRun.state == run_state)
+ run_state = request.args.get("run_state")
+ if run_state:
+ query = query.filter(DagRun.state == run_state)
- dag_runs = wwwutils.sorted_dag_runs(
- query, ordering=dag.timetable.run_ordering, limit=num_runs, session=session
- )
- encoded_runs = [wwwutils.encode_dag_run(dr, json_encoder=utils_json.WebEncoder) for dr in dag_runs]
- data = {
- "groups": dag_to_grid(dag, dag_runs, session),
- "dag_runs": encoded_runs,
- "ordering": dag.timetable.run_ordering,
- }
+ dag_runs = wwwutils.sorted_dag_runs(query, ordering=dag.timetable.run_ordering, limit=num_runs)
+ encoded_runs = [
+ wwwutils.encode_dag_run(dr, json_encoder=utils_json.WebEncoder) for dr in dag_runs
+ ]
+ data = {
+ "groups": dag_to_grid(dag, dag_runs, session),
+ "dag_runs": encoded_runs,
+ "ordering": dag.timetable.run_ordering,
+ }
# avoid spaces to reduce payload size
return (
htmlsafe_json_dumps(data, separators=(",", ":"), dumps=flask.json.dumps),
@@ -3873,34 +3863,37 @@ class Airflow(AirflowBaseView):
end_date = _safe_parse_datetime(request.args.get("end_date"))
with create_session() as session:
# DagRuns
- dag_runs_type = session.execute(
- select(DagRun.run_type, func.count(DagRun.run_id))
- .where(
+ dag_runs_type = (
+ session.query(DagRun.run_type, func.count(DagRun.run_id))
+ .filter(
DagRun.start_date >= start_date,
or_(DagRun.end_date.is_(None), DagRun.end_date <= end_date),
)
.group_by(DagRun.run_type)
- ).all()
+ .all()
+ )
- dag_run_states = session.execute(
- select(DagRun.state, func.count(DagRun.run_id))
- .where(
+ dag_run_states = (
+ session.query(DagRun.state, func.count(DagRun.run_id))
+ .filter(
DagRun.start_date >= start_date,
or_(DagRun.end_date.is_(None), DagRun.end_date <= end_date),
)
.group_by(DagRun.state)
- ).all()
+ .all()
+ )
# TaskInstances
- task_instance_states = session.execute(
- select(TaskInstance.state, func.count(TaskInstance.run_id))
+ task_instance_states = (
+ session.query(TaskInstance.state, func.count(TaskInstance.run_id))
.join(TaskInstance.dag_run)
- .where(
+ .filter(
DagRun.start_date >= start_date,
or_(DagRun.end_date.is_(None), DagRun.end_date <= end_date),
)
.group_by(TaskInstance.state)
- ).all()
+ .all()
+ )
data = {
"dag_run_types": {
@@ -3934,32 +3927,28 @@ class Airflow(AirflowBaseView):
with create_session() as session:
data = [
dict(info)
- for info in session.execute(
- select(
- DatasetModel.id,
- DatasetModel.uri,
- func.max(DatasetEvent.timestamp).label("lastUpdate"),
- )
- .join(
- DagScheduleDatasetReference, DagScheduleDatasetReference.dataset_id == DatasetModel.id
- )
- .join(
- DatasetDagRunQueue,
- and_(
- DatasetDagRunQueue.dataset_id == DatasetModel.id,
- DatasetDagRunQueue.target_dag_id == DagScheduleDatasetReference.dag_id,
- ),
- isouter=True,
- )
- .join(
- DatasetEvent,
- DatasetEvent.dataset_id == DatasetModel.id,
- isouter=True,
- )
- .where(DagScheduleDatasetReference.dag_id == dag_id, ~DatasetModel.is_orphaned)
- .group_by(DatasetModel.id, DatasetModel.uri)
- .order_by(DatasetModel.uri)
+ for info in session.query(
+ DatasetModel.id,
+ DatasetModel.uri,
+ func.max(DatasetEvent.timestamp).label("lastUpdate"),
)
+ .join(DagScheduleDatasetReference, DagScheduleDatasetReference.dataset_id == DatasetModel.id)
+ .join(
+ DatasetDagRunQueue,
+ and_(
+ DatasetDagRunQueue.dataset_id == DatasetModel.id,
+ DatasetDagRunQueue.target_dag_id == DagScheduleDatasetReference.dag_id,
+ ),
+ isouter=True,
+ )
+ .join(
+ DatasetEvent,
+ DatasetEvent.dataset_id == DatasetModel.id,
+ isouter=True,
+ )
+ .filter(DagScheduleDatasetReference.dag_id == dag_id, ~DatasetModel.is_orphaned)
+ .group_by(DatasetModel.id, DatasetModel.uri)
+ .order_by(DatasetModel.uri)
]
return (
htmlsafe_json_dumps(data, separators=(",", ":"), dumps=flask.json.dumps),
@@ -4057,12 +4046,12 @@ class Airflow(AirflowBaseView):
if session.bind.dialect.name == "postgresql":
order_by = (order_by[0].nulls_first(), *order_by[1:])
- count_query = select(func.count(DatasetModel.id))
+ count_query = session.query(func.count(DatasetModel.id))
has_event_filters = bool(updated_before or updated_after)
query = (
- select(
+ session.query(
DatasetModel.id,
DatasetModel.uri,
func.max(DatasetEvent.timestamp).label("last_dataset_update"),
@@ -4087,12 +4076,11 @@ class Airflow(AirflowBaseView):
if updated_before:
filters.append(DatasetEvent.timestamp <= updated_before)
- query = query.where(*filters).offset(offset).limit(limit)
- count_query = count_query.where(*filters)
+ query = query.filter(*filters).offset(offset).limit(limit)
+ count_query = count_query.filter(*filters)
- query = session.execute(query)
datasets = [dict(dataset) for dataset in query]
- data = {"datasets": datasets, "total_entries": session.scalar(count_query)}
+ data = {"datasets": datasets, "total_entries": count_query.scalar()}
return (
htmlsafe_json_dumps(data, separators=(",", ":"), cls=utils_json.WebEncoder),
@@ -4138,20 +4126,20 @@ class Airflow(AirflowBaseView):
included_events_raw = conf.get("webserver", "audit_view_included_events", fallback=None)
excluded_events_raw = conf.get("webserver", "audit_view_excluded_events", fallback=None)
- query = select(Log).where(Log.dag_id == dag_id)
+ query = session.query(Log).filter(Log.dag_id == dag_id)
if included_events_raw:
included_events = {event.strip() for event in included_events_raw.split(",")}
- query = query.where(Log.event.in_(included_events))
+ query = query.filter(Log.event.in_(included_events))
elif excluded_events_raw:
excluded_events = {event.strip() for event in excluded_events_raw.split(",")}
- query = query.where(Log.event.notin_(excluded_events))
+ query = query.filter(Log.event.notin_(excluded_events))
current_page = request.args.get("page", default=0, type=int)
arg_sorting_key = request.args.get("sorting_key", "dttm")
arg_sorting_direction = request.args.get("sorting_direction", default="desc")
logs_per_page = PAGE_SIZE
- audit_logs_count = session.scalar(select(func.count()).select_from(query))
+ audit_logs_count = query.count()
num_of_pages = int(math.ceil(audit_logs_count / float(logs_per_page)))
start = current_page * logs_per_page
@@ -4163,7 +4151,7 @@ class Airflow(AirflowBaseView):
sort_column = sort_column.desc()
query = query.order_by(sort_column)
- dag_audit_logs = session.scalars(query.offset(start).limit(logs_per_page)).all()
+ dag_audit_logs = query.offset(start).limit(logs_per_page).all()
return self.render_template(
"airflow/dag_audit_log.html",
dag=dag,
@@ -4284,7 +4272,7 @@ class DagFilter(BaseFilter):
if get_airflow_app().appbuilder.sm.has_all_dags_access(g.user):
return query
filter_dag_ids = get_airflow_app().appbuilder.sm.get_accessible_dag_ids(g.user)
- return query.where(self.model.dag_id.in_(filter_dag_ids))
+ return query.filter(self.model.dag_id.in_(filter_dag_ids))
class AirflowModelView(ModelView):
@@ -4729,11 +4717,9 @@ class ConnectionModelView(AirflowModelView):
potential_connection_ids = [f"{base_conn_id}_copy{i}" for i in range(1, 11)]
- query = session.scalars(
- select(Connection.conn_id).where(Connection.conn_id.in_(potential_connection_ids))
- )
+ query = session.query(Connection.conn_id).filter(Connection.conn_id.in_(potential_connection_ids))
- found_conn_id_set = {conn_id for conn_id in query}
+ found_conn_id_set = {conn_id for conn_id, in query}
possible_conn_id_iter = (
connection_id
@@ -5403,7 +5389,7 @@ class DagRunModelView(AirflowPrivilegeVerifierModelView):
"""This routine only supports Running and Queued state."""
try:
count = 0
- for dr in session.scalars(select(DagRun).where(DagRun.id.in_(dagrun.id for dagrun in drs))):
+ for dr in session.query(DagRun).filter(DagRun.id.in_(dagrun.id for dagrun in drs)):
count += 1
if state == State.RUNNING:
dr.start_date = timezone.utcnow()
@@ -5429,7 +5415,7 @@ class DagRunModelView(AirflowPrivilegeVerifierModelView):
try:
count = 0
altered_tis = []
- for dr in session.scalars(select(DagRun).where(DagRun.id.in_(dagrun.id for dagrun in drs))):
+ for dr in session.query(DagRun).filter(DagRun.id.in_(dagrun.id for dagrun in drs)):
count += 1
altered_tis += set_dag_run_state_to_failed(
dag=get_airflow_app().dag_bag.get_dag(dr.dag_id),
@@ -5457,7 +5443,7 @@ class DagRunModelView(AirflowPrivilegeVerifierModelView):
try:
count = 0
altered_tis = []
- for dr in session.scalars(select(DagRun).where(DagRun.id.in_(dagrun.id for dagrun in drs))):
+ for dr in session.query(DagRun).filter(DagRun.id.in_(dagrun.id for dagrun in drs)):
count += 1
altered_tis += set_dag_run_state_to_success(
dag=get_airflow_app().dag_bag.get_dag(dr.dag_id),
@@ -5481,7 +5467,7 @@ class DagRunModelView(AirflowPrivilegeVerifierModelView):
count = 0
cleared_ti_count = 0
dag_to_tis: dict[DAG, list[TaskInstance]] = {}
- for dr in session.scalars(select(DagRun).where(DagRun.id.in_(dagrun.id for dagrun in drs))):
+ for dr in session.query(DagRun).filter(DagRun.id.in_(dagrun.id for dagrun in drs)):
count += 1
dag = get_airflow_app().dag_bag.get_dag(dr.dag_id)
tis_to_clear = dag_to_tis.setdefault(dag, [])
@@ -5897,37 +5883,35 @@ class AutocompleteView(AirflowBaseView):
return flask.json.jsonify([])
# Provide suggestions of dag_ids and owners
- dag_ids_query = select(
+ dag_ids_query = session.query(
sqla.literal("dag").label("type"),
DagModel.dag_id.label("name"),
- ).where(~DagModel.is_subdag, DagModel.is_active, DagModel.dag_id.ilike(f"%{query}%"))
+ ).filter(~DagModel.is_subdag, DagModel.is_active, DagModel.dag_id.ilike(f"%{query}%"))
owners_query = (
- select(
+ session.query(
sqla.literal("owner").label("type"),
DagModel.owners.label("name"),
)
.distinct()
- .where(~DagModel.is_subdag, DagModel.is_active, DagModel.owners.ilike(f"%{query}%"))
+ .filter(~DagModel.is_subdag, DagModel.is_active, DagModel.owners.ilike(f"%{query}%"))
)
# Hide DAGs if not showing status: "all"
status = flask_session.get(FILTER_STATUS_COOKIE)
if status == "active":
- dag_ids_query = dag_ids_query.where(~DagModel.is_paused)
- owners_query = owners_query.where(~DagModel.is_paused)
+ dag_ids_query = dag_ids_query.filter(~DagModel.is_paused)
+ owners_query = owners_query.filter(~DagModel.is_paused)
elif status == "paused":
- dag_ids_query = dag_ids_query.where(DagModel.is_paused)
- owners_query = owners_query.where(DagModel.is_paused)
+ dag_ids_query = dag_ids_query.filter(DagModel.is_paused)
+ owners_query = owners_query.filter(DagModel.is_paused)
filter_dag_ids = get_airflow_app().appbuilder.sm.get_accessible_dag_ids(g.user)
- dag_ids_query = dag_ids_query.where(DagModel.dag_id.in_(filter_dag_ids))
- owners_query = owners_query.where(DagModel.dag_id.in_(filter_dag_ids))
- payload = [
- row._asdict()
- for row in session.execute(dag_ids_query.union(owners_query).order_by("name").limit(10))
- ]
+ dag_ids_query = dag_ids_query.filter(DagModel.dag_id.in_(filter_dag_ids))
+ owners_query = owners_query.filter(DagModel.dag_id.in_(filter_dag_ids))
+
+ payload = [row._asdict() for row in dag_ids_query.union(owners_query).order_by("name").limit(10)]
return flask.json.jsonify(payload)