You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/12 00:59:48 UTC
[airflow] 01/03: Fix bug that log can't be shown when task runs
failed (#16768)
This is an automated email from the ASF dual-hosted git repository.
jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 26e8d2d77aeb51b8dbfb758c477417ccd0c45063
Author: Zhanfeng Huo <hu...@gmail.com>
AuthorDate: Fri Jul 23 06:13:40 2021 +0800
Fix bug that log can't be shown when task runs failed (#16768)
The log can't be shown normally when the task runs failed. Users can only get useless logs as follows. #13692
<pre>
*** Log file does not exist: /home/airflow/airflow/logs/dag_id/task_id/2021-06-28T00:00:00+08:00/28.log
*** Fetching from: http://:8793/log/dag_id/task_id/2021-06-28T00:00:00+08:00/28.log
*** Failed to fetch log file from worker. Unsupported URL protocol
</pre>
The root cause is that scheduler will overwrite the hostname info into the task_instance table in DB by using blank str in the progress of `_execute_task_callbacks` when tasks into failed. Webserver can't get the right host of the task from task_instance because the hostname info of task_instance table is lost in the progress.
Co-authored-by: huozhanfeng <hu...@vipkid.cn>
(cherry picked from commit 34478c26d7de1328797e03bbf96d8261796fccbb)
---
airflow/jobs/scheduler_job.py | 11 ++++-------
tests/dag_processing/test_processor.py | 23 +++++++++++++++++++++++
2 files changed, 27 insertions(+), 7 deletions(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 198635c..6d37a2b 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -588,14 +588,11 @@ class DagFileProcessor(LoggingMixin):
if simple_ti.task_id in dag.task_ids:
task = dag.get_task(simple_ti.task_id)
ti = TI(task, simple_ti.execution_date)
- # Get properties needed for failure handling from SimpleTaskInstance.
- ti.start_date = simple_ti.start_date
- ti.end_date = simple_ti.end_date
- ti.try_number = simple_ti.try_number
- ti.state = simple_ti.state
- ti.test_mode = self.UNIT_TEST_MODE
if request.is_failure_callback:
- ti.handle_failure_with_callback(error=request.msg, test_mode=ti.test_mode)
+ ti = TI(task, simple_ti.execution_date)
+ # TODO: Use simple_ti to improve performance here in the future
+ ti.refresh_from_db()
+ ti.handle_failure_with_callback(error=request.msg, test_mode=self.UNIT_TEST_MODE)
self.log.info('Executed failure callback for %s in state %s', ti, ti.state)
@provide_session
diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py
index 243afc7..b6b9589 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -644,6 +644,29 @@ class TestDagFileProcessor(unittest.TestCase):
test_mode=conf.getboolean('core', 'unit_test_mode'),
)
+ def test_failure_callbacks_should_not_drop_hostname(self):
+ dagbag = DagBag(dag_folder="/dev/null", include_examples=True, read_dags_from_db=False)
+ dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+ dag_file_processor.UNIT_TEST_MODE = False
+
+ with create_session() as session:
+ dag = dagbag.get_dag('example_branch_operator')
+ task = dag.get_task(task_id='run_this_first')
+
+ ti = TaskInstance(task, DEFAULT_DATE, State.RUNNING)
+ ti.hostname = "test_hostname"
+ session.add(ti)
+
+ with create_session() as session:
+ requests = [
+ TaskCallbackRequest(
+ full_filepath="A", simple_task_instance=SimpleTaskInstance(ti), msg="Message"
+ )
+ ]
+ dag_file_processor.execute_callbacks(dagbag, requests)
+ tis = session.query(TaskInstance)
+ assert tis[0].hostname == "test_hostname"
+
def test_process_file_should_failure_callback(self):
dag_file = os.path.join(
os.path.dirname(os.path.realpath(__file__)), '../dags/test_on_failure_callback.py'