You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/24 08:10:14 UTC

[GitHub] [airflow] yuqian90 opened a new pull request #8992: [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276)

yuqian90 opened a new pull request #8992:
URL: https://github.com/apache/airflow/pull/8992


   This PR is backported from https://github.com/apache/airflow/pull/7276. The original commit was merged into master but not released in v1-10-*. This PR fixes a few minor merge conflicts and python2.7 compatibility issues and port it to v1-10-test.
   
   
   If a task is skipped by BranchPythonOperator, BaseBranchOperator or ShortCircuitOperator and the user then clears the skipped task later, it'll execute. This is probably not the right
   behaviour.
   
   This commit changes that so it will be skipped again. This can be ignored by running the task again with "Ignore Task Deps" override.
   
   (cherry picked from commit 1cdab56a6192f69962506b7ff632c986c84eb10d)
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #8992: [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276)

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #8992:
URL: https://github.com/apache/airflow/pull/8992#issuecomment-657072964


   > Hi @yuqian90, apologies this won't make it to 1.10.11 as the LatestOnlyOperator causes a change in behaviour. Is it possible for you to achieve this without changing the behaviour (talking about the note in Updating.md) or adding a flag to have old behavior vs new behaviour (default would be old behaviour).
   
   Hi, @kaxil sorry for the late response. I have backed out https://github.com/apache/airflow/pull/5970 which as you have pointed out slightly changes the behaviour or `LatestOnlyOperator`.
   
   I put the test fix inside the test instead (call create_dagrun() before calling `LatestOnlyOperator.execute()`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #8992: [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276)

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #8992:
URL: https://github.com/apache/airflow/pull/8992#issuecomment-650620677


   > > @yuqian90 tests are failing, can you take a look please
   > 
   > Hi @kaxil thank you. I looked into it. I think `tests/operators/test_latest_only_operator.py:test_run` is the only one related to my change.
   > 
   > The reason `test_latest_only_operator.py:test_run` fails in this PR but not in master is that master has another commit #5970. So I cherry-picked #5970 into my PR as well. `test_latest_only_operator.py:test_run` should now pass.
   > 
   > ```
   >             XCom.set(
   >                 key=XCOM_SKIPMIXIN_KEY,
   >                 value={XCOM_SKIPMIXIN_SKIPPED: [d.task_id for d in tasks]},
   >                 task_id=task_id,
   > >               dag_id=dag_run.dag_id,
   >                 execution_date=dag_run.execution_date,
   >                 session=session
   >             )
   > E           AttributeError: 'NoneType' object has no attribute 'dag_id'
   > ```
   
   Thanks. Currently, the CI is failing on v1-10-test too because of k8s tests. Will take a look tomorrow and re-runs the tests


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #8992: [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276)

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8992:
URL: https://github.com/apache/airflow/pull/8992#discussion_r443552728



##########
File path: tests/ti_deps/deps/test_trigger_rule_dep.py
##########
@@ -19,13 +19,13 @@
 
 import unittest
 from datetime import datetime
+from unittest.mock import Mock
 
 from airflow.models import BaseOperator, TaskInstance
 from airflow.utils.trigger_rule import TriggerRule
 from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
 from airflow.utils.db import create_session
 from airflow.utils.state import State

Review comment:
       ```suggestion
   
   from airflow.models import BaseOperator, TaskInstance
   from airflow.utils.trigger_rule import TriggerRule
   from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
   from airflow.utils.db import create_session
   from airflow.utils.state import State
   from tests.compat import Mock
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #8992: [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276)

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #8992:
URL: https://github.com/apache/airflow/pull/8992


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #8992: [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276)

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #8992:
URL: https://github.com/apache/airflow/pull/8992#issuecomment-653022789


   Hi @yuqian90, apologies this won't make it to 1.10.11 as the LatestOnlyOperator causes a change in behaviour. Is it possible for you to achieve this without changing the behaviour (talking about the note in Updating.md) or adding a flag to have old behavior vs new behaviour (default would be old behaviour).
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #8992: [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276)

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #8992:
URL: https://github.com/apache/airflow/pull/8992#issuecomment-662364794


   > Can you add a note in Updating.md and this is still a minor change in behavior for users who were clearing the skipped tasks intentionally to run it.
   
   Hi @kaxil  thanks for the suggestion. I've added a note in UPDATING.md.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #8992: [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276)

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8992:
URL: https://github.com/apache/airflow/pull/8992#discussion_r458137872



##########
File path: tests/ti_deps/deps/test_not_previously_skipped_dep.py
##########
@@ -0,0 +1,133 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pendulum
+
+from airflow.models import DAG, TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.python_operator import BranchPythonOperator
+from airflow.ti_deps.dep_context import DepContext
+from airflow.ti_deps.deps.not_previously_skipped_dep import NotPreviouslySkippedDep
+from airflow.utils.db import create_session
+from airflow.utils.state import State
+
+
+def test_no_parent():
+    """
+    A simple DAG with a single task. NotPreviouslySkippedDep is met.
+    """
+    start_date = pendulum.datetime(2020, 1, 1)
+    dag = DAG("test_test_no_parent_dag", schedule_interval=None, start_date=start_date)
+    op1 = DummyOperator(task_id="op1", dag=dag)
+
+    ti1 = TaskInstance(op1, start_date)
+
+    with create_session() as session:
+        dep = NotPreviouslySkippedDep()
+        assert len(list(dep.get_dep_statuses(ti1, session, DepContext()))) == 0
+        assert dep.is_met(ti1, session)
+        assert ti1.state != State.SKIPPED
+
+
+def test_no_skipmixin_parent():
+    """
+    A simple DAG with no branching. Both op1 and op2 are DummyOperator. NotPreviouslySkippedDep is met.
+    """
+    start_date = pendulum.datetime(2020, 1, 1)
+    dag = DAG(
+        "test_no_skipmixin_parent_dag", schedule_interval=None, start_date=start_date
+    )
+    op1 = DummyOperator(task_id="op1", dag=dag)
+    op2 = DummyOperator(task_id="op2", dag=dag)
+    op1 >> op2
+
+    ti2 = TaskInstance(op2, start_date)
+
+    with create_session() as session:
+        dep = NotPreviouslySkippedDep()
+        assert len(list(dep.get_dep_statuses(ti2, session, DepContext()))) == 0
+        assert dep.is_met(ti2, session)
+        assert ti2.state != State.SKIPPED
+
+
+def test_parent_follow_branch():
+    """
+    A simple DAG with a BranchPythonOperator that follows op2. NotPreviouslySkippedDep is met.
+    """
+    start_date = pendulum.datetime(2020, 1, 1)
+    dag = DAG(
+        "test_parent_follow_branch_dag", schedule_interval=None, start_date=start_date
+    )
+    op1 = BranchPythonOperator(task_id="op1", python_callable=lambda: "op2", dag=dag)
+    op2 = DummyOperator(task_id="op2", dag=dag)
+    op1 >> op2
+
+    TaskInstance(op1, start_date).run()
+    ti2 = TaskInstance(op2, start_date)
+
+    with create_session() as session:
+        dep = NotPreviouslySkippedDep()
+        assert len(list(dep.get_dep_statuses(ti2, session, DepContext()))) == 0
+        assert dep.is_met(ti2, session)
+        assert ti2.state != State.SKIPPED
+
+
+def test_parent_skip_branch():
+    """
+    A simple DAG with a BranchPythonOperator that does not follow op2. NotPreviouslySkippedDep is not met.
+    """
+    start_date = pendulum.datetime(2020, 1, 1)
+    dag = DAG(
+        "test_parent_skip_branch_dag", schedule_interval=None, start_date=start_date
+    )
+    op1 = BranchPythonOperator(task_id="op1", python_callable=lambda: "op3", dag=dag)
+    op2 = DummyOperator(task_id="op2", dag=dag)
+    op3 = DummyOperator(task_id="op3", dag=dag)
+    op1 >> [op2, op3]
+
+    TaskInstance(op1, start_date).run()
+    ti2 = TaskInstance(op2, start_date)
+
+    with create_session() as session:
+        dep = NotPreviouslySkippedDep()
+        assert len(list(dep.get_dep_statuses(ti2, session, DepContext()))) == 1
+        assert not dep.is_met(ti2, session)
+        assert ti2.state == State.SKIPPED

Review comment:
       `NotPreviouslySkippedDep` is a confusing term for this case. The task is about to be skipped in this DagRun but if this task ran in the previous DagRun, `NotPreviouslySkippedDep` should be True based on the name of the dependency. Although it would be `False` based on the implementation.
   
   Is my interpretation correct @yuqian90 ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 edited a comment on pull request #8992: [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276)

Posted by GitBox <gi...@apache.org>.
yuqian90 edited a comment on pull request #8992:
URL: https://github.com/apache/airflow/pull/8992#issuecomment-657072964


   > Hi @yuqian90, apologies this won't make it to 1.10.11 as the LatestOnlyOperator causes a change in behaviour. Is it possible for you to achieve this without changing the behaviour (talking about the note in Updating.md) or adding a flag to have old behavior vs new behaviour (default would be old behaviour).
   
   Hi, @kaxil sorry for the late response. I have backed out https://github.com/apache/airflow/pull/5970 which as you have pointed out slightly changes the behaviour or `LatestOnlyOperator`.
   
   I put the test fix inside the test instead (call create_dagrun() before calling `LatestOnlyOperator.execute()`.
   
   Looks like I missed 1.10.11. Hopefully this can make it for 1.10.12. Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #8992: [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276)

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #8992:
URL: https://github.com/apache/airflow/pull/8992#issuecomment-648282668


   Doc tests are failing 🤔 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #8992: [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276)

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #8992:
URL: https://github.com/apache/airflow/pull/8992#discussion_r453200860



##########
File path: tests/operators/test_latest_only_operator.py
##########
@@ -47,15 +47,40 @@ def get_task_instances(task_id):
 
 
 class LatestOnlyOperatorTest(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        from tests.compat import MagicMock
+        from airflow.jobs import SchedulerJob
 
-    def setUp(self):
-        super(LatestOnlyOperatorTest, self).setUp()
-        self.dag = DAG(
+        cls.dag = DAG(

Review comment:
       This fix is needed for the test because the original test calls `LatestOnlyOperator.execute()` without first creating a dagrun. Also the `create_dag_run` is moved to `setUpClass` so that it's called only once for the whole class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #8992: [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276)

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #8992:
URL: https://github.com/apache/airflow/pull/8992#issuecomment-650531210


   > @yuqian90 tests are failing, can you take a look please
   
   Hi @kaxil  thank you. I looked into it. I think `tests/operators/test_latest_only_operator.py:test_run` is the only one related to my change.
   
   The reason `test_latest_only_operator.py:test_run` fails in this PR but not in master is that master has another commit https://github.com/apache/airflow/pull/5970. So I cherry-picked https://github.com/apache/airflow/pull/5970 into my PR as well. `test_latest_only_operator.py:test_run` should now pass.
   
   ```
               XCom.set(
                   key=XCOM_SKIPMIXIN_KEY,
                   value={XCOM_SKIPMIXIN_SKIPPED: [d.task_id for d in tasks]},
                   task_id=task_id,
   >               dag_id=dag_run.dag_id,
                   execution_date=dag_run.execution_date,
                   session=session
               )
   E           AttributeError: 'NoneType' object has no attribute 'dag_id'
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #8992: [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276)

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #8992:
URL: https://github.com/apache/airflow/pull/8992#issuecomment-662427191


   Thanks @yuqian90 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on a change in pull request #8992: [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276)

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #8992:
URL: https://github.com/apache/airflow/pull/8992#discussion_r443886553



##########
File path: tests/ti_deps/deps/test_trigger_rule_dep.py
##########
@@ -19,13 +19,13 @@
 
 import unittest
 from datetime import datetime
+from unittest.mock import Mock
 
 from airflow.models import BaseOperator, TaskInstance
 from airflow.utils.trigger_rule import TriggerRule
 from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
 from airflow.utils.db import create_session
 from airflow.utils.state import State

Review comment:
       Thanks for fixing this!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #8992: [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276)

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #8992:
URL: https://github.com/apache/airflow/pull/8992#issuecomment-648540534


   > > Doc tests are failing 🤔
   > 
   > Hi @kaxil i think the doc test is failing because of this error. I rebased the branch and pushed again. Let's see how that goes.
   > 
   > ```
   > https://github.com/kaxil/airflow.git Branch v1-10-test_only_AIRFLOW-5391 does not exist
   > ```
   
   Now this is the error. It doesn't look related to this PR. @kaxil  any suggestions how to get around?
   
   ```
   Executing cmd:  sphinx-build -b html -d _build/doctrees --color -w /tmp/tmp5nm20i10 . _build/html
   Running Sphinx v3.1.1
   /opt/airflow/docs/exts/exampleinclude.py:254: RemovedInSphinx40Warning: The app.add_stylesheet() is deprecated. Please use app.add_css_file() instead.
     app.add_stylesheet('exampleinclude.css')
   /usr/local/lib/python3.6/importlib/__init__.py:126: RemovedInSphinx40Warning: sphinx.ext.autodoc.importer.mock is deprecated. Check CHANGES for Sphinx API modifications.
     return _bootstrap._gcd_import(name[level:], package, level)
   making output directory... done
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 commented on pull request #8992: [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276)

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on pull request #8992:
URL: https://github.com/apache/airflow/pull/8992#issuecomment-648537469


   > Doc tests are failing 🤔
   
   Hi @kaxil  i think the doc test is failing because of this error. I rebased the branch and pushed again. Let's see how that goes.
   ```
   https://github.com/kaxil/airflow.git Branch v1-10-test_only_AIRFLOW-5391 does not exist
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #8992: [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276)

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #8992:
URL: https://github.com/apache/airflow/pull/8992#discussion_r443553008



##########
File path: tests/ti_deps/deps/test_trigger_rule_dep.py
##########
@@ -19,13 +19,13 @@
 
 import unittest
 from datetime import datetime
+from unittest.mock import Mock
 
 from airflow.models import BaseOperator, TaskInstance
 from airflow.utils.trigger_rule import TriggerRule
 from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
 from airflow.utils.db import create_session
 from airflow.utils.state import State

Review comment:
       This is because we still support Python 2 in 1.10.*




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yuqian90 closed pull request #8992: [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276)

Posted by GitBox <gi...@apache.org>.
yuqian90 closed pull request #8992:
URL: https://github.com/apache/airflow/pull/8992


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #8992: [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276)

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #8992:
URL: https://github.com/apache/airflow/pull/8992#issuecomment-650443760


   @yuqian90 tests are failing, can you take a look please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org