You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/06/16 08:45:14 UTC

[GitHub] [airflow] yyhecust opened a new pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

yyhecust opened a new pull request #16475:
URL: https://github.com/apache/airflow/pull/16475


   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   related: #16452
   Rerun failed state task_instance was supported in the original Backfill API, this commit adds a feature to the Backfill API that also supports reruning succeeded state task_instance.
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r686003451



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -478,7 +478,7 @@ def _per_task_process(key, ti, session=None):
                         ti.set_state(State.SCHEDULED, session=session)
                 elif self.rerun_succeeded_tasks and ti.state == State.SUCCESS:
                     # Rerun succeeded tasks
-                    self.log.info("Task instance %s with state %s", ti, ti.state)
+                    self.log.info("Task instance %s with state %s, rerunning task", ti, ti.state)

Review comment:
       ```suggestion
                       self.log.info("Task instance %s with state %s, rerunning task" succeeded, ti, ti.state)
   ```
   Would that be better?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r684746710



##########
File path: tests/jobs/test_backfill_job.py
##########
@@ -56,47 +54,48 @@
 DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 
 
-class TestBackfillJob(unittest.TestCase):
-    def _get_dummy_dag(self, dag_id, pool=Pool.DEFAULT_POOL_NAME, task_concurrency=None):
-        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
-
-        with dag:
-            DummyOperator(task_id='op', pool=pool, task_concurrency=task_concurrency, dag=dag)
-
-        dag.clear()
-        return dag
-
-    def _times_called_with(self, method, class_):
-        count = 0
-        for args in method.call_args_list:
-            if isinstance(args[0][0], class_):
-                count += 1
-        return count
-
-    @classmethod
-    def setUpClass(cls):
-        cls.dagbag = DagBag(include_examples=True)
-
+@pytest.fixture(scope="module")
+def dag_bag():
+    return DagBag(include_examples=True)
+  
+  
+class TestBackfillJob:
     @staticmethod
     def clean_db():
         clear_db_dags()
         clear_db_runs()
         clear_db_pools()
 
-    def setUp(self):
+    @pytest.fixture(autouse=True)
+    def set_instance_attrs(self, dag_bag):
         self.clean_db()
         self.parser = cli_parser.get_parser()
+        self.dagbag = dag_bag
+        
+    def _get_dummy_dag(
+        self,
+        dag_maker_fixture,
+        dag_id='test_dag',
+        pool=Pool.DEFAULT_POOL_NAME,
+        task_concurrency=None,
+        task_id='op',
+        **kwargs,
+    ):
+        with dag_maker_fixture(dag_id=dag_id, schedule_interval='@daily', **kwargs) as dag:
+            DummyOperator(task_id=task_id, pool=pool, task_concurrency=task_concurrency)
 
-    def tearDown(self) -> None:
-        self.clean_db()
+        return dag
 
-    def test_unfinished_dag_runs_set_to_failed(self):
-        dag = self._get_dummy_dag('dummy_dag')
+    def _times_called_with(self, method, class_):
+        count = 0
+        for args in method.call_args_list:
+            if isinstance(args[0][0], class_):
+                count += 1
+        return count

Review comment:
       ```suggestion
           return sum(1 for args in method.call_args_list if isinstance(args[0][0], class_))
   ```
   
   A common trick :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r723801595



##########
File path: airflow/cli/cli_parser.py
##########
@@ -909,7 +917,9 @@ class GroupCommand(NamedTuple):
             "Run subsections of a DAG for a specified date range. If reset_dag_run option is used, "
             "backfill will first prompt users whether airflow should clear all the previous dag_run and "
             "task_instances within the backfill date range. If rerun_failed_tasks is used, backfill "
-            "will auto re-run the previous failed task instances  within the backfill date range"
+            "will auto re-run the previous failed task instances  within the backfill date range. If "

Review comment:
       ```suggestion
               "will auto re-run the previous failed task instances within the backfill date range. If "
   ```

##########
File path: airflow/cli/commands/dag_command.py
##########
@@ -122,13 +123,15 @@ def dag_backfill(args, dag=None):
                 verbose=args.verbose,
                 conf=run_conf,
                 rerun_failed_tasks=args.rerun_failed_tasks,
