You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/11 14:39:40 UTC
[GitHub] [airflow] ashb opened a new pull request, #22909: Fix bug where dynamically mapped tasks got set to REMOVED
ashb opened a new pull request, #22909:
URL: https://github.com/apache/airflow/pull/22909
This mostly affects backfil/`airflow tasks test`.
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
---
**^ Add meaningful description above**
Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847715879
##########
tests/models/test_dagrun.py:
##########
@@ -1036,14 +1036,17 @@ def test_mapped_mixed__literal_not_expanded_at_create(dag_maker, session):
mapped = MockOperator.partial(task_id='task_2').expand(arg1=literal, arg2=XComArg(task))
dr = dag_maker.create_dagrun()
- indices = (
- session.query(TI.map_index)
+ query = (
+ session.query(TI.map_index, TI.state)
.filter_by(task_id=mapped.task_id, dag_id=mapped.dag_id, run_id=dr.run_id)
.order_by(TI.map_index)
- .all()
)
- assert indices == [(-1,)]
+ assert query.all() == [(-1, None)]
+
+ # Verify_integrity shouldn't change the result now that the TIs exist
+ dr.verify_integrity()
+ assert query.all() == [(-1, None)]
Review Comment:
It has a flush inside it already https://github.com/apache/airflow/blob/9c9272de3a32d30d831fac1272a07244d5fb8e0b/airflow/models/dagrun.py#L903
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847410390
##########
tests/models/test_dagrun.py:
##########
@@ -1036,14 +1036,17 @@ def test_mapped_mixed__literal_not_expanded_at_create(dag_maker, session):
mapped = MockOperator.partial(task_id='task_2').expand(arg1=literal, arg2=XComArg(task))
dr = dag_maker.create_dagrun()
- indices = (
- session.query(TI.map_index)
+ query = (
+ session.query(TI.map_index, TI.state)
.filter_by(task_id=mapped.task_id, dag_id=mapped.dag_id, run_id=dr.run_id)
.order_by(TI.map_index)
- .all()
)
- assert indices == [(-1,)]
+ assert query.all() == [(-1, None)]
+
+ # Verify_integrity shouldn't change the restul now that the TIs exist
Review Comment:
```suggestion
# Verify_integrity shouldn't change the result now that the TIs exist
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ephraimbuddy commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847411155
##########
tests/models/test_dagrun.py:
##########
@@ -1036,14 +1036,17 @@ def test_mapped_mixed__literal_not_expanded_at_create(dag_maker, session):
mapped = MockOperator.partial(task_id='task_2').expand(arg1=literal, arg2=XComArg(task))
dr = dag_maker.create_dagrun()
- indices = (
- session.query(TI.map_index)
+ query = (
+ session.query(TI.map_index, TI.state)
.filter_by(task_id=mapped.task_id, dag_id=mapped.dag_id, run_id=dr.run_id)
.order_by(TI.map_index)
- .all()
)
- assert indices == [(-1,)]
+ assert query.all() == [(-1, None)]
+
+ # Verify_integrity shouldn't change the restul now that the TIs exist
Review Comment:
```suggestion
# Verify_integrity shouldn't change the result now that the TIs exist
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847645333
##########
airflow/serialization/serialized_objects.py:
##########
@@ -1081,12 +1081,12 @@ def deserialize_dag(cls, encoded_dag: Dict[str, Any]) -> 'SerializedDAG':
setattr(task.subdag, 'parent_dag', dag)
if isinstance(task, MappedOperator):
- for d in (task.mapped_kwargs, task.partial_kwargs):
- for k, v in d.items():
- if not isinstance(v, _XComRef):
- continue
+ expansion_kwargs = task._get_expansion_kwargs()
Review Comment:
This was a bug that I don't know how we didn't hit before? The serialized task as a result had these objects left as _XcomRef, which as it is a named tuple has a `__len__` of 2 😱
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847689404
##########
airflow/models/mappedoperator.py:
##########
@@ -559,7 +562,12 @@ def _get_map_lengths(self, run_id: str, *, session: Session) -> Dict[str, int]:
for task_id, length in xcom_query:
for mapped_arg_name in mapped_dep_keys[task_id]:
map_lengths[mapped_arg_name] += length
+ return map_lengths
+ def _resolve_map_lengths(self, run_id: str, *, session: Session) -> Dict[str, int]:
+ """Return dict of argument name to map length, or throw if some are not resolvable"""
+ expansion_kwargs = self._get_expansion_kwargs()
+ map_lengths = self._get_map_lengths(run_id, session=session)
Review Comment:
We should document `_get_map_lengths` to explain how it’s different from `_resolve_map_lengths`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847420061
##########
airflow/models/dagrun.py:
##########
@@ -838,14 +838,24 @@ def verify_integrity(self, session: Session = NEW_SESSION):
ti.state = State.REMOVED
continue
- if task.is_mapped:
- task = cast("MappedOperator", task)
- num_mapped_tis = task.parse_time_mapped_ti_count
- # Check if the number of mapped literals has changed and we need to mark this TI as removed
- if not num_mapped_tis or ti.map_index >= num_mapped_tis:
+ if not task.is_mapped:
+ continue
+ task = cast("MappedOperator", task)
+ num_mapped_tis = task.parse_time_mapped_ti_count
+ # Check if the number of mapped literals has changed and we need to mark this TI as removed
+ if num_mapped_tis is not None:
+ if ti.map_index >= num_mapped_tis:
+ self.log.debug(
Review Comment:
This was already covered from before, I've just reworked the conditional https://github.com/apache/airflow/blob/f662b7de8c5e61f640f150d4e68bde21dcdd09b4/tests/models/test_dagrun.py#L953-L986
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
ashb commented on PR #22909:
URL: https://github.com/apache/airflow/pull/22909#issuecomment-1095440612
This turned out a bit more complex :(
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847413388
##########
airflow/models/dagrun.py:
##########
@@ -838,14 +838,24 @@ def verify_integrity(self, session: Session = NEW_SESSION):
ti.state = State.REMOVED
continue
- if task.is_mapped:
- task = cast("MappedOperator", task)
- num_mapped_tis = task.parse_time_mapped_ti_count
- # Check if the number of mapped literals has changed and we need to mark this TI as removed
- if not num_mapped_tis or ti.map_index >= num_mapped_tis:
+ if not task.is_mapped:
+ continue
+ task = cast("MappedOperator", task)
Review Comment:
nit: Should we extract L843-858 to a separate function? This might help test that function separately too
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847650700
##########
airflow/serialization/serialized_objects.py:
##########
@@ -1081,12 +1081,12 @@ def deserialize_dag(cls, encoded_dag: Dict[str, Any]) -> 'SerializedDAG':
setattr(task.subdag, 'parent_dag', dag)
if isinstance(task, MappedOperator):
- for d in (task.mapped_kwargs, task.partial_kwargs):
- for k, v in d.items():
- if not isinstance(v, _XComRef):
- continue
+ expansion_kwargs = task._get_expansion_kwargs()
Review Comment:
Because those attributes _shouldn’t_ be accessed by the scheduler anyway? So this likely only caused the task to be rendered incorrectly in UI.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847719456
##########
tests/models/test_dagrun.py:
##########
@@ -1036,14 +1036,17 @@ def test_mapped_mixed__literal_not_expanded_at_create(dag_maker, session):
mapped = MockOperator.partial(task_id='task_2').expand(arg1=literal, arg2=XComArg(task))
dr = dag_maker.create_dagrun()
- indices = (
- session.query(TI.map_index)
+ query = (
+ session.query(TI.map_index, TI.state)
.filter_by(task_id=mapped.task_id, dag_id=mapped.dag_id, run_id=dr.run_id)
.order_by(TI.map_index)
- .all()
)
- assert indices == [(-1,)]
+ assert query.all() == [(-1, None)]
+
+ # Verify_integrity shouldn't change the result now that the TIs exist
+ dr.verify_integrity()
+ assert query.all() == [(-1, None)]
Review Comment:
Oh yes. Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847687365
##########
airflow/decorators/base.py:
##########
@@ -406,6 +406,9 @@ class DecoratedMappedOperator(MappedOperator):
# in partial_kwargs, and MappedOperator prevents duplication.
mapped_op_kwargs: Dict[str, "Mappable"]
+ def __hash__(self):
Review Comment:
This was needed for the `@cache` decorator.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
ashb commented on PR #22909:
URL: https://github.com/apache/airflow/pull/22909#issuecomment-1095137488
Fixing a bug, so that we can then go and fix https://github.com/apache/airflow/pull/22904
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847653426
##########
airflow/serialization/serialized_objects.py:
##########
@@ -1081,12 +1081,12 @@ def deserialize_dag(cls, encoded_dag: Dict[str, Any]) -> 'SerializedDAG':
setattr(task.subdag, 'parent_dag', dag)
if isinstance(task, MappedOperator):
- for d in (task.mapped_kwargs, task.partial_kwargs):
- for k, v in d.items():
- if not isinstance(v, _XComRef):
- continue
+ expansion_kwargs = task._get_expansion_kwargs()
Review Comment:
Ah, yeah that makes sense. When the LocalTaskJob checks this it has the real DAG.
It's just my change here now makes it be accessed via verify_integrity
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb merged pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
ashb merged PR #22909:
URL: https://github.com/apache/airflow/pull/22909
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847446426
##########
airflow/models/dagrun.py:
##########
@@ -838,14 +838,24 @@ def verify_integrity(self, session: Session = NEW_SESSION):
ti.state = State.REMOVED
continue
- if task.is_mapped:
- task = cast("MappedOperator", task)
- num_mapped_tis = task.parse_time_mapped_ti_count
- # Check if the number of mapped literals has changed and we need to mark this TI as removed
- if not num_mapped_tis or ti.map_index >= num_mapped_tis:
+ if not task.is_mapped:
+ continue
+ task = cast("MappedOperator", task)
+ num_mapped_tis = task.parse_time_mapped_ti_count
+ # Check if the number of mapped literals has changed and we need to mark this TI as removed
+ if num_mapped_tis is not None:
+ if ti.map_index >= num_mapped_tis:
+ self.log.debug(
+ "Removing task '%s' as the map_index is longer than the literal list (%s)",
+ ti,
+ num_mapped_tis,
+ )
ti.state = State.REMOVED
elif ti.map_index < 0:
+ self.log.debug("Removing the unmapped TI '%s' as the mapping can now be performed", ti)
ti.state = State.REMOVED
+ # TODO: What if it is _now_ None, but wasn't before? How do we detect that? And how do we detect
Review Comment:
Turns out need to fix this -- it's already tested.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22909:
URL: https://github.com/apache/airflow/pull/22909#issuecomment-1095147614
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] jedcunningham commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847411405
##########
tests/models/test_dagrun.py:
##########
@@ -1036,14 +1036,17 @@ def test_mapped_mixed__literal_not_expanded_at_create(dag_maker, session):
mapped = MockOperator.partial(task_id='task_2').expand(arg1=literal, arg2=XComArg(task))
dr = dag_maker.create_dagrun()
- indices = (
- session.query(TI.map_index)
+ query = (
+ session.query(TI.map_index, TI.state)
.filter_by(task_id=mapped.task_id, dag_id=mapped.dag_id, run_id=dr.run_id)
.order_by(TI.map_index)
- .all()
)
- assert indices == [(-1,)]
+ assert query.all() == [(-1, None)]
+
+ # Verify_integrity shouldn't change the restul now that the TIs exist
Review Comment:
```suggestion
# Verify_integrity shouldn't change the result now that the TIs exist
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] jedcunningham commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847691219
##########
airflow/models/dagrun.py:
##########
@@ -697,8 +697,9 @@ def _get_ready_tis(
old_states[schedulable.key] = old_state
continue
- # Expansion of last resort! This is ideally handled in the mini-scheduler in LocalTaskJob, but if
- # for any reason it wasn't, we need to expand it now
+ # This is called in two places: First (and ideally) is from the mini scheduler at the end of
+ # LocalTaskJob, and then as an "expansion of last resort" this is also called from the scheduler
Review Comment:
```suggestion
# This is called in two places: First (and ideally) is in the mini scheduler at the end of
# LocalTaskJob, and then as an "expansion of last resort" in the scheduler
```
nit
##########
airflow/models/dagrun.py:
##########
@@ -838,14 +839,47 @@ def verify_integrity(self, session: Session = NEW_SESSION):
ti.state = State.REMOVED
continue
- if task.is_mapped:
- task = cast("MappedOperator", task)
- num_mapped_tis = task.parse_time_mapped_ti_count
- # Check if the number of mapped literals has changed and we need to mark this TI as removed
- if not num_mapped_tis or ti.map_index >= num_mapped_tis:
+ if not task.is_mapped:
+ continue
+ task = cast("MappedOperator", task)
+ num_mapped_tis = task.parse_time_mapped_ti_count
+ # Check if the number of mapped literals has changed and we need to mark this TI as removed
+ if num_mapped_tis is not None:
+ if ti.map_index >= num_mapped_tis:
+ self.log.debug(
+ "Removing task '%s' as the map_index is longer than the literal mapping list (%s)",
+ ti,
+ num_mapped_tis,
+ )
ti.state = State.REMOVED
elif ti.map_index < 0:
+ self.log.debug("Removing the unmapped TI '%s' as the mapping can now be performed", ti)
ti.state = State.REMOVED
+ else:
+ self.log.info("Restoring mapped task '%s'", ti)
+ Stats.incr(f"task_restored_to_dag.{dag.dag_id}", 1, 1)
+ ti.state = State.NONE
+ else:
+ # What if it is _now_ dynamically mapped, but wasn't before?
+ total_length = task.run_time_mapped_ti_count(self.run_id, session=session)
+
+ if total_length is None:
+ # Not all upstreams finished, so we can't tell what should be here. Remove everying
Review Comment:
```suggestion
# Not all upstreams finished, so we can't tell what should be here. Remove everything.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847411580
##########
airflow/models/dagrun.py:
##########
@@ -838,14 +838,24 @@ def verify_integrity(self, session: Session = NEW_SESSION):
ti.state = State.REMOVED
continue
- if task.is_mapped:
- task = cast("MappedOperator", task)
- num_mapped_tis = task.parse_time_mapped_ti_count
- # Check if the number of mapped literals has changed and we need to mark this TI as removed
- if not num_mapped_tis or ti.map_index >= num_mapped_tis:
+ if not task.is_mapped:
+ continue
+ task = cast("MappedOperator", task)
+ num_mapped_tis = task.parse_time_mapped_ti_count
+ # Check if the number of mapped literals has changed and we need to mark this TI as removed
+ if num_mapped_tis is not None:
+ if ti.map_index >= num_mapped_tis:
+ self.log.debug(
Review Comment:
Should we add a test case for this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847700088
##########
airflow/models/mappedoperator.py:
##########
@@ -559,7 +562,12 @@ def _get_map_lengths(self, run_id: str, *, session: Session) -> Dict[str, int]:
for task_id, length in xcom_query:
for mapped_arg_name in mapped_dep_keys[task_id]:
map_lengths[mapped_arg_name] += length
+ return map_lengths
+ def _resolve_map_lengths(self, run_id: str, *, session: Session) -> Dict[str, int]:
+ """Return dict of argument name to map length, or throw if some are not resolvable"""
+ expansion_kwargs = self._get_expansion_kwargs()
+ map_lengths = self._get_map_lengths(run_id, session=session)
Review Comment:
Done:
> """Return dict of argument name to map length.
>
> If any arguments are not known right now (upstream task not finished) they will not be present in the
> dict.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] ashb commented on pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
ashb commented on PR #22909:
URL: https://github.com/apache/airflow/pull/22909#issuecomment-1095441003
And I'm not too happy with how I had to structure the conditionals. Suggestions appreciated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847713698
##########
tests/models/test_dagrun.py:
##########
@@ -1036,14 +1036,17 @@ def test_mapped_mixed__literal_not_expanded_at_create(dag_maker, session):
mapped = MockOperator.partial(task_id='task_2').expand(arg1=literal, arg2=XComArg(task))
dr = dag_maker.create_dagrun()
- indices = (
- session.query(TI.map_index)
+ query = (
+ session.query(TI.map_index, TI.state)
.filter_by(task_id=mapped.task_id, dag_id=mapped.dag_id, run_id=dr.run_id)
.order_by(TI.map_index)
- .all()
)
- assert indices == [(-1,)]
+ assert query.all() == [(-1, None)]
+
+ # Verify_integrity shouldn't change the result now that the TIs exist
+ dr.verify_integrity()
+ assert query.all() == [(-1, None)]
Review Comment:
We probably need to do this?
```suggestion
# Verify_integrity shouldn't change the result now that the TIs exist
dr.verify_integrity(session=session)
session.flush()
assert query.all() == [(-1, None)]
```
Otherwise `verify_integrity` would use a new session, and the query result won’t change regardless due to transaction isolation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED
Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847717386
##########
tests/models/test_dagrun.py:
##########
@@ -1036,14 +1036,17 @@ def test_mapped_mixed__literal_not_expanded_at_create(dag_maker, session):
mapped = MockOperator.partial(task_id='task_2').expand(arg1=literal, arg2=XComArg(task))
dr = dag_maker.create_dagrun()
- indices = (
- session.query(TI.map_index)
+ query = (
+ session.query(TI.map_index, TI.state)
.filter_by(task_id=mapped.task_id, dag_id=mapped.dag_id, run_id=dr.run_id)
.order_by(TI.map_index)
- .all()
)
- assert indices == [(-1,)]
+ assert query.all() == [(-1, None)]
+
+ # Verify_integrity shouldn't change the result now that the TIs exist
+ dr.verify_integrity()
+ assert query.all() == [(-1, None)]
Review Comment:
Passing in `session` should be enough then
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org