You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/01/28 14:26:28 UTC

[GitHub] [airflow] yuqian90 opened a new pull request #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator

yuqian90 opened a new pull request #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator
URL: https://github.com/apache/airflow/pull/7276
 
 
   This PR fixes the following issue:
   
   If a task is skipped by `BranchPythonOperator` and the user then clears the skipped task, it'll execute. 
   
   The implementation is inspired by the author of [this blog](https://blog.diffractive.io/2018/08/07/replacement-shortcircuitoperator-for-airflow/).
   
   ---
   Issue link: WILL BE INSERTED BY [boring-cyborg](https://github.com/kaxil/boring-cyborg)
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [ ] Description above provides context of the change
   - [ ] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN = JIRA ID<sup>*</sup>
   - [ ] Unit tests coverage for changes (not needed for documentation changes)
   - [ ] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [ ] Relevant documentation is updated including usage instructions.
   - [ ] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   <sup>*</sup> For document-only changes commit message can start with `[AIRFLOW-XXXX]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 closed pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 closed pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379327408
 
 

 ##########
 File path: airflow/ti_deps/deps/branch_dep.py
 ##########
 @@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+
+
+class BranchDep(BaseTIDep):
+    """
+    Determines if any of the task's direct upstream relatives have decided this task should
+    be skipped.
+    """
+    NAME = "Branch Rule"
+    IGNORABLE = True
+    IS_TASK_DEP = True
+
+    def _get_dep_statuses(self, ti, session, dep_context=None):
+        from airflow.models.skipmixin import SkipMixin
+        from airflow.utils.state import State
+
+        upstream = ti.task.get_direct_relatives(upstream=True)
+
+        finished_tasks = dep_context.finished_tasks if dep_context else None
+        if finished_tasks is None:
+            # this is for the strange feature of running tasks without dag_run
+            finished_tasks = ti.task.dag.get_task_instances(
 
 Review comment:
   This new method should also cache/memoize the result back in to `dep_context.finished_tasks`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r378784642
 
 

 ##########
 File path: airflow/ti_deps/deps/branch_dep.py
 ##########
 @@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+
+
+class BranchDep(BaseTIDep):
+    """
+    Determines if any of the task's direct upstream relatives have decided this task should
+    be skipped.
+    """
+    NAME = "Branch Rule"
+    IGNORABLE = True
+    IS_TASK_DEP = True
+
+    def _get_dep_statuses(self, ti, session, dep_context=None):
+        from airflow.models.skipmixin import SkipMixin
+        from airflow.utils.state import State
+
+        upstream = ti.task.get_direct_relatives(upstream=True)
+
+        finished_tasks = dep_context.finished_tasks if dep_context else None
+        if finished_tasks is None:
+            # this is for the strange feature of running tasks without dag_run
+            finished_tasks = ti.task.dag.get_task_instances(
+                start_date=ti.execution_date,
+                end_date=ti.execution_date,
+                state=State.finished() + [State.UPSTREAM_FAILED],
+                session=session)
+
+        finished_task_ids = {t.task_id for t in finished_tasks}
+
+        for parent in upstream:
+            if isinstance(parent, SkipMixin):
+                xcom_value = ti.xcom_pull(task_ids=parent.task_id)
 
 Review comment:
   Waait what. I'm really not sure about this -- I don't see from the problem you've describe why we have to touch XCom at all.
   
   Can you explain this in more detail?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379344105
 
 

 ##########
 File path: tests/operators/test_python.py
 ##########
 @@ -396,7 +397,7 @@ def test_without_dag_run(self):
                 elif ti.task_id == 'branch_2':
                     self.assertEqual(ti.state, State.SKIPPED)
                 else:
-                    raise Exception
+                    raise ValueError(f'Invalid task id {ti.task_id} found!')
 
 Review comment:
   I was changing this from `Exception` to `ValueError` in one of the new test cases I added following previous suggestions from reviewers. So I replaced the same usage in this test module too. I will revert this change if you prefer leaving the existing code untouched.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r381942853
 
 

 ##########
 File path: airflow/models/skipmixin.py
 ##########
 @@ -65,13 +68,53 @@ def skip(self, dag_run, execution_date, tasks, session=None):
                 ti.end_date = now
                 session.merge(ti)
 
-            session.commit()
+    @provide_session
+    def skip(
+        self, dag_run, execution_date, tasks, session=None,
+    ):
+        """
+        Sets tasks instances to skipped from the same dag run.
+        If `task_id` is a valid attribute, store the list of skipped task IDs to XCom
 
 Review comment:
   ```suggestion
   
           If this instance has a `task_id` attribute, store the list of skipped task IDs to XCom
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r380122906
 
 

 ##########
 File path: airflow/ti_deps/deps/not_previously_skipped_dep.py
 ##########
 @@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+
+
+class NotPreviouslySkippedDep(BaseTIDep):
+    """
+    Determines if any of the task's direct upstream relatives have decided this task should
+    be skipped.
+    """
+
+    NAME = "Not Previously Skipped"
+    IGNORABLE = True
+    IS_TASK_DEP = True
+
+    def _get_dep_statuses(self, ti, session, dep_context):  # pylint: disable=signature-differs
+        from airflow.models.skipmixin import SkipMixin, XCOM_SKIPMIXIN_KEY
+        from airflow.utils.state import State
+
+        upstream = ti.task.get_direct_relatives(upstream=True)
+
+        finished_tasks = dep_context.ensure_finished_tasks(ti.task.dag, ti.execution_date, session)
+
+        finished_task_ids = {t.task_id for t in finished_tasks}
+
+        for parent in upstream:
+            if isinstance(parent, SkipMixin):
+                if parent.task_id not in finished_task_ids:
+                    # This can happen if the parent task has not yet run.
+                    continue
+
+                prev_result = ti.xcom_pull(task_ids=parent.task_id, key=XCOM_SKIPMIXIN_KEY)
+
+                if prev_result is None:
+                    # This can happen if the parent task has not yet run.
+                    continue
+
+                should_skip = False
+                if "followed" in prev_result and ti.task_id not in prev_result["followed"]:
+                    # Skip any tasks that are not in "followed"
+                    should_skip = True
+                elif "skipped" in prev_result and ti.task_id in prev_result["skipped"]:
+                    # Skip any tasks that are in "skipped"
+                    should_skip = True
+
+                if should_skip:
+                    # If the parent SkipMixin has run, and the XCom result stored indicates this
+                    # ti should be skipped, set ti.state to SKIPPED and fail the rule so that the
+                    # ti does not execute.
+                    ti.set_state(State.SKIPPED, session)
+                    yield self._failing_status(
+                        reason="Skipping {} because of previous XCom result from "
 
 Review comment:
   We don't normally include the current task ID in these messages (as they are displayed in the context of a task anyway.
   
   ```suggestion
                           reason="Skipping since parent task {} decided this task should be skipped"
   ```
   might do it?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] feluelle commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r378244769
 
 

 ##########
 File path: tests/operators/test_python.py
 ##########
 @@ -541,6 +542,62 @@ def test_xcom_push(self):
                 self.assertEqual(
                     ti.xcom_pull(task_ids='make_choice'), 'branch_1')
 
+    def test_clear_skipped_downstream_task(self):
+        """
+        After a downstream task is skipped by BranchPythonOperator, clearing the skipped task
+        should not cause it to be executed.
+        """
+        branch_op = BranchPythonOperator(task_id='make_choice',
+                                         dag=self.dag,
+                                         python_callable=lambda: 'branch_1')
+        branches = [self.branch_1, self.branch_2]
+        branch_op >> branches
+        self.dag.clear()
+
+        dr = self.dag.create_dagrun(
+            run_id="manual__",
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING
+        )
+
+        branch_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        for task in branches:
+            task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        tis = dr.get_task_instances()
+        for ti in tis:
+            if ti.task_id == 'make_choice':
+                self.assertEqual(ti.state, State.SUCCESS)
+            elif ti.task_id == 'branch_1':
+                self.assertEqual(ti.state, State.SUCCESS)
+            elif ti.task_id == 'branch_2':
+                self.assertEqual(ti.state, State.SKIPPED)
+            else:
+                raise Exception
 
 Review comment:
   and in the other cases as well?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7276: [AIRFLOW-5391] Add a new dependency rule to evaluate branching result

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7276: [AIRFLOW-5391] Add a new dependency rule to evaluate branching result
URL: https://github.com/apache/airflow/pull/7276#issuecomment-581100536
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=h1) Report
   > Merging [#7276](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/288a50a0c95e14a81763cee214e55dde2feccf42?src=pr&el=desc) will **decrease** coverage by `0.27%`.
   > The diff coverage is `95.34%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7276/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7276      +/-   ##
   ==========================================
   - Coverage   86.59%   86.31%   -0.28%     
   ==========================================
     Files         871      872       +1     
     Lines       40660    40701      +41     
   ==========================================
   - Hits        35209    35131      -78     
   - Misses       5451     5570     +119
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/models/baseoperator.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvYmFzZW9wZXJhdG9yLnB5) | `96.52% <100%> (ø)` | :arrow_up: |
   | [airflow/models/skipmixin.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2tpcG1peGluLnB5) | `97.67% <100%> (+0.11%)` | :arrow_up: |
   | [airflow/operators/python.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvcHl0aG9uLnB5) | `95.39% <100%> (+0.25%)` | :arrow_up: |
   | [airflow/operators/branch\_operator.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvYnJhbmNoX29wZXJhdG9yLnB5) | `86.66% <87.5%> (-0.84%)` | :arrow_down: |
   | [airflow/ti\_deps/deps/branch\_dep.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy90aV9kZXBzL2RlcHMvYnJhbmNoX2RlcC5weQ==) | `95.65% <95.65%> (ø)` | |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `47.18% <0%> (-45.08%)` | :arrow_down: |
   | [airflow/kubernetes/refresh\_config.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3JlZnJlc2hfY29uZmlnLnB5) | `50.98% <0%> (-23.53%)` | :arrow_down: |
   | [...viders/cncf/kubernetes/operators/kubernetes\_pod.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvY25jZi9rdWJlcm5ldGVzL29wZXJhdG9ycy9rdWJlcm5ldGVzX3BvZC5weQ==) | `70.21% <0%> (-23.41%)` | :arrow_down: |
   | ... and [3 more](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=footer). Last update [288a50a...68383ae](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379470137
 
 

 ##########
 File path: airflow/ti_deps/deps/branch_dep.py
 ##########
 @@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+
+
+class BranchDep(BaseTIDep):
+    """
+    Determines if any of the task's direct upstream relatives have decided this task should
+    be skipped.
+    """
+    NAME = "Branch Rule"
+    IGNORABLE = True
+    IS_TASK_DEP = True
+
+    def _get_dep_statuses(self, ti, session, dep_context=None):
+        from airflow.models.skipmixin import SkipMixin
+        from airflow.utils.state import State
+
+        upstream = ti.task.get_direct_relatives(upstream=True)
 
 Review comment:
   Ah great! I wonder if that behaviour is tested already or not

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379649516
 
 

 ##########
 File path: airflow/models/baseoperator.py
 ##########
 @@ -650,6 +651,7 @@ def deps(self) -> Set[BaseTIDep]:
             NotInRetryPeriodDep(),
             PrevDagrunDep(),
             TriggerRuleDep(),
+            BranchDep(),
 
 Review comment:
   We have `NotInRetryPeriodDep` already , so `NotPreviouslySkippedDep` would follow -- `PreviouslySkippedDep` would _pass_ if it had been skipped, the opposite of what we want.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator
URL: https://github.com/apache/airflow/pull/7276#issuecomment-581100536
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=h1) Report
   > Merging [#7276](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/39375c01ced008a715bb2a540df31fe2cf908bbb?src=pr&el=desc) will **increase** coverage by `0.22%`.
   > The diff coverage is `76.66%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7276/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7276      +/-   ##
   ==========================================
   + Coverage   85.59%   85.82%   +0.22%     
   ==========================================
     Files         863      863              
     Lines       40520    40548      +28     
   ==========================================
   + Hits        34685    34800     +115     
   + Misses       5835     5748      -87
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/models/taskinstance.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvdGFza2luc3RhbmNlLnB5) | `94.72% <100%> (ø)` | :arrow_up: |
   | [airflow/models/skipmixin.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2tpcG1peGluLnB5) | `97.67% <100%> (+0.11%)` | :arrow_up: |
   | [airflow/operators/python.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvcHl0aG9uLnB5) | `92.1% <44.44%> (-3.04%)` | :arrow_down: |
   | [airflow/operators/branch\_operator.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvYnJhbmNoX29wZXJhdG9yLnB5) | `86.66% <87.5%> (-0.84%)` | :arrow_down: |
   | [airflow/models/baseoperator.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvYmFzZW9wZXJhdG9yLnB5) | `96.38% <90%> (-0.14%)` | :arrow_down: |
   | [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `89.34% <0%> (+0.14%)` | :arrow_up: |
   | [airflow/jobs/backfill\_job.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL2JhY2tmaWxsX2pvYi5weQ==) | `91.88% <0%> (+0.28%)` | :arrow_up: |
   | [airflow/hooks/dbapi\_hook.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9ob29rcy9kYmFwaV9ob29rLnB5) | `91.73% <0%> (+0.82%)` | :arrow_up: |
   | [airflow/models/connection.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvY29ubmVjdGlvbi5weQ==) | `77.4% <0%> (+0.96%)` | :arrow_up: |
   | [airflow/providers/apache/hive/hooks/hive.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYXBhY2hlL2hpdmUvaG9va3MvaGl2ZS5weQ==) | `77.55% <0%> (+1.53%)` | :arrow_up: |
   | ... and [3 more](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=footer). Last update [39375c0...9a907c9](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r380117315
 
 

 ##########
 File path: airflow/models/skipmixin.py
 ##########
 @@ -16,42 +16,38 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Iterable, Set, Union
+from typing import Iterable, Optional, Set, Union
 
 from airflow.models.taskinstance import TaskInstance
 from airflow.utils import timezone
 from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.utils.session import provide_session
+from airflow.utils.session import create_session, provide_session
 from airflow.utils.state import State
 
+XCOM_SKIPMIXIN_KEY = "skipmixin_key"
+
 
 class SkipMixin(LoggingMixin):
-    @provide_session
-    def skip(self, dag_run, execution_date, tasks, session=None):
+    def _set_state_to_skipped(self, dag_run, execution_date, tasks, session):
         """
-        Sets tasks instances to skipped from the same dag run.
-
-        :param dag_run: the DagRun for which to set the tasks to skipped
-        :param execution_date: execution_date
-        :param tasks: tasks to skip (not task_ids)
-        :param session: db session to use
+        Used internally to set state of task instances to skipped from the same dag run.
         """
-        if not tasks:
-            return
-
         task_ids = [d.task_id for d in tasks]
         now = timezone.utcnow()
 
         if dag_run:
             session.query(TaskInstance).filter(
                 TaskInstance.dag_id == dag_run.dag_id,
                 TaskInstance.execution_date == dag_run.execution_date,
-                TaskInstance.task_id.in_(task_ids)
-            ).update({TaskInstance.state: State.SKIPPED,
-                      TaskInstance.start_date: now,
-                      TaskInstance.end_date: now},
-                     synchronize_session=False)
-            session.commit()
 
 Review comment:
   Why did you remove the commit here? (It might be okay, just curious as to your reasoning.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r380155542
 
 

 ##########
 File path: airflow/models/skipmixin.py
 ##########
 @@ -16,42 +16,38 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Iterable, Set, Union
+from typing import Iterable, Optional, Set, Union
 
 from airflow.models.taskinstance import TaskInstance
 from airflow.utils import timezone
 from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.utils.session import provide_session
+from airflow.utils.session import create_session, provide_session
 from airflow.utils.state import State
 
+XCOM_SKIPMIXIN_KEY = "skipmixin_key"
+
 
 class SkipMixin(LoggingMixin):
-    @provide_session
-    def skip(self, dag_run, execution_date, tasks, session=None):
+    def _set_state_to_skipped(self, dag_run, execution_date, tasks, session):
         """
-        Sets tasks instances to skipped from the same dag run.
-
-        :param dag_run: the DagRun for which to set the tasks to skipped
-        :param execution_date: execution_date
-        :param tasks: tasks to skip (not task_ids)
-        :param session: db session to use
+        Used internally to set state of task instances to skipped from the same dag run.
         """
-        if not tasks:
-            return
-
         task_ids = [d.task_id for d in tasks]
         now = timezone.utcnow()
 
         if dag_run:
             session.query(TaskInstance).filter(
                 TaskInstance.dag_id == dag_run.dag_id,
                 TaskInstance.execution_date == dag_run.execution_date,
-                TaskInstance.task_id.in_(task_ids)
-            ).update({TaskInstance.state: State.SKIPPED,
-                      TaskInstance.start_date: now,
-                      TaskInstance.end_date: now},
-                     synchronize_session=False)
-            session.commit()
 
 Review comment:
   Yes. Thanks. I refactored the shared code into  `_set_state_to_skipped()` and moved the `commit()` into the caller `skip()` and `skip_all_except()`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r380124893
 
 

 ##########
 File path: tests/ti_deps/deps/test_not_previously_skipped_dep.py
 ##########
 @@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pendulum
+
+from airflow.models import DAG, TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.python_operator import BranchPythonOperator
+from airflow.ti_deps.dep_context import DepContext
+from airflow.ti_deps.deps.not_previously_skipped_dep import NotPreviouslySkippedDep
+from airflow.utils.session import create_session
+from airflow.utils.state import State
+
+
+def test_no_parent():
+    """
+    A simple DAG with a single task. NotPreviouslySkippedDep is met.
+    """
+    start_date = pendulum.datetime(2020, 1, 1)
+    dag = DAG("test_test_no_parent_dag", schedule_interval=None, start_date=start_date)
+    op1 = DummyOperator(task_id="op1", dag=dag)
+
+    ti1 = TaskInstance(op1, start_date)
+
+    with create_session() as session:
+        dep = NotPreviouslySkippedDep()
+        assert len(list(dep._get_dep_statuses(ti1, session, DepContext()))) == 0
+        assert dep.is_met(ti1, session)
+        assert ti1.state != State.SKIPPED
+
+
+def test_no_skipmixin_parent():
+    """
+    A simple DAG with no branching. Both op1 and op2 are DummyOperator. NotPreviouslySkippedDep is met.
+    """
+    start_date = pendulum.datetime(2020, 1, 1)
+    dag = DAG(
+        "test_no_skipmixin_parent_dag", schedule_interval=None, start_date=start_date
+    )
+    op1 = DummyOperator(task_id="op1", dag=dag)
+    op2 = DummyOperator(task_id="op2", dag=dag)
+    op1 >> op2
+
+    ti2 = TaskInstance(op2, start_date)
+
+    with create_session() as session:
+        dep = NotPreviouslySkippedDep()
+        assert len(list(dep._get_dep_statuses(ti2, session, DepContext()))) == 0
+        assert dep.is_met(ti2, session)
+        assert ti2.state != State.SKIPPED
+
+
+def test_parent_follow_branch():
+    """
+    A simple DAG with a BranchPythonOperator that follows op2. NotPreviouslySkippedDep is met.
+    """
+    start_date = pendulum.datetime(2020, 1, 1)
+    dag = DAG(
+        "test_parent_follow_branch_dag", schedule_interval=None, start_date=start_date
+    )
+    op1 = BranchPythonOperator(task_id="op1", python_callable=lambda: "op2", dag=dag)
+    op2 = DummyOperator(task_id="op2", dag=dag)
+    op1 >> op2
+
+    TaskInstance(op1, start_date).run()
+    ti2 = TaskInstance(op2, start_date)
+
+    with create_session() as session:
+        dep = NotPreviouslySkippedDep()
+        assert len(list(dep._get_dep_statuses(ti2, session, DepContext()))) == 0
+        assert dep.is_met(ti2, session)
+        assert ti2.state != State.SKIPPED
+
+
+def test_parent_skip_branch():
+    """
+    A simple DAG with a BranchPythonOperator that does not follow op2. NotPreviouslySkippedDep is not met.
+    """
+    start_date = pendulum.datetime(2020, 1, 1)
+    dag = DAG(
+        "test_parent_skip_branch_dag", schedule_interval=None, start_date=start_date
+    )
+    op1 = BranchPythonOperator(task_id="op1", python_callable=lambda: "op3", dag=dag)
+    op2 = DummyOperator(task_id="op2", dag=dag)
+    op3 = DummyOperator(task_id="op3", dag=dag)
+    op1 >> [op2, op3]
+
+    TaskInstance(op1, start_date).run()
+    ti2 = TaskInstance(op2, start_date)
+
+    with create_session() as session:
+        dep = NotPreviouslySkippedDep()
+        assert len(list(dep._get_dep_statuses(ti2, session, DepContext()))) == 1
 
 Review comment:
   Do we need to test _get_dap_statuses and dep.is_met? That feels like they are both testing exactly the same thing.
   
   And we should test `dep.get_dep_statuses` if we keep both.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 edited a comment on issue #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 edited a comment on issue #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#issuecomment-587230400
 
 
   Hi, @ashb  thanks for pointing out. I've fixed those. 
   
   Travis is still unhappy, but I'm pretty sure this is unrelated to my change. Not sure if anyone already brought this up in a JIRA somewhere:
   
   ```
   tests/cli/commands/test_task_command.py .........F
   _________________________ TestCliTasks.test_local_run __________________________
   self = <tests.cli.commands.test_task_command.TestCliTasks testMethod=test_local_run>
       def test_local_run(self):
           args = self.parser.parse_args([
   ...
       def _recv(self, size, read=_read):
           buf = io.BytesIO()
           handle = self._handle
           remaining = size
           while remaining > 0:
               chunk = read(handle, remaining)
               n = len(chunk)
               if n == 0:
                   if remaining == size:
   >                   raise EOFError
   E                   EOFError
   /usr/local/lib/python3.7/multiprocessing/connection.py:383: EOFError
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r380146262
 
 

 ##########
 File path: tests/ti_deps/deps/test_not_previously_skipped_dep.py
 ##########
 @@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pendulum
+
+from airflow.models import DAG, TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.python_operator import BranchPythonOperator
+from airflow.ti_deps.dep_context import DepContext
+from airflow.ti_deps.deps.not_previously_skipped_dep import NotPreviouslySkippedDep
+from airflow.utils.session import create_session
+from airflow.utils.state import State
+
+
+def test_no_parent():
+    """
+    A simple DAG with a single task. NotPreviouslySkippedDep is met.
+    """
+    start_date = pendulum.datetime(2020, 1, 1)
+    dag = DAG("test_test_no_parent_dag", schedule_interval=None, start_date=start_date)
+    op1 = DummyOperator(task_id="op1", dag=dag)
+
+    ti1 = TaskInstance(op1, start_date)
+
+    with create_session() as session:
+        dep = NotPreviouslySkippedDep()
+        assert len(list(dep._get_dep_statuses(ti1, session, DepContext()))) == 0
+        assert dep.is_met(ti1, session)
+        assert ti1.state != State.SKIPPED
+
+
+def test_no_skipmixin_parent():
+    """
+    A simple DAG with no branching. Both op1 and op2 are DummyOperator. NotPreviouslySkippedDep is met.
+    """
+    start_date = pendulum.datetime(2020, 1, 1)
+    dag = DAG(
+        "test_no_skipmixin_parent_dag", schedule_interval=None, start_date=start_date
+    )
+    op1 = DummyOperator(task_id="op1", dag=dag)
+    op2 = DummyOperator(task_id="op2", dag=dag)
+    op1 >> op2
+
+    ti2 = TaskInstance(op2, start_date)
+
+    with create_session() as session:
+        dep = NotPreviouslySkippedDep()
+        assert len(list(dep._get_dep_statuses(ti2, session, DepContext()))) == 0
+        assert dep.is_met(ti2, session)
+        assert ti2.state != State.SKIPPED
+
+
+def test_parent_follow_branch():
+    """
+    A simple DAG with a BranchPythonOperator that follows op2. NotPreviouslySkippedDep is met.
+    """
+    start_date = pendulum.datetime(2020, 1, 1)
+    dag = DAG(
+        "test_parent_follow_branch_dag", schedule_interval=None, start_date=start_date
+    )
+    op1 = BranchPythonOperator(task_id="op1", python_callable=lambda: "op2", dag=dag)
+    op2 = DummyOperator(task_id="op2", dag=dag)
+    op1 >> op2
+
+    TaskInstance(op1, start_date).run()
+    ti2 = TaskInstance(op2, start_date)
+
+    with create_session() as session:
+        dep = NotPreviouslySkippedDep()
+        assert len(list(dep._get_dep_statuses(ti2, session, DepContext()))) == 0
+        assert dep.is_met(ti2, session)
+        assert ti2.state != State.SKIPPED
+
+
+def test_parent_skip_branch():
+    """
+    A simple DAG with a BranchPythonOperator that does not follow op2. NotPreviouslySkippedDep is not met.
+    """
+    start_date = pendulum.datetime(2020, 1, 1)
+    dag = DAG(
+        "test_parent_skip_branch_dag", schedule_interval=None, start_date=start_date
+    )
+    op1 = BranchPythonOperator(task_id="op1", python_callable=lambda: "op3", dag=dag)
+    op2 = DummyOperator(task_id="op2", dag=dag)
+    op3 = DummyOperator(task_id="op3", dag=dag)
+    op1 >> [op2, op3]
+
+    TaskInstance(op1, start_date).run()
+    ti2 = TaskInstance(op2, start_date)
+
+    with create_session() as session:
+        dep = NotPreviouslySkippedDep()
+        assert len(list(dep._get_dep_statuses(ti2, session, DepContext()))) == 1
 
 Review comment:
   It's testing both because it's possible that the `len()` is more than zero and is_met() is True. It's not happening here, but I think it can happen if the dep yields a `_passing_status()`. i changed this to call `get_dep_statuses()` like you suggested.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r380217423
 
 

 ##########
 File path: airflow/models/skipmixin.py
 ##########
 @@ -88,13 +131,23 @@ def skip_all_except(self, ti: TaskInstance, branch_task_ids: Union[str, Iterable
             # is also a downstream task of the branch task, we exclude it from skipping.
             branch_downstream_task_ids = set()  # type: Set[str]
             for b in branch_task_ids:
-                branch_downstream_task_ids.update(dag.
-                                                  get_task(b).
-                                                  get_flat_relative_ids(upstream=False))
+                branch_downstream_task_ids.update(
+                    dag.get_task(b).get_flat_relative_ids(upstream=False)
+                )
 
-            skip_tasks = [t for t in downstream_tasks
-                          if t.task_id not in branch_task_ids and
-                          t.task_id not in branch_downstream_task_ids]
+            skip_tasks = [
+                t
+                for t in downstream_tasks
+                if t.task_id not in branch_task_ids
+                and t.task_id not in branch_downstream_task_ids
+            ]
 
             self.log.info("Skipping tasks %s", [t.task_id for t in skip_tasks])
-            self.skip(dag_run, ti.execution_date, skip_tasks)
+            with create_session() as session:
+                self._set_state_to_skipped(
+                    dag_run, ti.execution_date, skip_tasks, session=session
+                )
+                session.commit()
+                ti.xcom_push(
+                    key=XCOM_SKIPMIXIN_KEY, value={"followed": branch_task_ids}
 
 Review comment:
   ```suggestion
                       key=XCOM_SKIPMIXIN_KEY, value={XCOM_SKIPMIXIN_FOLLOWED: branch_task_ids}
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379470709
 
 

 ##########
 File path: tests/operators/test_python.py
 ##########
 @@ -396,7 +397,7 @@ def test_without_dag_run(self):
                 elif ti.task_id == 'branch_2':
                     self.assertEqual(ti.state, State.SKIPPED)
                 else:
-                    raise Exception
+                    raise ValueError(f'Invalid task id {ti.task_id} found!')
 
 Review comment:
   Cool keep it

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379783888
 
 

 ##########
 File path: airflow/ti_deps/deps/branch_dep.py
 ##########
 @@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+
+
+class BranchDep(BaseTIDep):
+    """
+    Determines if any of the task's direct upstream relatives have decided this task should
+    be skipped.
+    """
+    NAME = "Branch Rule"
+    IGNORABLE = True
+    IS_TASK_DEP = True
+
+    def _get_dep_statuses(self, ti, session, dep_context=None):
+        from airflow.models.skipmixin import SkipMixin
+        from airflow.utils.state import State
+
+        upstream = ti.task.get_direct_relatives(upstream=True)
 
 Review comment:
   No I can't find a test for this useful behaviour. So I'm adding one.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 closed pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 closed pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r382499946
 
 

 ##########
 File path: airflow/models/skipmixin.py
 ##########
 @@ -88,13 +134,23 @@ def skip_all_except(self, ti: TaskInstance, branch_task_ids: Union[str, Iterable
             # is also a downstream task of the branch task, we exclude it from skipping.
             branch_downstream_task_ids = set()  # type: Set[str]
             for b in branch_task_ids:
-                branch_downstream_task_ids.update(dag.
-                                                  get_task(b).
-                                                  get_flat_relative_ids(upstream=False))
+                branch_downstream_task_ids.update(
+                    dag.get_task(b).get_flat_relative_ids(upstream=False)
+                )
 
-            skip_tasks = [t for t in downstream_tasks
-                          if t.task_id not in branch_task_ids and
-                          t.task_id not in branch_downstream_task_ids]
+            skip_tasks = [
+                t
+                for t in downstream_tasks
+                if t.task_id not in branch_task_ids
+                and t.task_id not in branch_downstream_task_ids
+            ]
 
             self.log.info("Skipping tasks %s", [t.task_id for t in skip_tasks])
-            self.skip(dag_run, ti.execution_date, skip_tasks)
+            with create_session() as session:
+                self._set_state_to_skipped(
+                    dag_run, ti.execution_date, skip_tasks, session=session
+                )
+                session.commit()
 
 Review comment:
   ```suggestion
   ```
   
   Not needed, the session will be committed automatically by `create_session` (and not explicitly committing makes the xcom and the status update be a single transaction.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] tooptoop4 commented on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator

Posted by GitBox <gi...@apache.org>.
tooptoop4 commented on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator
URL: https://github.com/apache/airflow/pull/7276#issuecomment-579625776
 
 
   i clear entire dag and dont face problem

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator
URL: https://github.com/apache/airflow/pull/7276#issuecomment-579271946
 
 
   This is work-in-progress and untested. I want to get some opinions from committers first to see if it's worth pursuing. I'll then add tests and make the PR complete.
   
   One immediate shortcoming that I can see:
   This PR only makes skipped tasks respect the decision of upstream `BranchPythonOperator` when cleared. Other variants of branching exist, namely `BaseBranchOperator` and `ShortCircuitOperator`. This PR does not make tasks respect the decision of those operators when cleared.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379380497
 
 

 ##########
 File path: airflow/models/baseoperator.py
 ##########
 @@ -650,6 +651,7 @@ def deps(self) -> Set[BaseTIDep]:
             NotInRetryPeriodDep(),
             PrevDagrunDep(),
             TriggerRuleDep(),
+            BranchDep(),
 
 Review comment:
   @ashb  what about calling it `PreviouslySkipped` instead of `NotPreviouslySkipped`?
   
   I'm just worried the negation in the naming can be a little confusion.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379328108
 
 

 ##########
 File path: airflow/models/skipmixin.py
 ##########
 @@ -98,3 +98,17 @@ def skip_all_except(self, ti: TaskInstance, branch_task_ids: Union[str, Iterable
 
             self.log.info("Skipping tasks %s", [t.task_id for t in skip_tasks])
             self.skip(dag_run, ti.execution_date, skip_tasks)
+
+    def evaluate_skip_condition(self, xcom_value: Any, downstream_ti: TaskInstance):
+        """
+        Evalutes the xcom_value of the SkipMixin itself and decides if a downstream task
+        should be skipped.
 
 Review comment:
   This doc needs expanding to say when this is called (as it's not clear without reading the rest of this code) so people with custom operators know what to do.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on issue #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on issue #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#issuecomment-586576551
 
 
   Hi @ashb  thanks for you review. I have adopted all your suggestions, with these this tweak:
   
   - I moved the XCom code into ``SkipMixin``. The main use of this ``BranchPythonOperator`` and ``BaseBranchOperator`` both determine tasks to skip by returning branches to **follow**. So I find it important to have ``SkipMixin`` store the branches **followed** rather than the ones skipped when ``skip_all_except()`` is called. When ``skip()`` is called, obviously it should store the tasks that are skipped. So the re-vamped ``SkipMixin`` now handles both cases. The shared code is refactored into ``_set_state_to_skipped()`` to avoid duplication.
   
   Please take another look and let me know what you think.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r381965196
 
 

 ##########
 File path: airflow/models/skipmixin.py
 ##########
 @@ -65,13 +68,53 @@ def skip(self, dag_run, execution_date, tasks, session=None):
                 ti.end_date = now
                 session.merge(ti)
 
-            session.commit()
+    @provide_session
+    def skip(
+        self, dag_run, execution_date, tasks, session=None,
+    ):
+        """
+        Sets tasks instances to skipped from the same dag run.
+        If `task_id` is a valid attribute, store the list of skipped task IDs to XCom
+        so that NotPreviouslySkippedDep knows these tasks should be skipped when they
+        are cleared.
+
+        :param dag_run: the DagRun for which to set the tasks to skipped
+        :param execution_date: execution_date
+        :param tasks: tasks to skip (not task_ids)
+        :param session: db session to use
+        """
+        if not tasks:
+            return
+
+        self._set_state_to_skipped(dag_run, execution_date, tasks, session)
+        session.commit()
+
+        # SkipMixin may not necessarily have a task_id attribute. Only store to XCom if one is available.
+        try:
+            task_id = self.task_id
+        except AttributeError:
+            task_id = None
+
+        if task_id is not None:
+            from airflow.models.xcom import XCom
+
+            XCom.set(
+                key=XCOM_SKIPMIXIN_KEY,
+                value={XCOM_SKIPMIXIN_SKIPPED: [d.task_id for d in tasks]},
+                task_id=task_id,
+                dag_id=dag_run.dag_id,
+                execution_date=dag_run.execution_date,
 
 Review comment:
   Good catch!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 opened a new pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 opened a new pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276
 
 
   This PR fixes the following issue:
   
   If a task is skipped by ``BranchPythonOperator``, ``BaseBranchOperator`` orr ``ShortCircuitOperator`` and the user then clears the skipped task, it'll execute. 
   
   After this PR:
   The ``NotPreviouslySkippedDep`` rule will first evaluate if a task has a direct ``SkipMixin`` parent that has decided to skip it. This is done by examining the XCom data stored by ``SkipMixin.skip()`` or ``SkipMixin.skip_all_except()``.
   
   The implementation is inspired by the author of [this blog](https://blog.diffractive.io/2018/08/07/replacement-shortcircuitoperator-for-airflow/).
   
   ---
   Issue link: [AIRFLOW-5391](https://issues.apache.org/jira/browse/AIRFLOW-5391)
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN = JIRA ID<sup>*</sup>
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   <sup>*</sup> For document-only changes commit message can start with `[AIRFLOW-XXXX]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator
URL: https://github.com/apache/airflow/pull/7276#issuecomment-581100536
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=h1) Report
   > Merging [#7276](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/39375c01ced008a715bb2a540df31fe2cf908bbb?src=pr&el=desc) will **increase** coverage by `0.23%`.
   > The diff coverage is `96.66%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7276/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7276      +/-   ##
   ==========================================
   + Coverage    85.6%   85.83%   +0.23%     
   ==========================================
     Files         863      863              
     Lines       40520    40548      +28     
   ==========================================
   + Hits        34686    34806     +120     
   + Misses       5834     5742      -92
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/models/taskinstance.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvdGFza2luc3RhbmNlLnB5) | `94.72% <100%> (ø)` | :arrow_up: |
   | [airflow/models/baseoperator.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvYmFzZW9wZXJhdG9yLnB5) | `96.58% <100%> (+0.06%)` | :arrow_up: |
   | [airflow/models/skipmixin.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2tpcG1peGluLnB5) | `97.67% <100%> (+0.11%)` | :arrow_up: |
   | [airflow/operators/python.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvcHl0aG9uLnB5) | `95.39% <100%> (+0.25%)` | :arrow_up: |
   | [airflow/operators/branch\_operator.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvYnJhbmNoX29wZXJhdG9yLnB5) | `86.66% <87.5%> (-0.84%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=footer). Last update [39375c0...2c7fae0](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on issue #7276: [AIRFLOW-5391] Add a new dependency rule to evaluate branching result

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on issue #7276: [AIRFLOW-5391] Add a new dependency rule to evaluate branching result
URL: https://github.com/apache/airflow/pull/7276#issuecomment-583347659
 
 
   @ashb  @kaxil  @feluelle  @tooptoop4  Hoping to get a review here.
   
   I have updated the PR a bit. The logic is still the same, but the code that does the branching evaluation has been moved into a new dependency rule inside ``ti_deps/deps/branch_dep.py``.
   
   When `BranchDep` is evaluated, it sets the state of a task to "SKIPPED" if it has a parent that decided to skip it.
   
   For example, this comes from "example_short_circuit_operatorschedule":
   ![image](https://user-images.githubusercontent.com/6637585/74024780-7eaba680-49dd-11ea-813f-09ca598e5f48.png)
   
   For a DAG that looks like this, `condition_is_False` decided to skip `false_1`. Before this PR, the problem is if someone clears `false_1`, it'll execute. This is very counter-intuitive for users:
   ![image](https://user-images.githubusercontent.com/6637585/74024872-af8bdb80-49dd-11ea-9e53-19f4f7c844df.png)
   ![image](https://user-images.githubusercontent.com/6637585/74024927-caf6e680-49dd-11ea-84f6-5548e8954b1c.png)
   
   
   After this PR, when the scheduler evaluates the dependency rules for `false_1`, `BranchDep` will skip `false_1` because it knows that the parent task `condition_is_False` had already decided to skip `false_1`. In other words, this makes the "skipped" status "sticky".
    
   So after this PR, the DAG will look like this after someone clears `false_1`. This is much more intuitive:
   
   ![image](https://user-images.githubusercontent.com/6637585/74024780-7eaba680-49dd-11ea-813f-09ca598e5f48.png)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 removed a comment on issue #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 removed a comment on issue #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#issuecomment-579271946
 
 
   This is work-in-progress and untested. I want to get some opinions from committers first to see if it's worth pursuing. I'll then add tests and make the PR complete.
   
   One immediate shortcoming that I can see:
   This PR only makes skipped tasks respect the decision of upstream `BranchPythonOperator` when cleared. Other variants of branching exist, namely `BaseBranchOperator` and `ShortCircuitOperator`. This PR does not make tasks respect the decision of those operators when cleared.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] feluelle commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r378236863
 
 

 ##########
 File path: airflow/models/skipmixin.py
 ##########
 @@ -98,3 +98,13 @@ def skip_all_except(self, ti: TaskInstance, branch_task_ids: Union[str, Iterable
 
             self.log.info("Skipping tasks %s", [t.task_id for t in skip_tasks])
             self.skip(dag_run, ti.execution_date, skip_tasks)
+
+    def evaluate_skip_condition(self, xcom_value, downstream_ti):
+        """
+        This method evalutes the xcom_value of the SkipMixin itself and decides if a downstream task
+        should be skipped. Returns True if the downstream_ti should be skipped. Otherwise False.
 
 Review comment:
   You can add `:returns:` and `:rtype:` to the end of the docstring instead of putting it to the description.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] feluelle commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r378236282
 
 

 ##########
 File path: airflow/models/skipmixin.py
 ##########
 @@ -98,3 +98,13 @@ def skip_all_except(self, ti: TaskInstance, branch_task_ids: Union[str, Iterable
 
             self.log.info("Skipping tasks %s", [t.task_id for t in skip_tasks])
             self.skip(dag_run, ti.execution_date, skip_tasks)
+
+    def evaluate_skip_condition(self, xcom_value, downstream_ti):
 
 Review comment:
   Please add types to the function parameters.
   
   and also in the docs, please.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io commented on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator

Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator
URL: https://github.com/apache/airflow/pull/7276#issuecomment-581100536
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=h1) Report
   > Merging [#7276](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/39375c01ced008a715bb2a540df31fe2cf908bbb?src=pr&el=desc) will **increase** coverage by `0.22%`.
   > The diff coverage is `76.66%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7276/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7276      +/-   ##
   ==========================================
   + Coverage   85.59%   85.82%   +0.22%     
   ==========================================
     Files         863      863              
     Lines       40520    40548      +28     
   ==========================================
   + Hits        34685    34800     +115     
   + Misses       5835     5748      -87
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/models/taskinstance.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvdGFza2luc3RhbmNlLnB5) | `94.72% <100%> (ø)` | :arrow_up: |
   | [airflow/models/skipmixin.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2tpcG1peGluLnB5) | `97.67% <100%> (+0.11%)` | :arrow_up: |
   | [airflow/operators/python.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvcHl0aG9uLnB5) | `92.1% <44.44%> (-3.04%)` | :arrow_down: |
   | [airflow/operators/branch\_operator.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvYnJhbmNoX29wZXJhdG9yLnB5) | `86.66% <87.5%> (-0.84%)` | :arrow_down: |
   | [airflow/models/baseoperator.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvYmFzZW9wZXJhdG9yLnB5) | `96.38% <90%> (-0.14%)` | :arrow_down: |
   | [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `89.34% <0%> (+0.14%)` | :arrow_up: |
   | [airflow/jobs/backfill\_job.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL2JhY2tmaWxsX2pvYi5weQ==) | `91.88% <0%> (+0.28%)` | :arrow_up: |
   | [airflow/hooks/dbapi\_hook.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9ob29rcy9kYmFwaV9ob29rLnB5) | `91.73% <0%> (+0.82%)` | :arrow_up: |
   | [airflow/models/connection.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvY29ubmVjdGlvbi5weQ==) | `77.4% <0%> (+0.96%)` | :arrow_up: |
   | [airflow/providers/apache/hive/hooks/hive.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYXBhY2hlL2hpdmUvaG9va3MvaGl2ZS5weQ==) | `77.55% <0%> (+1.53%)` | :arrow_up: |
   | ... and [3 more](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=footer). Last update [39375c0...9a907c9](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] tooptoop4 commented on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator

Posted by GitBox <gi...@apache.org>.
tooptoop4 commented on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator
URL: https://github.com/apache/airflow/pull/7276#issuecomment-579917606
 
 
   makes sense!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379333613
 
 

 ##########
 File path: tests/operators/test_python.py
 ##########
 @@ -396,7 +397,7 @@ def test_without_dag_run(self):
                 elif ti.task_id == 'branch_2':
                     self.assertEqual(ti.state, State.SKIPPED)
                 else:
-                    raise Exception
+                    raise ValueError(f'Invalid task id {ti.task_id} found!')
 
 Review comment:
   What is this change for? It seems unrelated to your change.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379329062
 
 

 ##########
 File path: airflow/operators/branch_operator.py
 ##########
 @@ -48,4 +48,21 @@ def choose_branch(self, context: Dict) -> Union[str, Iterable[str]]:
         raise NotImplementedError
 
     def execute(self, context: Dict):
-        self.skip_all_except(context['ti'], self.choose_branch(context))
+        branch = self.choose_branch(context)
+        self.skip_all_except(context['ti'], branch)
+        return branch
+
+    def evaluate_skip_condition(self, xcom_value: Union[str, List[str]], downstream_ti: TaskInstance):
+        """
+        :param xcom_value: The task_id of list of task_id returned by execute().
+        :type xcom_value: str or list[str]
+        :param downstream_ti: The TaskInstance of child task
+        :type downstream_ti: airflow.models.TaskInstance
+        :return: True if xcom_value indicates downstream_ti should be skipped. Otherwise False.
+        :rtype: bool
+        """
+        branch = xcom_value
+        if isinstance(branch, str):
+            branch = [branch]
+
+        return downstream_ti.task_id not in branch
 
 Review comment:
   Rather than having this logic here, and needing to implement it each time, what do you think instead of having `skip` and `skip_all_except` methods in the mixin storing the list of tasks it has skipped in to Xcom. That way this evaluate function doesn't need to be subclassed, the parent mix in can handle it for all cases.
   
   That does possibly make us store more data in Xcom, so maybe that's not the best plan.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator
URL: https://github.com/apache/airflow/pull/7276#issuecomment-581100536
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=h1) Report
   > Merging [#7276](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/f3ad5cf6185b9d406d0fb0a4ecc0b5536f79217a?src=pr&el=desc) will **increase** coverage by `53.14%`.
   > The diff coverage is `96.66%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7276/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #7276       +/-   ##
   ==========================================
   + Coverage   32.86%     86%   +53.14%     
   ==========================================
     Files         867     867               
     Lines       40564   40592       +28     
   ==========================================
   + Hits        13332   34912    +21580     
   + Misses      27232    5680    -21552
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/models/taskinstance.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvdGFza2luc3RhbmNlLnB5) | `94.29% <100%> (+64.57%)` | :arrow_up: |
   | [airflow/models/baseoperator.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvYmFzZW9wZXJhdG9yLnB5) | `96.58% <100%> (+39%)` | :arrow_up: |
   | [airflow/models/skipmixin.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2tpcG1peGluLnB5) | `97.67% <100%> (+73.28%)` | :arrow_up: |
   | [airflow/operators/python.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvcHl0aG9uLnB5) | `95.39% <100%> (+54.42%)` | :arrow_up: |
   | [airflow/operators/branch\_operator.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvYnJhbmNoX29wZXJhdG9yLnB5) | `86.66% <87.5%> (+11.66%)` | :arrow_up: |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `45.25% <0%> (-43.8%)` | :arrow_down: |
   | [airflow/kubernetes/refresh\_config.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3JlZnJlc2hfY29uZmlnLnB5) | `50.98% <0%> (-23.53%)` | :arrow_down: |
   | [...viders/cncf/kubernetes/operators/kubernetes\_pod.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvY25jZi9rdWJlcm5ldGVzL29wZXJhdG9ycy9rdWJlcm5ldGVzX3BvZC5weQ==) | `76.47% <0%> (-21.18%)` | :arrow_down: |
   | ... and [740 more](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=footer). Last update [f3ad5cf...723bdfb](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r380117315
 
 

 ##########
 File path: airflow/models/skipmixin.py
 ##########
 @@ -16,42 +16,38 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Iterable, Set, Union
+from typing import Iterable, Optional, Set, Union
 
 from airflow.models.taskinstance import TaskInstance
 from airflow.utils import timezone
 from airflow.utils.log.logging_mixin import LoggingMixin
-from airflow.utils.session import provide_session
+from airflow.utils.session import create_session, provide_session
 from airflow.utils.state import State
 
+XCOM_SKIPMIXIN_KEY = "skipmixin_key"
+
 
 class SkipMixin(LoggingMixin):
-    @provide_session
-    def skip(self, dag_run, execution_date, tasks, session=None):
+    def _set_state_to_skipped(self, dag_run, execution_date, tasks, session):
         """
-        Sets tasks instances to skipped from the same dag run.
-
-        :param dag_run: the DagRun for which to set the tasks to skipped
-        :param execution_date: execution_date
-        :param tasks: tasks to skip (not task_ids)
-        :param session: db session to use
+        Used internally to set state of task instances to skipped from the same dag run.
         """
-        if not tasks:
-            return
-
         task_ids = [d.task_id for d in tasks]
         now = timezone.utcnow()
 
         if dag_run:
             session.query(TaskInstance).filter(
                 TaskInstance.dag_id == dag_run.dag_id,
                 TaskInstance.execution_date == dag_run.execution_date,
-                TaskInstance.task_id.in_(task_ids)
-            ).update({TaskInstance.state: State.SKIPPED,
-                      TaskInstance.start_date: now,
-                      TaskInstance.end_date: now},
-                     synchronize_session=False)
-            session.commit()
 
 Review comment:
   Why did you remove the commit here? (It might be okay, just curious as to your reasoning.)
   
   Oh you renamed the FN to. I see. All good.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r380121894
 
 

 ##########
 File path: airflow/ti_deps/deps/not_previously_skipped_dep.py
 ##########
 @@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+
+
+class NotPreviouslySkippedDep(BaseTIDep):
+    """
+    Determines if any of the task's direct upstream relatives have decided this task should
+    be skipped.
+    """
+
+    NAME = "Not Previously Skipped"
+    IGNORABLE = True
+    IS_TASK_DEP = True
+
+    def _get_dep_statuses(self, ti, session, dep_context):  # pylint: disable=signature-differs
+        from airflow.models.skipmixin import SkipMixin, XCOM_SKIPMIXIN_KEY
+        from airflow.utils.state import State
+
+        upstream = ti.task.get_direct_relatives(upstream=True)
+
+        finished_tasks = dep_context.ensure_finished_tasks(ti.task.dag, ti.execution_date, session)
+
+        finished_task_ids = {t.task_id for t in finished_tasks}
+
+        for parent in upstream:
+            if isinstance(parent, SkipMixin):
+                if parent.task_id not in finished_task_ids:
+                    # This can happen if the parent task has not yet run.
+                    continue
+
+                prev_result = ti.xcom_pull(task_ids=parent.task_id, key=XCOM_SKIPMIXIN_KEY)
+
+                if prev_result is None:
+                    # This can happen if the parent task has not yet run.
+                    continue
+
+                should_skip = False
+                if "followed" in prev_result and ti.task_id not in prev_result["followed"]:
 
 Review comment:
   Can we make "followed" and "skipped" constants in SkipMixin module please?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb merged pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb merged pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator
URL: https://github.com/apache/airflow/pull/7276#issuecomment-579687797
 
 
   @tooptoop4 yes if always clear the entire DAG, the BranchPythonOperator itself gets cleared so you won't face this problem.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379332710
 
 

 ##########
 File path: airflow/ti_deps/deps/branch_dep.py
 ##########
 @@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+
+
+class BranchDep(BaseTIDep):
+    """
+    Determines if any of the task's direct upstream relatives have decided this task should
+    be skipped.
+    """
+    NAME = "Branch Rule"
+    IGNORABLE = True
+    IS_TASK_DEP = True
+
+    def _get_dep_statuses(self, ti, session, dep_context=None):
+        from airflow.models.skipmixin import SkipMixin
+        from airflow.utils.state import State
+
+        upstream = ti.task.get_direct_relatives(upstream=True)
+
+        finished_tasks = dep_context.finished_tasks if dep_context else None
+        if finished_tasks is None:
+            # this is for the strange feature of running tasks without dag_run
+            finished_tasks = ti.task.dag.get_task_instances(
+                start_date=ti.execution_date,
+                end_date=ti.execution_date,
+                state=State.finished() + [State.UPSTREAM_FAILED],
+                session=session)
+
+        finished_task_ids = {t.task_id for t in finished_tasks}
+
+        for parent in upstream:
+            if isinstance(parent, SkipMixin):
+                xcom_value = ti.xcom_pull(task_ids=parent.task_id)
 
 Review comment:
   There is no point pulling the xcom  if `parent.task_id not in finished_task_ids` - avoiding extra DB requests in the hot-path of the scheduler is something we want to do.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 opened a new pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 opened a new pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276
 
 
   This PR fixes the following issue:
   
   If a task is skipped by ``BranchPythonOperator``, ``BaseBranchOperator`` orr ``ShortCircuitOperator`` and the user then clears the skipped task, it'll execute. 
   
   After this PR:
   The ``NotPreviouslySkippedDep`` rule will first evaluate if a task has a direct ``SkipMixin`` parent that has decided to skip it. This is done by examining the XCom data stored by ``SkipMixin.skip()`` or ``SkipMixin.skip_all_except()``.
   
   The implementation is inspired by the author of [this blog](https://blog.diffractive.io/2018/08/07/replacement-shortcircuitoperator-for-airflow/).
   
   ---
   Issue link: [AIRFLOW-5391](https://issues.apache.org/jira/browse/AIRFLOW-5391)
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN = JIRA ID<sup>*</sup>
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   <sup>*</sup> For document-only changes commit message can start with `[AIRFLOW-XXXX]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 opened a new pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 opened a new pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276
 
 
   This PR fixes the following issue:
   
   If a task is skipped by ``BranchPythonOperator``, ``BaseBranchOperator`` orr ``ShortCircuitOperator`` and the user then clears the skipped task, it'll execute. 
   
   After this PR:
   The ``NotPreviouslySkippedDep`` rule will first evaluate if a task has a direct ``SkipMixin`` parent that has decided to skip it. This is done by examining the XCom data stored by ``SkipMixin.skip()`` or ``SkipMixin.skip_all_except()``.
   
   The implementation is inspired by the author of [this blog](https://blog.diffractive.io/2018/08/07/replacement-shortcircuitoperator-for-airflow/).
   
   ---
   Issue link: [AIRFLOW-5391](https://issues.apache.org/jira/browse/AIRFLOW-5391)
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN = JIRA ID<sup>*</sup>
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   <sup>*</sup> For document-only changes commit message can start with `[AIRFLOW-XXXX]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#issuecomment-581100536
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=h1) Report
   > Merging [#7276](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/288a50a0c95e14a81763cee214e55dde2feccf42?src=pr&el=desc) will **decrease** coverage by `0.38%`.
   > The diff coverage is `95.34%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7276/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff            @@
   ##           master   #7276      +/-   ##
   =========================================
   - Coverage   86.59%   86.2%   -0.39%     
   =========================================
     Files         871     872       +1     
     Lines       40660   40701      +41     
   =========================================
   - Hits        35209   35088     -121     
   - Misses       5451    5613     +162
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/models/baseoperator.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvYmFzZW9wZXJhdG9yLnB5) | `96.52% <100%> (ø)` | :arrow_up: |
   | [airflow/models/skipmixin.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2tpcG1peGluLnB5) | `97.67% <100%> (+0.11%)` | :arrow_up: |
   | [airflow/operators/python.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvcHl0aG9uLnB5) | `95.39% <100%> (+0.25%)` | :arrow_up: |
   | [airflow/operators/branch\_operator.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvYnJhbmNoX29wZXJhdG9yLnB5) | `86.66% <87.5%> (-0.84%)` | :arrow_down: |
   | [airflow/ti\_deps/deps/branch\_dep.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy90aV9kZXBzL2RlcHMvYnJhbmNoX2RlcC5weQ==) | `95.65% <95.65%> (ø)` | |
   | [...w/providers/apache/hive/operators/mysql\_to\_hive.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYXBhY2hlL2hpdmUvb3BlcmF0b3JzL215c3FsX3RvX2hpdmUucHk=) | `100% <0%> (ø)` | :arrow_up: |
   | [airflow/operators/generic\_transfer.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvZ2VuZXJpY190cmFuc2Zlci5weQ==) | `100% <0%> (ø)` | :arrow_up: |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/providers/postgres/operators/postgres.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvcG9zdGdyZXMvb3BlcmF0b3JzL3Bvc3RncmVzLnB5) | `50% <0%> (-50%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | ... and [14 more](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=footer). Last update [288a50a...ca16153](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379379912
 
 

 ##########
 File path: airflow/operators/branch_operator.py
 ##########
 @@ -48,4 +48,21 @@ def choose_branch(self, context: Dict) -> Union[str, Iterable[str]]:
         raise NotImplementedError
 
     def execute(self, context: Dict):
-        self.skip_all_except(context['ti'], self.choose_branch(context))
+        branch = self.choose_branch(context)
+        self.skip_all_except(context['ti'], branch)
+        return branch
+
+    def evaluate_skip_condition(self, xcom_value: Union[str, List[str]], downstream_ti: TaskInstance):
+        """
+        :param xcom_value: The task_id of list of task_id returned by execute().
+        :type xcom_value: str or list[str]
+        :param downstream_ti: The TaskInstance of child task
+        :type downstream_ti: airflow.models.TaskInstance
+        :return: True if xcom_value indicates downstream_ti should be skipped. Otherwise False.
+        :rtype: bool
+        """
+        branch = xcom_value
+        if isinstance(branch, str):
+            branch = [branch]
+
+        return downstream_ti.task_id not in branch
 
 Review comment:
   @ashb  how about this:
   
   Since there's `skip()` and `skip_all_except()`. I can make `skip()` store what's skipped, and make `skip_all_except()` store what's in the "except". 
   So the XCom data will look something along this line:
   ```
   "skipped": [B, C]
   "except": [A]
   ```
   
   When looking up this XCom values, first, skip [B, C], and then also skip everything that's not equal in "except".
   
   That should cover all the edge cases that can happen.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379823456
 
 

 ##########
 File path: airflow/ti_deps/deps/branch_dep.py
 ##########
 @@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+
+
+class BranchDep(BaseTIDep):
+    """
+    Determines if any of the task's direct upstream relatives have decided this task should
+    be skipped.
+    """
+    NAME = "Branch Rule"
+    IGNORABLE = True
+    IS_TASK_DEP = True
+
+    def _get_dep_statuses(self, ti, session, dep_context=None):
+        from airflow.models.skipmixin import SkipMixin
+        from airflow.utils.state import State
+
+        upstream = ti.task.get_direct_relatives(upstream=True)
+
+        finished_tasks = dep_context.finished_tasks if dep_context else None
+        if finished_tasks is None:
+            # this is for the strange feature of running tasks without dag_run
+            finished_tasks = ti.task.dag.get_task_instances(
 
 Review comment:
   @ash okay this is done. I created ``DepContext.ensure_finished_tasks()`` and made existing code use it.
   
   However, I only found two uses, one in ``TriggerRuleDep`` and one in my ``NotPreviouslySkippedDep``. Where is the new one you mentioned?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator
URL: https://github.com/apache/airflow/pull/7276#issuecomment-581103041
 
 
   - Added tests
   - Extended the same support to ``BranchPythonOperator``, ``ShortCircuitOperator`` and ``BaseBranchOperator``. Clearing downstream tasks of all these three operators will now do the right thing.
   - That said, I still think ``BaseBranchOperator`` is a bit redundant since we already have ``BranchPythonOperator``. I'm proposing to deprecate ``BaesBranchOperator`` in [another PR](https://github.com/apache/airflow/pull/7331)
   
   The PR is now ready for review. Reviewers @kaxil  @ashb  @feluelle please take a look.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r378787145
 
 

 ##########
 File path: airflow/ti_deps/deps/branch_dep.py
 ##########
 @@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+
+
+class BranchDep(BaseTIDep):
+    """
+    Determines if any of the task's direct upstream relatives have decided this task should
+    be skipped.
+    """
+    NAME = "Branch Rule"
+    IGNORABLE = True
+    IS_TASK_DEP = True
+
+    def _get_dep_statuses(self, ti, session, dep_context=None):
+        from airflow.models.skipmixin import SkipMixin
+        from airflow.utils.state import State
+
+        upstream = ti.task.get_direct_relatives(upstream=True)
+
+        finished_tasks = dep_context.finished_tasks if dep_context else None
+        if finished_tasks is None:
+            # this is for the strange feature of running tasks without dag_run
+            finished_tasks = ti.task.dag.get_task_instances(
+                start_date=ti.execution_date,
+                end_date=ti.execution_date,
+                state=State.finished() + [State.UPSTREAM_FAILED],
+                session=session)
+
+        finished_task_ids = {t.task_id for t in finished_tasks}
+
+        for parent in upstream:
+            if isinstance(parent, SkipMixin):
+                xcom_value = ti.xcom_pull(task_ids=parent.task_id)
 
 Review comment:
   And if we are going to use Xcom for this (which as I say, I'm not convinced) we should use something other than the default key.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r378844501
 
 

 ##########
 File path: airflow/ti_deps/deps/branch_dep.py
 ##########
 @@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+
+
+class BranchDep(BaseTIDep):
+    """
+    Determines if any of the task's direct upstream relatives have decided this task should
+    be skipped.
+    """
+    NAME = "Branch Rule"
+    IGNORABLE = True
+    IS_TASK_DEP = True
+
+    def _get_dep_statuses(self, ti, session, dep_context=None):
+        from airflow.models.skipmixin import SkipMixin
+        from airflow.utils.state import State
+
+        upstream = ti.task.get_direct_relatives(upstream=True)
+
+        finished_tasks = dep_context.finished_tasks if dep_context else None
+        if finished_tasks is None:
+            # this is for the strange feature of running tasks without dag_run
+            finished_tasks = ti.task.dag.get_task_instances(
+                start_date=ti.execution_date,
+                end_date=ti.execution_date,
+                state=State.finished() + [State.UPSTREAM_FAILED],
+                session=session)
+
+        finished_task_ids = {t.task_id for t in finished_tasks}
+
+        for parent in upstream:
+            if isinstance(parent, SkipMixin):
+                xcom_value = ti.xcom_pull(task_ids=parent.task_id)
 
 Review comment:
   Hi, @ashb  here's the reason I'm using XCom, using the same example I previously provided:
   
   In this example, `condition_false` is a `ShortCircuitOperator` that decided to skip its downstream task. But If the user clicks `false_1` and hit "Clear", what Airflow currently does is that it'll actually execute `false_1`, which is very counter-intuitive for users.
   
   So how do we improve this behavior?
   
   When `false_1` is cleared, one way to know that it should actually be skipped is by looking at what its parent `ShortCircuitOperator` returned. To get the return value of the `ShortCircuitOperator`, I'm resorting to `XCom`.
   
   ![image](https://user-images.githubusercontent.com/6637585/74437479-283ddc80-4ea3-11ea-8aec-324369733ebb.png)
   
   
   This works fine. But maybe you know a better way than using XCom for this purpose. Definitely let me know I don't mind changing.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7276: [AIRFLOW-5391] Add a new dependency rule to evaluate branching result

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7276: [AIRFLOW-5391] Add a new dependency rule to evaluate branching result
URL: https://github.com/apache/airflow/pull/7276#issuecomment-581100536
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=h1) Report
   > Merging [#7276](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/288a50a0c95e14a81763cee214e55dde2feccf42?src=pr&el=desc) will **decrease** coverage by `0.38%`.
   > The diff coverage is `95.34%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7276/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff            @@
   ##           master   #7276      +/-   ##
   =========================================
   - Coverage   86.59%   86.2%   -0.39%     
   =========================================
     Files         871     872       +1     
     Lines       40660   40701      +41     
   =========================================
   - Hits        35209   35086     -123     
   - Misses       5451    5615     +164
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/models/baseoperator.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvYmFzZW9wZXJhdG9yLnB5) | `96.52% <100%> (ø)` | :arrow_up: |
   | [airflow/models/skipmixin.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2tpcG1peGluLnB5) | `97.67% <100%> (+0.11%)` | :arrow_up: |
   | [airflow/operators/python.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvcHl0aG9uLnB5) | `95.39% <100%> (+0.25%)` | :arrow_up: |
   | [airflow/operators/branch\_operator.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvYnJhbmNoX29wZXJhdG9yLnB5) | `86.66% <87.5%> (-0.84%)` | :arrow_down: |
   | [airflow/ti\_deps/deps/branch\_dep.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy90aV9kZXBzL2RlcHMvYnJhbmNoX2RlcC5weQ==) | `95.65% <95.65%> (ø)` | |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/providers/postgres/operators/postgres.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvcG9zdGdyZXMvb3BlcmF0b3JzL3Bvc3RncmVzLnB5) | `50% <0%> (-50%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `47.18% <0%> (-45.08%)` | :arrow_down: |
   | [...roviders/google/cloud/operators/postgres\_to\_gcs.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL29wZXJhdG9ycy9wb3N0Z3Jlc190b19nY3MucHk=) | `52.94% <0%> (-32.36%)` | :arrow_down: |
   | ... and [8 more](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=footer). Last update [288a50a...68383ae](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379350352
 
 

 ##########
 File path: airflow/operators/branch_operator.py
 ##########
 @@ -48,4 +48,21 @@ def choose_branch(self, context: Dict) -> Union[str, Iterable[str]]:
         raise NotImplementedError
 
     def execute(self, context: Dict):
-        self.skip_all_except(context['ti'], self.choose_branch(context))
+        branch = self.choose_branch(context)
+        self.skip_all_except(context['ti'], branch)
+        return branch
+
+    def evaluate_skip_condition(self, xcom_value: Union[str, List[str]], downstream_ti: TaskInstance):
+        """
+        :param xcom_value: The task_id of list of task_id returned by execute().
+        :type xcom_value: str or list[str]
+        :param downstream_ti: The TaskInstance of child task
+        :type downstream_ti: airflow.models.TaskInstance
+        :return: True if xcom_value indicates downstream_ti should be skipped. Otherwise False.
+        :rtype: bool
+        """
+        branch = xcom_value
+        if isinstance(branch, str):
+            branch = [branch]
+
+        return downstream_ti.task_id not in branch
 
 Review comment:
   That might actually be one way to do this. There's just one downside i can see. 
   
   For `BranchPythonOperator`, the current behaviour in my PR is to store the task_ids to follow (because that's in XCom). 
   
   If we change this to store the list of task_ids skipped in `SkipMixin`, people making incremental changes to their existing DAG may find some surprising behaviour. E.g if the DAG looks like this on 20200101. And the Branch follows A, skips B and C. `SkipMixin` stores B and C in db as its "list of task_ids skipped".
   ```
   Branch >> [A, B, C]
   ```
   
   We then add a D on 20200102. The DAG becomes like this. 
   ```
   Branch >> [A, B, C, D]
   ```
   
   Branch itself is not changed. It still only follows A. But if someone clears a task on the 20200101 DAG. It'll start running D because D is not found in the"list of task_ids skipped" of the `SkipMixin`.
   
   I think this behaviour is less intuitive than having Branch automatically skip anything it did not follow (like what this PR is currently doing).
   
   However I do like your suggestion because the implementation is simpler and more elegant than forcing developers to implement an additional `evaluate_skip_condition()`.
   
   What do you think?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 opened a new pull request #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 opened a new pull request #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator
URL: https://github.com/apache/airflow/pull/7276
 
 
   This PR fixes the following issue:
   
   If a task is skipped by `BranchPythonOperator` and the user then clears the skipped task, it'll execute. 
   
   The implementation is inspired by the author of [this blog](https://blog.diffractive.io/2018/08/07/replacement-shortcircuitoperator-for-airflow/).
   
   ---
   Issue link: [AIRFLOW-5391](https://issues.apache.org/jira/browse/AIRFLOW-5391)
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [ ] Description above provides context of the change
   - [ ] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN = JIRA ID<sup>*</sup>
   - [ ] Unit tests coverage for changes (not needed for documentation changes)
   - [ ] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [ ] Relevant documentation is updated including usage instructions.
   - [ ] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   <sup>*</sup> For document-only changes commit message can start with `[AIRFLOW-XXXX]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379333409
 
 

 ##########
 File path: airflow/ti_deps/deps/branch_dep.py
 ##########
 @@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+
+
+class BranchDep(BaseTIDep):
+    """
+    Determines if any of the task's direct upstream relatives have decided this task should
+    be skipped.
+    """
+    NAME = "Branch Rule"
+    IGNORABLE = True
+    IS_TASK_DEP = True
+
+    def _get_dep_statuses(self, ti, session, dep_context=None):
+        from airflow.models.skipmixin import SkipMixin
+        from airflow.utils.state import State
+
+        upstream = ti.task.get_direct_relatives(upstream=True)
 
 Review comment:
   In your example, if I clear false_2 it would re run -- the skip task was not a direct relative. That makes this behaviour even more confusing as sometimes it would run and sometimes it wouldn't depending on which task was cleared.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r380219507
 
 

 ##########
 File path: tests/ti_deps/deps/test_trigger_rule_dep.py
 ##########
 @@ -19,6 +19,8 @@
 import unittest
 from datetime import datetime
 
+import mock
 
 Review comment:
   ```suggestion
   from unittest.mock import Mock
   ```
   
   and change the usage. (Py3 includes mock, we should move away from using a the separate module as it's not needed)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#issuecomment-581100536
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=h1) Report
   > Merging [#7276](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/d83ce260c074d3f887948921d3ebbdb1100cc800?src=pr&el=desc) will **increase** coverage by `53.43%`.
   > The diff coverage is `100%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7276/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##           master    #7276       +/-   ##
   ===========================================
   + Coverage   32.95%   86.39%   +53.43%     
   ===========================================
     Files         878      878               
     Lines       41219    41288       +69     
   ===========================================
   + Hits        13584    35669    +22085     
   + Misses      27635     5619    -22016
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/ti\_deps/deps/trigger\_rule\_dep.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy90aV9kZXBzL2RlcHMvdHJpZ2dlcl9ydWxlX2RlcC5weQ==) | `91.02% <ø> (+74.77%)` | :arrow_up: |
   | [airflow/models/baseoperator.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvYmFzZW9wZXJhdG9yLnB5) | `96.52% <100%> (+38.94%)` | :arrow_up: |
   | [airflow/ti\_deps/dep\_context.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy90aV9kZXBzL2RlcF9jb250ZXh0LnB5) | `100% <100%> (+29.03%)` | :arrow_up: |
   | [airflow/models/skipmixin.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2tpcG1peGluLnB5) | `98.18% <100%> (+73.79%)` | :arrow_up: |
   | [airflow/ti\_deps/deps/not\_previously\_skipped\_dep.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy90aV9kZXBzL2RlcHMvbm90X3ByZXZpb3VzbHlfc2tpcHBlZF9kZXAucHk=) | `100% <100%> (ø)` | |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `47.18% <0%> (-39.44%)` | :arrow_down: |
   | [...viders/cncf/kubernetes/operators/kubernetes\_pod.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvY25jZi9rdWJlcm5ldGVzL29wZXJhdG9ycy9rdWJlcm5ldGVzX3BvZC5weQ==) | `69.38% <0%> (-25.52%)` | :arrow_down: |
   | [airflow/kubernetes/refresh\_config.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3JlZnJlc2hfY29uZmlnLnB5) | `50.98% <0%> (-23.53%)` | :arrow_down: |
   | ... and [767 more](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=footer). Last update [d83ce26...39020d8](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379340298
 
 

 ##########
 File path: airflow/ti_deps/deps/branch_dep.py
 ##########
 @@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+
+
+class BranchDep(BaseTIDep):
+    """
+    Determines if any of the task's direct upstream relatives have decided this task should
+    be skipped.
+    """
+    NAME = "Branch Rule"
+    IGNORABLE = True
+    IS_TASK_DEP = True
+
+    def _get_dep_statuses(self, ti, session, dep_context=None):
+        from airflow.models.skipmixin import SkipMixin
+        from airflow.utils.state import State
+
+        upstream = ti.task.get_direct_relatives(upstream=True)
 
 Review comment:
   I did have the same concern as you. So I did some experiment. If someone clears `false_2`, Airflow actually does the right thing. It becomes skipped. That's already happening in Airflow, not because of my PR.
   The doc describes this behaviour as "skipped states propagates". It's actually `trigger_rule_deps.py` that does this "magic". Somewhere here the state is set to SKIPPED if any of its upstream is skipped and the task `trigger_rule` is `all_success`. This is exactly what we hope for.
   
   ```LANG=python
               if tr == TR.ALL_SUCCESS:
                   if upstream_failed or failed:
                       ti.set_state(State.UPSTREAM_FAILED, session)
                   elif skipped:
                       ti.set_state(State.SKIPPED, session)
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 edited a comment on issue #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 edited a comment on issue #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#issuecomment-586576551
 
 
   Hi @ashb  thanks for you review. I have adopted all your suggestions, with this tweak:
   
   - I moved the XCom code into ``SkipMixin``. The main use of this ``BranchPythonOperator`` and ``BaseBranchOperator`` both determine tasks to skip by returning branches to **follow**. So I find it important to have ``SkipMixin`` store the branches **followed** rather than the ones skipped when ``skip_all_except()`` is called. When ``skip()`` is called, obviously it should store the tasks that are skipped. So the re-vamped ``SkipMixin`` now handles both cases. The shared code is refactored into ``_set_state_to_skipped()`` to avoid duplication.
   
   Please take another look and let me know what you think.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r380156437
 
 

 ##########
 File path: airflow/models/skipmixin.py
 ##########
 @@ -65,13 +61,44 @@ def skip(self, dag_run, execution_date, tasks, session=None):
                 ti.end_date = now
                 session.merge(ti)
 
-            session.commit()
+    @provide_session
+    def skip(
+        self,
+        dag_run,
+        execution_date,
+        tasks,
+        session=None,
+        ti: Optional[TaskInstance] = None,
+    ):
+        """
+        Sets tasks instances to skipped from the same dag run. If ti is given, store the list of
+        skipped task IDs to XCom so that NotPreviouslySkippedDep knows these tasks should be skipped
+        when they are cleared.
+
+        :param dag_run: the DagRun for which to set the tasks to skipped
+        :param execution_date: execution_date
+        :param tasks: tasks to skip (not task_ids)
+        :param session: db session to use
+        :param ti: The TaskInstance that initiates the skip. Used for storing results to XCom.
+            If not given, skipped task IDs will not be stored to XCom.
+        """
+        if not tasks:
+            return
+
+        self._set_state_to_skipped(dag_run, execution_date, tasks, session)
+        session.commit()
+        if ti:
 
 Review comment:
   Oh good idea. One thing i realised though is that `SkipMixin` is not necessarily a `BaseOperator`. So it may not have a `self.task_id` attribute. However, I scanned through the code base I think that only happened in tests. All real use of `SkipMixin` are creating `BaseOperator` from it. Nevertheless I added a type check to make sure this does not break existing code/tests and only do XCom.set() if `self.task_id` is valid.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 closed pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 closed pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379324932
 
 

 ##########
 File path: airflow/ti_deps/deps/branch_dep.py
 ##########
 @@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+
+
+class BranchDep(BaseTIDep):
+    """
+    Determines if any of the task's direct upstream relatives have decided this task should
+    be skipped.
+    """
+    NAME = "Branch Rule"
+    IGNORABLE = True
+    IS_TASK_DEP = True
+
+    def _get_dep_statuses(self, ti, session, dep_context=None):
+        from airflow.models.skipmixin import SkipMixin
+        from airflow.utils.state import State
+
+        upstream = ti.task.get_direct_relatives(upstream=True)
+
+        finished_tasks = dep_context.finished_tasks if dep_context else None
+        if finished_tasks is None:
+            # this is for the strange feature of running tasks without dag_run
+            finished_tasks = ti.task.dag.get_task_instances(
+                start_date=ti.execution_date,
+                end_date=ti.execution_date,
+                state=State.finished() + [State.UPSTREAM_FAILED],
+                session=session)
+
+        finished_task_ids = {t.task_id for t in finished_tasks}
+
+        for parent in upstream:
+            if isinstance(parent, SkipMixin):
+                xcom_value = ti.xcom_pull(task_ids=parent.task_id)
 
 Review comment:
   Oh - to start with I thought this was Xcom of false_1 etc, not of the skipping operator.
   
   Let me look at this again.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] feluelle commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r378807358
 
 

 ##########
 File path: airflow/ti_deps/deps/branch_dep.py
 ##########
 @@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+
+
+class BranchDep(BaseTIDep):
+    """
+    Determines if any of the task's direct upstream relatives have decided this task should
+    be skipped.
+    """
+    NAME = "Branch Rule"
+    IGNORABLE = True
+    IS_TASK_DEP = True
+
+    def _get_dep_statuses(self, ti, session, dep_context=None):
+        from airflow.models.skipmixin import SkipMixin
+        from airflow.utils.state import State
+
+        upstream = ti.task.get_direct_relatives(upstream=True)
+
+        finished_tasks = dep_context.finished_tasks if dep_context else None
+        if finished_tasks is None:
+            # this is for the strange feature of running tasks without dag_run
+            finished_tasks = ti.task.dag.get_task_instances(
+                start_date=ti.execution_date,
+                end_date=ti.execution_date,
+                state=State.finished() + [State.UPSTREAM_FAILED],
+                session=session)
+
+        finished_task_ids = {t.task_id for t in finished_tasks}
+
+        for parent in upstream:
+            if isinstance(parent, SkipMixin):
+                xcom_value = ti.xcom_pull(task_ids=parent.task_id)
 
 Review comment:
   > we should use something other than the default key.
   
   Oh. Ash ist totally right @yugian90 I missed that. We cannot use the default key.
   
   > Waait what. I'm really not sure about this -- I don't see from the problem you've describe why we have to touch XCom at all.
   
   Why don't you want to use xcom for this?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r381943188
 
 

 ##########
 File path: airflow/models/skipmixin.py
 ##########
 @@ -65,13 +68,53 @@ def skip(self, dag_run, execution_date, tasks, session=None):
                 ti.end_date = now
                 session.merge(ti)
 
-            session.commit()
+    @provide_session
+    def skip(
+        self, dag_run, execution_date, tasks, session=None,
+    ):
+        """
+        Sets tasks instances to skipped from the same dag run.
+        If `task_id` is a valid attribute, store the list of skipped task IDs to XCom
+        so that NotPreviouslySkippedDep knows these tasks should be skipped when they
+        are cleared.
+
+        :param dag_run: the DagRun for which to set the tasks to skipped
+        :param execution_date: execution_date
+        :param tasks: tasks to skip (not task_ids)
+        :param session: db session to use
+        """
+        if not tasks:
+            return
+
+        self._set_state_to_skipped(dag_run, execution_date, tasks, session)
+        session.commit()
+
+        # SkipMixin may not necessarily have a task_id attribute. Only store to XCom if one is available.
+        try:
+            task_id = self.task_id
+        except AttributeError:
+            task_id = None
+
+        if task_id is not None:
+            from airflow.models.xcom import XCom
+
+            XCom.set(
+                key=XCOM_SKIPMIXIN_KEY,
+                value={XCOM_SKIPMIXIN_SKIPPED: [d.task_id for d in tasks]},
+                task_id=task_id,
+                dag_id=dag_run.dag_id,
+                execution_date=dag_run.execution_date,
+            )
 
-    def skip_all_except(self, ti: TaskInstance, branch_task_ids: Union[str, Iterable[str]]):
+    def skip_all_except(
+        self, ti: TaskInstance, branch_task_ids: Union[str, Iterable[str]]
+    ):
         """
         This method implements the logic for a branching operator; given a single
         task ID or list of task IDs to follow, this skips all other tasks
-        immediately downstream of this operator.
+        immediately downstream of this operator. branch_task_ids is stored to XCom so that
 
 Review comment:
   ```suggestion
           immediately downstream of this operator.
           
           branch_task_ids is stored to XCom so that
   ```
   
   (Plus reformat.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on issue #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on issue #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#issuecomment-587230400
 
 
   Hi, @ashb  thanks for pointing out. I've fixed those. 
   
   Travis is still unhappy, but I'm pretty sure this is unrelated to my change:

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#issuecomment-581100536
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=h1) Report
   > Merging [#7276](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/946bdc23c039637b0383e1269f99bdd1b2426565?src=pr&el=desc) will **increase** coverage by `53.42%`.
   > The diff coverage is `100%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7276/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##           master    #7276       +/-   ##
   ===========================================
   + Coverage   32.95%   86.38%   +53.42%     
   ===========================================
     Files         878      879        +1     
     Lines       41188    41228       +40     
   ===========================================
   + Hits        13573    35614    +22041     
   + Misses      27615     5614    -22001
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/ti\_deps/deps/trigger\_rule\_dep.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy90aV9kZXBzL2RlcHMvdHJpZ2dlcl9ydWxlX2RlcC5weQ==) | `91.02% <ø> (+74.77%)` | :arrow_up: |
   | [airflow/models/baseoperator.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvYmFzZW9wZXJhdG9yLnB5) | `96.52% <100%> (+38.94%)` | :arrow_up: |
   | [airflow/ti\_deps/dep\_context.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy90aV9kZXBzL2RlcF9jb250ZXh0LnB5) | `100% <100%> (+29.03%)` | :arrow_up: |
   | [airflow/ti\_deps/deps/not\_previously\_skipped\_dep.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy90aV9kZXBzL2RlcHMvbm90X3ByZXZpb3VzbHlfc2tpcHBlZF9kZXAucHk=) | `100% <100%> (ø)` | |
   | [airflow/models/skipmixin.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvc2tpcG1peGluLnB5) | `97.91% <100%> (+73.52%)` | :arrow_up: |
   | [airflow/operators/python.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvcHl0aG9uLnB5) | `95.17% <100%> (+54.2%)` | :arrow_up: |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `47.18% <0%> (-39.44%)` | :arrow_down: |
   | [...viders/cncf/kubernetes/operators/kubernetes\_pod.py](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvY25jZi9rdWJlcm5ldGVzL29wZXJhdG9ycy9rdWJlcm5ldGVzX3BvZC5weQ==) | `69.38% <0%> (-25.52%)` | :arrow_down: |
   | ... and [755 more](https://codecov.io/gh/apache/airflow/pull/7276/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=footer). Last update [946bdc2...8f1b843](https://codecov.io/gh/apache/airflow/pull/7276?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 closed pull request #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 closed pull request #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator
URL: https://github.com/apache/airflow/pull/7276
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] tooptoop4 commented on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator

Posted by GitBox <gi...@apache.org>.
tooptoop4 commented on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator
URL: https://github.com/apache/airflow/pull/7276#issuecomment-579455800
 
 
   hmm, I use BranchPythonOperator and clear command a lot and I never see the skipped tasks just run on clear. do u use none_failed ? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on issue #7276: [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator
URL: https://github.com/apache/airflow/pull/7276#issuecomment-579577040
 
 
   @tooptoop4 the problem happens when you clear the skipped task (not when you clear the BranchPythonOperator). There's an example in the linked JIRA.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on issue #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on issue #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#issuecomment-585622695
 
 
   Hi, @feluelle  I've adopted all your suggestions (adding doc and raise ValueError), and the PR has been updated. Please take another look. Thank you!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r381942951
 
 

 ##########
 File path: airflow/models/skipmixin.py
 ##########
 @@ -65,13 +68,53 @@ def skip(self, dag_run, execution_date, tasks, session=None):
                 ti.end_date = now
                 session.merge(ti)
 
-            session.commit()
+    @provide_session
+    def skip(
+        self, dag_run, execution_date, tasks, session=None,
+    ):
+        """
+        Sets tasks instances to skipped from the same dag run.
+        If `task_id` is a valid attribute, store the list of skipped task IDs to XCom
+        so that NotPreviouslySkippedDep knows these tasks should be skipped when they
+        are cleared.
+
+        :param dag_run: the DagRun for which to set the tasks to skipped
+        :param execution_date: execution_date
+        :param tasks: tasks to skip (not task_ids)
+        :param session: db session to use
+        """
+        if not tasks:
+            return
+
+        self._set_state_to_skipped(dag_run, execution_date, tasks, session)
+        session.commit()
+
+        # SkipMixin may not necessarily have a task_id attribute. Only store to XCom if one is available.
+        try:
+            task_id = self.task_id
+        except AttributeError:
+            task_id = None
+
+        if task_id is not None:
+            from airflow.models.xcom import XCom
+
+            XCom.set(
+                key=XCOM_SKIPMIXIN_KEY,
+                value={XCOM_SKIPMIXIN_SKIPPED: [d.task_id for d in tasks]},
+                task_id=task_id,
+                dag_id=dag_run.dag_id,
+                execution_date=dag_run.execution_date,
 
 Review comment:
   ```suggestion
                   execution_date=dag_run.execution_date,
                   session=session
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] feluelle commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r378807358
 
 

 ##########
 File path: airflow/ti_deps/deps/branch_dep.py
 ##########
 @@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+
+
+class BranchDep(BaseTIDep):
+    """
+    Determines if any of the task's direct upstream relatives have decided this task should
+    be skipped.
+    """
+    NAME = "Branch Rule"
+    IGNORABLE = True
+    IS_TASK_DEP = True
+
+    def _get_dep_statuses(self, ti, session, dep_context=None):
+        from airflow.models.skipmixin import SkipMixin
+        from airflow.utils.state import State
+
+        upstream = ti.task.get_direct_relatives(upstream=True)
+
+        finished_tasks = dep_context.finished_tasks if dep_context else None
+        if finished_tasks is None:
+            # this is for the strange feature of running tasks without dag_run
+            finished_tasks = ti.task.dag.get_task_instances(
+                start_date=ti.execution_date,
+                end_date=ti.execution_date,
+                state=State.finished() + [State.UPSTREAM_FAILED],
+                session=session)
+
+        finished_task_ids = {t.task_id for t in finished_tasks}
+
+        for parent in upstream:
+            if isinstance(parent, SkipMixin):
+                xcom_value = ti.xcom_pull(task_ids=parent.task_id)
 
 Review comment:
   > we should use something other than the default key.
   
   Oh. Ash ist totally right @yuqian90 I missed that. We cannot use the default key.
   
   > Waait what. I'm really not sure about this -- I don't see from the problem you've describe why we have to touch XCom at all.
   
   Why don't you want to use xcom for this?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379326946
 
 

 ##########
 File path: airflow/ti_deps/deps/branch_dep.py
 ##########
 @@ -0,0 +1,63 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
+
+
+class BranchDep(BaseTIDep):
+    """
+    Determines if any of the task's direct upstream relatives have decided this task should
+    be skipped.
+    """
+    NAME = "Branch Rule"
+    IGNORABLE = True
+    IS_TASK_DEP = True
+
+    def _get_dep_statuses(self, ti, session, dep_context=None):
+        from airflow.models.skipmixin import SkipMixin
+        from airflow.utils.state import State
+
+        upstream = ti.task.get_direct_relatives(upstream=True)
+
+        finished_tasks = dep_context.finished_tasks if dep_context else None
+        if finished_tasks is None:
+            # this is for the strange feature of running tasks without dag_run
+            finished_tasks = ti.task.dag.get_task_instances(
 
 Review comment:
   This is now the 2nd? 3rd? time this block of code exists, please could you refactor it to a method on dep_context and use it in all the places this is called (Please rebase to master too, as there is a new one since you opened this PR)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r379334969
 
 

 ##########
 File path: airflow/models/baseoperator.py
 ##########
 @@ -650,6 +651,7 @@ def deps(self) -> Set[BaseTIDep]:
             NotInRetryPeriodDep(),
             PrevDagrunDep(),
             TriggerRuleDep(),
+            BranchDep(),
 
 Review comment:
   ```suggestion
               NotPreviouslySkipped(),
   ```
   
   There are more ways of skipping than just branches.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] feluelle commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r378244504
 
 

 ##########
 File path: tests/operators/test_python.py
 ##########
 @@ -541,6 +542,62 @@ def test_xcom_push(self):
                 self.assertEqual(
                     ti.xcom_pull(task_ids='make_choice'), 'branch_1')
 
+    def test_clear_skipped_downstream_task(self):
+        """
+        After a downstream task is skipped by BranchPythonOperator, clearing the skipped task
+        should not cause it to be executed.
+        """
+        branch_op = BranchPythonOperator(task_id='make_choice',
+                                         dag=self.dag,
+                                         python_callable=lambda: 'branch_1')
+        branches = [self.branch_1, self.branch_2]
+        branch_op >> branches
+        self.dag.clear()
+
+        dr = self.dag.create_dagrun(
+            run_id="manual__",
+            start_date=timezone.utcnow(),
+            execution_date=DEFAULT_DATE,
+            state=State.RUNNING
+        )
+
+        branch_op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        for task in branches:
+            task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        tis = dr.get_task_instances()
+        for ti in tis:
+            if ti.task_id == 'make_choice':
+                self.assertEqual(ti.state, State.SUCCESS)
+            elif ti.task_id == 'branch_1':
+                self.assertEqual(ti.state, State.SUCCESS)
+            elif ti.task_id == 'branch_2':
+                self.assertEqual(ti.state, State.SKIPPED)
+            else:
+                raise Exception
 
 Review comment:
   ```suggestion
                   raise ValueError(f'Invalid task id {ti.task_id} found!')
   ```
   
   WDYT?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7276: [AIRFLOW-5391] Do not run skipped tasks when they are cleared
URL: https://github.com/apache/airflow/pull/7276#discussion_r380119916
 
 

 ##########
 File path: airflow/models/skipmixin.py
 ##########
 @@ -65,13 +61,44 @@ def skip(self, dag_run, execution_date, tasks, session=None):
                 ti.end_date = now
                 session.merge(ti)
 
-            session.commit()
+    @provide_session
+    def skip(
+        self,
+        dag_run,
+        execution_date,
+        tasks,
+        session=None,
+        ti: Optional[TaskInstance] = None,
+    ):
+        """
+        Sets tasks instances to skipped from the same dag run. If ti is given, store the list of
+        skipped task IDs to XCom so that NotPreviouslySkippedDep knows these tasks should be skipped
+        when they are cleared.
+
+        :param dag_run: the DagRun for which to set the tasks to skipped
+        :param execution_date: execution_date
+        :param tasks: tasks to skip (not task_ids)
+        :param session: db session to use
+        :param ti: The TaskInstance that initiates the skip. Used for storing results to XCom.
+            If not given, skipped task IDs will not be stored to XCom.
+        """
+        if not tasks:
+            return
+
+        self._set_state_to_skipped(dag_run, execution_date, tasks, session)
+        session.commit()
+        if ti:
 
 Review comment:
   We can avoid needing a TI here, and thus not have to change the fn sigature by calling `XCom.set` directly:
   
   ```
   XCom.set(
       key=XCOM_SKIPMIXIN_KEY,
       value={"skipped": [d.task_id for d in tasks],
       execution_date=execution_date,
       task_id=self.task_id,
       dag_id=dag_run.dag_id,
       session=session
   )
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services