+                rerun_succeeded_task=args.rerun_succeeded_tasks,
                 run_backwards=args.run_backwards,
             )
         except ValueError as vr:
             print(str(vr))
             sys.exit(1)
 
 
+

Review comment:
       I think linter would complain about this extra empty line?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#issuecomment-891034412


   It seems the rebase went wrong, please try again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#issuecomment-937394114


   A couple of nitpicks, lgtm in general.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yyhecust commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
yyhecust commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r661362044



##########
File path: airflow/models/dag.py
##########
@@ -1662,6 +1662,7 @@ def run(
         verbose=False,
         conf=None,
         rerun_failed_tasks=False,
+        rerun_succeeded_tasks=False,

Review comment:
       I have add a test_cli_backfill_rerun_succeeded_tasks func in test_dag_command.py,does it meet your requirements? 
   
    




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#issuecomment-862174530


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, pylint and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better 🚀.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r684746710



##########
File path: tests/jobs/test_backfill_job.py
##########
@@ -56,47 +54,48 @@
 DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 
 
-class TestBackfillJob(unittest.TestCase):
-    def _get_dummy_dag(self, dag_id, pool=Pool.DEFAULT_POOL_NAME, task_concurrency=None):
-        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
-
-        with dag:
-            DummyOperator(task_id='op', pool=pool, task_concurrency=task_concurrency, dag=dag)
-
-        dag.clear()
-        return dag
-
-    def _times_called_with(self, method, class_):
-        count = 0
-        for args in method.call_args_list:
-            if isinstance(args[0][0], class_):
-                count += 1
-        return count
-
-    @classmethod
-    def setUpClass(cls):
-        cls.dagbag = DagBag(include_examples=True)
-
+@pytest.fixture(scope="module")
+def dag_bag():
+    return DagBag(include_examples=True)
+  
+  
+class TestBackfillJob:
     @staticmethod
     def clean_db():
         clear_db_dags()
         clear_db_runs()
         clear_db_pools()
 
-    def setUp(self):
+    @pytest.fixture(autouse=True)
+    def set_instance_attrs(self, dag_bag):
         self.clean_db()
         self.parser = cli_parser.get_parser()
+        self.dagbag = dag_bag
+        
+    def _get_dummy_dag(
+        self,
+        dag_maker_fixture,
+        dag_id='test_dag',
+        pool=Pool.DEFAULT_POOL_NAME,
+        task_concurrency=None,
+        task_id='op',
+        **kwargs,
+    ):
+        with dag_maker_fixture(dag_id=dag_id, schedule_interval='@daily', **kwargs) as dag:
+            DummyOperator(task_id=task_id, pool=pool, task_concurrency=task_concurrency)
 
-    def tearDown(self) -> None:
-        self.clean_db()
+        return dag
 
-    def test_unfinished_dag_runs_set_to_failed(self):
-        dag = self._get_dummy_dag('dummy_dag')
+    def _times_called_with(self, method, class_):
+        count = 0
+        for args in method.call_args_list:
+            if isinstance(args[0][0], class_):
+                count += 1
+        return count

Review comment:
       ```suggestion
           return sum(1 for args, _ in method.call_args_list if isinstance(args[0], class_))
   ```
   
   A common trick :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yyhecust commented on pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
yyhecust commented on pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#issuecomment-863713860


   > Add tests
   
   ok


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r723801733



##########
File path: airflow/cli/commands/dag_command.py
##########
@@ -122,13 +123,15 @@ def dag_backfill(args, dag=None):
                 verbose=args.verbose,
                 conf=run_conf,
                 rerun_failed_tasks=args.rerun_failed_tasks,
+                rerun_succeeded_task=args.rerun_succeeded_tasks,
                 run_backwards=args.run_backwards,
             )
         except ValueError as vr:
             print(str(vr))
             sys.exit(1)
 
 
+

Review comment:
       I think linter would complain about this extra empty line?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yyhecust commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
