You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/03/29 10:10:49 UTC

[GitHub] [airflow] uranusjr opened a new pull request #22583: Consider mapped upstream in TriggerRuleDep

uranusjr opened a new pull request #22583:
URL: https://github.com/apache/airflow/pull/22583


   This is needed to ensure all mapped upstream task instances are depended on by a downstream, instead of just one of them signifying the entire task as "done".
   
   Does not work yet. This depends on correct task-unmapping behaviour implemented in #22396.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #22583: Consider mapped upstream in TriggerRuleDep

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #22583:
URL: https://github.com/apache/airflow/pull/22583#discussion_r839822913



##########
File path: airflow/models/taskinstance.py
##########
@@ -819,17 +819,25 @@ def refresh_from_task(self, task: "Operator", pool_override=None):
         self.operator = task.task_type
 
     @provide_session
-    def clear_xcom_data(self, session=NEW_SESSION):
-        """
-        Clears all XCom data from the database for the task instance
+    def clear_xcom_data(self, session: Session = NEW_SESSION):

Review comment:
       this reminds me... thinking about custom xcom backends ... there should be a probably be a "clear data" hook that can be implemented in an xcom backend to purge the external data without having to override this method... of course that's another PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #22583: Consider mapped upstream in TriggerRuleDep

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #22583:
URL: https://github.com/apache/airflow/pull/22583#discussion_r840369842



##########
File path: airflow/ti_deps/deps/trigger_rule_dep.py
##########
@@ -83,6 +87,31 @@ def _get_dep_statuses(self, ti, session, dep_context: DepContext):
             session=session,
         )
 
+    @staticmethod
+    def _count_upstreams(ti: "TaskInstance", *, session: "Session"):
+        from airflow.models.taskinstance import TaskInstance
+
+        # Optimization: Don't need to hit the database if no upstreams are mapped.
+        upstream_task_ids = ti.task.upstream_task_ids
+        if ti.task.dag and not any(ti.task.dag.get_task(tid).is_mapped for tid in upstream_task_ids):

Review comment:
       ```suggestion
           if ti.task.has_dag() and not any(node.is_mapped for node in ti.task.upstream_list):
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #22583: Consider mapped upstream in TriggerRuleDep

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #22583:
URL: https://github.com/apache/airflow/pull/22583#discussion_r839250923



##########
File path: airflow/models/taskinstance.py
##########
@@ -2339,7 +2350,15 @@ def xcom_pull(
                 .order_by(None)
                 .order_by(XCom.map_index.asc())
             )
-            return (XCom.deserialize_value(r) for r in query)
+
+            def iter_xcom_values(query):
+                # The session passed to xcom_pull() may die before this is
+                # iterated through, so we need to bind to a new session.
+                with create_session() as session:

Review comment:
       The return value from `double` tasks are pushed into XCom. This is similar to
   
   ```python
   @task
   def double(v):
       return v * 2
   
   @task
   def consume(v):
       print(v)
   
   consume(v=double(v=2))
   ```
   
   where `double` would push `4` to XCom.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #22583: Consider mapped upstream in TriggerRuleDep

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #22583:
URL: https://github.com/apache/airflow/pull/22583#discussion_r839209079



##########
File path: airflow/models/taskinstance.py
##########
@@ -2339,7 +2350,15 @@ def xcom_pull(
                 .order_by(None)
                 .order_by(XCom.map_index.asc())
             )
-            return (XCom.deserialize_value(r) for r in query)
+
+            def iter_xcom_values(query):
+                # The session passed to xcom_pull() may die before this is
+                # iterated through, so we need to bind to a new session.
+                with create_session() as session:

Review comment:
       Because `session` in the outer scope can die before we iterate through this, e.g.
   
   ```python
   @task
   def double(v):
       return v * 2
   
   @task
   def consume(v):
       resolved = list(v)  # The original session used to fetch XComs is dead by now.
   
   consume(v=double.expand(v=[1, 2]))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #22583: Consider mapped upstream in TriggerRuleDep

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #22583:
URL: https://github.com/apache/airflow/pull/22583#discussion_r839820307



##########
File path: airflow/models/taskinstance.py
##########
@@ -2339,7 +2350,15 @@ def xcom_pull(
                 .order_by(None)
                 .order_by(XCom.map_index.asc())
             )
-            return (XCom.deserialize_value(r) for r in query)
+
+            def iter_xcom_values(query):
+                # The session passed to xcom_pull() may die before this is
+                # iterated through, so we need to bind to a new session.
+                with create_session() as session:

Review comment:
       yeah i understand that, what i meant was, the example is just dag code, so the data movement is not happening in what you've written, right?  it only happens once you run the tasks.  i presume that to understand what's going with the sessions i need to understand more details about where / when / how this method is actually invoked. but i'll leave that for another day.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #22583: Consider mapped upstream in TriggerRuleDep

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #22583:
URL: https://github.com/apache/airflow/pull/22583#discussion_r839508909



##########
File path: airflow/models/taskinstance.py
##########
@@ -2339,7 +2350,15 @@ def xcom_pull(
                 .order_by(None)
                 .order_by(XCom.map_index.asc())
             )
-            return (XCom.deserialize_value(r) for r in query)
+
+            def iter_xcom_values(query):
+                # The session passed to xcom_pull() may die before this is
+                # iterated through, so we need to bind to a new session.
+                with create_session() as session:
+                    for r in query.with_session(session):
+                        yield XCom.deserialize_value(r)

Review comment:
       We shouldn't use `create_session` here as that has the side effect of doing an expunge_all when this block exits.
   
   We just want a `session = settings.Session()`
   
   ```suggestion
                   for r in query.with_session(settings.Session()):
                       yield XCom.deserialize_value(r)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #22583: Consider mapped upstream in TriggerRuleDep

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #22583:
URL: https://github.com/apache/airflow/pull/22583#discussion_r840280489



##########
File path: airflow/models/taskinstance.py
##########
@@ -819,17 +819,25 @@ def refresh_from_task(self, task: "Operator", pool_override=None):
         self.operator = task.task_type
 
     @provide_session
-    def clear_xcom_data(self, session=NEW_SESSION):
-        """
-        Clears all XCom data from the database for the task instance
+    def clear_xcom_data(self, session: Session = NEW_SESSION):

