You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/08/08 17:06:05 UTC
[airflow] branch master updated: Remove redundant "and_" condition
when using filter (#10232)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 7930234 Remove redundant "and_" condition when using filter (#10232)
7930234 is described below
commit 7930234726c5e9cb9745cc7944047ac343ab832a
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Sat Aug 8 18:05:36 2020 +0100
Remove redundant "and_" condition when using filter (#10232)
Multiple criteria may be specified as comma separated; the effect is that they will be joined together using the and_() function ( https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.filter)
---
airflow/api_connexion/endpoints/dag_run_endpoint.py | 6 +++---
airflow/api_connexion/endpoints/xcom_endpoint.py | 7 +++----
airflow/jobs/scheduler_job.py | 4 ++--
airflow/models/dagcode.py | 6 +++---
airflow/models/renderedtifields.py | 8 ++++----
5 files changed, 15 insertions(+), 16 deletions(-)
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 7c08423..c4ed07d 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -17,7 +17,7 @@
from connexion import NoContent
from flask import request
from marshmallow import ValidationError
-from sqlalchemy import and_, func
+from sqlalchemy import func
from airflow.api_connexion.exceptions import AlreadyExists, BadRequest, NotFound
from airflow.api_connexion.parameters import check_limit, format_datetime, format_parameters
@@ -36,7 +36,7 @@ def delete_dag_run(dag_id, dag_run_id, session):
"""
if (
session.query(DagRun)
- .filter(and_(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id))
+ .filter(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id)
.delete() == 0
):
raise NotFound(detail=f"DAGRun with DAG ID: '{dag_id}' and DagRun ID: '{dag_run_id}' not found")
@@ -162,7 +162,7 @@ def post_dag_run(dag_id, session):
post_body = dagrun_schema.load(request.json, session=session)
dagrun_instance = (
session.query(DagRun).filter(
- and_(DagRun.dag_id == dag_id, DagRun.run_id == post_body["run_id"])).first()
+ DagRun.dag_id == dag_id, DagRun.run_id == post_body["run_id"]).first()
)
if not dagrun_instance:
dag_run = DagRun(dag_id=dag_id, run_type=DagRunType.MANUAL.value, **post_body)
diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py b/airflow/api_connexion/endpoints/xcom_endpoint.py
index 13dff43..0c20192 100644
--- a/airflow/api_connexion/endpoints/xcom_endpoint.py
+++ b/airflow/api_connexion/endpoints/xcom_endpoint.py
@@ -74,10 +74,9 @@ def get_xcom_entry(
"""
Get an XCom entry
"""
- query = session.query(XCom)
- query = query.filter(and_(XCom.dag_id == dag_id,
- XCom.task_id == task_id,
- XCom.key == xcom_key))
+ query = session.query(XCom).filter(XCom.dag_id == dag_id,
+ XCom.task_id == task_id,
+ XCom.key == xcom_key)
query = query.join(DR, and_(XCom.dag_id == DR.dag_id, XCom.execution_date == DR.execution_date))
query = query.filter(DR.run_id == dag_run_id)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 90b2e1c..1642292 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1131,11 +1131,11 @@ class SchedulerJob(BaseJob): # pylint: disable=too-many-instance-attributes
subq = query.subquery()
tis_changed = session \
.query(models.TaskInstance) \
- .filter(and_(
+ .filter(
models.TaskInstance.dag_id == subq.c.dag_id,
models.TaskInstance.task_id == subq.c.task_id,
models.TaskInstance.execution_date ==
- subq.c.execution_date)) \
+ subq.c.execution_date) \
.update({models.TaskInstance.state: new_state}, synchronize_session=False)
session.commit()
diff --git a/airflow/models/dagcode.py b/airflow/models/dagcode.py
index 5146ebe..8defbeb 100644
--- a/airflow/models/dagcode.py
+++ b/airflow/models/dagcode.py
@@ -20,7 +20,7 @@ import struct
from datetime import datetime
from typing import Iterable, Optional
-from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists
+from sqlalchemy import BigInteger, Column, String, UnicodeText, exists
from airflow.exceptions import AirflowException, DagCodeNotFound
from airflow.models.base import Base
@@ -151,8 +151,8 @@ class DagCode(Base):
log.debug("Deleting code from %s table ", cls.__tablename__)
session.query(cls).filter(
- and_(cls.fileloc_hash.notin_(alive_fileloc_hashes),
- cls.fileloc.notin_(alive_dag_filelocs))).delete(synchronize_session='fetch')
+ cls.fileloc_hash.notin_(alive_fileloc_hashes),
+ cls.fileloc.notin_(alive_dag_filelocs)).delete(synchronize_session='fetch')
@classmethod
@provide_session
diff --git a/airflow/models/renderedtifields.py b/airflow/models/renderedtifields.py
index 3647dff..e4f1f00 100644
--- a/airflow/models/renderedtifields.py
+++ b/airflow/models/renderedtifields.py
@@ -120,10 +120,10 @@ class RenderedTaskInstanceFields(Base):
subq1 = tis_to_keep_query.subquery('subq1')
session.query(cls) \
- .filter(and_(
+ .filter(
cls.dag_id == dag_id,
cls.task_id == task_id,
- tuple_(cls.dag_id, cls.task_id, cls.execution_date).notin_(subq1))) \
+ tuple_(cls.dag_id, cls.task_id, cls.execution_date).notin_(subq1)) \
.delete(synchronize_session=False)
elif session.bind.dialect.name in ["mysql"]:
# Fetch Top X records given dag_id & task_id ordered by Execution Date
@@ -140,10 +140,10 @@ class RenderedTaskInstanceFields(Base):
)
session.query(cls) \
- .filter(and_(
+ .filter(
cls.dag_id == dag_id,
cls.task_id == task_id,
- tuple_(cls.dag_id, cls.task_id, cls.execution_date).notin_(subq2))) \
+ tuple_(cls.dag_id, cls.task_id, cls.execution_date).notin_(subq2)) \
.delete(synchronize_session=False)
else:
# Fetch Top X records given dag_id & task_id ordered by Execution Date