yyhecust commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r685942656



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -471,6 +476,13 @@ def _per_task_process(key, ti, session=None):
                             ti_status.running.pop(key)
                         # Reset the failed task in backfill to scheduled state
                         ti.set_state(State.SCHEDULED, session=session)
+                elif self.rerun_succeeded_tasks and ti.state == State.SUCCESS:
+                    # Rerun succeeded tasks
+                    self.log.info("Task instance {ti} with state {state}".format(ti=ti, state=ti.state))

Review comment:
       I have updated log message in recent commit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yyhecust commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
yyhecust commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r685802843



##########
File path: tests/jobs/test_backfill_job.py
##########
@@ -929,8 +960,11 @@ def run_backfill(cond):
                     dag_maker.create_dagrun(
                         # Existing dagrun that is not within the backfill range
                         run_id=run_id,
+                        state=State.RUNNING,
                         execution_date=DEFAULT_DATE + datetime.timedelta(hours=1),
+                        start_date=DEFAULT_DATE,

Review comment:
       Yes,I will resolve these problems, thank you




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#issuecomment-896018462


   @ash, @kaxil what do you think about this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r652529503



##########
File path: airflow/cli/cli_parser.py
##########
@@ -884,7 +893,9 @@ class GroupCommand(NamedTuple):
             "Run subsections of a DAG for a specified date range. If reset_dag_run option is used, "
             "backfill will first prompt users whether airflow should clear all the previous dag_run and "
             "task_instances within the backfill date range. If rerun_failed_tasks is used, backfill "
-            "will auto re-run the previous failed task instances  within the backfill date range"
+            "will auto re-run the previous failed task instances  within the backfill date range. If "
+            "rerun_succeeded_tasks is used, backfill will auto re-run the previous failed task instances "

Review comment:
       ```suggestion
               "rerun_succeeded_tasks is used, backfill will auto re-run the previous succeeded task instances "
   ```
   I think that's what you want to do, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#issuecomment-980472115


   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] closed pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #16475:
URL: https://github.com/apache/airflow/pull/16475


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yyhecust commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
yyhecust commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r685648816



##########
File path: airflow/cli/cli_parser.py
##########
@@ -277,6 +277,15 @@ def _check(value):
     ),
     action="store_true",
 )
+ARG_RERUN_SUCCEED_TASKS = Arg(
+    ("--rerun-succeeded-tasks",),
+    help=(
+        "if set, the backfill will auto-rerun "
+        "all the succeeded tasks for the backfill date range "
+        "instead of throwing exceptions"

Review comment:
       and i can remove it, thank you




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yyhecust commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
yyhecust commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r685648318



##########
File path: airflow/cli/cli_parser.py
##########
@@ -277,6 +277,15 @@ def _check(value):
     ),
     action="store_true",
 )
