You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by jh...@apache.org on 2021/08/12 01:41:13 UTC
[airflow] 02/03: Fix external elasticsearch logs link (#16357)
This is an automated email from the ASF dual-hosted git repository.
jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 5ea02ccf460dd3fcd0f85cd2d51e83d451f87a04
Author: Jed Cunningham <66...@users.noreply.github.com>
AuthorDate: Tue Jun 15 09:40:01 2021 -0600
Fix external elasticsearch logs link (#16357)
During the 2.0 upgrade, the external log link when using elasticsearch
remote logs was broken. This fixes it, including it only being shown if
`[elasticsearch] frontend` is set.
(cherry picked from commit e31e515b28a745b7428b42f1559ab456305fb3a0)
---
.../providers/elasticsearch/log/es_task_handler.py | 9 +-
airflow/utils/log/log_reader.py | 7 +-
airflow/utils/log/logging_mixin.py | 5 +
.../elasticsearch/log/test_es_task_handler.py | 10 ++
tests/utils/log/test_log_reader.py | 21 ++++
tests/www/views/test_views_log.py | 16 ++-
tests/www/views/test_views_tasks.py | 128 ++++++++++++++++++++-
7 files changed, 187 insertions(+), 9 deletions(-)
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py
index e20038c..76ce946 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -36,13 +36,13 @@ from airflow.utils import timezone
from airflow.utils.helpers import parse_template_string
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.json_formatter import JSONFormatter
-from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin
# Elasticsearch hosted log type
EsLogMsgType = List[Tuple[str, str]]
-class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
+class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin):
"""
ElasticsearchTaskHandler is a python log handler that
reads logs from Elasticsearch. Note logs are not directly
@@ -350,6 +350,11 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
url = 'https://' + self.frontend.format(log_id=quote(log_id))
return url
+ @property
+ def supports_external_link(self) -> bool:
+ """Whether we can support external links"""
+ return bool(self.frontend)
+
class _ESJsonLogFmt:
"""Helper class to read ES Logs and re-format it to match settings.LOG_FORMAT"""
diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py
index 0e1f691..bebe369 100644
--- a/airflow/utils/log/log_reader.py
+++ b/airflow/utils/log/log_reader.py
@@ -98,9 +98,12 @@ class TaskLogReader:
return hasattr(self.log_handler, 'read')
@property
- def supports_external_link(self):
+ def supports_external_link(self) -> bool:
"""Check if the logging handler supports external links (e.g. to Elasticsearch, Stackdriver, etc)."""
- return isinstance(self.log_handler, ExternalLoggingMixin)
+ if not isinstance(self.log_handler, ExternalLoggingMixin):
+ return False
+
+ return self.log_handler.supports_external_link
def render_log_filename(self, ti: TaskInstance, try_number: Optional[int] = None):
"""
diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py
index e101eb5..1fc0757 100644
--- a/airflow/utils/log/logging_mixin.py
+++ b/airflow/utils/log/logging_mixin.py
@@ -66,6 +66,11 @@ class ExternalLoggingMixin:
def get_external_log_url(self, task_instance, try_number) -> str:
"""Return the URL for log visualization in the external service."""
+ @property
+ @abc.abstractmethod
+ def supports_external_link(self) -> bool:
+ """Return whether handler is able to support external links."""
+
# TODO: Formally inherit from io.IOBase
class StreamLogWriter:
diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py
index 9a5ec3b..d06e016 100644
--- a/tests/providers/elasticsearch/log/test_es_task_handler.py
+++ b/tests/providers/elasticsearch/log/test_es_task_handler.py
@@ -443,3 +443,13 @@ class TestElasticsearchTaskHandler(unittest.TestCase):
)
url = es_task_handler.get_external_log_url(self.ti, self.ti.try_number)
assert expected_url == url
+
+ @parameterized.expand(
+ [
+ ('localhost:5601/{log_id}', True),
+ (None, False),
+ ]
+ )
+ def test_supports_external_link(self, frontend, expected):
+ self.es_task_handler.frontend = frontend
+ assert self.es_task_handler.supports_external_link == expected
diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py
index cef374c..301eed7 100644
--- a/tests/utils/log/test_log_reader.py
+++ b/tests/utils/log/test_log_reader.py
@@ -30,6 +30,7 @@ from airflow.models import TaskInstance
from airflow.operators.dummy import DummyOperator
from airflow.utils import timezone
from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.log.logging_mixin import ExternalLoggingMixin
from airflow.utils.session import create_session
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_runs
@@ -215,3 +216,23 @@ class TestLogView(unittest.TestCase):
],
any_order=False,
)
+
+ def test_supports_external_link(self):
+ task_log_reader = TaskLogReader()
+
+ # Short circuit if log_handler doesn't include ExternalLoggingMixin
+ task_log_reader.log_handler = mock.MagicMock()
+ mock_prop = mock.PropertyMock()
+ mock_prop.return_value = False
+ type(task_log_reader.log_handler).supports_external_link = mock_prop
+ assert not task_log_reader.supports_external_link
+ mock_prop.assert_not_called()
+
+ # Otherwise, defer to the log_handlers supports_external_link
+ task_log_reader.log_handler = mock.MagicMock(spec=ExternalLoggingMixin)
+ type(task_log_reader.log_handler).supports_external_link = mock_prop
+ assert not task_log_reader.supports_external_link
+ mock_prop.assert_called_once()
+
+ mock_prop.return_value = True
+ assert task_log_reader.supports_external_link
diff --git a/tests/www/views/test_views_log.py b/tests/www/views/test_views_log.py
index a56b0f0..152c43d 100644
--- a/tests/www/views/test_views_log.py
+++ b/tests/www/views/test_views_log.py
@@ -379,17 +379,25 @@ def test_redirect_to_external_log_with_local_log_handler(log_admin_client, task_
assert 'http://localhost/home' == response.headers['Location']
-class ExternalHandler(ExternalLoggingMixin):
+class _ExternalHandler(ExternalLoggingMixin):
EXTERNAL_URL = 'http://external-service.com'
- def get_external_log_url(self, *args, **kwargs):
+ @property
+ def log_name(self) -> str:
+ return 'ExternalLog'
+
+ def get_external_log_url(self, *args, **kwargs) -> str:
return self.EXTERNAL_URL
+ @property
+ def supports_external_link(self) -> bool:
+ return True
+
@unittest.mock.patch(
'airflow.utils.log.log_reader.TaskLogReader.log_handler',
new_callable=unittest.mock.PropertyMock,
- return_value=ExternalHandler(),
+ return_value=_ExternalHandler(),
)
def test_redirect_to_external_log_with_external_log_handler(_, log_admin_client):
url_template = "redirect_to_external_log?dag_id={}&task_id={}&execution_date={}&try_number={}"
@@ -402,4 +410,4 @@ def test_redirect_to_external_log_with_external_log_handler(_, log_admin_client)
)
response = log_admin_client.get(url)
assert 302 == response.status_code
- assert ExternalHandler.EXTERNAL_URL == response.headers['Location']
+ assert _ExternalHandler.EXTERNAL_URL == response.headers['Location']
diff --git a/tests/www/views/test_views_tasks.py b/tests/www/views/test_views_tasks.py
index b8c2d83..88f881c 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -551,12 +551,20 @@ def test_show_external_log_redirect_link_with_local_log_handler(capture_template
class _ExternalHandler(ExternalLoggingMixin):
+ _supports_external_link = True
LOG_NAME = 'ExternalLog'
@property
- def log_name(self):
+ def log_name(self) -> str:
return self.LOG_NAME
+ def get_external_log_url(self, *args, **kwargs) -> str:
+ return 'http://external-service.com'
+
+ @property
+ def supports_external_link(self) -> bool:
+ return self._supports_external_link
+
@pytest.mark.parametrize("endpoint", ["graph", "tree"])
@unittest.mock.patch(
@@ -574,3 +582,121 @@ def test_show_external_log_redirect_link_with_external_log_handler(
ctx = templates[0].local_context
assert ctx['show_external_log_redirect']
assert ctx['external_log_name'] == _ExternalHandler.LOG_NAME
+
+
+@pytest.mark.parametrize("endpoint", ["graph", "tree"])
+@unittest.mock.patch(
+ 'airflow.utils.log.log_reader.TaskLogReader.log_handler',
+ new_callable=unittest.mock.PropertyMock,
+ return_value=_ExternalHandler(),
+)
+def test_external_log_redirect_link_with_external_log_handler_not_shown(
+ _external_handler, capture_templates, admin_client, endpoint
+):
+ """Show external links if log handler is external."""
+ _external_handler.return_value._supports_external_link = False
+ url = f'{endpoint}?dag_id=example_bash_operator'
+ with capture_templates() as templates:
+ admin_client.get(url, follow_redirects=True)
+ ctx = templates[0].local_context
+ assert not ctx['show_external_log_redirect']
+ assert ctx['external_log_name'] is None
+
+
+def _get_appbuilder_pk_string(model_view_cls, instance) -> str:
+ """Utility to get Flask-Appbuilder's string format "pk" for an object.
+
+ Used to generate requests to FAB action views without *too* much difficulty.
+ The implementation relies on FAB internals, but unfortunately I don't see
+ a better way around it.
+
+ Example usage::
+
+ >>> from airflow.www.views import TaskInstanceModelView
+ >>> ti = session.Query(TaskInstance).filter(...).one()
+ >>> pk = _get_appbuilder_pk_string(TaskInstanceModelView, ti)
+ >>> client.post("...", data={"action": "...", "rowid": pk})
+ """
+ pk_value = model_view_cls.datamodel.get_pk_value(instance)
+ return model_view_cls._serialize_pk_if_composite(model_view_cls, pk_value)
+
+
+def test_task_instance_clear(session, admin_client):
+ task_id = "runme_0"
+
+ # Set the state to success for clearing.
+ ti_q = session.query(TaskInstance).filter(TaskInstance.task_id == task_id)
+ ti_q.update({"state": State.SUCCESS})
+ session.commit()
+
+ # Send a request to clear.
+ rowid = _get_appbuilder_pk_string(TaskInstanceModelView, ti_q.one())
+ resp = admin_client.post(
+ "/taskinstance/action_post",
+ data={"action": "clear", "rowid": rowid},
+ follow_redirects=True,
+ )
+ assert resp.status_code == 200
+
+ # Now the state should be None.
+ state = session.query(TaskInstance.state).filter(TaskInstance.task_id == task_id).scalar()
+ assert state == State.NONE
+
+
+def test_task_instance_clear_failure(admin_client):
+ rowid = '["12345"]' # F.A.B. crashes if the rowid is *too* invalid.
+ resp = admin_client.post(
+ "/taskinstance/action_post",
+ data={"action": "clear", "rowid": rowid},
+ follow_redirects=True,
+ )
+ assert resp.status_code == 200
+ check_content_in_response("Failed to clear task instances:", resp)
+
+
+@pytest.mark.parametrize(
+ "action, expected_state",
+ [
+ ("set_running", State.RUNNING),
+ ("set_failed", State.FAILED),
+ ("set_success", State.SUCCESS),
+ ("set_retry", State.UP_FOR_RETRY),
+ ],
+ ids=["running", "failed", "success", "retry"],
+)
+def test_task_instance_set_state(session, admin_client, action, expected_state):
+ task_id = "runme_0"
+
+ # Send a request to clear.
+ ti_q = session.query(TaskInstance).filter(TaskInstance.task_id == task_id)
+ rowid = _get_appbuilder_pk_string(TaskInstanceModelView, ti_q.one())
+ resp = admin_client.post(
+ "/taskinstance/action_post",
+ data={"action": action, "rowid": rowid},
+ follow_redirects=True,
+ )
+ assert resp.status_code == 200
+
+ # Now the state should be modified.
+ state = session.query(TaskInstance.state).filter(TaskInstance.task_id == task_id).scalar()
+ assert state == expected_state
+
+
+@pytest.mark.parametrize(
+ "action",
+ [
+ "set_running",
+ "set_failed",
+ "set_success",
+ "set_retry",
+ ],
+)
+def test_task_instance_set_state_failure(admin_client, action):
+ rowid = '["12345"]' # F.A.B. crashes if the rowid is *too* invalid.
+ resp = admin_client.post(
+ "/taskinstance/action_post",
+ data={"action": action, "rowid": rowid},
+ follow_redirects=True,
+ )
+ assert resp.status_code == 200
+ check_content_in_response("Failed to set state", resp)