Review comment:
       Is `XCom.clear()` not enough?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #22583: Consider mapped upstream in TriggerRuleDep

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #22583:
URL: https://github.com/apache/airflow/pull/22583#discussion_r839209079



##########
File path: airflow/models/taskinstance.py
##########
@@ -2339,7 +2350,15 @@ def xcom_pull(
                 .order_by(None)
                 .order_by(XCom.map_index.asc())
             )
-            return (XCom.deserialize_value(r) for r in query)
+
+            def iter_xcom_values(query):
+                # The session passed to xcom_pull() may die before this is
+                # iterated through, so we need to bind to a new session.
+                with create_session() as session:

Review comment:
       Because `session` in the outer scope can die before we iterate through this, e.g.
   
   ```python
   @task
   def double(v):
       return v * 2
   
   @task
   def consume(v):
       resolved = list(v)  # The original session in xcom_pull is dead by now.
   
   consume(v=double.expand(v=[1, 2]))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #22583: Consider mapped upstream in TriggerRuleDep

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #22583:
URL: https://github.com/apache/airflow/pull/22583#issuecomment-1085750452


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #22583: Consider mapped upstream in TriggerRuleDep

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #22583:
URL: https://github.com/apache/airflow/pull/22583#discussion_r839209079



##########
File path: airflow/models/taskinstance.py
##########
@@ -2339,7 +2350,15 @@ def xcom_pull(
                 .order_by(None)
                 .order_by(XCom.map_index.asc())
             )
-            return (XCom.deserialize_value(r) for r in query)
+
+            def iter_xcom_values(query):
+                # The session passed to xcom_pull() may die before this is
+                # iterated through, so we need to bind to a new session.
+                with create_session() as session:

Review comment:
       Because `session` in the outer scope can die before we iterate through this, e.g.
   
   ```python
   @task
   def double(v):
       return v * 2
   
   @task
   def consume(v):
       resolved = list(v)  # The original session used to fetch XComs is dead by now.
   
   
   consume(v=double.expand(v=[1, 2]))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #22583: Consider mapped upstream in TriggerRuleDep

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #22583:
URL: https://github.com/apache/airflow/pull/22583#discussion_r840419545



##########
File path: airflow/ti_deps/deps/trigger_rule_dep.py
##########
@@ -83,6 +87,31 @@ def _get_dep_statuses(self, ti, session, dep_context: DepContext):
             session=session,
         )
 
+    @staticmethod
+    def _count_upstreams(ti: "TaskInstance", *, session: "Session"):
+        from airflow.models.taskinstance import TaskInstance
+
+        # Optimization: Don't need to hit the database if no upstreams are mapped.
+        upstream_task_ids = ti.task.upstream_task_ids
+        if ti.task.dag and not any(ti.task.dag.get_task(tid).is_mapped for tid in upstream_task_ids):

Review comment:
       This would be slightly slower (an extra list creation) but I guess it doesn’t trump readability in practical scenarios.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #22583: Consider mapped upstream in TriggerRuleDep

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #22583:
URL: https://github.com/apache/airflow/pull/22583#discussion_r840420320



##########
File path: airflow/ti_deps/deps/trigger_rule_dep.py
##########
@@ -83,6 +87,31 @@ def _get_dep_statuses(self, ti, session, dep_context: DepContext):
             session=session,
         )
 
+    @staticmethod
+    def _count_upstreams(ti: "TaskInstance", *, session: "Session"):
+        from airflow.models.taskinstance import TaskInstance
+
+        # Optimization: Don't need to hit the database if no upstreams are mapped.
+        upstream_task_ids = ti.task.upstream_task_ids
+        if ti.task.dag and not any(ti.task.dag.get_task(tid).is_mapped for tid in upstream_task_ids):

Review comment:
       This doesn’t work well with Mypy because `upstream_list` returns a list of DAGNode, not AbstractOperator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #22583: Consider mapped upstream in TriggerRuleDep

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #22583:
URL: https://github.com/apache/airflow/pull/22583#discussion_r837485487



