You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/06/30 05:45:03 UTC

[airflow] branch master updated: Add log endpoint (#9331)

This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 59035a0  Add log endpoint (#9331)
59035a0 is described below

commit 59035a0b36eec49b5ea53b6d9c0d9009d4d101d3
Author: Ephraim Anierobi <sp...@gmail.com>
AuthorDate: Tue Jun 30 06:44:35 2020 +0100

    Add log endpoint (#9331)
    
    Co-authored-by: Kamil BreguĊ‚a <ka...@polidea.com>
---
 airflow/api_connexion/endpoints/log_endpoint.py    |  69 +++++-
 .../log_endpoint.py => schemas/log_schema.py}      |  23 +-
 requirements/requirements-python3.6.txt            |  20 +-
 requirements/requirements-python3.7.txt            |  20 +-
 requirements/requirements-python3.8.txt            |  20 +-
 tests/api_connexion/endpoints/test_log_endpoint.py | 253 ++++++++++++++++++++-
 6 files changed, 355 insertions(+), 50 deletions(-)

diff --git a/airflow/api_connexion/endpoints/log_endpoint.py b/airflow/api_connexion/endpoints/log_endpoint.py
index df66a95..d990b9b 100644
--- a/airflow/api_connexion/endpoints/log_endpoint.py
+++ b/airflow/api_connexion/endpoints/log_endpoint.py
@@ -15,12 +15,73 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.schemas.log_schema import LogResponseObject, logs_schema
+from airflow.models import DagRun
+from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.session import provide_session
 
-def get_log():
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    key = current_app.config["SECRET_KEY"]
+    if not token:
+        metadata = {}
+    else:
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please use only the tokens provided by the API.")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+
+    task_log_reader = TaskLogReader()
+    if not task_log_reader.is_supported:
+        raise BadRequest("Task log handler does not support read logs.")
+
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("DAG Run not found")
+
+    ti = dag_run.get_task_instance(task_id, session)
+    if ti is None:
+        metadata['end_of_log'] = True
+        raise BadRequest(detail="Task instance did not exist in the DB")
+
+    dag = current_app.dag_bag.get_dag(dag_id)
+    if dag:
+        ti.task = dag.get_task(ti.task_id)
+
+    return_type = request.accept_mimetypes.best_match(['text/plain', 'application/json'])
+
+    # return_type would be either the above two or None
+
+    if return_type == 'application/json' or return_type is None:  # default
+        logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
+        logs = logs[0] if task_try_number is not None else logs
+        token = URLSafeSerializer(key).dumps(metadata)
+        return logs_schema.dump(LogResponseObject(continuation_token=token,
+                                                  content=logs)
+                                )
+    # text/plain. Stream
+    logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)
+
+    return Response(
+        logs,
+        headers={"Content-Type": return_type}
+    )
diff --git a/airflow/api_connexion/endpoints/log_endpoint.py b/airflow/api_connexion/schemas/log_schema.py
similarity index 69%
copy from airflow/api_connexion/endpoints/log_endpoint.py
copy to airflow/api_connexion/schemas/log_schema.py
index df66a95..2e48fc7 100644
--- a/airflow/api_connexion/endpoints/log_endpoint.py
+++ b/airflow/api_connexion/schemas/log_schema.py
@@ -14,13 +14,22 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from typing import NamedTuple
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+from marshmallow import Schema, fields
 
 
-def get_log():
-    """
-    Get logs for specific task instance
-    """
-    raise NotImplementedError("Not implemented yet.")
+class LogsSchema(Schema):
+    """ Schema for logs """
+
+    content = fields.Str()
+    continuation_token = fields.Str()
+
+
+class LogResponseObject(NamedTuple):
+    """ Log Response Object """
+    content: str
+    continuation_token: str
+
+
+logs_schema = LogsSchema(strict=True)
diff --git a/requirements/requirements-python3.6.txt b/requirements/requirements-python3.6.txt
index 732f997..30a3d01 100644
--- a/requirements/requirements-python3.6.txt
+++ b/requirements/requirements-python3.6.txt
@@ -72,12 +72,12 @@ beautifulsoup4==4.7.1
 billiard==3.6.3.0
 black==19.10b0
 blinker==1.4
-boto3==1.14.11
+boto3==1.14.12
 boto==2.49.0
-botocore==1.17.11
+botocore==1.17.12
 bowler==0.8.0
 cached-property==1.5.1
-cachetools==4.1.0
+cachetools==4.1.1
 cassandra-driver==3.20.2
 cattrs==1.0.0
 celery==4.4.6
@@ -99,13 +99,13 @@ coverage==5.1
 croniter==0.3.34
 cryptography==2.9.2
 curlify==2.2.1
-cx-Oracle==7.3.0
+cx-Oracle==8.0.0
 dask==2.19.0
 datadog==0.37.1
 decorator==4.4.2
 defusedxml==0.6.0
 dill==0.3.2
-distlib==0.3.0
+distlib==0.3.1
 distributed==2.19.0
 dnspython==1.16.0
 docker-pycreds==0.4.0
@@ -182,16 +182,16 @@ humanize==0.5.1
 hvac==0.10.4
 identify==1.4.20
 idna-ssl==1.1.0
-idna==2.9
+idna==2.10
 ijson==2.6.1
 imagesize==1.2.0
 immutables==0.14
-importlib-metadata==1.6.1
+importlib-metadata==1.7.0
 importlib-resources==2.0.1
 inflection==0.5.0
 ipdb==0.13.3
 ipython-genutils==0.2.0
-ipython==7.15.0
+ipython==7.16.1
 iso8601==0.1.12
 isodate==0.6.0
 isort==4.3.21
@@ -307,7 +307,7 @@ python-jenkins==1.7.0
 python-jose==3.1.0
 python-nvd3==0.15.0
 python-slugify==4.0.0
-python3-openid==3.1.0
+python3-openid==3.2.0
 pytz==2020.1
 pytzdata==2019.3
 pywinrm==0.4.1
@@ -371,7 +371,7 @@ thrift==0.13.0
 toml==0.10.1
 toolz==0.10.0
 tornado==5.1.1
-tqdm==4.46.1
+tqdm==4.47.0
 traitlets==4.3.3
 typed-ast==1.4.1
 typing-extensions==3.7.4.2
diff --git a/requirements/requirements-python3.7.txt b/requirements/requirements-python3.7.txt
index 14ca634..dcd1017 100644
--- a/requirements/requirements-python3.7.txt
+++ b/requirements/requirements-python3.7.txt
@@ -72,12 +72,12 @@ beautifulsoup4==4.7.1
 billiard==3.6.3.0
 black==19.10b0
 blinker==1.4
-boto3==1.14.11
+boto3==1.14.12
 boto==2.49.0
-botocore==1.17.11
+botocore==1.17.12
 bowler==0.8.0
 cached-property==1.5.1
-cachetools==4.1.0
+cachetools==4.1.1
 cassandra-driver==3.20.2
 cattrs==1.0.0
 celery==4.4.6
@@ -98,13 +98,13 @@ coverage==5.1
 croniter==0.3.34
 cryptography==2.9.2
 curlify==2.2.1
-cx-Oracle==7.3.0
+cx-Oracle==8.0.0
 dask==2.19.0
 datadog==0.37.1
 decorator==4.4.2
 defusedxml==0.6.0
 dill==0.3.2
-distlib==0.3.0
+distlib==0.3.1
 distributed==2.19.0
 dnspython==1.16.0
 docker-pycreds==0.4.0
@@ -180,14 +180,14 @@ httplib2==0.18.1
 humanize==0.5.1
 hvac==0.10.4
 identify==1.4.20
-idna==2.9
+idna==2.10
 ijson==2.6.1
 imagesize==1.2.0
-importlib-metadata==1.6.1
+importlib-metadata==1.7.0
 inflection==0.5.0
 ipdb==0.13.3
 ipython-genutils==0.2.0
-ipython==7.15.0
+ipython==7.16.1
 iso8601==0.1.12
 isodate==0.6.0
 isort==4.3.21
@@ -302,7 +302,7 @@ python-jenkins==1.7.0
 python-jose==3.1.0
 python-nvd3==0.15.0
 python-slugify==4.0.0
-python3-openid==3.1.0
+python3-openid==3.2.0
 pytz==2020.1
 pytzdata==2019.3
 pywinrm==0.4.1
@@ -366,7 +366,7 @@ thrift==0.13.0
 toml==0.10.1
 toolz==0.10.0
 tornado==5.1.1
-tqdm==4.46.1
+tqdm==4.47.0
 traitlets==4.3.3
 typed-ast==1.4.1
 typing-extensions==3.7.4.2
diff --git a/requirements/requirements-python3.8.txt b/requirements/requirements-python3.8.txt
index c708f66..ea020b7 100644
--- a/requirements/requirements-python3.8.txt
+++ b/requirements/requirements-python3.8.txt
@@ -72,12 +72,12 @@ beautifulsoup4==4.7.1
 billiard==3.6.3.0
 black==19.10b0
 blinker==1.4
-boto3==1.14.11
+boto3==1.14.12
 boto==2.49.0
-botocore==1.17.11
+botocore==1.17.12
 bowler==0.8.0
 cached-property==1.5.1
-cachetools==4.1.0
+cachetools==4.1.1
 cassandra-driver==3.20.2
 cattrs==1.0.0
 celery==4.4.6
@@ -98,13 +98,13 @@ coverage==5.1
 croniter==0.3.34
 cryptography==2.9.2
 curlify==2.2.1
-cx-Oracle==7.3.0
+cx-Oracle==8.0.0
 dask==2.19.0
 datadog==0.37.1
 decorator==4.4.2
 defusedxml==0.6.0
 dill==0.3.2
-distlib==0.3.0
+distlib==0.3.1
 distributed==2.19.0
 dnspython==1.16.0
 docker-pycreds==0.4.0
@@ -180,14 +180,14 @@ httplib2==0.18.1
 humanize==0.5.1
 hvac==0.10.4
 identify==1.4.20
-idna==2.9
+idna==2.10
 ijson==2.6.1
 imagesize==1.2.0
-importlib-metadata==1.6.1
+importlib-metadata==1.7.0
 inflection==0.5.0
 ipdb==0.13.3
 ipython-genutils==0.2.0
-ipython==7.15.0
+ipython==7.16.1
 iso8601==0.1.12
 isodate==0.6.0
 isort==4.3.21
@@ -301,7 +301,7 @@ python-jenkins==1.7.0
 python-jose==3.1.0
 python-nvd3==0.15.0
 python-slugify==4.0.0
-python3-openid==3.1.0
+python3-openid==3.2.0
 pytz==2020.1
 pytzdata==2019.3
 pywinrm==0.4.1
@@ -365,7 +365,7 @@ thrift==0.13.0
 toml==0.10.1
 toolz==0.10.0
 tornado==5.1.1
-tqdm==4.46.1
+tqdm==4.47.0
 traitlets==4.3.3
 typed-ast==1.4.1
 typing-extensions==3.7.4.2
diff --git a/tests/api_connexion/endpoints/test_log_endpoint.py b/tests/api_connexion/endpoints/test_log_endpoint.py
index cbc564a..5fa274b 100644
--- a/tests/api_connexion/endpoints/test_log_endpoint.py
+++ b/tests/api_connexion/endpoints/test_log_endpoint.py
@@ -14,25 +14,260 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import copy
+import logging.config
+import os
+import shutil
+import sys
+import tempfile
 import unittest
+from unittest import mock
+from unittest.mock import PropertyMock
 
-import pytest
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import DAG, settings
+from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
+from airflow.models import DagRun, TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session, provide_session
+from airflow.utils.types import DagRunType
 from airflow.www import app
+from tests.test_utils.config import conf_vars
+from tests.test_utils.db import clear_db_runs
 
 
 class TestGetLog(unittest.TestCase):
+    DAG_ID = 'dag_for_testing_log_endpoint'
+    TASK_ID = 'task_for_testing_log_endpoint'
+    TRY_NUMBER = 1
+
     @classmethod
-    def setUpClass(cls) -> None:
-        super().setUpClass()
-        cls.app = app.create_app(testing=True)  # type:ignore
+    def setUpClass(cls):
+        settings.configure_orm()
+        cls.session = settings.Session
+        cls.app = app.create_app(testing=True)
 
     def setUp(self) -> None:
-        self.client = self.app.test_client()  # type:ignore
+        self.default_time = "2020-06-10T20:00:00+00:00"
+        self.client = self.app.test_client()
+        self.log_dir = tempfile.mkdtemp()
+        # Make sure that the configure_logging is not cached
+        self.old_modules = dict(sys.modules)
+        self._prepare_log_files()
+        self._configure_loggers()
+        self._prepare_db()
+
+    def _create_dagrun(self, session):
+        dagrun_model = DagRun(
+            dag_id=self.DAG_ID,
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+
+    def _configure_loggers(self):
+        # Create a custom logging configuration
+        logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
+        logging_config['handlers']['task']['base_log_folder'] = self.log_dir
+
+        logging_config['handlers']['task']['filename_template'] = \
+            '{{ ti.dag_id }}/{{ ti.task_id }}/' \
+            '{{ ts | replace(":", ".") }}/{{ try_number }}.log'
+
+        # Write the custom logging configuration to a file
+        self.settings_folder = tempfile.mkdtemp()
+        settings_file = os.path.join(self.settings_folder, "airflow_local_settings.py")
+        new_logging_file = "LOGGING_CONFIG = {}".format(logging_config)
+        with open(settings_file, 'w') as handle:
+            handle.writelines(new_logging_file)
+        sys.path.append(self.settings_folder)
+
+        with conf_vars({('logging', 'logging_config_class'): 'airflow_local_settings.LOGGING_CONFIG'}):
+            self.app = app.create_app(testing=True)
+            self.client = self.app.test_client()
+            settings.configure_logging()
+
+    def _prepare_db(self):
+        dagbag = self.app.dag_bag  # pylint: disable=no-member
+        dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time))
+        dag.sync_to_db()
+        dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag)
+        with create_session() as session:
+            self.ti = TaskInstance(
+                task=DummyOperator(task_id=self.TASK_ID, dag=dag),
+                execution_date=timezone.parse(self.default_time)
+            )
+            self.ti.try_number = 1
+            session.merge(self.ti)
+
+    def _prepare_log_files(self):
+        dir_path = f"{self.log_dir}/{self.DAG_ID}/{self.TASK_ID}/" \
+                   f"{self.default_time.replace(':', '.')}/"
+        os.makedirs(dir_path)
+        with open(f"{dir_path}/1.log", "w+") as file:
+            file.write("Log for testing.")
+            file.flush()
+
+    def tearDown(self):
+        logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+        clear_db_runs()
+
+        # Remove any new modules imported during the test run. This lets us
+        # import the same source files for more than one test.
+        for mod in [m for m in sys.modules if m not in self.old_modules]:
+            del sys.modules[mod]
+
+        sys.path.remove(self.settings_folder)
+        shutil.rmtree(self.settings_folder)
+        shutil.rmtree(self.log_dir)
+
+        super().tearDown()
+
+    @provide_session
+    def test_should_response_200_json(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": False})
+        headers = {'Accept': 'application/json'}
+        response = self.client.get(
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+            headers=headers
+        )
+        expected_filename = "{}/{}/{}/{}/1.log".format(
+            self.log_dir,
+            self.DAG_ID,
+            self.TASK_ID,
+            self.default_time.replace(":", ".")
+        )
+        self.assertEqual(
+            response.json['content'],
+            f"*** Reading local file: {expected_filename}\nLog for testing."
+        )
+        info = serializer.loads(response.json['continuation_token'])
+        self.assertEqual(
+            info,
+            {'end_of_log': True}
+        )
+        self.assertEqual(200, response.status_code)
+
+    @provide_session
+    def test_should_response_200_text_plain(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": True})
 
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
         response = self.client.get(
-            "/dags/TEST_DG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_TASK_ID/logs/3"
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+            headers={'Accept': 'text/plain'}
+        )
+        expected_filename = "{}/{}/{}/{}/1.log".format(
+            self.log_dir,
+            self.DAG_ID,
+            self.TASK_ID,
+            self.default_time.replace(':', '.')
+        )
+        self.assertEqual(200, response.status_code)
+        self.assertEqual(
+            response.data.decode('utf-8'),
+            f"*** Reading local file: {expected_filename}\nLog for testing.\n"
+        )
+
+    @provide_session
+    def test_get_logs_response_with_ti_equal_to_none(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": True})
+
+        response = self.client.get(
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/Invalid-Task-ID/logs/1?token={token}",
+        )
+        self.assertEqual(response.status_code, 400)
+        self.assertEqual(response.json['detail'], "Task instance did not exist in the DB")
+
+    @provide_session
+    def test_get_logs_with_metadata_as_download_large_file(self, session):
+        self._create_dagrun(session)
+        with mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.read") as read_mock:
+            first_return = (['1st line'], [{}])
+            second_return = (['2nd line'], [{'end_of_log': False}])
+            third_return = (['3rd line'], [{'end_of_log': True}])
+            fourth_return = (['should never be read'], [{'end_of_log': True}])
+            read_mock.side_effect = [first_return, second_return, third_return, fourth_return]
+
+            response = self.client.get(
+                f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+                f"taskInstances/{self.TASK_ID}/logs/1?full_content=True",
+                headers={"Accept": 'text/plain'}
+            )
+
+            self.assertIn('1st line', response.data.decode('utf-8'))
+            self.assertIn('2nd line', response.data.decode('utf-8'))
+            self.assertIn('3rd line', response.data.decode('utf-8'))
+            self.assertNotIn('should never be read', response.data.decode('utf-8'))
+
+    @mock.patch("airflow.api_connexion.endpoints.log_endpoint.TaskLogReader")
+    def test_get_logs_for_handler_without_read_method(self, mock_log_reader):
+        type(mock_log_reader.return_value).is_supported = PropertyMock(return_value=False)
+
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": False})
+        headers = {'Content-Type': 'application/jso'}  # check guessing
+        response = self.client.get(
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+            headers=headers
+        )
+        self.assertEqual(400, response.status_code)
+        self.assertIn(
+            'Task log handler does not support read logs.',
+            response.data.decode('utf-8'))
+
+    @provide_session
+    def test_bad_signature_raises(self, session):
+        self._create_dagrun(session)
+        token = {"download_logs": False}
+        headers = {'Accept': 'application/json'}
+        response = self.client.get(
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+            headers=headers
+        )
+        self.assertEqual(
+            response.json,
+            {
+                'detail': None,
+                'status': 400,
+                'title': "Bad Signature. Please use only the tokens provided by the API.",
+                'type': 'about:blank'
+            }
+        )
+
+    def test_raises_404_for_invalid_dag_run_id(self):
+        headers = {'Accept': 'application/json'}
+        response = self.client.get(
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN/"  # invalid dagrun_id
+            f"taskInstances/{self.TASK_ID}/logs/1?",
+            headers=headers
+        )
+        self.assertEqual(
+            response.json,
+            {
+                'detail': None,
+                'status': 404,
+                'title': "DAG Run not found",
+                'type': 'about:blank'
+            }
         )
-        assert response.status_code == 200