You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/09 22:52:20 UTC

[airflow] 22/39: Fix: ``TaskInstance`` does not show ``queued_by_job_id`` & ``external_executor_id`` (#17179)

This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1b4af0baf752b54497caeff964633421d69cf426
Author: Jorrick Sleijster <jo...@gmail.com>
AuthorDate: Fri Jul 23 00:25:08 2021 +0200

    Fix: ``TaskInstance`` does not show ``queued_by_job_id`` & ``external_executor_id`` (#17179)
    
    **Problem discovery:**
    I was debugging a bug with the `external_executor_id` Airflow after which this UI bug caught my eye and I got annoyed by it. I figured to fix this one first so my other testing can go a bit smoother :)
    
    **Description of the problem:**
    Currently there is a BUG inside the Task Instance details (/task) view.
    It loads the TaskInstance by calling `TI(task, execution_date)` and then uses `refresh_from_db()` to refresh many fields that are no filled in yet.
    However, the assumption is made in that case that it refreshes all values, which it does not.
    `external_executor_id` and `queued_by_job_id` are not updated at all and `executor_config` is only instantiated by the original `TI(task, execution_date)` call but also not updated in `refresh_from_db()`.
    This also shows in the UI where these values are always showing None, while the TaskInstance view shows you these values are not None.
    
    **The changes in the PR:**
    1. Changes to the `update_from_db()` method to include the missing three values.
    2. A new test that checks we are really updating ALL values in `update_from_db()`
    3. Removal of an incorrect comment as we do need the `execution_date` for that view.
    
    (cherry picked from commit 759c76d7a5d23cc6f6ef4f724a1a322d2445bbd2)
---
 airflow/models/taskinstance.py    |  3 +++
 airflow/www/views.py              |  2 --
 tests/models/test_taskinstance.py | 54 +++++++++++++++++++++++++++++++++++++++
 3 files changed, 57 insertions(+), 2 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 46ea7f9..ea5a72e 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -644,7 +644,10 @@ class TaskInstance(Base, LoggingMixin):
             self.priority_weight = ti.priority_weight
             self.operator = ti.operator
             self.queued_dttm = ti.queued_dttm
+            self.queued_by_job_id = ti.queued_by_job_id
             self.pid = ti.pid
+            self.executor_config = ti.executor_config
+            self.external_executor_id = ti.external_executor_id
         else:
             self.state = None
 
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 590de7d..740a767 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1229,8 +1229,6 @@ class Airflow(AirflowBaseView):
         """Retrieve task."""
         dag_id = request.args.get('dag_id')
         task_id = request.args.get('task_id')
-        # Carrying execution_date through, even though it's irrelevant for
-        # this context
         execution_date = request.args.get('execution_date')
         dttm = timezone.parse(execution_date)
         form = DateTimeForm(data={'execution_date': dttm})
diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py
index 4b84d10..021809b 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2078,6 +2078,60 @@ class TestTaskInstance(unittest.TestCase):
         assert ti.start_date < ti.end_date
         assert ti.duration > 0
 
+    def test_refresh_from_db(self):
+        run_date = timezone.utcnow()
+
+        expected_values = {
+            "task_id": "test_refresh_from_db_task",
+            "dag_id": "test_refresh_from_db_dag",
+            "execution_date": run_date,
+            "start_date": run_date + datetime.timedelta(days=1),
+            "end_date": run_date + datetime.timedelta(days=1, seconds=1, milliseconds=234),
+            "duration": 1.234,
+            "state": State.SUCCESS,
+            "_try_number": 1,
+            "max_tries": 1,
+            "hostname": "some_unique_hostname",
+            "unixname": "some_unique_unixname",
+            "job_id": 1234,
+            "pool": "some_fake_pool_id",
+            "pool_slots": 25,
+            "queue": "some_queue_id",
+            "priority_weight": 123,
+            "operator": "some_custom_operator",
+            "queued_dttm": run_date + datetime.timedelta(hours=1),
+            "queued_by_job_id": 321,
+            "pid": 123,
+            "executor_config": {"Some": {"extra": "information"}},
+            "external_executor_id": "some_executor_id",
+        }
+        # Make sure we aren't missing any new value in our expected_values list.
+        expected_keys = {f"task_instance.{key.lstrip('_')}" for key in expected_values.keys()}
+        assert {str(c) for c in TI.__table__.columns} == expected_keys, (
+            "Please add all non-foreign values of TaskInstance to this list. "
+            "This prevents refresh_from_db() from missing a field."
+        )
+
+        operator = DummyOperator(task_id=expected_values['task_id'])
+        ti = TI(task=operator, execution_date=expected_values['execution_date'])
+        for key, expected_value in expected_values.items():
+            setattr(ti, key, expected_value)
+        with create_session() as session:
+            session.merge(ti)
+            session.commit()
+
+        mock_task = mock.MagicMock()
+        mock_task.task_id = expected_values["task_id"]
+        mock_task.dag_id = expected_values["dag_id"]
+
+        ti = TI(task=mock_task, execution_date=run_date)
+        ti.refresh_from_db()
+        for key, expected_value in expected_values.items():
+            assert hasattr(ti, key), f"Key {key} is missing in the TaskInstance."
+            assert (
+                getattr(ti, key) == expected_value
+            ), f"Key: {key} had different values. Make sure it loads it in the refresh refresh_from_db()"
+
 
 @pytest.mark.parametrize("pool_override", [None, "test_pool2"])
 def test_refresh_from_task(pool_override):