You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/12 10:21:07 UTC

[GitHub] [airflow] iw-pavan opened a new pull request, #22944: fix trigger event payload is not persisted in db

iw-pavan opened a new pull request, #22944:
URL: https://github.com/apache/airflow/pull/22944

   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   closes: #22942 
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on pull request #22944: Fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on PR #22944:
URL: https://github.com/apache/airflow/pull/22944#issuecomment-1098354632

   Merging. The helm failure isn't related and is flaky.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22944: fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22944:
URL: https://github.com/apache/airflow/pull/22944#discussion_r848801697


##########
tests/models/test_trigger.py:
##########
@@ -101,6 +101,9 @@ def test_submit_event(session, create_task_instance):
     session.commit()
     # Call submit_event
     Trigger.submit_event(trigger.id, TriggerEvent(42), session=session)
+    # commit changes made by submit event and expire all cache to read from db.
+    session.commit()
+    session.expire_all()

Review Comment:
   ```suggestion
       session.flush()
       session.expunge_all()
   ```
   
   Should be enough I think? (Edited)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #22944: fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #22944:
URL: https://github.com/apache/airflow/pull/22944#discussion_r848799528


##########
airflow/models/trigger.py:
##########
@@ -115,7 +115,7 @@ def submit_event(cls, trigger_id, event, session=None):
             TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
         ):
             # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
+            next_kwargs = dict(task_instance.next_kwargs) if task_instance.next_kwargs else {}

Review Comment:
   `ExtendedJSON` is also used in `TaskMap.keys` and `DbCallbackRequest.callback_data` so this might suffer from same issue if they mutate the dicts too.
   
   https://github.com/apache/airflow/blob/152b646564c8eff1239f29e91aec5e0f89b28353/airflow/models/db_callback_request.py#L37
   https://github.com/apache/airflow/blob/152b646564c8eff1239f29e91aec5e0f89b28353/airflow/models/taskmap.py#L61
   
   cc @uranusjr @ashb 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22944: fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22944:
URL: https://github.com/apache/airflow/pull/22944#discussion_r848798475


##########
airflow/models/trigger.py:
##########
@@ -115,7 +115,7 @@ def submit_event(cls, trigger_id, event, session=None):
             TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
         ):
             # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
+            next_kwargs = dict(task_instance.next_kwargs) if task_instance.next_kwargs else {}

Review Comment:
   This is only a problem when you mutate a structure (dict, list etc) in place and is sort of a limitation of Python -- nothing is called on the object to know it's changed (without doing _very_ expensive checks all the time)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22944: fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22944:
URL: https://github.com/apache/airflow/pull/22944#discussion_r848804924


##########
airflow/models/trigger.py:
##########
@@ -115,7 +115,7 @@ def submit_event(cls, trigger_id, event, session=None):
             TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
         ):
             # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
+            next_kwargs = dict(task_instance.next_kwargs) if task_instance.next_kwargs else {}

Review Comment:
   Other uses are fine -- this is the only one where we mutate a row in place. Others are create only/set new dict.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham merged pull request #22944: Fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
jedcunningham merged PR #22944:
URL: https://github.com/apache/airflow/pull/22944


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #22944: fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #22944:
URL: https://github.com/apache/airflow/pull/22944#discussion_r848796206


##########
airflow/models/trigger.py:
##########
@@ -115,7 +115,7 @@ def submit_event(cls, trigger_id, event, session=None):
             TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
         ):
             # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
+            next_kwargs = dict(task_instance.next_kwargs) if task_instance.next_kwargs else {}

Review Comment:
   :( SqlAlchemy 



##########
airflow/models/trigger.py:
##########
@@ -115,7 +115,7 @@ def submit_event(cls, trigger_id, event, session=None):
             TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
         ):
             # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
+            next_kwargs = dict(task_instance.next_kwargs) if task_instance.next_kwargs else {}

Review Comment:
   :( SqlAlchemy  😭 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22944: fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22944:
URL: https://github.com/apache/airflow/pull/22944#discussion_r848801310


##########
airflow/models/trigger.py:
##########
@@ -115,7 +115,7 @@ def submit_event(cls, trigger_id, event, session=None):
             TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
         ):
             # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
+            next_kwargs = dict(task_instance.next_kwargs) if task_instance.next_kwargs else {}

Review Comment:
   (but in this case doing what we have in this PR may be the better option)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #22944: fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #22944:
URL: https://github.com/apache/airflow/pull/22944#discussion_r848810173


##########
airflow/models/trigger.py:
##########
@@ -115,7 +115,7 @@ def submit_event(cls, trigger_id, event, session=None):
             TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
         ):
             # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
+            next_kwargs = dict(task_instance.next_kwargs) if task_instance.next_kwargs else {}

Review Comment:
   in that, I propose using the diff I have above to be explicit instead of using `dict` as it is in the current PR



##########
airflow/models/trigger.py:
##########
@@ -115,7 +115,7 @@ def submit_event(cls, trigger_id, event, session=None):
             TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
         ):
             # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
