You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/08/13 19:49:11 UTC

[airflow] 02/08: Fix external elasticsearch logs link (#16357)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5a38d8e119bdb1e51ece2e690e27fe9eddc7ca0c
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Tue Jun 15 09:40:01 2021 -0600

    Fix external elasticsearch logs link (#16357)
    
    During the 2.0 upgrade, the external log link when using elasticsearch
    remote logs was broken. This fixes it, including it only being shown if
    `[elasticsearch] frontend` is set.
    
    (cherry picked from commit e31e515b28a745b7428b42f1559ab456305fb3a0)
---
 .../providers/elasticsearch/log/es_task_handler.py |  9 +++++--
 airflow/utils/log/log_reader.py                    |  7 ++++--
 airflow/utils/log/logging_mixin.py                 |  5 ++++
 .../elasticsearch/log/test_es_task_handler.py      | 10 ++++++++
 tests/utils/log/test_log_reader.py                 | 21 ++++++++++++++++
 tests/www/views/test_views_log.py                  | 16 +++++++++---
 tests/www/views/test_views_tasks.py                | 29 +++++++++++++++++++++-
 7 files changed, 88 insertions(+), 9 deletions(-)

diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py
index e20038c..76ce946 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -36,13 +36,13 @@ from airflow.utils import timezone
 from airflow.utils.helpers import parse_template_string
 from airflow.utils.log.file_task_handler import FileTaskHandler
 from airflow.utils.log.json_formatter import JSONFormatter
-from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
 
 # Elasticsearch hosted log type
 EsLogMsgType = List[Tuple[str, str]]
 
 
-class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
+class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin):
     """
     ElasticsearchTaskHandler is a python log handler that
     reads logs from Elasticsearch. Note logs are not directly
@@ -350,6 +350,11 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
         url = 'https://' + self.frontend.format(log_id=quote(log_id))
         return url
 
+    @property
+    def supports_external_link(self) -> bool:
+        """Whether we can support external links"""
+        return bool(self.frontend)
+
 
 class _ESJsonLogFmt:
     """Helper class to read ES Logs and re-format it to match settings.LOG_FORMAT"""
diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py
index 0e1f691..bebe369 100644
--- a/airflow/utils/log/log_reader.py
+++ b/airflow/utils/log/log_reader.py
@@ -98,9 +98,12 @@ class TaskLogReader:
         return hasattr(self.log_handler, 'read')
 
     @property
-    def supports_external_link(self):
+    def supports_external_link(self) -> bool:
         """Check if the logging handler supports external links (e.g. to Elasticsearch, Stackdriver, etc)."""
-        return isinstance(self.log_handler, ExternalLoggingMixin)
+        if not isinstance(self.log_handler, ExternalLoggingMixin):
+            return False
+
+        return self.log_handler.supports_external_link
 
     def render_log_filename(self, ti: TaskInstance, try_number: Optional[int] = None):
         """
diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py
index e101eb5..1fc0757 100644
--- a/airflow/utils/log/logging_mixin.py
+++ b/airflow/utils/log/logging_mixin.py
@@ -66,6 +66,11 @@ class ExternalLoggingMixin:
     def get_external_log_url(self, task_instance, try_number) -> str:
         """Return the URL for log visualization in the external service."""
 
+    @property
+    @abc.abstractmethod
+    def supports_external_link(self) -> bool:
+        """Return whether handler is able to support external links."""
+
 
 # TODO: Formally inherit from io.IOBase
 class StreamLogWriter:
diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py
index 9a5ec3b..d06e016 100644
--- a/tests/providers/elasticsearch/log/test_es_task_handler.py
+++ b/tests/providers/elasticsearch/log/test_es_task_handler.py
@@ -443,3 +443,13 @@ class TestElasticsearchTaskHandler(unittest.TestCase):
         )
         url = es_task_handler.get_external_log_url(self.ti, self.ti.try_number)
         assert expected_url == url
+
+    @parameterized.expand(
+        [
+            ('localhost:5601/{log_id}', True),
+            (None, False),
+        ]
+    )
+    def test_supports_external_link(self, frontend, expected):
+        self.es_task_handler.frontend = frontend
+        assert self.es_task_handler.supports_external_link == expected
diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py
index cef374c..301eed7 100644
--- a/tests/utils/log/test_log_reader.py
+++ b/tests/utils/log/test_log_reader.py
@@ -30,6 +30,7 @@ from airflow.models import TaskInstance
 from airflow.operators.dummy import DummyOperator
 from airflow.utils import timezone
 from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin
 from airflow.utils.session import create_session
 from tests.test_utils.config import conf_vars
 from tests.test_utils.db import clear_db_runs
