You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/08 04:38:16 UTC

[GitHub] [airflow] uranusjr opened a new pull request, #22849: Allow using mapped upstream's aggregated XCom

uranusjr opened a new pull request, #22849:
URL: https://github.com/apache/airflow/pull/22849

   Fix #22833.
   
   This needs two changes. First, when the upstream pushes the return value to XCom, we need to identify that the pushed value is not used on its own, but only aggregated with other return values from other mapped task instances. Fortunately, this is actually the only possible case right now, since we have not implemented support for depending on individual return values from a mapped task (aka nested mapping). So we instead skip recording any TaskMap metadata from a mapped task to avoid the problem altogether.
   
   The second change is for when the downstream task is expanded. Since the task depends on the mapped upstream as a whole, we should not use TaskMap from the upstream (which corresponds to individual task instances, as mentioned above), but the XComs pushed by every instance of the mapped task. Again, since we don't nested mapping now, we can cut corners and simply check whether the upstream is mapped or not to decide what to do, and leave further logic to the future.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22849: Allow using mapped upstream's aggregated XCom

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22849:
URL: https://github.com/apache/airflow/pull/22849#discussion_r845968546


##########
airflow/models/taskinstance.py:
##########
@@ -295,6 +297,33 @@ def clear_task_instances(
                 dr.start_date = None
 
 
+class _LazyXComAccess(collections.abc.Sequence):
+    """Wrapper to lazily pull XCom with a sequence-like interface.
+
+    Note that since the session binded to the parent query may have died when we

Review Comment:
   ```suggestion
       Note that since the session bound to the parent query may have died when we
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22849: Allow using mapped upstream's aggregated XCom

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22849:
URL: https://github.com/apache/airflow/pull/22849#discussion_r846319944


##########
airflow/models/taskinstance.py:
##########
@@ -295,6 +299,58 @@ def clear_task_instances(
                 dr.start_date = None
 
 
+class _LazyXComAccessIterator(collections.abc.Iterator):
+    def __init__(self, cm: ContextManager[Query]):
+        self._cm = cm
+        self._it = iter(cm.__enter__())
+
+    def __del__(self):
+        self._cm.__exit__()
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        return XCom.deserialize_value(next(self._it))
+
+
+class _LazyXComAccess(collections.abc.Sequence):
+    """Wrapper to lazily pull XCom with a sequence-like interface.
+
+    Note that since the session bound to the parent query may have died when we
+    actually access the sequence's content, we must create a new fresh session
+    for every function call with ``with_session()``.
+    """
+
+    def __init__(self, query: Query):
+        self._q = query
+
+    def __len__(self):
+        with self._get_bound_query() as query:
+            return query.count()
+
+    def __iter__(self):
+        return _LazyXComAccessIterator(self._get_bound_query())

Review Comment:
   Oh nice, this means we can iterate over the list multiple times in a consuming task. Makes the docs easier to write.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #22849: Allow using mapped upstream's aggregated XCom

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #22849:
URL: https://github.com/apache/airflow/pull/22849#discussion_r846623188


##########
airflow/models/taskinstance.py:
##########
@@ -323,16 +323,19 @@ class _LazyXComAccess(collections.abc.Sequence):
     """Wrapper to lazily pull XCom with a sequence-like interface.
 
     Note that since the session bound to the parent query may have died when we
-    actually access the sequence's content, we must create a new fresh session
+    actually access the sequence's content, we must a new fresh session

Review Comment:
   Accidental?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #22849: Allow using mapped upstream's aggregated XCom

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #22849:
URL: https://github.com/apache/airflow/pull/22849#issuecomment-1092919212

   Yeah, MSSQL says N 😦 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #22849: Allow using mapped upstream's aggregated XCom

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #22849:
URL: https://github.com/apache/airflow/pull/22849#discussion_r846039122


##########
tests/models/test_taskinstance.py:
##########
@@ -1091,7 +1092,7 @@ def test_xcom_pull_mapped(self, dag_maker, session):
         assert ti_2.xcom_pull("task_1", map_indexes=1, session=session) == "b"
 
         joined = ti_2.xcom_pull("task_1", session=session)
-        assert iter(joined) is joined, "should be iterator"
+        assert isinstance(joined, collections.abc.Iterable), "should be iterator"

Review Comment:
   ```suggestion
           assert isinstance(joined, collections.abc.Iterable), "should be iterable"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb merged pull request #22849: Allow using mapped upstream's aggregated XCom

Posted by GitBox <gi...@apache.org>.
ashb merged PR #22849:
URL: https://github.com/apache/airflow/pull/22849


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #22849: Allow using mapped upstream's aggregated XCom

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #22849:
URL: https://github.com/apache/airflow/pull/22849#discussion_r846623285


##########
airflow/models/taskinstance.py:
##########
@@ -323,16 +323,19 @@ class _LazyXComAccess(collections.abc.Sequence):
     """Wrapper to lazily pull XCom with a sequence-like interface.
 
     Note that since the session bound to the parent query may have died when we
-    actually access the sequence's content, we must create a new fresh session
+    actually access the sequence's content, we must a new fresh session
     for every function call with ``with_session()``.
     """
 
     def __init__(self, query: Query):
         self._q = query
+        self._len = None
 
     def __len__(self):
-        with self._get_bound_query() as query:
-            return query.count()
+        if self._len is None:
+            with self._get_bound_query() as query:
+                self._len = query.count()
+        return self._len

Review Comment:
   Or we could `@cache` this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #22849: Allow using mapped upstream's aggregated XCom

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #22849:
URL: https://github.com/apache/airflow/pull/22849#discussion_r846333236


##########
airflow/models/taskinstance.py:
##########
@@ -295,6 +299,58 @@ def clear_task_instances(
                 dr.start_date = None
 
 
+class _LazyXComAccessIterator(collections.abc.Iterator):
+    def __init__(self, cm: ContextManager[Query]):
+        self._cm = cm
+        self._it = iter(cm.__enter__())
+
+    def __del__(self):
+        self._cm.__exit__()
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        return XCom.deserialize_value(next(self._it))
+
+
+class _LazyXComAccess(collections.abc.Sequence):
+    """Wrapper to lazily pull XCom with a sequence-like interface.
+
+    Note that since the session bound to the parent query may have died when we
+    actually access the sequence's content, we must create a new fresh session
+    for every function call with ``with_session()``.
+    """
+
+    def __init__(self, query: Query):
+        self._q = query
+
+    def __len__(self):
+        with self._get_bound_query() as query:
+            return query.count()
+
+    def __iter__(self):
+        return _LazyXComAccessIterator(self._get_bound_query())
+
+    def __getitem__(self, key):
+        if not isinstance(key, int):
+            raise ValueError("only support index access for now")
+        try:
+            with self._get_bound_query() as query:
+                r = query.offset(key).limit(1).one()
+        except NoResultFound:
+            raise IndexError(key) from None
+        return XCom.deserialize_value(r)

Review Comment:
   Just realised I forgot to handle negative indexes… maybe deal with that when someone complains.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22849: Allow using mapped upstream's aggregated XCom

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22849:
URL: https://github.com/apache/airflow/pull/22849#discussion_r846336340


##########
airflow/models/taskinstance.py:
##########
@@ -295,6 +299,58 @@ def clear_task_instances(
                 dr.start_date = None
 
 
+class _LazyXComAccessIterator(collections.abc.Iterator):
+    def __init__(self, cm: ContextManager[Query]):
+        self._cm = cm
+        self._it = iter(cm.__enter__())
+
+    def __del__(self):
+        self._cm.__exit__()
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        return XCom.deserialize_value(next(self._it))
+
+
+class _LazyXComAccess(collections.abc.Sequence):
+    """Wrapper to lazily pull XCom with a sequence-like interface.
+
+    Note that since the session bound to the parent query may have died when we
+    actually access the sequence's content, we must create a new fresh session
+    for every function call with ``with_session()``.
+    """
+
+    def __init__(self, query: Query):
+        self._q = query
+
+    def __len__(self):
+        with self._get_bound_query() as query:
+            return query.count()
+
+    def __iter__(self):
+        return _LazyXComAccessIterator(self._get_bound_query())
+
+    def __getitem__(self, key):
+        if not isinstance(key, int):
+            raise ValueError("only support index access for now")
+        try:
+            with self._get_bound_query() as query:
+                r = query.offset(key).limit(1).one()
+        except NoResultFound:
+            raise IndexError(key) from None
+        return XCom.deserialize_value(r)

Review Comment:
   😆 :



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #22849: Allow using mapped upstream's aggregated XCom

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #22849:
URL: https://github.com/apache/airflow/pull/22849#discussion_r846639474


##########
airflow/models/taskinstance.py:
##########
@@ -295,6 +299,71 @@ def clear_task_instances(
                 dr.start_date = None
 
 
+class _LazyXComAccessIterator(collections.abc.Iterator):
+    __slots__ = ['_cm', '_it']
+
+    def __init__(self, cm: ContextManager[Query]):
+        self._cm = cm
+        self._it = None
+
+    def __del__(self):
+        if self._it:
+            self._cm.__exit__(None, None, None)
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        if not self._it:
+            self._it = iter(self._cm.__enter__())
+        return XCom.deserialize_value(next(self._it))
+
+
+class _LazyXComAccess(collections.abc.Sequence):
+    """Wrapper to lazily pull XCom with a sequence-like interface.
+
+    Note that since the session bound to the parent query may have died when we
+    actually access the sequence's content, we must a new fresh session

Review Comment:
   ```suggestion
       actually access the sequence's content, we must create a new session
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #22849: Allow using mapped upstream's aggregated XCom

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #22849:
URL: https://github.com/apache/airflow/pull/22849#discussion_r846639395


##########
airflow/models/taskinstance.py:
##########
@@ -323,16 +323,19 @@ class _LazyXComAccess(collections.abc.Sequence):
     """Wrapper to lazily pull XCom with a sequence-like interface.
 
     Note that since the session bound to the parent query may have died when we
-    actually access the sequence's content, we must create a new fresh session
+    actually access the sequence's content, we must a new fresh session

Review Comment:
   I'd think so. We should toss "fresh" instead I'd think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #22849: Allow using mapped upstream's aggregated XCom

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #22849:
URL: https://github.com/apache/airflow/pull/22849#issuecomment-1092862805

   Hmmm, this might have MSSQL issues:
   
   Twice now it's failed with 
   > tests/models/test_dagrun.py::test_mapped_task_all_finish_before_downstream: sqlalchemy.exc.DBAPIError: (pyodbc.Error) ('HY000', '[HY000] [Microsoft][ODBC Driver 18 for SQL Server]Connection is busy with results for another command (0) (SQLExecDirectW)')


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org