You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/17 07:01:09 UTC

[GitHub] [airflow] uranusjr opened a new pull request, #25757: Fail task if mapping upstream fails

uranusjr opened a new pull request, #25757:
URL: https://github.com/apache/airflow/pull/25757

   We already mark the mapped task's unmapped ti to UPSTREAM_FAILED if the upstream fails with ALL_SUCCESS, so this simply replicates the behavior for all trigger rules.
   
   Fix #25698.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #25757: Fail task if mapping upstream fails

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25757:
URL: https://github.com/apache/airflow/pull/25757#discussion_r947523883


##########
airflow/models/mappedoperator.py:
##########
@@ -682,17 +701,21 @@ def expand_mapped_task(self, run_id: str, *, session: Session) -> Tuple[Sequence
             ti.refresh_from_task(self)  # session.merge() loses task information.
             all_expanded_tis.append(ti)
 
+        # Coerce the None case to 0 -- these two are almost treated identically,
+        # except the unmapped ti (if exists) is marked to different states.
+        total_expanded_ti_count = total_length or 0
+
         # Set to "REMOVED" any (old) TaskInstances with map indices greater
         # than the current map value
         session.query(TaskInstance).filter(
             TaskInstance.dag_id == self.dag_id,
             TaskInstance.task_id == self.task_id,
             TaskInstance.run_id == run_id,
-            TaskInstance.map_index >= total_length,
+            TaskInstance.map_index >= total_expanded_ti_count,
         ).update({TaskInstance.state: TaskInstanceState.REMOVED})
 
         session.flush()
-        return all_expanded_tis, total_length
+        return all_expanded_tis, total_expanded_ti_count - 1

Review Comment:
   I think the old code introduces an off-by-one error? The return value is described as “the maximum map_index” (and is used as such in BackfillJob), which should be one less than the total expand length, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #25757: Fail task if mapping upstream fails

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #25757:
URL: https://github.com/apache/airflow/pull/25757#discussion_r949973164


##########
airflow/models/mappedoperator.py:
##########
@@ -682,17 +701,21 @@ def expand_mapped_task(self, run_id: str, *, session: Session) -> Tuple[Sequence
             ti.refresh_from_task(self)  # session.merge() loses task information.
             all_expanded_tis.append(ti)
 
+        # Coerce the None case to 0 -- these two are almost treated identically,
+        # except the unmapped ti (if exists) is marked to different states.
+        total_expanded_ti_count = total_length or 0
+
         # Set to "REMOVED" any (old) TaskInstances with map indices greater
         # than the current map value
         session.query(TaskInstance).filter(
             TaskInstance.dag_id == self.dag_id,
             TaskInstance.task_id == self.task_id,
             TaskInstance.run_id == run_id,
-            TaskInstance.map_index >= total_length,
+            TaskInstance.map_index >= total_expanded_ti_count,
         ).update({TaskInstance.state: TaskInstanceState.REMOVED})
 
         session.flush()
-        return all_expanded_tis, total_length
+        return all_expanded_tis, total_expanded_ti_count - 1

Review Comment:
   Yup.
   
   ```python
                   def to_keep(key: TaskInstanceKey) -> bool:
                       if key.dag_id != node.dag_id or key.task_id != node.task_id or key.run_id != run_id:
                           # For another Dag/Task/Run -- don't remove
                           return True
                       return 0 <= key.map_index <= max_map_index
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr merged pull request #25757: Fail task if mapping upstream fails

Posted by GitBox <gi...@apache.org>.
uranusjr merged PR #25757:
URL: https://github.com/apache/airflow/pull/25757


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org