You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/04 09:49:33 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request, #25531: Fix reducing mapped length of a mapped task at runtime after a clear

ephraimbuddy opened a new pull request, #25531:
URL: https://github.com/apache/airflow/pull/25531

   The previous fix on task immutability after a run did not fix a case where the task was removed at runtime when the literal is dynamic. This PR addreses it
   
   There's still a case which I haven't figured out yet. Reducing the length thereby marking the task instance as removed and then increasing the length to equal the original length or more. This still leaves the removed task instance as removed
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #25531: Fix reducing mapped length of a mapped task at runtime after a clear

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #25531:
URL: https://github.com/apache/airflow/pull/25531#discussion_r937722973


##########
airflow/models/dagrun.py:
##########
@@ -1158,7 +1158,7 @@ def _create_task_instances(
             # TODO[HA]: We probably need to savepoint this so we can keep the transaction alive.
             session.rollback()
 
-    def _find_missing_task_indexes(
+    def _find_missing_task_indexes_or_mark_removed(

Review Comment:
   or simply `_repair_mapped_task_indexes`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25531: Fix reducing mapped length of a mapped task at runtime after a clear

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25531:
URL: https://github.com/apache/airflow/pull/25531#discussion_r937619814


##########
airflow/models/dagrun.py:
##########
@@ -1183,6 +1183,14 @@ def _find_missing_task_indexes(
             existing_indexes[task].append(ti.map_index)
             task.run_time_mapped_ti_count.cache_clear()  # type: ignore[attr-defined]
             new_length = task.run_time_mapped_ti_count(self.run_id, session=session) or 0
+
+            if ti.map_index >= new_length and new_length > 0:

Review Comment:
   Why do we need the `new_length > 0` check here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] norm commented on a diff in pull request #25531: Fix reducing mapped length of a mapped task at runtime after a clear

Posted by GitBox <gi...@apache.org>.
norm commented on code in PR #25531:
URL: https://github.com/apache/airflow/pull/25531#discussion_r937590697


##########
airflow/models/dagrun.py:
##########
@@ -1158,7 +1158,7 @@ def _create_task_instances(
             # TODO[HA]: We probably need to savepoint this so we can keep the transaction alive.
             session.rollback()
 
-    def _find_missing_task_indexes(
+    def _find_missing_task_indexes_or_mark_removed(

Review Comment:
   I'm not sure I can put into words why, but I don't like this function name now. It sounds like it does two *different* actions, even though it's basically "fix up if TI length has changed". And because I don't have the words, I also can't suggest a better name. Helping! :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #25531: Fix reducing mapped length of a mapped task at runtime after a clear

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #25531:
URL: https://github.com/apache/airflow/pull/25531#discussion_r937721623


##########
airflow/models/dagrun.py:
##########
@@ -1158,7 +1158,7 @@ def _create_task_instances(
             # TODO[HA]: We probably need to savepoint this so we can keep the transaction alive.
             session.rollback()
 
-    def _find_missing_task_indexes(
+    def _find_missing_task_indexes_or_mark_removed(

Review Comment:
   How about `_check_for_new_or_removed_task_indexes` or `_find_new_task_indexes_and_repair_removed`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25531: Fix reducing mapped length of a mapped task at runtime after a clear

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25531:
URL: https://github.com/apache/airflow/pull/25531#discussion_r937594589


##########
airflow/models/dagrun.py:
##########
@@ -1158,7 +1158,7 @@ def _create_task_instances(
             # TODO[HA]: We probably need to savepoint this so we can keep the transaction alive.
             session.rollback()
 
-    def _find_missing_task_indexes(
+    def _find_missing_task_indexes_or_mark_removed(

Review Comment:
   "repair" maybe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] norm commented on a diff in pull request #25531: Fix reducing mapped length of a mapped task at runtime after a clear

Posted by GitBox <gi...@apache.org>.
norm commented on code in PR #25531:
URL: https://github.com/apache/airflow/pull/25531#discussion_r937597044


##########
tests/models/test_dagrun.py:
##########
@@ -1227,6 +1227,67 @@ def task_2(arg2):
     ]
 
 
+def test_mapped_literal_length_reduction_at_runtime_adds_removed_state(dag_maker, session):
+    """Test that when the length of mapped literal increases at runtime, additional ti is added"""

Review Comment:
   Comment needs updating.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #25531: Fix reducing mapped length of a mapped task at runtime after a clear

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #25531:
URL: https://github.com/apache/airflow/pull/25531#discussion_r937721908


##########
airflow/models/dagrun.py:
##########
@@ -1183,6 +1183,14 @@ def _find_missing_task_indexes(
             existing_indexes[task].append(ti.map_index)
             task.run_time_mapped_ti_count.cache_clear()  # type: ignore[attr-defined]
             new_length = task.run_time_mapped_ti_count(self.run_id, session=session) or 0
+
+            if ti.map_index >= new_length and new_length > 0:

Review Comment:
   Yeah, not needed. I was taking clue from _check_for_removed_or_restored_tasks but I see now that it's not needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] norm commented on a diff in pull request #25531: Fix reducing mapped length of a mapped task at runtime after a clear

Posted by GitBox <gi...@apache.org>.
norm commented on code in PR #25531:
URL: https://github.com/apache/airflow/pull/25531#discussion_r937739177


##########
airflow/models/dagrun.py:
##########
@@ -1158,7 +1158,7 @@ def _create_task_instances(
             # TODO[HA]: We probably need to savepoint this so we can keep the transaction alive.
             session.rollback()
 
-    def _find_missing_task_indexes(
+    def _find_missing_task_indexes_or_mark_removed(

Review Comment:
   I like the last one, but perhaps `revise` rather than `repair`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ephraimbuddy merged pull request #25531: Fix reducing mapped length of a mapped task at runtime after a clear

Posted by GitBox <gi...@apache.org>.
ephraimbuddy merged PR #25531:
URL: https://github.com/apache/airflow/pull/25531


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org