You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by je...@apache.org on 2022/04/13 18:26:47 UTC
[airflow] branch main updated: Fix trigger event payload is not persisted in db (#22944)
This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bab740c0a4 Fix trigger event payload is not persisted in db (#22944)
bab740c0a4 is described below
commit bab740c0a49b828401a8baf04eb297d083605ae8
Author: Pavan Sharma <96...@users.noreply.github.com>
AuthorDate: Wed Apr 13 23:56:40 2022 +0530
Fix trigger event payload is not persisted in db (#22944)
Co-authored-by: Kaxil Naik <ka...@gmail.com>
Co-authored-by: Ash Berlin-Taylor <as...@firemirror.com>
---
airflow/models/taskinstance.py | 3 ++-
tests/models/test_trigger.py | 3 +++
2 files changed, 5 insertions(+), 1 deletion(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 145512be8c..6c7526d6ea 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -69,6 +69,7 @@ from sqlalchemy import (
tuple_,
)
from sqlalchemy.ext.associationproxy import association_proxy
+from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.orm import reconstructor, relationship
from sqlalchemy.orm.attributes import NO_VALUE, set_committed_value
from sqlalchemy.orm.exc import NoResultFound
@@ -454,7 +455,7 @@ class TaskInstance(Base, LoggingMixin):
# The method to call next, and any extra arguments to pass to it.
# Usually used when resuming from DEFERRED.
next_method = Column(String(1000))
- next_kwargs = Column(ExtendedJSON)
+ next_kwargs = Column(MutableDict.as_mutable(ExtendedJSON))
# If adding new fields here then remember to add them to
# refresh_from_db() or they won't display in the UI correctly
diff --git a/tests/models/test_trigger.py b/tests/models/test_trigger.py
index 99cd71f767..7e5deb6b8d 100644
--- a/tests/models/test_trigger.py
+++ b/tests/models/test_trigger.py
@@ -101,6 +101,9 @@ def test_submit_event(session, create_task_instance):
session.commit()
# Call submit_event
Trigger.submit_event(trigger.id, TriggerEvent(42), session=session)
+ # commit changes made by submit event and expire all cache to read from db.
+ session.flush()
+ session.expunge_all()
# Check that the task instance is now scheduled
updated_task_instance = session.query(TaskInstance).one()
assert updated_task_instance.state == State.SCHEDULED