+ARG_RERUN_SUCCEED_TASKS = Arg(
+    ("--rerun-succeeded-tasks",),
+    help=(
+        "if set, the backfill will auto-rerun "
+        "all the succeeded tasks for the backfill date range "
+        "instead of throwing exceptions"

Review comment:
       This is written in imitation of the rerun_failed_tasks function




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r652529783



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -164,6 +165,9 @@ def __init__(  # pylint: disable=too-many-arguments
         :param rerun_failed_tasks: flag to whether to
                                    auto rerun the failed task in backfill
         :type rerun_failed_tasks: bool
+        :param rerun_succeeded_tasks: flag to whether to
+                                   auto rerun the failed task in backfill

Review comment:
       ```suggestion
                                      auto rerun the succeeded task in backfill
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#issuecomment-937394114


   A couple of nitpicks, lgtm in general.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r723801595



##########
File path: airflow/cli/cli_parser.py
##########
@@ -909,7 +917,9 @@ class GroupCommand(NamedTuple):
             "Run subsections of a DAG for a specified date range. If reset_dag_run option is used, "
             "backfill will first prompt users whether airflow should clear all the previous dag_run and "
             "task_instances within the backfill date range. If rerun_failed_tasks is used, backfill "
-            "will auto re-run the previous failed task instances  within the backfill date range"
+            "will auto re-run the previous failed task instances  within the backfill date range. If "

Review comment:
       ```suggestion
               "will auto re-run the previous failed task instances within the backfill date range. If "
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r655186577



##########
File path: airflow/models/dag.py
##########
@@ -1662,6 +1662,7 @@ def run(
         verbose=False,
         conf=None,
         rerun_failed_tasks=False,
+        rerun_succeeded_tasks=False,

Review comment:
       You should add a test for the change here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yyhecust commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
yyhecust commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r666758233



##########
File path: airflow/cli/cli_parser.py
##########
@@ -277,6 +277,15 @@ def _check(value):
     ),
     action="store_true",
 )
+ARG_RERUN_SUCCEED_TASKS = Arg(
+    ("--rerun-succeeded-tasks",),
+    help=(
+        "if set, the backfill will auto-rerun "
+        "all the succeeded tasks for the backfill date range "
+        "instead of throwing exceptions"

Review comment:
       if not set, it won't throw exceptions, the program will set rerun_succeeded_tasks to false by default.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r680964580



##########
File path: airflow/cli/cli_parser.py
##########
@@ -277,6 +277,15 @@ def _check(value):
     ),
     action="store_true",
 )
+ARG_RERUN_SUCCEED_TASKS = Arg(
+    ("--rerun-succeeded-tasks",),
+    help=(
+        "if set, the backfill will auto-rerun "
+        "all the succeeded tasks for the backfill date range "
+        "instead of throwing exceptions"

Review comment:
       So the part you said `instead of throwing exceptions should be removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r685764959



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -471,6 +476,14 @@ def _per_task_process(key, ti, session=None):
                             ti_status.running.pop(key)
                         # Reset the failed task in backfill to scheduled state
                         ti.set_state(State.SCHEDULED, session=session)
+                elif self.rerun_succeeded_tasks and ti.state == State.SUCCESS:
+                    # Rerun succeeded tasks
+                    self.log.error("Task instance {ti} with state {state}".format(ti=ti,
+                                                                   state=ti.state))

Review comment:
       ```suggestion
                       self.log.info("Task instance %s with state %s succeeded", ti,ti.state)
   ```
   This should not be an error log, right?

##########
File path: tests/jobs/test_backfill_job.py
##########
@@ -612,6 +640,7 @@ def test_backfill_rerun_upstream_failed_tasks(self, dag_maker):
             op1.set_upstream(op2)
         dag_maker.create_dagrun()
 
+        dag.clear()

Review comment:
       ```suggestion
   ```
   ??

##########
File path: tests/jobs/test_backfill_job.py
##########
@@ -719,7 +748,7 @@ def test_backfill_retry_always_failed_task(self, dag_maker):
             start_date=DEFAULT_DATE,
             end_date=DEFAULT_DATE,
         )
-        with pytest.raises(BackfillUnfinished):
+        with self.assertRaises(BackfillUnfinished):

Review comment:
       ```suggestion
           with pytest.raises((BackfillUnfinished):
   ```
   We no longer use unittest model on these tests, so this won't work

##########
File path: tests/jobs/test_backfill_job.py
##########
@@ -740,6 +769,8 @@ def test_backfill_ordered_concurrent_execute(self, dag_maker):
             op3.set_downstream(op4)
         dag_maker.create_dagrun()
 
+        dag.clear()
+

Review comment:
       ```suggestion
   ```

##########
File path: tests/jobs/test_backfill_job.py
##########
@@ -477,6 +471,8 @@ def test_backfill_pool_not_found(self, dag_maker):
         except AirflowException:
             return
 
+        self.fail()

Review comment:
       ```suggestion
   ```
   Any reason for adding this?

##########
File path: tests/jobs/test_backfill_job.py
##########
@@ -287,10 +284,7 @@ def test_backfill_conf(self, dag_maker):
         )
         job.run()
 
-        # We ignore the first dag_run created by fixture
-        dr = DagRun.find(
-            dag_id='test_backfill_conf', execution_start_date=DEFAULT_DATE + datetime.timedelta(days=1)
-        )
+        dr = DagRun.find(dag_id='test_backfill_conf')

Review comment:
       ```suggestion
           # We ignore the first dag_run created by fixture
           dr = DagRun.find(
               dag_id='test_backfill_conf', execution_start_date=DEFAULT_DATE + datetime.timedelta(days=1)
           )
   ```
   Does this affect your change?

##########
File path: tests/jobs/test_backfill_job.py
##########
@@ -929,8 +960,11 @@ def run_backfill(cond):
                     dag_maker.create_dagrun(
                         # Existing dagrun that is not within the backfill range
                         run_id=run_id,
+                        state=State.RUNNING,
                         execution_date=DEFAULT_DATE + datetime.timedelta(hours=1),
+                        start_date=DEFAULT_DATE,

Review comment:
       dag_maker has a default state=`State.RUNNING` and start_date=DEFAULT_DATE. Why are we adding this? Less code is better...
   I don't think you should change this test

##########
File path: tests/jobs/test_backfill_job.py
##########
@@ -37,7 +37,7 @@
     TaskConcurrencyLimitReached,
 )
 from airflow.jobs.backfill_job import BackfillJob
-from airflow.models import DagBag, Pool, TaskInstance as TI
+from airflow.models import DAG, DagBag, Pool, TaskInstance as TI

Review comment:
       ```suggestion
   from airflow.models import DagBag, Pool, TaskInstance as TI
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r666085492



##########
File path: airflow/models/dag.py
##########
@@ -1662,6 +1662,7 @@ def run(
         verbose=False,
         conf=None,
         rerun_failed_tasks=False,
+        rerun_succeeded_tasks=False,

Review comment:
       Good. You also need to add this kind of test :
   https://github.com/apache/airflow/blob/1e1b212417e089b43474bc19eec05b654afe98fb/tests/jobs/test_backfill_job.py#L574-L576
   for the rerun success task




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yyhecust commented on pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
yyhecust commented on pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#issuecomment-899363427


   @ash, @kaxil I am looking forward to your reply 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yyhecust commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
yyhecust commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r666776087



##########
File path: airflow/models/dag.py
##########
@@ -1662,6 +1662,7 @@ def run(
         verbose=False,
         conf=None,
         rerun_failed_tasks=False,
+        rerun_succeeded_tasks=False,

Review comment:
       OK, I have added the test in recent commit (a85e664)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yyhecust commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
yyhecust commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r685929982



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -471,6 +476,13 @@ def _per_task_process(key, ti, session=None):
                             ti_status.running.pop(key)
                         # Reset the failed task in backfill to scheduled state
                         ti.set_state(State.SCHEDULED, session=session)
+                elif self.rerun_succeeded_tasks and ti.state == State.SUCCESS:
+                    # Rerun succeeded tasks
+                    self.log.info("Task instance {ti} with state {state}".format(ti=ti, state=ti.state))

Review comment:
       Yes, it will make the feature clearer to users.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r652533594



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -471,6 +476,16 @@ def _per_task_process(key, ti, session=None):  # pylint: disable=too-many-return
                             ti_status.running.pop(key)
                         # Reset the failed task in backfill to scheduled state
                         ti.set_state(State.SCHEDULED, session=session)
+                elif self.rerun_succeeded_tasks:
+                    # Rerun succeeded tasks
+                    if ti.state in (State.SUCCESS):
+                        self.log.error("Task instance {ti} "
+                                       "with state {state}".format(ti=ti,
+                                                                   state=ti.state))
+                        if key in ti_status.running:
+                            ti_status.running.pop(key)
+                        # Reset the succeeded task in backfill to scheduled state
+                        ti.set_state(State.SCHEDULED, session=session)

Review comment:
       ```suggestion
                   elif self.rerun_succeeded_tasks and ti.state == State.SUCCESS:
                       # Rerun succeeded tasks
                       self.log.error("Task instance {ti} with state {state}".format(ti=ti,
                                                                      state=ti.state))
                       if key in ti_status.running:
                           ti_status.running.pop(key)
                       # Reset the succeeded task in backfill to scheduled state
                       ti.set_state(State.SCHEDULED, session=session)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r655186577



##########
File path: airflow/models/dag.py
##########
@@ -1662,6 +1662,7 @@ def run(
         verbose=False,
         conf=None,
         rerun_failed_tasks=False,
+        rerun_succeeded_tasks=False,

Review comment:
       You should add a test for the change here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r685879899



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -471,6 +476,13 @@ def _per_task_process(key, ti, session=None):
                             ti_status.running.pop(key)
                         # Reset the failed task in backfill to scheduled state
                         ti.set_state(State.SCHEDULED, session=session)
+                elif self.rerun_succeeded_tasks and ti.state == State.SUCCESS:
+                    # Rerun succeeded tasks
+                    self.log.info("Task instance {ti} with state {state}".format(ti=ti, state=ti.state))

Review comment:
       ```suggestion
                       self.log.info("Task instance %s with state %s", ti, ti.state)
   ```
   For some reasons, we don't use `format` in log messages. We prefer using `%s`. Also, should we make the log to convey a good message, like, Task instance with state ... succeeded. rerunning task? what do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] yyhecust commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
yyhecust commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r685652825



##########
File path: tests/jobs/test_backfill_job.py
##########
@@ -56,47 +54,48 @@
 DEFAULT_DATE = timezone.datetime(2016, 1, 1)
 
 
-class TestBackfillJob(unittest.TestCase):
-    def _get_dummy_dag(self, dag_id, pool=Pool.DEFAULT_POOL_NAME, task_concurrency=None):
-        dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily')
-
-        with dag:
-            DummyOperator(task_id='op', pool=pool, task_concurrency=task_concurrency, dag=dag)
-
-        dag.clear()
-        return dag
-
-    def _times_called_with(self, method, class_):
-        count = 0
-        for args in method.call_args_list:
-            if isinstance(args[0][0], class_):
-                count += 1
-        return count
-
-    @classmethod
-    def setUpClass(cls):
-        cls.dagbag = DagBag(include_examples=True)
-
+@pytest.fixture(scope="module")
+def dag_bag():
+    return DagBag(include_examples=True)
+  
+  
+class TestBackfillJob:
     @staticmethod
     def clean_db():
         clear_db_dags()
         clear_db_runs()
         clear_db_pools()
 
-    def setUp(self):
+    @pytest.fixture(autouse=True)
+    def set_instance_attrs(self, dag_bag):
         self.clean_db()
         self.parser = cli_parser.get_parser()
+        self.dagbag = dag_bag
+        
+    def _get_dummy_dag(
+        self,
+        dag_maker_fixture,
+        dag_id='test_dag',
+        pool=Pool.DEFAULT_POOL_NAME,
+        task_concurrency=None,
+        task_id='op',
+        **kwargs,
+    ):
+        with dag_maker_fixture(dag_id=dag_id, schedule_interval='@daily', **kwargs) as dag:
+            DummyOperator(task_id=task_id, pool=pool, task_concurrency=task_concurrency)
 
-    def tearDown(self) -> None:
-        self.clean_db()
+        return dag
 
-    def test_unfinished_dag_runs_set_to_failed(self):
-        dag = self._get_dummy_dag('dummy_dag')
+    def _times_called_with(self, method, class_):
+        count = 0
+        for args in method.call_args_list:
+            if isinstance(args[0][0], class_):
+                count += 1
+        return count

Review comment:
       this is a good trick, i will accept it 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #16475: Add the ability to rerun success state task_instance in the Backfill API(#16452)

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #16475:
URL: https://github.com/apache/airflow/pull/16475#discussion_r666078573



##########
File path: airflow/cli/cli_parser.py
##########
@@ -277,6 +277,15 @@ def _check(value):
     ),
     action="store_true",
 )
+ARG_RERUN_SUCCEED_TASKS = Arg(
+    ("--rerun-succeeded-tasks",),
+    help=(
+        "if set, the backfill will auto-rerun "
+        "all the succeeded tasks for the backfill date range "
+        "instead of throwing exceptions"

Review comment:
       Is this likely to throw exceptions if not set?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org