You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/10/11 03:43:51 UTC

[GitHub] [airflow] uranusjr commented on a diff in pull request #23829: Add disable retry flag on backfill

uranusjr commented on code in PR #23829:
URL: https://github.com/apache/airflow/pull/23829#discussion_r991786988


##########
tests/jobs/test_backfill_job.py:
##########
@@ -1812,3 +1812,41 @@ def test_task_instances_are_not_set_to_scheduled_when_dagrun_reset(self, dag_mak
             states = [ti.state for _, ti in tasks_to_run.items()]
             assert TaskInstanceState.SCHEDULED in states
             assert State.NONE in states
+
+    def test_backfill_disable_retry(self, dag_maker):
+        with dag_maker(
+            dag_id='test_disable_retry',
+            schedule_interval="@daily",
+            default_args={
+                'retries': 3,
+                'retry_delay': datetime.timedelta(seconds=3),
+            },
+        ) as dag:
+            task1 = EmptyOperator(task_id="task1")
+        dag_run = dag_maker.create_dagrun(state=None)
+
+        executor = MockExecutor(parallelism=16)
+        executor.mock_task_results[
+            TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id, try_number=1)
+        ] = State.UP_FOR_RETRY
+        executor.mock_task_results[
+            TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id, try_number=2)
+        ] = State.UP_FOR_RETRY
+        job = BackfillJob(
+            dag=dag,
+            executor=executor,
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE,
+            disable_retry=True,
+        )
+        with pytest.raises(BackfillUnfinished):
+            job.run()
+        ti = dag_run.get_task_instance(task_id=task1.task_id)
+
+        assert ti._try_number == 1
+
+        dag_run.refresh_from_db()
+
+        assert State.FAILED == dag_run.state

Review Comment:
   You put `assert` at the wrong place 😉 
   
   Also it’s better to use `DagRunState.FAILED` here since `State.FAILED` is `TaskInstanceState.FAILED` (they both equal to the string `"failed"` but it’s still beneficial to use the correct state class)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org