You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/09 22:52:20 UTC
[airflow] 22/39: Fix: ``TaskInstance`` does not show
``queued_by_job_id`` & ``external_executor_id`` (#17179)
This is an automated email from the ASF dual-hosted git repository.
jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 1b4af0baf752b54497caeff964633421d69cf426
Author: Jorrick Sleijster <jo...@gmail.com>
AuthorDate: Fri Jul 23 00:25:08 2021 +0200
Fix: ``TaskInstance`` does not show ``queued_by_job_id`` & ``external_executor_id`` (#17179)
**Problem discovery:**
I was debugging a bug with the `external_executor_id` Airflow after which this UI bug caught my eye and I got annoyed by it. I figured to fix this one first so my other testing can go a bit smoother :)
**Description of the problem:**
Currently there is a BUG inside the Task Instance details (/task) view.
It loads the TaskInstance by calling `TI(task, execution_date)` and then uses `refresh_from_db()` to refresh many fields that are no filled in yet.
However, the assumption is made in that case that it refreshes all values, which it does not.
`external_executor_id` and `queued_by_job_id` are not updated at all and `executor_config` is only instantiated by the original `TI(task, execution_date)` call but also not updated in `refresh_from_db()`.
This also shows in the UI where these values are always showing None, while the TaskInstance view shows you these values are not None.
**The changes in the PR:**
1. Changes to the `update_from_db()` method to include the missing three values.
2. A new test that checks we are really updating ALL values in `update_from_db()`
3. Removal of an incorrect comment as we do need the `execution_date` for that view.
(cherry picked from commit 759c76d7a5d23cc6f6ef4f724a1a322d2445bbd2)
---
airflow/models/taskinstance.py | 3 +++
airflow/www/views.py | 2 --
tests/models/test_taskinstance.py | 54 +++++++++++++++++++++++++++++++++++++++
3 files changed, 57 insertions(+), 2 deletions(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 46ea7f9..ea5a72e 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -644,7 +644,10 @@ class TaskInstance(Base, LoggingMixin):
self.priority_weight = ti.priority_weight
self.operator = ti.operator
self.queued_dttm = ti.queued_dttm
+ self.queued_by_job_id = ti.queued_by_job_id
self.pid = ti.pid
+ self.executor_config = ti.executor_config
+ self.external_executor_id = ti.external_executor_id
else:
self.state = None
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 590de7d..740a767 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1229,8 +1229,6 @@ class Airflow(AirflowBaseView):
"""Retrieve task."""
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
- # Carrying execution_date through, even though it's irrelevant for
- # this context
execution_date = request.args.get('execution_date')
dttm = timezone.parse(execution_date)
form = DateTimeForm(data={'execution_date': dttm})
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 4b84d10..021809b 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2078,6 +2078,60 @@ class TestTaskInstance(unittest.TestCase):
assert ti.start_date < ti.end_date
assert ti.duration > 0
+ def test_refresh_from_db(self):
+ run_date = timezone.utcnow()
+
+ expected_values = {
+ "task_id": "test_refresh_from_db_task",
+ "dag_id": "test_refresh_from_db_dag",
+ "execution_date": run_date,
+ "start_date": run_date + datetime.timedelta(days=1),
+ "end_date": run_date + datetime.timedelta(days=1, seconds=1, milliseconds=234),
+ "duration": 1.234,
+ "state": State.SUCCESS,
+ "_try_number": 1,
+ "max_tries": 1,
+ "hostname": "some_unique_hostname",
+ "unixname": "some_unique_unixname",
+ "job_id": 1234,
+ "pool": "some_fake_pool_id",
+ "pool_slots": 25,
+ "queue": "some_queue_id",
+ "priority_weight": 123,
+ "operator": "some_custom_operator",
+ "queued_dttm": run_date + datetime.timedelta(hours=1),
+ "queued_by_job_id": 321,
+ "pid": 123,
+ "executor_config": {"Some": {"extra": "information"}},
+ "external_executor_id": "some_executor_id",
+ }
+ # Make sure we aren't missing any new value in our expected_values list.
+ expected_keys = {f"task_instance.{key.lstrip('_')}" for key in expected_values.keys()}
+ assert {str(c) for c in TI.__table__.columns} == expected_keys, (
+ "Please add all non-foreign values of TaskInstance to this list. "
+ "This prevents refresh_from_db() from missing a field."
+ )
+
+ operator = DummyOperator(task_id=expected_values['task_id'])
+ ti = TI(task=operator, execution_date=expected_values['execution_date'])
+ for key, expected_value in expected_values.items():
+ setattr(ti, key, expected_value)
+ with create_session() as session:
+ session.merge(ti)
+ session.commit()
+
+ mock_task = mock.MagicMock()
+ mock_task.task_id = expected_values["task_id"]
+ mock_task.dag_id = expected_values["dag_id"]
+
+ ti = TI(task=mock_task, execution_date=run_date)
+ ti.refresh_from_db()
+ for key, expected_value in expected_values.items():
+ assert hasattr(ti, key), f"Key {key} is missing in the TaskInstance."
+ assert (
+ getattr(ti, key) == expected_value
+ ), f"Key: {key} had different values. Make sure it loads it in the refresh refresh_from_db()"
+
@pytest.mark.parametrize("pool_override", [None, "test_pool2"])
def test_refresh_from_task(pool_override):