+            next_kwargs = dict(task_instance.next_kwargs) if task_instance.next_kwargs else {}

Review Comment:
   Cool if that's the case, I propose using the diff I have above to be explicit instead of using `dict` as it is in the current PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on pull request #22944: Fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on PR #22944:
URL: https://github.com/apache/airflow/pull/22944#issuecomment-1098356160

   Thanks @iw-pavan! Congrats on your first commit 🎉


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #22944: fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #22944:
URL: https://github.com/apache/airflow/pull/22944#discussion_r848435130


##########
airflow/models/trigger.py:
##########
@@ -115,7 +115,7 @@ def submit_event(cls, trigger_id, event, session=None):
             TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
         ):
             # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
+            next_kwargs = dict(task_instance.next_kwargs) if task_instance.next_kwargs else {}

Review Comment:
   Seems that is indeed unfortunate way how some objects are treated by SQLAlchemy. Basically changes to object are not seen as "changes" unles we explicitly replace the object or mark it as changed.
   
   But likely there is a better way of handling it:
   
   https://docs.sqlalchemy.org/en/14/orm/extensions/mutable.html
   
   Seems that we could set the object as Mutable and it would solve the problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #22944: Fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #22944:
URL: https://github.com/apache/airflow/pull/22944#discussion_r849536238


##########
airflow/models/trigger.py:
##########
@@ -115,7 +115,7 @@ def submit_event(cls, trigger_id, event, session=None):
             TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
         ):
             # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
+            next_kwargs = dict(task_instance.next_kwargs) if task_instance.next_kwargs else {}

Review Comment:
   ```suggestion
               next_kwargs = task_instance.next_kwargs or {}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #22944: fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #22944:
URL: https://github.com/apache/airflow/pull/22944#discussion_r848436073


##########
airflow/models/trigger.py:
##########
@@ -115,7 +115,7 @@ def submit_event(cls, trigger_id, event, session=None):
             TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
         ):
             # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
+            next_kwargs = dict(task_instance.next_kwargs) if task_instance.next_kwargs else {}

Review Comment:
   (and it prevent similar problem from happening in other places where the dict might be mutated).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #22944: fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #22944:
URL: https://github.com/apache/airflow/pull/22944#discussion_r848371419


##########
airflow/models/trigger.py:
##########
@@ -115,7 +115,7 @@ def submit_event(cls, trigger_id, event, session=None):
             TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
         ):
             # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
+            next_kwargs = dict(task_instance.next_kwargs) if task_instance.next_kwargs else {}

Review Comment:
   Can you give a reproducible case, I am not sure how this fixes any issue?
   
   If `task_instance.next_kwargs` is `None`, `next_kwargs` is set to `{}`, else it will be a dict. Do you see a case where it won't be a dict?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #22944: fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22944:
URL: https://github.com/apache/airflow/pull/22944#issuecomment-1096560942

   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on pull request #22944: Fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on PR #22944:
URL: https://github.com/apache/airflow/pull/22944#issuecomment-1098355405

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #22944: fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #22944:
URL: https://github.com/apache/airflow/pull/22944#discussion_r848809112


##########
tests/models/test_trigger.py:
##########
@@ -101,6 +101,9 @@ def test_submit_event(session, create_task_instance):
     session.commit()
     # Call submit_event
     Trigger.submit_event(trigger.id, TriggerEvent(42), session=session)
+    # commit changes made by submit event and expire all cache to read from db.
+    session.commit()
+    session.expire_all()

Review Comment:
   We will atleast need the following, flush won't mark the state as out-of-date, so I think `session.expire_all` is fine tbh:
   
   ```
       session.flush()
       session.expire(task_instance)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #22944: Fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #22944:
URL: https://github.com/apache/airflow/pull/22944#discussion_r849535841


##########
airflow/models/taskinstance.py:
##########
@@ -452,8 +453,7 @@ class TaskInstance(Base, LoggingMixin):
 
     # The method to call next, and any extra arguments to pass to it.
     # Usually used when resuming from DEFERRED.
-    next_method = Column(String(1000))
-    next_kwargs = Column(ExtendedJSON)
+    next_kwargs = Column(MutableDict.as_mutable(ExtendedJSON))

