You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ur...@apache.org on 2022/04/06 17:50:15 UTC
[airflow] branch main updated: No need to load whole ti in current_state (#22764)
This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4eaf9bcddf No need to load whole ti in current_state (#22764)
4eaf9bcddf is described below
commit 4eaf9bcddfb370222b4386b02975974bb253f614
Author: Ping Zhang <pi...@umich.edu>
AuthorDate: Wed Apr 6 10:50:06 2022 -0700
No need to load whole ti in current_state (#22764)
Co-authored-by: Jed Cunningham <66...@users.noreply.github.com>
Co-authored-by: Tzu-ping Chung <ur...@gmail.com>
---
airflow/models/taskinstance.py | 11 +++--------
tests/models/test_taskinstance.py | 6 ++++++
2 files changed, 9 insertions(+), 8 deletions(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index b9a1ebc148..94397aac9a 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -723,20 +723,15 @@ class TaskInstance(Base, LoggingMixin):
:param session: SQLAlchemy ORM Session
"""
- ti = (
- session.query(TaskInstance)
+ return (
+ session.query(TaskInstance.state)
.filter(
TaskInstance.dag_id == self.dag_id,
TaskInstance.task_id == self.task_id,
TaskInstance.run_id == self.run_id,
)
- .all()
+ .scalar()
)
- if ti:
- state = ti[0].state
- else:
- state = None
- return state
@provide_session
def error(self, session=NEW_SESSION):
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 38be50b12d..68bd9b0951 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -187,6 +187,12 @@ class TestTaskInstance:
assert op3.start_date == DEFAULT_DATE + datetime.timedelta(days=1)
assert op3.end_date == DEFAULT_DATE + datetime.timedelta(days=9)
+ def test_current_state(self, create_task_instance):
+ ti = create_task_instance()
+ assert ti.current_state() is None
+ ti.run()
+ assert ti.current_state() == State.SUCCESS
+
def test_set_dag(self, dag_maker):
"""
Test assigning Operators to Dags, including deferred assignment