You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/13 18:20:38 UTC
[airflow] 18/38: Don't check execution_date in refresh_from_db
(#16809)
This is an automated email from the ASF dual-hosted git repository.
jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit ae4ab586b8bb12f0827505d05c88ac924734410a
Author: Tzu-ping Chung <tp...@astronomer.io>
AuthorDate: Wed Jul 7 19:04:15 2021 +0800
Don't check execution_date in refresh_from_db (#16809)
The native sqlalchemy DateTime type does not compare well when timezones
don't match. This can happen if the current execution_date on a DagRun
instance is not in UTC (the db entry is always in UTC).
Since DagRun has a unique constraint on (dag_id, run_id), these two
should be able to return one unique result, and the executrion_date
column should not be needed anyway. Let's just remove that filter to
prevent all the datetime comparison trouble.
(cherry picked from commit faffaec73385db3c6910d31ccea9fc4f9f3f9d42)
---
airflow/models/dagrun.py | 29 ++---------------------------
1 file changed, 2 insertions(+), 27 deletions(-)
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index f29d6ac..07f309d 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -18,19 +18,7 @@
from datetime import datetime
from typing import TYPE_CHECKING, Any, Iterable, List, NamedTuple, Optional, Tuple, Union
-from sqlalchemy import (
- Boolean,
- Column,
- DateTime,
- Index,
- Integer,
- PickleType,
- String,
- UniqueConstraint,
- and_,
- func,
- or_,
-)
+from sqlalchemy import Boolean, Column, Index, Integer, PickleType, String, UniqueConstraint, and_, func, or_
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import backref, relationship, synonym
@@ -165,20 +153,7 @@ class DagRun(Base, LoggingMixin):
:param session: database session
:type session: Session
"""
- DR = DagRun
-
- exec_date = func.cast(self.execution_date, DateTime)
-
- dr = (
- session.query(DR)
- .filter(
- DR.dag_id == self.dag_id,
- func.cast(DR.execution_date, DateTime) == exec_date,
- DR.run_id == self.run_id,
- )
- .one()
- )
-
+ dr = session.query(DagRun).filter(DagRun.dag_id == self.dag_id, DagRun.run_id == self.run_id).one()
self.id = dr.id
self.state = dr.state