You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/08/08 17:06:05 UTC

[airflow] branch master updated: Remove redundant "and_" condition when using filter (#10232)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7930234  Remove redundant "and_" condition when using filter (#10232)
7930234 is described below

commit 7930234726c5e9cb9745cc7944047ac343ab832a
Author: Kaxil Naik <ka...@gmail.com>
AuthorDate: Sat Aug 8 18:05:36 2020 +0100

    Remove redundant "and_" condition when using filter (#10232)
    
    Multiple criteria may be specified as comma separated; the effect is that they will be joined together using the and_() function ( https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.filter)
---
 airflow/api_connexion/endpoints/dag_run_endpoint.py | 6 +++---
 airflow/api_connexion/endpoints/xcom_endpoint.py    | 7 +++----
 airflow/jobs/scheduler_job.py                       | 4 ++--
 airflow/models/dagcode.py                           | 6 +++---
 airflow/models/renderedtifields.py                  | 8 ++++----
 5 files changed, 15 insertions(+), 16 deletions(-)

diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 7c08423..c4ed07d 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -17,7 +17,7 @@
 from connexion import NoContent
 from flask import request
 from marshmallow import ValidationError
-from sqlalchemy import and_, func
+from sqlalchemy import func
 
 from airflow.api_connexion.exceptions import AlreadyExists, BadRequest, NotFound
 from airflow.api_connexion.parameters import check_limit, format_datetime, format_parameters
@@ -36,7 +36,7 @@ def delete_dag_run(dag_id, dag_run_id, session):
     """
     if (
         session.query(DagRun)
-            .filter(and_(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id))
+            .filter(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id)
             .delete() == 0
     ):
         raise NotFound(detail=f"DAGRun with DAG ID: '{dag_id}' and DagRun ID: '{dag_run_id}' not found")
@@ -162,7 +162,7 @@ def post_dag_run(dag_id, session):
     post_body = dagrun_schema.load(request.json, session=session)
     dagrun_instance = (
         session.query(DagRun).filter(
-            and_(DagRun.dag_id == dag_id, DagRun.run_id == post_body["run_id"])).first()
+            DagRun.dag_id == dag_id, DagRun.run_id == post_body["run_id"]).first()
     )
     if not dagrun_instance:
         dag_run = DagRun(dag_id=dag_id, run_type=DagRunType.MANUAL.value, **post_body)
diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py b/airflow/api_connexion/endpoints/xcom_endpoint.py
index 13dff43..0c20192 100644
--- a/airflow/api_connexion/endpoints/xcom_endpoint.py
+++ b/airflow/api_connexion/endpoints/xcom_endpoint.py
@@ -74,10 +74,9 @@ def get_xcom_entry(
     """
     Get an XCom entry
     """
-    query = session.query(XCom)
-    query = query.filter(and_(XCom.dag_id == dag_id,
-                              XCom.task_id == task_id,
-                              XCom.key == xcom_key))
+    query = session.query(XCom).filter(XCom.dag_id == dag_id,
+                                       XCom.task_id == task_id,
+                                       XCom.key == xcom_key)
     query = query.join(DR, and_(XCom.dag_id == DR.dag_id, XCom.execution_date == DR.execution_date))
     query = query.filter(DR.run_id == dag_run_id)
 
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 90b2e1c..1642292 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1131,11 +1131,11 @@ class SchedulerJob(BaseJob):  # pylint: disable=too-many-instance-attributes
             subq = query.subquery()
             tis_changed = session \
                 .query(models.TaskInstance) \
-                .filter(and_(
+                .filter(
                     models.TaskInstance.dag_id == subq.c.dag_id,
                     models.TaskInstance.task_id == subq.c.task_id,
                     models.TaskInstance.execution_date ==
-                    subq.c.execution_date)) \
+                    subq.c.execution_date) \
                 .update({models.TaskInstance.state: new_state}, synchronize_session=False)
             session.commit()
 
diff --git a/airflow/models/dagcode.py b/airflow/models/dagcode.py
index 5146ebe..8defbeb 100644
--- a/airflow/models/dagcode.py
+++ b/airflow/models/dagcode.py
@@ -20,7 +20,7 @@ import struct
 from datetime import datetime
 from typing import Iterable, Optional
 
-from sqlalchemy import BigInteger, Column, String, UnicodeText, and_, exists
+from sqlalchemy import BigInteger, Column, String, UnicodeText, exists
 
 from airflow.exceptions import AirflowException, DagCodeNotFound
 from airflow.models.base import Base
@@ -151,8 +151,8 @@ class DagCode(Base):
         log.debug("Deleting code from %s table ", cls.__tablename__)
 
         session.query(cls).filter(
-            and_(cls.fileloc_hash.notin_(alive_fileloc_hashes),
-                 cls.fileloc.notin_(alive_dag_filelocs))).delete(synchronize_session='fetch')
+            cls.fileloc_hash.notin_(alive_fileloc_hashes),
+            cls.fileloc.notin_(alive_dag_filelocs)).delete(synchronize_session='fetch')
 
     @classmethod
     @provide_session
diff --git a/airflow/models/renderedtifields.py b/airflow/models/renderedtifields.py
index 3647dff..e4f1f00 100644
--- a/airflow/models/renderedtifields.py
+++ b/airflow/models/renderedtifields.py
@@ -120,10 +120,10 @@ class RenderedTaskInstanceFields(Base):
             subq1 = tis_to_keep_query.subquery('subq1')
 
             session.query(cls) \
-                .filter(and_(
+                .filter(
                     cls.dag_id == dag_id,
                     cls.task_id == task_id,
-                    tuple_(cls.dag_id, cls.task_id, cls.execution_date).notin_(subq1))) \
+                    tuple_(cls.dag_id, cls.task_id, cls.execution_date).notin_(subq1)) \
                 .delete(synchronize_session=False)
         elif session.bind.dialect.name in ["mysql"]:
             # Fetch Top X records given dag_id & task_id ordered by Execution Date
@@ -140,10 +140,10 @@ class RenderedTaskInstanceFields(Base):
             )
 
             session.query(cls) \
-                .filter(and_(
+                .filter(
                     cls.dag_id == dag_id,
                     cls.task_id == task_id,
-                    tuple_(cls.dag_id, cls.task_id, cls.execution_date).notin_(subq2))) \
+                    tuple_(cls.dag_id, cls.task_id, cls.execution_date).notin_(subq2)) \
                 .delete(synchronize_session=False)
         else:
             # Fetch Top X records given dag_id & task_id ordered by Execution Date