@@ -215,3 +216,23 @@ class TestLogView(unittest.TestCase):
             ],
             any_order=False,
         )
+
+    def test_supports_external_link(self):
+        task_log_reader = TaskLogReader()
+
+        # Short circuit if log_handler doesn't include ExternalLoggingMixin
+        task_log_reader.log_handler = mock.MagicMock()
+        mock_prop = mock.PropertyMock()
+        mock_prop.return_value = False
+        type(task_log_reader.log_handler).supports_external_link = mock_prop
+        assert not task_log_reader.supports_external_link
+        mock_prop.assert_not_called()
+
+        # Otherwise, defer to the log_handlers supports_external_link
+        task_log_reader.log_handler = mock.MagicMock(spec=ExternalLoggingMixin)
+        type(task_log_reader.log_handler).supports_external_link = mock_prop
+        assert not task_log_reader.supports_external_link
+        mock_prop.assert_called_once()
+
+        mock_prop.return_value = True
+        assert task_log_reader.supports_external_link
diff --git a/tests/www/views/test_views_log.py b/tests/www/views/test_views_log.py
index a56b0f0..152c43d 100644
--- a/tests/www/views/test_views_log.py
+++ b/tests/www/views/test_views_log.py
@@ -379,17 +379,25 @@ def test_redirect_to_external_log_with_local_log_handler(log_admin_client, task_
     assert 'http://localhost/home' == response.headers['Location']
 
 
-class ExternalHandler(ExternalLoggingMixin):
+class _ExternalHandler(ExternalLoggingMixin):
     EXTERNAL_URL = 'http://external-service.com'
 
-    def get_external_log_url(self, *args, **kwargs):
+    @property
+    def log_name(self) -> str:
+        return 'ExternalLog'
+
+    def get_external_log_url(self, *args, **kwargs) -> str:
         return self.EXTERNAL_URL
 
+    @property
+    def supports_external_link(self) -> bool:
+        return True
+
 
 @unittest.mock.patch(
     'airflow.utils.log.log_reader.TaskLogReader.log_handler',
     new_callable=unittest.mock.PropertyMock,
-    return_value=ExternalHandler(),
+    return_value=_ExternalHandler(),
 )
 def test_redirect_to_external_log_with_external_log_handler(_, log_admin_client):
     url_template = "redirect_to_external_log?dag_id={}&task_id={}&execution_date={}&try_number={}"
@@ -402,4 +410,4 @@ def test_redirect_to_external_log_with_external_log_handler(_, log_admin_client)
     )
     response = log_admin_client.get(url)
     assert 302 == response.status_code
-    assert ExternalHandler.EXTERNAL_URL == response.headers['Location']
+    assert _ExternalHandler.EXTERNAL_URL == response.headers['Location']
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index b8c2d83..7c94006 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -551,12 +551,20 @@ def test_show_external_log_redirect_link_with_local_log_handler(capture_template
 
 
 class _ExternalHandler(ExternalLoggingMixin):
+    _supports_external_link = True
     LOG_NAME = 'ExternalLog'
 
     @property
-    def log_name(self):
+    def log_name(self) -> str:
         return self.LOG_NAME
 
+    def get_external_log_url(self, *args, **kwargs) -> str:
+        return 'http://external-service.com'
+
+    @property
+    def supports_external_link(self) -> bool:
+        return self._supports_external_link
+
 
 @pytest.mark.parametrize("endpoint", ["graph", "tree"])
 @unittest.mock.patch(
@@ -574,3 +582,22 @@ def test_show_external_log_redirect_link_with_external_log_handler(
         ctx = templates[0].local_context
         assert ctx['show_external_log_redirect']
         assert ctx['external_log_name'] == _ExternalHandler.LOG_NAME
+
+
+@pytest.mark.parametrize("endpoint", ["graph", "tree"])
+@unittest.mock.patch(
+    'airflow.utils.log.log_reader.TaskLogReader.log_handler',
+    new_callable=unittest.mock.PropertyMock,
+    return_value=_ExternalHandler(),
+)
+def test_external_log_redirect_link_with_external_log_handler_not_shown(
+    _external_handler, capture_templates, admin_client, endpoint
+):
+    """Show external links if log handler is external."""
+    _external_handler.return_value._supports_external_link = False
+    url = f'{endpoint}?dag_id=example_bash_operator'
+    with capture_templates() as templates:
+        admin_client.get(url, follow_redirects=True)
+        ctx = templates[0].local_context
+        assert not ctx['show_external_log_redirect']
+        assert ctx['external_log_name'] is None