You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/21 16:27:06 UTC

[GitHub] [airflow] ashb opened a new pull request, #23153: Fix Clear+upstream/downstream

ashb opened a new pull request, #23153:
URL: https://github.com/apache/airflow/pull/23153

   When we added clearing individual mapped tasks, we unfortunately broke
   the up/down stream feature
   
   This was because when passing task_id/task_id+map_index down that
   limited it to _just_ that task_id.
   
   So we need to change it to also support the up/downstream we need to add
   those tasks to the list we pass on, meaning that we have to support
   task_id and (task_id,map_index) tuples in the same `task_id` list.
   
   This almost certainly needs tests adding!
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragement file, named `{pr_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham commented on a diff in pull request #23153: Fix Clear+upstream/downstream

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on code in PR #23153:
URL: https://github.com/apache/airflow/pull/23153#discussion_r855383369


##########
airflow/models/dag.py:
##########
@@ -1439,19 +1439,34 @@ def _get_task_instances(
                     (TaskInstance.dag_id == dag.dag_id) & TaskInstance.task_id.in_(dag.task_ids)
                 )
             tis = tis.filter(or_(*conditions))
-        else:
+        elif self.partial:
             tis = tis.filter(TaskInstance.dag_id == self.dag_id, TaskInstance.task_id.in_(self.task_ids))
+        else:
+            tis = tis.filter(TaskInstance.dag_id == self.dag_id)
         if run_id:
             tis = tis.filter(TaskInstance.run_id == run_id)
         if start_date:
             tis = tis.filter(DagRun.execution_date >= start_date)
 
+        def _task_id_map_filter(val):
+            # Compute a filter  for TI.task_id and TI.map_index based on input values

Review Comment:
   ```suggestion
               # Compute a filter for TI.task_id and TI.map_index based on input values
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #23153: Fix TaskInstance actions with upstream/downstream

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #23153:
URL: https://github.com/apache/airflow/pull/23153#discussion_r855447486


##########
airflow/models/taskinstance.py:
##########
@@ -228,14 +229,14 @@ def clear_task_instances(
             ti.external_executor_id = None
             session.merge(ti)
 
-        task_id_by_key[ti.dag_id][ti.run_id][ti.try_number].add(ti.task_id)
+        task_id_by_key[ti.dag_id][ti.run_id][ti.map_index][ti.try_number].add(ti.task_id)

Review Comment:
   This mapping is _so_ weird.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #23153: Fix TaskInstance actions with upstream/downstream

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #23153:
URL: https://github.com/apache/airflow/pull/23153#issuecomment-1106030605

   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #23153: Fix TaskInstance actions with upstream/downstream

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #23153:
URL: https://github.com/apache/airflow/pull/23153#discussion_r855453990


##########
airflow/models/taskinstance.py:
##########
@@ -2529,6 +2536,28 @@ def filter_for_tis(tis: Iterable[Union["TaskInstance", TaskInstanceKey]]) -> Opt
             (ti.key.primary for ti in tis),
         )
 
+    @classmethod
+    def filter_for_task_id_map_index_lists(cls, vals):
+        """
+        Build an SQLAlchemy filter for a list where each element can contain
+        whether a task_id, or a tuple of (task_id,map_index)
+
+        :meta private:
+        """
+        # Compute a filter for TI.task_id and TI.map_index based on input values
+        # For each item, it will either be a task_id, or (task_id, map_index)
+        task_id_only = list(filter(lambda v: isinstance(v, str), vals))
+        with_map_index = list(filter(lambda v: not isinstance(v, str), vals))
+        filters = []
+
+        if task_id_only:
+            filters.append(cls.task_id.in_(task_id_only))
+        if with_map_index:
+            filters.append(
+                tuple_in_condition((cls.task_id, cls.map_index), with_map_index),
+            )
+        return or_(*filters) if len(filters) > 1 else filters[0]

Review Comment:
   In my head this function should never result in 0 filters. I should assert that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #23153: Fix TaskInstance actions with upstream/downstream

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #23153:
URL: https://github.com/apache/airflow/pull/23153#issuecomment-1106088728

   > Only part I can’t make sense of is the dag_id-run_id-try_number-map_index lookup. I’ll take your word they are correct.
   
   Yeah that'd hard to follow, I agree - as the comment hints it was to (try) and build a more efficient query when working on huge dags.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bbovenzi commented on pull request #23153: Fix Clear+upstream/downstream

Posted by GitBox <gi...@apache.org>.
bbovenzi commented on PR #23153:
URL: https://github.com/apache/airflow/pull/23153#issuecomment-1105464257

   I won't comment on python code as its not my forté. But, testing with the UI looks correct.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] jedcunningham merged pull request #23153: Fix TaskInstance actions with upstream/downstream

Posted by GitBox <gi...@apache.org>.
jedcunningham merged PR #23153:
URL: https://github.com/apache/airflow/pull/23153


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #23153: Fix Clear+upstream/downstream

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #23153:
URL: https://github.com/apache/airflow/pull/23153#issuecomment-1105468916

   @bbovenzi TBH given how fried my brain is this week your python is probably better than mine at the moment!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on pull request #23153: Fix TaskInstance actions with upstream/downstream

Posted by GitBox <gi...@apache.org>.
uranusjr commented on PR #23153:
URL: https://github.com/apache/airflow/pull/23153#issuecomment-1106526363

   One failure left
   
   ```
   tests/api/common/test_mark_tasks.py::TestMarkTasks::test_mark_tasks_subdag
   ```
   
   (MySQL and MSSQL are green because they skip this test. The logic is likely wrong everywhere.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #23153: Fix Clear+upstream/downstream

Posted by GitBox <gi...@apache.org>.
ashb commented on PR #23153:
URL: https://github.com/apache/airflow/pull/23153#issuecomment-1105443436

   I am no longer convinced I even know what good code looks like having written like 4 hacky PRs later. Ideas/kicks greatly appreciated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #23153: Fix TaskInstance actions with upstream/downstream

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #23153:
URL: https://github.com/apache/airflow/pull/23153#discussion_r855449371


##########
airflow/models/taskinstance.py:
##########
@@ -2529,6 +2536,28 @@ def filter_for_tis(tis: Iterable[Union["TaskInstance", TaskInstanceKey]]) -> Opt
             (ti.key.primary for ti in tis),
         )
 
+    @classmethod
+    def filter_for_task_id_map_index_lists(cls, vals):
+        """
+        Build an SQLAlchemy filter for a list where each element can contain
+        whether a task_id, or a tuple of (task_id,map_index)
+
+        :meta private:
+        """
+        # Compute a filter for TI.task_id and TI.map_index based on input values
+        # For each item, it will either be a task_id, or (task_id, map_index)
+        task_id_only = list(filter(lambda v: isinstance(v, str), vals))
+        with_map_index = list(filter(lambda v: not isinstance(v, str), vals))
+        filters = []
+
+        if task_id_only:
+            filters.append(cls.task_id.in_(task_id_only))
+        if with_map_index:
+            filters.append(
+                tuple_in_condition((cls.task_id, cls.map_index), with_map_index),
+            )
+        return or_(*filters) if len(filters) > 1 else filters[0]

Review Comment:
   ```suggestion
           if not filters:
               return true()
           elif len(filters) == 1:
               return filters[0]
           return or_(*filters)
   ```
   
   Calling `or_()` without any arguments is deprecated https://docs.sqlalchemy.org/en/14/core/sqlelement.html#sqlalchemy.sql.expression.or_



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org