Review Comment:
   ```suggestion
       next_method = Column(String(1000))
       next_kwargs = Column(MutableDict.as_mutable(ExtendedJSON))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] iw-pavan commented on a diff in pull request #22944: fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
iw-pavan commented on code in PR #22944:
URL: https://github.com/apache/airflow/pull/22944#discussion_r848412646


##########
airflow/models/trigger.py:
##########
@@ -115,7 +115,7 @@ def submit_event(cls, trigger_id, event, session=None):
             TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
         ):
             # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
+            next_kwargs = dict(task_instance.next_kwargs) if task_instance.next_kwargs else {}

Review Comment:
   @kaxil  sqlalchemy doesn't update next_kwargs column if same dict object is modified,
   I've updated test_trigger.py's test_submit_event to capture this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22944: fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22944:
URL: https://github.com/apache/airflow/pull/22944#discussion_r848811892


##########
tests/models/test_trigger.py:
##########
@@ -101,6 +101,9 @@ def test_submit_event(session, create_task_instance):
     session.commit()
     # Call submit_event
     Trigger.submit_event(trigger.id, TriggerEvent(42), session=session)
+    # commit changes made by submit event and expire all cache to read from db.
+    session.commit()
+    session.expire_all()

Review Comment:
   Better -- I have a pet peave of avoiding `commits()` where possible in tests as I have a dream of one day running most tests in an auto-rollingback transaction for speedier setup/teardown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #22944: fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
potiuk commented on code in PR #22944:
URL: https://github.com/apache/airflow/pull/22944#discussion_r848436073


##########
airflow/models/trigger.py:
##########
@@ -115,7 +115,7 @@ def submit_event(cls, trigger_id, event, session=None):
             TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
         ):
             # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
+            next_kwargs = dict(task_instance.next_kwargs) if task_instance.next_kwargs else {}

Review Comment:
   (and it prevent similar problem from happening in other places).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #22944: fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #22944:
URL: https://github.com/apache/airflow/pull/22944#discussion_r848803343


##########
airflow/models/trigger.py:
##########
@@ -115,7 +115,7 @@ def submit_event(cls, trigger_id, event, session=None):
             TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
         ):
             # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
+            next_kwargs = dict(task_instance.next_kwargs) if task_instance.next_kwargs else {}

Review Comment:
   We can apply this change too
   
   ```diff
   diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
   index 5b53916ac..fc4003b4f 100644
   --- a/airflow/models/taskinstance.py
   +++ b/airflow/models/taskinstance.py
   @@ -69,6 +69,7 @@ from sqlalchemy import (
        tuple_,
    )
    from sqlalchemy.ext.associationproxy import association_proxy
   +from sqlalchemy.ext.mutable import MutableDict
    from sqlalchemy.orm import reconstructor, relationship
    from sqlalchemy.orm.attributes import NO_VALUE, set_committed_value
    from sqlalchemy.orm.exc import NoResultFound
   @@ -453,7 +454,7 @@ class TaskInstance(Base, LoggingMixin):
        # The method to call next, and any extra arguments to pass to it.
        # Usually used when resuming from DEFERRED.
        next_method = Column(String(1000))
   -    next_kwargs = Column(ExtendedJSON)
   +    next_kwargs = Column(MutableDict.as_mutable(ExtendedJSON))
   
        # If adding new fields here then remember to add them to
        # refresh_from_db() or they won't display in the UI correctly
   diff --git a/tests/models/test_trigger.py b/tests/models/test_trigger.py
   index 99cd71f76..ed6f635a9 100644
   --- a/tests/models/test_trigger.py
   +++ b/tests/models/test_trigger.py
   @@ -101,6 +101,9 @@ def test_submit_event(session, create_task_instance):
        session.commit()
        # Call submit_event
        Trigger.submit_event(trigger.id, TriggerEvent(42), session=session)
   +    # commit changes made by submit event and expire all cache to read from db.
   +    session.commit()
   +    session.expire_all()
        # Check that the task instance is now scheduled
        updated_task_instance = session.query(TaskInstance).one()
        assert updated_task_instance.state == State.SCHEDULED
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22944: fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22944:
URL: https://github.com/apache/airflow/pull/22944#discussion_r848801697


##########
tests/models/test_trigger.py:
##########
@@ -101,6 +101,9 @@ def test_submit_event(session, create_task_instance):
     session.commit()
     # Call submit_event
     Trigger.submit_event(trigger.id, TriggerEvent(42), session=session)
+    # commit changes made by submit event and expire all cache to read from db.
+    session.commit()
+    session.expire_all()

Review Comment:
   ```suggestion
       session.flush()
   ```
   
   Should be enough I think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22944: fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22944:
URL: https://github.com/apache/airflow/pull/22944#discussion_r848800304


##########
airflow/models/trigger.py:
##########
@@ -115,7 +115,7 @@ def submit_event(cls, trigger_id, event, session=None):
             TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
         ):
             # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
+            next_kwargs = dict(task_instance.next_kwargs) if task_instance.next_kwargs else {}

Review Comment:
   https://github.com/edelooff/sqlalchemy-json



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #22944: fix trigger event payload is not persisted in db

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #22944:
URL: https://github.com/apache/airflow/pull/22944#discussion_r848882550


##########
airflow/models/trigger.py:
##########
@@ -115,7 +115,7 @@ def submit_event(cls, trigger_id, event, session=None):
             TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
         ):
             # Add the event's payload into the kwargs for the task
-            next_kwargs = task_instance.next_kwargs or {}
+            next_kwargs = dict(task_instance.next_kwargs) if task_instance.next_kwargs else {}

Review Comment:
   Hmm is this the same cause to https://github.com/apache/airflow/issues/22245?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org