You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/15 01:25:49 UTC

[GitHub] [airflow] dstandish commented on a change in pull request #21316: Fix race condition between triggerer and scheduler

dstandish commented on a change in pull request #21316:
URL: https://github.com/apache/airflow/pull/21316#discussion_r806371277



##########
File path: tests/executors/test_base_executor.py
##########
@@ -66,8 +68,52 @@ def test_try_adopt_task_instances(dag_maker):
         BaseOperator(task_id="task_2", start_date=start_date)
         BaseOperator(task_id="task_3", start_date=start_date)
 
-    dagrun = dag_maker.create_dagrun(execution_date=date)
-    tis = dagrun.task_instances
+    return dag_maker.create_dagrun(execution_date=date)
+
 
+def test_try_adopt_task_instances(dag_maker):
+    dagrun = setup_dagrun(dag_maker)
+    tis = dagrun.task_instances
     assert {ti.task_id for ti in tis} == {"task_1", "task_2", "task_3"}
     assert BaseExecutor().try_adopt_task_instances(tis) == tis
+
+
+def enqueue_tasks(executor, dagrun):
+    for task_instance in dagrun.task_instances:
+        executor.queue_command(task_instance, ["airflow"])
+
+
+def setup_trigger_tasks(dag_maker):
+    dagrun = setup_dagrun(dag_maker)
+    executor = BaseExecutor()
+    executor.execute_async = mock.Mock()
+    enqueue_tasks(executor, dagrun)
+    return executor, dagrun
+
+
+@mark.parametrize("open_slots", [1, 2, 3])
+def test_trigger_queued_tasks(dag_maker, open_slots):
+    executor, _ = setup_trigger_tasks(dag_maker)
+    executor.trigger_tasks(open_slots)
+    assert len(executor.execute_async.mock_calls) == open_slots
+
+
+@mark.parametrize("change_state_attempt", range(QUEUEING_ATTEMPTS))
+def test_trigger_running_tasks(dag_maker, change_state_attempt):
+    executor, dagrun = setup_trigger_tasks(dag_maker)
+    open_slots = triggered = len(dagrun.task_instances)
+    executor.trigger_tasks(open_slots)
+
+    # All the tasks are now running, so while we _can_ enqueue them
+    # again, they won't be triggered during `trigger_tasks` until
+    # the executor has been notified of a state change.
+    enqueue_tasks(executor, dagrun)
+    for attempt in range(QUEUEING_ATTEMPTS):
+        # On the configured attempt, we notify the executor
+        # that the task has succeeded.
+        if attempt == change_state_attempt:
+            executor.change_state(dagrun.task_instances[0].key, State.SUCCESS)
+            # We then expect an additional triggered task.
+            triggered += 1
+        executor.trigger_tasks(open_slots)
+        assert len(executor.execute_async.mock_calls) == triggered

Review comment:
       ```suggestion
   @mark.parametrize("change_state_attempt", range(QUEUEING_ATTEMPTS + 2))
   def test_trigger_running_tasks(dag_maker, change_state_attempt):
       executor, dagrun = setup_trigger_tasks(dag_maker)
       open_slots = 100
       executor.trigger_tasks(open_slots)
       expected_calls = len(dagrun.task_instances)  # initially `execute_async` called for each task
       assert len(executor.execute_async.mock_calls) == expected_calls
   
       # All the tasks are now "running", so while we enqueue them again here,
       # they won't be executed again until the executor has been notified of a state change.
       enqueue_tasks(executor, dagrun)
   
       for attempt in range(QUEUEING_ATTEMPTS + 2):
           # On the configured attempt, we notify the executor that the task has succeeded.
           if attempt == change_state_attempt:
               executor.change_state(dagrun.task_instances[0].key, State.SUCCESS)
               # If we have not exceeded QUEUEING_ATTEMPTS, we should expect an additional "execute" call
               if attempt < QUEUEING_ATTEMPTS:
                   expected_calls += 1
           executor.trigger_tasks(open_slots)
           assert len(executor.execute_async.mock_calls) == expected_calls
       if change_state_attempt < QUEUEING_ATTEMPTS:
           assert len(executor.execute_async.mock_calls) == len(dagrun.task_instances) + 1
       else:
           assert len(executor.execute_async.mock_calls) == len(dagrun.task_instances)
   ```
   
   while trying to verify and understand the change i modified the test to make it a bit  stronger.
   we can include the case where "change_state_attempt" is beyond max queue tries.
   also i remove any possibility of interaction with 'open_slots'  by setting it  to  a high  number.
   and i rename `triggered` to  `expected_calls` to  reduce possibility of confusion  with the  airflow service `triggerer`  since  the  two  are unrelated
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org