You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/12 00:59:48 UTC

[airflow] 01/03: Fix bug that log can't be shown when task runs failed (#16768)

This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 26e8d2d77aeb51b8dbfb758c477417ccd0c45063
Author: Zhanfeng Huo <hu...@gmail.com>
AuthorDate: Fri Jul 23 06:13:40 2021 +0800

    Fix bug that log can't be shown when task runs failed (#16768)
    
    The log can't be shown normally when the task runs failed. Users can only get useless logs as follows. #13692
    
    <pre>
    *** Log file does not exist: /home/airflow/airflow/logs/dag_id/task_id/2021-06-28T00:00:00+08:00/28.log
    *** Fetching from: http://:8793/log/dag_id/task_id/2021-06-28T00:00:00+08:00/28.log
    *** Failed to fetch log file from worker. Unsupported URL protocol
    </pre>
    
    The root cause is that scheduler will overwrite the hostname info into the task_instance table in DB by using blank str in the progress of `_execute_task_callbacks` when tasks into failed.  Webserver can't get the right host of the task from task_instance because the hostname info of  task_instance table is lost in the progress.
    
    Co-authored-by: huozhanfeng <hu...@vipkid.cn>
    (cherry picked from commit 34478c26d7de1328797e03bbf96d8261796fccbb)
---
 airflow/jobs/scheduler_job.py          | 11 ++++-------
 tests/dag_processing/test_processor.py | 23 +++++++++++++++++++++++
 2 files changed, 27 insertions(+), 7 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 198635c..6d37a2b 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -588,14 +588,11 @@ class DagFileProcessor(LoggingMixin):
             if simple_ti.task_id in dag.task_ids:
                 task = dag.get_task(simple_ti.task_id)
                 ti = TI(task, simple_ti.execution_date)
-                # Get properties needed for failure handling from SimpleTaskInstance.
-                ti.start_date = simple_ti.start_date
-                ti.end_date = simple_ti.end_date
-                ti.try_number = simple_ti.try_number
-                ti.state = simple_ti.state
-                ti.test_mode = self.UNIT_TEST_MODE
                 if request.is_failure_callback:
-                    ti.handle_failure_with_callback(error=request.msg, test_mode=ti.test_mode)
+                    ti = TI(task, simple_ti.execution_date)
+                    # TODO: Use simple_ti to improve performance here in the future
+                    ti.refresh_from_db()
+                    ti.handle_failure_with_callback(error=request.msg, test_mode=self.UNIT_TEST_MODE)
                     self.log.info('Executed failure callback for %s in state %s', ti, ti.state)
 
     @provide_session
diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py
index 243afc7..b6b9589 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -644,6 +644,29 @@ class TestDagFileProcessor(unittest.TestCase):
                 test_mode=conf.getboolean('core', 'unit_test_mode'),
             )
 
+    def test_failure_callbacks_should_not_drop_hostname(self):
+        dagbag = DagBag(dag_folder="/dev/null", include_examples=True, read_dags_from_db=False)
+        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+        dag_file_processor.UNIT_TEST_MODE = False
+
+        with create_session() as session:
+            dag = dagbag.get_dag('example_branch_operator')
+            task = dag.get_task(task_id='run_this_first')
+
+            ti = TaskInstance(task, DEFAULT_DATE, State.RUNNING)
+            ti.hostname = "test_hostname"
+            session.add(ti)
+
+        with create_session() as session:
+            requests = [
+                TaskCallbackRequest(
+                    full_filepath="A", simple_task_instance=SimpleTaskInstance(ti), msg="Message"
+                )
+            ]
+            dag_file_processor.execute_callbacks(dagbag, requests)
+            tis = session.query(TaskInstance)
+            assert tis[0].hostname == "test_hostname"
+
     def test_process_file_should_failure_callback(self):
         dag_file = os.path.join(
             os.path.dirname(os.path.realpath(__file__)), '../dags/test_on_failure_callback.py'