You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/07/05 14:47:29 UTC
[airflow] 10/16: DebugExecutor use ti.run() instead of ti._run_raw_task (#24357)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit b9fb473eb3eb1dffe76e07dd9e96ca72688cb1e5
Author: Niko <on...@amazon.com>
AuthorDate: Mon Jun 13 00:02:56 2022 -0700
DebugExecutor use ti.run() instead of ti._run_raw_task (#24357)
The DebugExecutor previously executed tasks by calling the "private"
ti._run_raw_task(...) method instead of ti.run(...). But the latter
contains the logic to increase task instance try_numbers when running,
thus tasks executed with the DebugExecutor were never getting their
try_numbers increased and for rescheduled tasks this led to off-by-one
errors (as the logic to reduce the try_number for the reschedule was
still working while the increase was not).
(cherry picked from commit da7b22be2986fc3217ac4d7fa6c3831d87ccff87)
---
airflow/executors/debug_executor.py | 2 +-
tests/dags/test_sensor.py | 33 +++++++++++++++++++
tests/executors/test_debug_executor.py | 4 +--
tests/jobs/test_backfill_job.py | 58 ++++++++++++++++++++++------------
4 files changed, 73 insertions(+), 24 deletions(-)
diff --git a/airflow/executors/debug_executor.py b/airflow/executors/debug_executor.py
index 865186dd18..6c8a13b345 100644
--- a/airflow/executors/debug_executor.py
+++ b/airflow/executors/debug_executor.py
@@ -76,7 +76,7 @@ class DebugExecutor(BaseExecutor):
key = ti.key
try:
params = self.tasks_params.pop(ti.key, {})
- ti._run_raw_task(job_id=ti.job_id, **params)
+ ti.run(job_id=ti.job_id, **params)
self.change_state(key, State.SUCCESS)
ti._run_finished_callback()
return True
diff --git a/tests/dags/test_sensor.py b/tests/dags/test_sensor.py
new file mode 100644
index 0000000000..760300ef00
--- /dev/null
+++ b/tests/dags/test_sensor.py
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.sensors.date_time import DateTimeSensor
+from airflow.utils import timezone
+
+with DAG(
+ dag_id='test_sensor', start_date=datetime.datetime(2022, 1, 1), catchup=False, schedule_interval='@once'
+) as dag:
+
+ @task
+ def get_date():
+ return str(timezone.utcnow() + datetime.timedelta(seconds=3))
+
+ DateTimeSensor(task_id='dts', target_time=str(get_date()), poke_interval=1, mode='reschedule')
diff --git a/tests/executors/test_debug_executor.py b/tests/executors/test_debug_executor.py
index dbce6aca40..371fbc5213 100644
--- a/tests/executors/test_debug_executor.py
+++ b/tests/executors/test_debug_executor.py
@@ -49,7 +49,7 @@ class TestDebugExecutor:
succeeded = executor._run_task(task_instance_mock)
assert succeeded
- task_instance_mock._run_raw_task.assert_called_once_with(job_id=job_id)
+ task_instance_mock.run.assert_called_once_with(job_id=job_id)
def test_queue_task_instance(self):
key = "ti_key"
@@ -100,7 +100,7 @@ class TestDebugExecutor:
ti1 = MagicMock(key="t1")
ti2 = MagicMock(key="t2")
- ti1._run_raw_task.side_effect = Exception
+ ti1.run.side_effect = Exception
executor.tasks_to_run = [ti1, ti2]
diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py
index e5ab189f35..5222f9cdbe 100644
--- a/tests/jobs/test_backfill_job.py
+++ b/tests/jobs/test_backfill_job.py
@@ -1309,10 +1309,18 @@ class TestBackfillJob:
ti_status = BackfillJob._DagRunTaskStatus()
- # test for success
- ti.set_state(State.SUCCESS, session)
- ti_status.running[ti.key] = ti
- job._update_counters(ti_status=ti_status, session=session)
+ # Test for success
+ # The in-memory task key in ti_status.running contains a try_number
+ # that is always one behind the DB. The _update_counters method however uses
+ # a reduced_key to handle this. To test this, we mark the task as running in-memory
+ # and then increase the try number as it would be before the raw task is executed.
+ # When updating the counters the reduced_key will be used which will match what's
+ # in the in-memory ti_status.running map. This is the same for skipped, failed
+ # and retry states.
+ ti_status.running[ti.key] = ti # Task is queued and marked as running
+ ti._try_number += 1 # Try number is increased during ti.run()
+ ti.set_state(State.SUCCESS, session) # Task finishes with success state
+ job._update_counters(ti_status=ti_status, session=session) # Update counters
assert len(ti_status.running) == 0
assert len(ti_status.succeeded) == 1
assert len(ti_status.skipped) == 0
@@ -1321,9 +1329,10 @@ class TestBackfillJob:
ti_status.succeeded.clear()
- # test for skipped
- ti.set_state(State.SKIPPED, session)
+ # Test for skipped
ti_status.running[ti.key] = ti
+ ti._try_number += 1
+ ti.set_state(State.SKIPPED, session)
job._update_counters(ti_status=ti_status, session=session)
assert len(ti_status.running) == 0
assert len(ti_status.succeeded) == 0
@@ -1333,9 +1342,10 @@ class TestBackfillJob:
ti_status.skipped.clear()
- # test for failed
- ti.set_state(State.FAILED, session)
+ # Test for failed
ti_status.running[ti.key] = ti
+ ti._try_number += 1
+ ti.set_state(State.FAILED, session)
job._update_counters(ti_status=ti_status, session=session)
assert len(ti_status.running) == 0
assert len(ti_status.succeeded) == 0
@@ -1345,9 +1355,10 @@ class TestBackfillJob:
ti_status.failed.clear()
- # test for retry
- ti.set_state(State.UP_FOR_RETRY, session)
+ # Test for retry
ti_status.running[ti.key] = ti
+ ti._try_number += 1
+ ti.set_state(State.UP_FOR_RETRY, session)
job._update_counters(ti_status=ti_status, session=session)
assert len(ti_status.running) == 0
assert len(ti_status.succeeded) == 0
@@ -1357,13 +1368,18 @@ class TestBackfillJob:
ti_status.to_run.clear()
- # test for reschedule
- # For rescheduled state, tests that reduced_key is not
- # used by upping try_number.
- ti._try_number = 2
- ti.set_state(State.UP_FOR_RESCHEDULE, session)
- assert ti.try_number == 3 # see ti.try_number property in taskinstance module
- ti_status.running[ti.key] = ti
+ # Test for reschedule
+ # Logic in taskinstance reduces the try number for a task that's been
+ # rescheduled (which makes sense because it's the _same_ try, but it's
+ # just being rescheduled to a later time). This now makes the in-memory
+ # and DB representation of the task try_number the _same_, which is unlike
+ # the above cases. But this is okay because the reduced_key is NOT used for
+ # the rescheduled case in _update_counters, for this exact reason.
+ ti_status.running[ti.key] = ti # Task queued and marked as running
+ # Note: Both the increase and decrease are kept here for context
+ ti._try_number += 1 # Try number is increased during ti.run()
+ ti._try_number -= 1 # Task is being rescheduled, decrement try_number
+ ti.set_state(State.UP_FOR_RESCHEDULE, session) # Task finishes with reschedule state
job._update_counters(ti_status=ti_status, session=session)
assert len(ti_status.running) == 0
assert len(ti_status.succeeded) == 0
@@ -1580,10 +1596,10 @@ class TestBackfillJob:
@pytest.mark.long_running
@pytest.mark.parametrize("executor_name", ["SequentialExecutor", "DebugExecutor"])
- @pytest.mark.parametrize("dag_id", ["test_mapped_classic", "test_mapped_taskflow"])
- def test_mapped_dag(self, dag_id, executor_name, session):
+ @pytest.mark.parametrize("dag_id", ["test_mapped_classic", "test_mapped_taskflow", "test_sensor"])
+ def test_backfilling_dags(self, dag_id, executor_name, session):
"""
- End-to-end test of a simple mapped dag.
+ End-to-end test for backfilling dags with various executors.
We test with multiple executors as they have different "execution environments" -- for instance
DebugExecutor runs a lot more in the same process than other Executors.
@@ -1595,7 +1611,7 @@ class TestBackfillJob:
self.dagbag.process_file(str(TEST_DAGS_FOLDER / f'{dag_id}.py'))
dag = self.dagbag.get_dag(dag_id)
- when = datetime.datetime(2022, 1, 1)
+ when = timezone.datetime(2022, 1, 1)
job = BackfillJob(
dag=dag,