You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/06/30 05:45:03 UTC
[airflow] branch master updated: Add log endpoint (#9331)
This is an automated email from the ASF dual-hosted git repository.
kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 59035a0 Add log endpoint (#9331)
59035a0 is described below
commit 59035a0b36eec49b5ea53b6d9c0d9009d4d101d3
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Tue Jun 30 06:44:35 2020 +0100
Add log endpoint (#9331)
Co-authored-by: Kamil BreguĊa <ka...@polidea.com>
---
airflow/api_connexion/endpoints/log_endpoint.py | 69 +++++-
.../log_endpoint.py => schemas/log_schema.py} | 23 +-
requirements/requirements-python3.6.txt | 20 +-
requirements/requirements-python3.7.txt | 20 +-
requirements/requirements-python3.8.txt | 20 +-
tests/api_connexion/endpoints/test_log_endpoint.py | 253 ++++++++++++++++++++-
6 files changed, 355 insertions(+), 50 deletions(-)
diff --git a/airflow/api_connexion/endpoints/log_endpoint.py b/airflow/api_connexion/endpoints/log_endpoint.py
index df66a95..d990b9b 100644
--- a/airflow/api_connexion/endpoints/log_endpoint.py
+++ b/airflow/api_connexion/endpoints/log_endpoint.py
@@ -15,12 +15,73 @@
# specific language governing permissions and limitations
# under the License.
-# TODO(mik-laj): We have to implement it.
-# Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.schemas.log_schema import LogResponseObject, logs_schema
+from airflow.models import DagRun
+from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.session import provide_session
-def get_log():
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+ full_content=False, token=None):
"""
Get logs for specific task instance
"""
- raise NotImplementedError("Not implemented yet.")
+ key = current_app.config["SECRET_KEY"]
+ if not token:
+ metadata = {}
+ else:
+ try:
+ metadata = URLSafeSerializer(key).loads(token)
+ except BadSignature:
+ raise BadRequest("Bad Signature. Please use only the tokens provided by the API.")
+
+ if metadata.get('download_logs', None) and metadata['download_logs']:
+ full_content = True
+
+ if full_content:
+ metadata['download_logs'] = True
+ else:
+ metadata['download_logs'] = False
+
+ task_log_reader = TaskLogReader()
+ if not task_log_reader.is_supported:
+ raise BadRequest("Task log handler does not support read logs.")
+
+ query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+ dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+ if not dag_run:
+ raise NotFound("DAG Run not found")
+
+ ti = dag_run.get_task_instance(task_id, session)
+ if ti is None:
+ metadata['end_of_log'] = True
+ raise BadRequest(detail="Task instance did not exist in the DB")
+
+ dag = current_app.dag_bag.get_dag(dag_id)
+ if dag:
+ ti.task = dag.get_task(ti.task_id)
+
+ return_type = request.accept_mimetypes.best_match(['text/plain', 'application/json'])
+
+ # return_type would be either the above two or None
+
+ if return_type == 'application/json' or return_type is None: # default
+ logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
+ logs = logs[0] if task_try_number is not None else logs
+ token = URLSafeSerializer(key).dumps(metadata)
+ return logs_schema.dump(LogResponseObject(continuation_token=token,
+ content=logs)
+ )
+ # text/plain. Stream
+ logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)
+
+ return Response(
+ logs,
+ headers={"Content-Type": return_type}
+ )
diff --git a/airflow/api_connexion/endpoints/log_endpoint.py b/airflow/api_connexion/schemas/log_schema.py
similarity index 69%
copy from airflow/api_connexion/endpoints/log_endpoint.py
copy to airflow/api_connexion/schemas/log_schema.py
index df66a95..2e48fc7 100644
--- a/airflow/api_connexion/endpoints/log_endpoint.py
+++ b/airflow/api_connexion/schemas/log_schema.py
@@ -14,13 +14,22 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from typing import NamedTuple
-# TODO(mik-laj): We have to implement it.
-# Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+from marshmallow import Schema, fields
-def get_log():
- """
- Get logs for specific task instance
- """
- raise NotImplementedError("Not implemented yet.")
+class LogsSchema(Schema):
+ """ Schema for logs """
+
+ content = fields.Str()
+ continuation_token = fields.Str()
+
+
+class LogResponseObject(NamedTuple):
+ """ Log Response Object """
+ content: str
+ continuation_token: str
+
+
+logs_schema = LogsSchema(strict=True)
diff --git a/requirements/requirements-python3.6.txt b/requirements/requirements-python3.6.txt
index 732f997..30a3d01 100644
--- a/requirements/requirements-python3.6.txt
+++ b/requirements/requirements-python3.6.txt
@@ -72,12 +72,12 @@ beautifulsoup4==4.7.1
billiard==3.6.3.0
black==19.10b0
blinker==1.4
-boto3==1.14.11
+boto3==1.14.12
boto==2.49.0
-botocore==1.17.11
+botocore==1.17.12
bowler==0.8.0
cached-property==1.5.1
-cachetools==4.1.0
+cachetools==4.1.1
cassandra-driver==3.20.2
cattrs==1.0.0
celery==4.4.6
@@ -99,13 +99,13 @@ coverage==5.1
croniter==0.3.34
cryptography==2.9.2
curlify==2.2.1
-cx-Oracle==7.3.0
+cx-Oracle==8.0.0
dask==2.19.0
datadog==0.37.1
decorator==4.4.2
defusedxml==0.6.0
dill==0.3.2
-distlib==0.3.0
+distlib==0.3.1
distributed==2.19.0
dnspython==1.16.0
docker-pycreds==0.4.0
@@ -182,16 +182,16 @@ humanize==0.5.1
hvac==0.10.4
identify==1.4.20
idna-ssl==1.1.0
-idna==2.9
+idna==2.10
ijson==2.6.1
imagesize==1.2.0
immutables==0.14
-importlib-metadata==1.6.1
+importlib-metadata==1.7.0
importlib-resources==2.0.1
inflection==0.5.0
ipdb==0.13.3
ipython-genutils==0.2.0
-ipython==7.15.0
+ipython==7.16.1
iso8601==0.1.12
isodate==0.6.0
isort==4.3.21
@@ -307,7 +307,7 @@ python-jenkins==1.7.0
python-jose==3.1.0
python-nvd3==0.15.0
python-slugify==4.0.0
-python3-openid==3.1.0
+python3-openid==3.2.0
pytz==2020.1
pytzdata==2019.3
pywinrm==0.4.1
@@ -371,7 +371,7 @@ thrift==0.13.0
toml==0.10.1
toolz==0.10.0
tornado==5.1.1
-tqdm==4.46.1
+tqdm==4.47.0
traitlets==4.3.3
typed-ast==1.4.1
typing-extensions==3.7.4.2
diff --git a/requirements/requirements-python3.7.txt b/requirements/requirements-python3.7.txt
index 14ca634..dcd1017 100644
--- a/requirements/requirements-python3.7.txt
+++ b/requirements/requirements-python3.7.txt
@@ -72,12 +72,12 @@ beautifulsoup4==4.7.1
billiard==3.6.3.0
black==19.10b0
blinker==1.4
-boto3==1.14.11
+boto3==1.14.12
boto==2.49.0
-botocore==1.17.11
+botocore==1.17.12
bowler==0.8.0
cached-property==1.5.1
-cachetools==4.1.0
+cachetools==4.1.1
cassandra-driver==3.20.2
cattrs==1.0.0
celery==4.4.6
@@ -98,13 +98,13 @@ coverage==5.1
croniter==0.3.34
cryptography==2.9.2
curlify==2.2.1
-cx-Oracle==7.3.0
+cx-Oracle==8.0.0
dask==2.19.0
datadog==0.37.1
decorator==4.4.2
defusedxml==0.6.0
dill==0.3.2
-distlib==0.3.0
+distlib==0.3.1
distributed==2.19.0
dnspython==1.16.0
docker-pycreds==0.4.0
@@ -180,14 +180,14 @@ httplib2==0.18.1
humanize==0.5.1
hvac==0.10.4
identify==1.4.20
-idna==2.9
+idna==2.10
ijson==2.6.1
imagesize==1.2.0
-importlib-metadata==1.6.1
+importlib-metadata==1.7.0
inflection==0.5.0
ipdb==0.13.3
ipython-genutils==0.2.0
-ipython==7.15.0
+ipython==7.16.1
iso8601==0.1.12
isodate==0.6.0
isort==4.3.21
@@ -302,7 +302,7 @@ python-jenkins==1.7.0
python-jose==3.1.0
python-nvd3==0.15.0
python-slugify==4.0.0
-python3-openid==3.1.0
+python3-openid==3.2.0
pytz==2020.1
pytzdata==2019.3
pywinrm==0.4.1
@@ -366,7 +366,7 @@ thrift==0.13.0
toml==0.10.1
toolz==0.10.0
tornado==5.1.1
-tqdm==4.46.1
+tqdm==4.47.0
traitlets==4.3.3
typed-ast==1.4.1
typing-extensions==3.7.4.2
diff --git a/requirements/requirements-python3.8.txt b/requirements/requirements-python3.8.txt
index c708f66..ea020b7 100644
--- a/requirements/requirements-python3.8.txt
+++ b/requirements/requirements-python3.8.txt
@@ -72,12 +72,12 @@ beautifulsoup4==4.7.1
billiard==3.6.3.0
black==19.10b0
blinker==1.4
-boto3==1.14.11
+boto3==1.14.12
boto==2.49.0
-botocore==1.17.11
+botocore==1.17.12
bowler==0.8.0
cached-property==1.5.1
-cachetools==4.1.0
+cachetools==4.1.1
cassandra-driver==3.20.2
cattrs==1.0.0
celery==4.4.6
@@ -98,13 +98,13 @@ coverage==5.1
croniter==0.3.34
cryptography==2.9.2
curlify==2.2.1
-cx-Oracle==7.3.0
+cx-Oracle==8.0.0
dask==2.19.0
datadog==0.37.1
decorator==4.4.2
defusedxml==0.6.0
dill==0.3.2
-distlib==0.3.0
+distlib==0.3.1
distributed==2.19.0
dnspython==1.16.0
docker-pycreds==0.4.0
@@ -180,14 +180,14 @@ httplib2==0.18.1
humanize==0.5.1
hvac==0.10.4
identify==1.4.20
-idna==2.9
+idna==2.10
ijson==2.6.1
imagesize==1.2.0
-importlib-metadata==1.6.1
+importlib-metadata==1.7.0
inflection==0.5.0
ipdb==0.13.3
ipython-genutils==0.2.0
-ipython==7.15.0
+ipython==7.16.1
iso8601==0.1.12
isodate==0.6.0
isort==4.3.21
@@ -301,7 +301,7 @@ python-jenkins==1.7.0
python-jose==3.1.0
python-nvd3==0.15.0
python-slugify==4.0.0
-python3-openid==3.1.0
+python3-openid==3.2.0
pytz==2020.1
pytzdata==2019.3
pywinrm==0.4.1
@@ -365,7 +365,7 @@ thrift==0.13.0
toml==0.10.1
toolz==0.10.0
tornado==5.1.1
-tqdm==4.46.1
+tqdm==4.47.0
traitlets==4.3.3
typed-ast==1.4.1
typing-extensions==3.7.4.2
diff --git a/tests/api_connexion/endpoints/test_log_endpoint.py b/tests/api_connexion/endpoints/test_log_endpoint.py
index cbc564a..5fa274b 100644
--- a/tests/api_connexion/endpoints/test_log_endpoint.py
+++ b/tests/api_connexion/endpoints/test_log_endpoint.py
@@ -14,25 +14,260 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import copy
+import logging.config
+import os
+import shutil
+import sys
+import tempfile
import unittest
+from unittest import mock
+from unittest.mock import PropertyMock
-import pytest
+from itsdangerous.url_safe import URLSafeSerializer
+from airflow import DAG, settings
+from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
+from airflow.models import DagRun, TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session, provide_session
+from airflow.utils.types import DagRunType
from airflow.www import app
+from tests.test_utils.config import conf_vars
+from tests.test_utils.db import clear_db_runs
class TestGetLog(unittest.TestCase):
+ DAG_ID = 'dag_for_testing_log_endpoint'
+ TASK_ID = 'task_for_testing_log_endpoint'
+ TRY_NUMBER = 1
+
@classmethod
- def setUpClass(cls) -> None:
- super().setUpClass()
- cls.app = app.create_app(testing=True) # type:ignore
+ def setUpClass(cls):
+ settings.configure_orm()
+ cls.session = settings.Session
+ cls.app = app.create_app(testing=True)
def setUp(self) -> None:
- self.client = self.app.test_client() # type:ignore
+ self.default_time = "2020-06-10T20:00:00+00:00"
+ self.client = self.app.test_client()
+ self.log_dir = tempfile.mkdtemp()
+ # Make sure that the configure_logging is not cached
+ self.old_modules = dict(sys.modules)
+ self._prepare_log_files()
+ self._configure_loggers()
+ self._prepare_db()
+
+ def _create_dagrun(self, session):
+ dagrun_model = DagRun(
+ dag_id=self.DAG_ID,
+ run_id='TEST_DAG_RUN_ID',
+ run_type=DagRunType.MANUAL.value,
+ execution_date=timezone.parse(self.default_time),
+ start_date=timezone.parse(self.default_time),
+ external_trigger=True,
+ )
+ session.add(dagrun_model)
+ session.commit()
+
+ def _configure_loggers(self):
+ # Create a custom logging configuration
+ logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
+ logging_config['handlers']['task']['base_log_folder'] = self.log_dir
+
+ logging_config['handlers']['task']['filename_template'] = \
+ '{{ ti.dag_id }}/{{ ti.task_id }}/' \
+ '{{ ts | replace(":", ".") }}/{{ try_number }}.log'
+
+ # Write the custom logging configuration to a file
+ self.settings_folder = tempfile.mkdtemp()
+ settings_file = os.path.join(self.settings_folder, "airflow_local_settings.py")
+ new_logging_file = "LOGGING_CONFIG = {}".format(logging_config)
+ with open(settings_file, 'w') as handle:
+ handle.writelines(new_logging_file)
+ sys.path.append(self.settings_folder)
+
+ with conf_vars({('logging', 'logging_config_class'): 'airflow_local_settings.LOGGING_CONFIG'}):
+ self.app = app.create_app(testing=True)
+ self.client = self.app.test_client()
+ settings.configure_logging()
+
+ def _prepare_db(self):
+ dagbag = self.app.dag_bag # pylint: disable=no-member
+ dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time))
+ dag.sync_to_db()
+ dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag)
+ with create_session() as session:
+ self.ti = TaskInstance(
+ task=DummyOperator(task_id=self.TASK_ID, dag=dag),
+ execution_date=timezone.parse(self.default_time)
+ )
+ self.ti.try_number = 1
+ session.merge(self.ti)
+
+ def _prepare_log_files(self):
+ dir_path = f"{self.log_dir}/{self.DAG_ID}/{self.TASK_ID}/" \
+ f"{self.default_time.replace(':', '.')}/"
+ os.makedirs(dir_path)
+ with open(f"{dir_path}/1.log", "w+") as file:
+ file.write("Log for testing.")
+ file.flush()
+
+ def tearDown(self):
+ logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+ clear_db_runs()
+
+ # Remove any new modules imported during the test run. This lets us
+ # import the same source files for more than one test.
+ for mod in [m for m in sys.modules if m not in self.old_modules]:
+ del sys.modules[mod]
+
+ sys.path.remove(self.settings_folder)
+ shutil.rmtree(self.settings_folder)
+ shutil.rmtree(self.log_dir)
+
+ super().tearDown()
+
+ @provide_session
+ def test_should_response_200_json(self, session):
+ self._create_dagrun(session)
+ key = self.app.config["SECRET_KEY"]
+ serializer = URLSafeSerializer(key)
+ token = serializer.dumps({"download_logs": False})
+ headers = {'Accept': 'application/json'}
+ response = self.client.get(
+ f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+ f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+ headers=headers
+ )
+ expected_filename = "{}/{}/{}/{}/1.log".format(
+ self.log_dir,
+ self.DAG_ID,
+ self.TASK_ID,
+ self.default_time.replace(":", ".")
+ )
+ self.assertEqual(
+ response.json['content'],
+ f"*** Reading local file: {expected_filename}\nLog for testing."
+ )
+ info = serializer.loads(response.json['continuation_token'])
+ self.assertEqual(
+ info,
+ {'end_of_log': True}
+ )
+ self.assertEqual(200, response.status_code)
+
+ @provide_session
+ def test_should_response_200_text_plain(self, session):
+ self._create_dagrun(session)
+ key = self.app.config["SECRET_KEY"]
+ serializer = URLSafeSerializer(key)
+ token = serializer.dumps({"download_logs": True})
- @pytest.mark.skip(reason="Not implemented yet")
- def test_should_response_200(self):
response = self.client.get(
- "/dags/TEST_DG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_TASK_ID/logs/3"
+ f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+ f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+ headers={'Accept': 'text/plain'}
+ )
+ expected_filename = "{}/{}/{}/{}/1.log".format(
+ self.log_dir,
+ self.DAG_ID,
+ self.TASK_ID,
+ self.default_time.replace(':', '.')
+ )
+ self.assertEqual(200, response.status_code)
+ self.assertEqual(
+ response.data.decode('utf-8'),
+ f"*** Reading local file: {expected_filename}\nLog for testing.\n"
+ )
+
+ @provide_session
+ def test_get_logs_response_with_ti_equal_to_none(self, session):
+ self._create_dagrun(session)
+ key = self.app.config["SECRET_KEY"]
+ serializer = URLSafeSerializer(key)
+ token = serializer.dumps({"download_logs": True})
+
+ response = self.client.get(
+ f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+ f"taskInstances/Invalid-Task-ID/logs/1?token={token}",
+ )
+ self.assertEqual(response.status_code, 400)
+ self.assertEqual(response.json['detail'], "Task instance did not exist in the DB")
+
+ @provide_session
+ def test_get_logs_with_metadata_as_download_large_file(self, session):
+ self._create_dagrun(session)
+ with mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.read") as read_mock:
+ first_return = (['1st line'], [{}])
+ second_return = (['2nd line'], [{'end_of_log': False}])
+ third_return = (['3rd line'], [{'end_of_log': True}])
+ fourth_return = (['should never be read'], [{'end_of_log': True}])
+ read_mock.side_effect = [first_return, second_return, third_return, fourth_return]
+
+ response = self.client.get(
+ f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+ f"taskInstances/{self.TASK_ID}/logs/1?full_content=True",
+ headers={"Accept": 'text/plain'}
+ )
+
+ self.assertIn('1st line', response.data.decode('utf-8'))
+ self.assertIn('2nd line', response.data.decode('utf-8'))
+ self.assertIn('3rd line', response.data.decode('utf-8'))
+ self.assertNotIn('should never be read', response.data.decode('utf-8'))
+
+ @mock.patch("airflow.api_connexion.endpoints.log_endpoint.TaskLogReader")
+ def test_get_logs_for_handler_without_read_method(self, mock_log_reader):
+ type(mock_log_reader.return_value).is_supported = PropertyMock(return_value=False)
+
+ key = self.app.config["SECRET_KEY"]
+ serializer = URLSafeSerializer(key)
+ token = serializer.dumps({"download_logs": False})
+ headers = {'Content-Type': 'application/jso'} # check guessing
+ response = self.client.get(
+ f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+ f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+ headers=headers
+ )
+ self.assertEqual(400, response.status_code)
+ self.assertIn(
+ 'Task log handler does not support read logs.',
+ response.data.decode('utf-8'))
+
+ @provide_session
+ def test_bad_signature_raises(self, session):
+ self._create_dagrun(session)
+ token = {"download_logs": False}
+ headers = {'Accept': 'application/json'}
+ response = self.client.get(
+ f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+ f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+ headers=headers
+ )
+ self.assertEqual(
+ response.json,
+ {
+ 'detail': None,
+ 'status': 400,
+ 'title': "Bad Signature. Please use only the tokens provided by the API.",
+ 'type': 'about:blank'
+ }
+ )
+
+ def test_raises_404_for_invalid_dag_run_id(self):
+ headers = {'Accept': 'application/json'}
+ response = self.client.get(
+ f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN/" # invalid dagrun_id
+ f"taskInstances/{self.TASK_ID}/logs/1?",
+ headers=headers
+ )
+ self.assertEqual(
+ response.json,
+ {
+ 'detail': None,
+ 'status': 404,
+ 'title': "DAG Run not found",
+ 'type': 'about:blank'
+ }
)
- assert response.status_code == 200