You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/15 01:25:49 UTC
[GitHub] [airflow] dstandish commented on a change in pull request #21316: Fix race condition between triggerer and scheduler
dstandish commented on a change in pull request #21316:
URL: https://github.com/apache/airflow/pull/21316#discussion_r806371277
##########
File path: tests/executors/test_base_executor.py
##########
@@ -66,8 +68,52 @@ def test_try_adopt_task_instances(dag_maker):
BaseOperator(task_id="task_2", start_date=start_date)
BaseOperator(task_id="task_3", start_date=start_date)
- dagrun = dag_maker.create_dagrun(execution_date=date)
- tis = dagrun.task_instances
+ return dag_maker.create_dagrun(execution_date=date)
+
+def test_try_adopt_task_instances(dag_maker):
+ dagrun = setup_dagrun(dag_maker)
+ tis = dagrun.task_instances
assert {ti.task_id for ti in tis} == {"task_1", "task_2", "task_3"}
assert BaseExecutor().try_adopt_task_instances(tis) == tis
+
+
+def enqueue_tasks(executor, dagrun):
+ for task_instance in dagrun.task_instances:
+ executor.queue_command(task_instance, ["airflow"])
+
+
+def setup_trigger_tasks(dag_maker):
+ dagrun = setup_dagrun(dag_maker)
+ executor = BaseExecutor()
+ executor.execute_async = mock.Mock()
+ enqueue_tasks(executor, dagrun)
+ return executor, dagrun
+
+
+@mark.parametrize("open_slots", [1, 2, 3])
+def test_trigger_queued_tasks(dag_maker, open_slots):
+ executor, _ = setup_trigger_tasks(dag_maker)
+ executor.trigger_tasks(open_slots)
+ assert len(executor.execute_async.mock_calls) == open_slots
+
+
+@mark.parametrize("change_state_attempt", range(QUEUEING_ATTEMPTS))
+def test_trigger_running_tasks(dag_maker, change_state_attempt):
+ executor, dagrun = setup_trigger_tasks(dag_maker)
+ open_slots = triggered = len(dagrun.task_instances)
+ executor.trigger_tasks(open_slots)
+
+ # All the tasks are now running, so while we _can_ enqueue them
+ # again, they won't be triggered during `trigger_tasks` until
+ # the executor has been notified of a state change.
+ enqueue_tasks(executor, dagrun)
+ for attempt in range(QUEUEING_ATTEMPTS):
+ # On the configured attempt, we notify the executor
+ # that the task has succeeded.
+ if attempt == change_state_attempt:
+ executor.change_state(dagrun.task_instances[0].key, State.SUCCESS)
+ # We then expect an additional triggered task.
+ triggered += 1
+ executor.trigger_tasks(open_slots)
+ assert len(executor.execute_async.mock_calls) == triggered
Review comment:
```suggestion
@mark.parametrize("change_state_attempt", range(QUEUEING_ATTEMPTS + 2))
def test_trigger_running_tasks(dag_maker, change_state_attempt):
executor, dagrun = setup_trigger_tasks(dag_maker)
open_slots = 100
executor.trigger_tasks(open_slots)
expected_calls = len(dagrun.task_instances) # initially `execute_async` called for each task
assert len(executor.execute_async.mock_calls) == expected_calls
# All the tasks are now "running", so while we enqueue them again here,
# they won't be executed again until the executor has been notified of a state change.
enqueue_tasks(executor, dagrun)
for attempt in range(QUEUEING_ATTEMPTS + 2):
# On the configured attempt, we notify the executor that the task has succeeded.
if attempt == change_state_attempt:
executor.change_state(dagrun.task_instances[0].key, State.SUCCESS)
# If we have not exceeded QUEUEING_ATTEMPTS, we should expect an additional "execute" call
if attempt < QUEUEING_ATTEMPTS:
expected_calls += 1
executor.trigger_tasks(open_slots)
assert len(executor.execute_async.mock_calls) == expected_calls
if change_state_attempt < QUEUEING_ATTEMPTS:
assert len(executor.execute_async.mock_calls) == len(dagrun.task_instances) + 1
else:
assert len(executor.execute_async.mock_calls) == len(dagrun.task_instances)
```
while trying to verify and understand the change i modified the test to make it a bit stronger.
we can include the case where "change_state_attempt" is beyond max queue tries.
also i remove any possibility of interaction with 'open_slots' by setting it to a high number.
and i rename `triggered` to `expected_calls` to reduce possibility of confusion with the airflow service `triggerer` since the two are unrelated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org