##########
File path: airflow/models/mappedoperator.py
##########
@@ -542,7 +542,7 @@ def expand_mapped_task(self, run_id: str, *, session: Session) -> Sequence["Task
             unmapped_ti.map_index = 0
             state = unmapped_ti.state
             self.log.debug("Updated in place to become %s", unmapped_ti)
-            ret.append(unmapped_ti)
+            ret.append(session.merge(unmapped_ti))

Review comment:
       This shouldn't be required as the `unmapped_ti` is still attached to `session`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #22583: Consider mapped upstream in TriggerRuleDep

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #22583:
URL: https://github.com/apache/airflow/pull/22583#discussion_r839209587



##########
File path: airflow/models/taskinstance.py
##########
@@ -819,17 +819,25 @@ def refresh_from_task(self, task: "Operator", pool_override=None):
         self.operator = task.task_type
 
     @provide_session
-    def clear_xcom_data(self, session=NEW_SESSION):
-        """
-        Clears all XCom data from the database for the task instance
+    def clear_xcom_data(self, session: Session = NEW_SESSION):
+        """Clear all XCom data from the database for the task instance.
+
+        If the task is unmapped, all XComs matching this task ID in the same DAG
+        run are removed. If the task is mapped, only the one with matching map
+        index is removed.
 
         :param session: SQLAlchemy ORM Session
         """
         self.log.debug("Clearing XCom data")
+        if self.map_index < 0:
+            map_index: Optional[int] = None

Review comment:
       This is for clearing stale XComs; if the task instance is unmapped, there shouldn’t be mapped XComs bound to the task, so we use None to clear them all just in case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #22583: Consider mapped upstream in TriggerRuleDep

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #22583:
URL: https://github.com/apache/airflow/pull/22583#discussion_r839170638



##########
File path: airflow/models/taskinstance.py
##########
@@ -2339,7 +2350,15 @@ def xcom_pull(
                 .order_by(None)
                 .order_by(XCom.map_index.asc())
             )
-            return (XCom.deserialize_value(r) for r in query)
+
+            def iter_xcom_values(query):
+                # The session passed to xcom_pull() may die before this is
+                # iterated through, so we need to bind to a new session.
+                with create_session() as session:

Review comment:
       why create a new one?

##########
File path: airflow/models/taskinstance.py
##########
@@ -819,17 +819,25 @@ def refresh_from_task(self, task: "Operator", pool_override=None):
         self.operator = task.task_type
 
     @provide_session
-    def clear_xcom_data(self, session=NEW_SESSION):
-        """
-        Clears all XCom data from the database for the task instance
+    def clear_xcom_data(self, session: Session = NEW_SESSION):
+        """Clear all XCom data from the database for the task instance.
+
+        If the task is unmapped, all XComs matching this task ID in the same DAG
+        run are removed. If the task is mapped, only the one with matching map
+        index is removed.
 
         :param session: SQLAlchemy ORM Session
         """
         self.log.debug("Clearing XCom data")
+        if self.map_index < 0:
+            map_index: Optional[int] = None

Review comment:
       why convert -1 to None?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #22583: Consider mapped upstream in TriggerRuleDep

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #22583:
URL: https://github.com/apache/airflow/pull/22583#discussion_r839219281



##########
File path: airflow/models/taskinstance.py
##########
@@ -2339,7 +2350,15 @@ def xcom_pull(
                 .order_by(None)
                 .order_by(XCom.map_index.asc())
             )
-            return (XCom.deserialize_value(r) for r in query)
+
+            def iter_xcom_values(query):
+                # The session passed to xcom_pull() may die before this is
+                # iterated through, so we need to bind to a new session.
+                with create_session() as session:

Review comment:
       I'll take your word for it.  But that example seems quite odd... because xcom_pull isn't actually going to be invoked in what you've written, because no tasks have been run, so there's nothing to pull ...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #22583: Consider mapped upstream in TriggerRuleDep

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #22583:
URL: https://github.com/apache/airflow/pull/22583#discussion_r838813708



##########
File path: airflow/models/taskinstance.py
##########
@@ -819,17 +819,25 @@ def refresh_from_task(self, task: "Operator", pool_override=None):
         self.operator = task.task_type
 
     @provide_session
-    def clear_xcom_data(self, session=NEW_SESSION):
-        """
-        Clears all XCom data from the database for the task instance
+    def clear_xcom_data(self, session: Session = NEW_SESSION):
+        """Clear all XCom data from the database for the task instance.
+
+        If the task is unmapped, all XComs matching this task ID in the same DAG
+        run are removed. If the task is mapped, only the one with matching map
+        index is removed.
 
         :param session: SQLAlchemy ORM Session
         """
         self.log.debug("Clearing XCom data")
+        if self.map_index < 0:
+            map_index: Optional[int] = None
+        else:
+            map_index = self.map_index
         XCom.clear(
             dag_id=self.dag_id,
             task_id=self.task_id,
             run_id=self.run_id,
+            map_index=map_index,
             session=session,
         )

Review comment:
       Somewhat surprising we didn’t catch this earlier!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org