You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/06/16 13:51:17 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #9331: add log endpoint

ephraimbuddy opened a new pull request #9331:
URL: https://github.com/apache/airflow/pull/9331


   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [ ] Description above provides context of the change
   - [ ] Unit tests coverage for changes (not needed for documentation changes)
   - [ ] Target Github ISSUE in description if exists
   - [ ] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [ ] Relevant documentation is updated including usage instructions.
   - [ ] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r441365898



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -14,13 +14,99 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import logging
+import os
+from io import BytesIO
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import models, settings
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.configuration import conf
+from airflow.models import DagRun
+from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.utils.helpers import render_log_filename
+from airflow.utils.session import provide_session
 
-def get_log():
+if os.environ.get('SKIP_DAGS_PARSING') != 'True':
+    dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
+else:
+    dagbag = models.DagBag(os.devnull, include_examples=False)
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")
+
+    logger = logging.getLogger('airflow.task')
+    task_log_reader = conf.get('logging', 'task_log_reader')
+    handler = next((handler for handler in logger.handlers
+                    if handler.name == task_log_reader), None)
+
+    ti = dag_run.get_task_instance(task_id, session)
+    try:
+        if ti is None:
+            logs = ["*** Task instance did not exist in the DB\n"]
+            metadata['end_of_log'] = True
+        else:
+            dag = dagbag.get_dag(dag_id)
+            ti.task = dag.get_task(ti.task_id)

Review comment:
       ```suggestion
               if dag:
                   with TaskNotFound:
                       ti.task = dag.get_task(ti.task_id)
   ```
   Stackdriver does not require access to DAG. It can be deleted and logs will still be available.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r442118043



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,102 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+import os
 
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
-def get_log():
+from airflow import models, settings
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.schemas.log_schema import logs_schema
+from airflow.models import DagRun
+from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.session import provide_session
+
+if os.environ.get('SKIP_DAGS_PARSING') != 'True':
+    dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
+else:
+    dagbag = models.DagBag(os.devnull, include_examples=False)
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+
+    task_log_reader = TaskLogReader()
+
+    if not task_log_reader.is_supported:
+        metadata = {"end_of_log": True}
+        return logs_schema.dump(dict(
+            content="[Task log handler does not support read logs.]",
+            continuation_token=str(metadata)
+        ))
+
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")
+
+    ti = dag_run.get_task_instance(task_id, session)
+    if ti is None:
+        metadata['end_of_log'] = True
+        return logs_schema.dump(
+            dict(
+                content=str("[*** Task instance did not exist in the DB\n]"),
+                continuation_token=str(metadata))
+        )
+    try:
+        dag = dagbag.get_dag(dag_id)
+        if dag:
+            ti.task = dag.get_task(ti.task_id)
+
+        if request.headers.get('Content-Type', None) == 'application/json':
+            logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
+            logs = logs[0] if task_try_number is not None else logs
+
+            return logs_schema.dump(dict(continuation_token=str(metadata),
+                                         content=logs)
+                                    )
+
+        # Defaulting to content type of text/plain
+        logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)

Review comment:
       ```suggestion
           if not full_content:  # and text/plain as content_type
               logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
               logs = logs[0] if task_try_number is not None else logs
   
               return logs_schema.dump(dict(continuation_token=str(metadata),
                                            content=logs)
                                       )
           logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)
   ```
   What do you think @mik-laj?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r446841962



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,83 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.schemas.log_schema import LogResponseObject, logs_schema
+from airflow.models import DagRun
+from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.session import provide_session
 
-def get_log():
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+
+    task_log_reader = TaskLogReader()
+    if not task_log_reader.is_supported:
+        raise BadRequest("Task log handler does not support read logs.")
+
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("DAG Run not found")
+
+    ti = dag_run.get_task_instance(task_id, session)
+    if ti is None:
+        metadata['end_of_log'] = True
+        raise BadRequest(detail="Task instance did not exist in the DB")
+
+    try:
+        dag = current_app.dag_bag.get_dag(dag_id)
+        if dag:
+            ti.task = dag.get_task(ti.task_id)
+
+        return_type = request.accept_mimetypes.best_match(['text/plain', 'application/json'])
+
+        # return_type would be either the above two or None
+
+        if return_type == 'application/json' or return_type is None:  # default
+            logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
+            logs = logs[0] if task_try_number is not None else logs
+            return logs_schema.dump(LogResponseObject(continuation_token=str(metadata),
+                                                      content=logs)
+                                    )
+        # text/plain. Stream
+        logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)
+
+        return Response(
+            logs,
+            headers={"Content-Type": return_type}
+        )
+
+    except AttributeError as err:
+        error_message = [
+            f"Task log handler {task_log_reader} does not support read logs.\n{str(err)}\n"
+        ]
+        metadata['end_of_log'] = True
+        return logs_schema.dump(
+            LogResponseObject(continuation_token=str(metadata),
+                              content=str(error_message))
+        )

Review comment:
       This situation will never happen. This case was handled earlier. Errors should be returned as a response with an error code. Logs should not be used to send errors to the client.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] randr97 commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
randr97 commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r441738149



##########
File path: airflow/www/views.py
##########
@@ -679,62 +680,53 @@ def get_logs_with_metadata(self, session=None):
 
             return response
 
-        logger = logging.getLogger('airflow.task')
-        task_log_reader = conf.get('logging', 'task_log_reader')
-        handler = next((handler for handler in logger.handlers
-                        if handler.name == task_log_reader), None)
+        task_log_reader = TaskLogReader()
+        if not task_log_reader.is_supported:
+            return jsonify(
+                messaege="Task log handler does not support read logs.",
+                errur=True,
+                metadata={
+                    "end_of_log": True
+                }
+            )
 
         ti = session.query(models.TaskInstance).filter(
             models.TaskInstance.dag_id == dag_id,
             models.TaskInstance.task_id == task_id,
             models.TaskInstance.execution_date == dttm).first()
 
-        def _get_logs_with_metadata(try_number, metadata):
-            if ti is None:
-                logs = ["*** Task instance did not exist in the DB\n"]
-                metadata['end_of_log'] = True
-            else:
-                logs, metadatas = handler.read(ti, try_number, metadata=metadata)
-                metadata = metadatas[0]
-            return logs, metadata
+        if ti is None:
+            return jsonify(
+                message="*** Task instance did not exist in the DB\n",
+                error=True,
+                metadata={
+                    "end_of_log": True
+                }
+            )
 
         try:
-            if ti is not None:
-                dag = dagbag.get_dag(dag_id)
-                if dag:
-                    ti.task = dag.get_task(ti.task_id)
+            dag = dagbag.get_dag(dag_id)
+            if dag:
+                ti.task = dag.get_task(ti.task_id)
+
             if response_format == 'json':
-                logs, metadata = _get_logs_with_metadata(try_number, metadata)
+                logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
                 message = logs[0] if try_number is not None else logs
                 return jsonify(message=message, metadata=metadata)
 
-            filename_template = conf.get('logging', 'LOG_FILENAME_TEMPLATE')
-            attachment_filename = render_log_filename(
-                ti=ti,
-                try_number="all" if try_number is None else try_number,
-                filename_template=filename_template)
             metadata['download_logs'] = True
-
-            def _generate_log_stream(try_number, metadata):
-                if try_number is None and ti is not None:
-                    next_try = ti.next_try_number
-                    try_numbers = list(range(1, next_try))
-                else:
-                    try_numbers = [try_number]
-                for try_number in try_numbers:
-                    metadata.pop('end_of_log', None)
-                    metadata.pop('max_offset', None)
-                    metadata.pop('offset', None)
-                    while 'end_of_log' not in metadata or not metadata['end_of_log']:
-                        logs, metadata = _get_logs_with_metadata(try_number, metadata)
-                        yield "\n".join(logs) + "\n"
-            return Response(_generate_log_stream(try_number, metadata),
-                            mimetype="text/plain",
-                            headers={"Content-Disposition": "attachment; filename={}".format(
-                                attachment_filename)})
+            attachment_filename = task_log_reader.render_log_filename(ti, try_number)
+            log_stream = task_log_reader.read_log_stream(ti, try_number, metadata)
+            return Response(
+                response=log_stream,
+                mimetype="text/plain",
+                headers={
+                    "Content-Disposition": f"attachment; filename={attachment_filename}"
+                })
         except AttributeError as e:
-            error_message = ["Task log handler {} does not support read logs.\n{}\n"
-                             .format(task_log_reader, str(e))]
+            error_message = [
+                "Task log handler does not support read logs.\n{}\n".format(str(e))

Review comment:
       ```suggestion
                   f"Task log handler does not support read logs.\n{str(e)}\n"
   ```
   IDK jus my thought, f strings are a lil more optimised.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r443290698



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,93 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
 
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
-def get_log():
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.schemas.log_schema import logs_schema
+from airflow.models import DagRun
+from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.session import provide_session
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+
+    task_log_reader = TaskLogReader()
+    if not task_log_reader.is_supported:
+        metadata = {"end_of_log": True}
+        return logs_schema.dump(dict(
+            content="[Task log handler does not support read logs.]",
+            continuation_token=str(metadata)
+        ))
+
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")
+
+    ti = dag_run.get_task_instance(task_id, session)
+    if ti is None:
+        metadata['end_of_log'] = True
+        return logs_schema.dump(
+            dict(
+                content="[*** Task instance did not exist in the DB\n]",
+                continuation_token=str(metadata))
+        )
+    try:
+        dag = current_app.dag_bag.get_dag(dag_id)
+        if dag:
+            ti.task = dag.get_task(ti.task_id)
+
+        if request.headers.get('Content-Type', None) == 'application/json':
+            logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
+            logs = logs[0] if task_try_number is not None else logs
+
+            return logs_schema.dump(dict(continuation_token=str(metadata),
+                                         content=logs)
+                                    )
+
+        # Defaulting to content type of text/plain
+        logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)
+
+        attachment_filename = task_log_reader.render_log_filename(ti, task_try_number)
+
+        return Response(
+            logs,
+            mimetype="text/plain",
+            headers={
+                "Content-Disposition": f"attachment; filename={attachment_filename}"

Review comment:
       you can check the interaction with the browser by doing manual tests. :-D 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r446839313



##########
File path: tests/api_connexion/endpoints/test_log_endpoint.py
##########
@@ -14,25 +14,259 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import copy
+import logging.config
+import os
+import shutil
+import sys
+import tempfile
 import unittest
+from unittest import mock
+from unittest.mock import PropertyMock
 
-import pytest
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import DAG, settings
+from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
+from airflow.models import DagRun, TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session, provide_session
+from airflow.utils.types import DagRunType
 from airflow.www import app
+from tests.test_utils.config import conf_vars
+from tests.test_utils.db import clear_db_runs
 
 
 class TestGetLog(unittest.TestCase):
+    DAG_ID = 'dag_for_testing_log_endpoint'
+    TASK_ID = 'task_for_testing_log_endpoint'
+    TRY_NUMBER = 1
+
     @classmethod
-    def setUpClass(cls) -> None:
-        super().setUpClass()
-        cls.app = app.create_app(testing=True)  # type:ignore
+    def setUpClass(cls):
+        settings.configure_orm()
+        cls.session = settings.Session
+        cls.app = app.create_app(testing=True)
 
     def setUp(self) -> None:
-        self.client = self.app.test_client()  # type:ignore
+        self.default_time = "2020-06-10T20:00:00+00:00"
+        self.client = self.app.test_client()
+        self.log_dir = tempfile.mkdtemp()
+        # Make sure that the configure_logging is not cached
+        self.old_modules = dict(sys.modules)
+        self._prepare_log_files()
+        self._configure_loggers()
+        self._prepare_db()
+
+    def _create_dagrun(self, session):
+        dagrun_model = DagRun(
+            dag_id=self.DAG_ID,
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+
+    def _configure_loggers(self):
+        # Create a custom logging configuration
+        logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
+        logging_config['handlers']['task']['base_log_folder'] = self.log_dir
+
+        logging_config['handlers']['task']['filename_template'] = \
+            '{{ ti.dag_id }}/{{ ti.task_id }}/' \
+            '{{ ts | replace(":", ".") }}/{{ try_number }}.log'
+
+        # Write the custom logging configuration to a file
+        self.settings_folder = tempfile.mkdtemp()
+        settings_file = os.path.join(self.settings_folder, "airflow_local_settings.py")
+        new_logging_file = "LOGGING_CONFIG = {}".format(logging_config)
+        with open(settings_file, 'w') as handle:
+            handle.writelines(new_logging_file)
+        sys.path.append(self.settings_folder)
+
+        with conf_vars({('logging', 'logging_config_class'): 'airflow_local_settings.LOGGING_CONFIG'}):
+            self.app = app.create_app(testing=True)
+            self.client = self.app.test_client()
+            settings.configure_logging()
+
+    def _prepare_db(self):
+        dagbag = self.app.dag_bag  # pylint: disable=no-member
+        dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time))
+        dag.sync_to_db()
+        dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag)
+        with create_session() as session:
+            self.ti = TaskInstance(
+                task=DummyOperator(task_id=self.TASK_ID, dag=dag),
+                execution_date=timezone.parse(self.default_time)
+            )
+            self.ti.try_number = 1
+            session.merge(self.ti)
+
+    def _prepare_log_files(self):
+        dir_path = f"{self.log_dir}/{self.DAG_ID}/{self.TASK_ID}/" \
+                   f"{self.default_time.replace(':', '.')}/"
+        os.makedirs(dir_path)
+        with open(f"{dir_path}/1.log", "w+") as file:
+            file.write("Log for testing.")
+            file.flush()
+
+    def tearDown(self):
+        logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+        clear_db_runs()
+
+        # Remove any new modules imported during the test run. This lets us
+        # import the same source files for more than one test.
+        for mod in [m for m in sys.modules if m not in self.old_modules]:
+            del sys.modules[mod]
+
+        sys.path.remove(self.settings_folder)
+        shutil.rmtree(self.settings_folder)
+        shutil.rmtree(self.log_dir)
+
+        super().tearDown()
+
+    @provide_session
+    def test_should_response_200_json(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": False})
+        headers = {'Accept': 'application/json'}
+        response = self.client.get(
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+            headers=headers
+        )
+        expected_filename = "{}/{}/{}/{}/1.log".format(
+            self.log_dir,
+            self.DAG_ID,
+            self.TASK_ID,
+            self.default_time.replace(":", ".")
+        )
+        self.assertEqual(
+            response.json['content'],
+            f"*** Reading local file: {expected_filename}\nLog for testing."
+        )
+        self.assertIn(
+            response.json['continuation_token'],
+            "{'end_of_log': True}"
+        )
+        self.assertEqual(200, response.status_code)
+
+    @provide_session
+    def test_should_response_200_text_plain(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": True})
 
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
         response = self.client.get(
-            "/dags/TEST_DG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_TASK_ID/logs/3"
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+            headers={'Accept': 'text/plain'}
+        )
+        expected_filename = "{}/{}/{}/{}/1.log".format(
+            self.log_dir,
+            self.DAG_ID,
+            self.TASK_ID,
+            self.default_time.replace(':', '.')
+        )
+        self.assertEqual(200, response.status_code)
+        self.assertEqual(
+            response.data.decode('utf-8'),
+            f"*** Reading local file: {expected_filename}\nLog for testing.\n"
+        )
+
+    @provide_session
+    def test_get_logs_response_with_ti_equal_to_none(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": True})
+
+        response = self.client.get(
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/Invalid-Task-ID/logs/1?token={token}",
+        )
+        self.assertEqual(response.status_code, 400)
+        self.assertEqual(response.json['detail'], "Task instance did not exist in the DB")
+
+    @provide_session
+    def test_get_logs_with_metadata_as_download_large_file(self, session):
+        self._create_dagrun(session)
+        with mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.read") as read_mock:
+            first_return = (['1st line'], [{}])
+            second_return = (['2nd line'], [{'end_of_log': False}])
+            third_return = (['3rd line'], [{'end_of_log': True}])
+            fourth_return = (['should never be read'], [{'end_of_log': True}])
+            read_mock.side_effect = [first_return, second_return, third_return, fourth_return]
+
+            response = self.client.get(
+                f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+                f"taskInstances/{self.TASK_ID}/logs/1?full_content=True",
+                headers={"Accept": 'text/plain'}
+            )
+
+            self.assertIn('1st line', response.data.decode('utf-8'))
+            self.assertIn('2nd line', response.data.decode('utf-8'))
+            self.assertIn('3rd line', response.data.decode('utf-8'))
+            self.assertNotIn('should never be read', response.data.decode('utf-8'))
+
+    @mock.patch("airflow.api_connexion.endpoints.log_endpoint.TaskLogReader")
+    def test_get_logs_for_handler_without_read_method(self, mock_log_reader):
+        type(mock_log_reader.return_value).is_supported = PropertyMock(return_value=False)
+
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": False})
+        headers = {'Content-Type': 'application/jso'}  # check guessing
+        response = self.client.get(
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+            headers=headers
+        )
+        self.assertEqual(400, response.status_code)
+        self.assertIn(
+            'Task log handler does not support read logs.',
+            response.data.decode('utf-8'))
+
+    @provide_session
+    def test_bad_signature_raises(self, session):
+        self._create_dagrun(session)
+        token = {"download_logs": False}
+        headers = {'Accept': 'application/json'}
+        response = self.client.get(
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+            headers=headers
+        )
+        self.assertEqual(
+            response.json,
+            {
+                'detail': None,
+                'status': 400,
+                'title': "Bad Signature. Please sign your token with URLSafeSerializer",

Review comment:
       ```suggestion
                   'title': "Bad Signature. Please use only the tokens provided by the API.",
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r442118043



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,102 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+import os
 
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
-def get_log():
+from airflow import models, settings
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.schemas.log_schema import logs_schema
+from airflow.models import DagRun
+from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.session import provide_session
+
+if os.environ.get('SKIP_DAGS_PARSING') != 'True':
+    dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
+else:
+    dagbag = models.DagBag(os.devnull, include_examples=False)
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+
+    task_log_reader = TaskLogReader()
+
+    if not task_log_reader.is_supported:
+        metadata = {"end_of_log": True}
+        return logs_schema.dump(dict(
+            content="[Task log handler does not support read logs.]",
+            continuation_token=str(metadata)
+        ))
+
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")
+
+    ti = dag_run.get_task_instance(task_id, session)
+    if ti is None:
+        metadata['end_of_log'] = True
+        return logs_schema.dump(
+            dict(
+                content=str("[*** Task instance did not exist in the DB\n]"),
+                continuation_token=str(metadata))
+        )
+    try:
+        dag = dagbag.get_dag(dag_id)
+        if dag:
+            ti.task = dag.get_task(ti.task_id)
+
+        if request.headers.get('Content-Type', None) == 'application/json':
+            logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
+            logs = logs[0] if task_try_number is not None else logs
+
+            return logs_schema.dump(dict(continuation_token=str(metadata),
+                                         content=logs)
+                                    )
+
+        # Defaulting to content type of text/plain
+        logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)

Review comment:
       ```suggestion
           if not full_content:  # and text/plain as content_type
                logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
               logs = logs[0] if task_try_number is not None else logs
   
               return logs_schema.dump(dict(continuation_token=str(metadata),
                                            content=logs)
                                       )
           logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)
   ```
   What do you think @mik-laj?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r441365898



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -14,13 +14,99 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import logging
+import os
+from io import BytesIO
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import models, settings
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.configuration import conf
+from airflow.models import DagRun
+from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.utils.helpers import render_log_filename
+from airflow.utils.session import provide_session
 
-def get_log():
+if os.environ.get('SKIP_DAGS_PARSING') != 'True':
+    dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
+else:
+    dagbag = models.DagBag(os.devnull, include_examples=False)
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")
+
+    logger = logging.getLogger('airflow.task')
+    task_log_reader = conf.get('logging', 'task_log_reader')
+    handler = next((handler for handler in logger.handlers
+                    if handler.name == task_log_reader), None)
+
+    ti = dag_run.get_task_instance(task_id, session)
+    try:
+        if ti is None:
+            logs = ["*** Task instance did not exist in the DB\n"]
+            metadata['end_of_log'] = True
+        else:
+            dag = dagbag.get_dag(dag_id)
+            ti.task = dag.get_task(ti.task_id)

Review comment:
       ```suggestion
               if dag:
                   with TaskNotFound:
                       ti.task = dag.get_task(ti.task_id)
   
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r444799752



##########
File path: tests/api_connexion/endpoints/test_log_endpoint.py
##########
@@ -14,25 +14,254 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import copy
+import logging.config
+import os
+import shutil
+import sys
+import tempfile
 import unittest
+from unittest import mock
+from unittest.mock import PropertyMock
 
-import pytest
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import DAG, settings
+from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
+from airflow.models import DagRun, TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session, provide_session
+from airflow.utils.types import DagRunType
 from airflow.www import app
+from tests.test_utils.config import conf_vars
+from tests.test_utils.db import clear_db_runs
 
 
 class TestGetLog(unittest.TestCase):
+    DAG_ID = 'dag_for_testing_log_endpoint'
+    DAG_ID_REMOVED = 'removed_dag_for_testing_log_endpoint'
+    TASK_ID = 'task_for_testing_log_endpoint'
+    TRY_NUMBER = 1
+
     @classmethod
-    def setUpClass(cls) -> None:
-        super().setUpClass()
-        cls.app = app.create_app(testing=True)  # type:ignore
+    def setUpClass(cls):
+        settings.configure_orm()
+        cls.session = settings.Session
+        cls.app = app.create_app(testing=True)
 
     def setUp(self) -> None:
-        self.client = self.app.test_client()  # type:ignore
+        self.default_time = "2020-06-10T20:00:00+00:00"
+        self.client = self.app.test_client()
+        self.log_dir = tempfile.mkdtemp()
+        # Make sure that the configure_logging is not cached
+        self.old_modules = dict(sys.modules)
+        self._prepare_log_files()
+        self._configure_loggers()
+        self._prepare_db()
+
+    def _create_dagrun(self, session):
+        dagrun_model = DagRun(
+            dag_id=self.DAG_ID,
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+
+    def _configure_loggers(self):
+        # Create a custom logging configuration
+        logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
+        logging_config['handlers']['task']['base_log_folder'] = self.log_dir
+
+        logging_config['handlers']['task']['filename_template'] = \
+            '{{ ti.dag_id }}/{{ ti.task_id }}/' \
+            '{{ ts | replace(":", ".") }}/{{ try_number }}.log'
+
+        # Write the custom logging configuration to a file
+        self.settings_folder = tempfile.mkdtemp()
+        settings_file = os.path.join(self.settings_folder, "airflow_local_settings.py")
+        new_logging_file = "LOGGING_CONFIG = {}".format(logging_config)
+        with open(settings_file, 'w') as handle:
+            handle.writelines(new_logging_file)
+        sys.path.append(self.settings_folder)
+
+        with conf_vars({('logging', 'logging_config_class'): 'airflow_local_settings.LOGGING_CONFIG'}):
+            self.app = app.create_app(testing=True)
+            self.client = self.app.test_client()
+            settings.configure_logging()
+
+    def _prepare_db(self):
+        dagbag = self.app.dag_bag  # pylint: disable=no-member
+        dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time))
+        dag.sync_to_db()
+        dag_removed = DAG(self.DAG_ID_REMOVED, start_date=timezone.parse(self.default_time))
+        dag_removed.sync_to_db()
+        dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag)
+        with create_session() as session:
+            self.ti = TaskInstance(
+                task=DummyOperator(task_id=self.TASK_ID, dag=dag),
+                execution_date=timezone.parse(self.default_time)
+            )
+            self.ti.try_number = 1
+            self.ti_removed_dag = TaskInstance(
+                task=DummyOperator(task_id=self.TASK_ID, dag=dag_removed),
+                execution_date=timezone.parse(self.default_time)
+            )
+            self.ti_removed_dag.try_number = 1
+
+            session.merge(self.ti)
+            session.merge(self.ti_removed_dag)
+
+    def _prepare_log_files(self):
+        dir_path = f"{self.log_dir}/{self.DAG_ID}/{self.TASK_ID}/" \
+                   f"{self.default_time.replace(':', '.')}/"
+        os.makedirs(dir_path)
+        with open(f"{dir_path}/1.log", "w+") as file:
+            file.write("Log for testing.")
+            file.flush()
 
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
+    def tearDown(self):
+        logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+        clear_db_runs()
+
+        # Remove any new modules imported during the test run. This lets us
+        # import the same source files for more than one test.
+        for mod in [m for m in sys.modules if m not in self.old_modules]:
+            del sys.modules[mod]
+
+        sys.path.remove(self.settings_folder)
+        shutil.rmtree(self.settings_folder)
+        shutil.rmtree(self.log_dir)
+
+        super().tearDown()
+
+    @provide_session
+    def test_should_response_200_json(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": False})
+        headers = {'Accept': 'application/json'}
         response = self.client.get(
-            "/dags/TEST_DG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_TASK_ID/logs/3"
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+            headers=headers
+        )
+        self.assertIn('content', response.json)
+        self.assertIn('continuation_token', response.json)
+        self.assertIn('Log for testing.', response.json.get('content'))
+        self.assertEqual(200, response.status_code)
+
+    @provide_session
+    def test_should_response_200_text_plain(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": True})
+
+        response = self.client.get(
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+            headers={'Accept': 'text/plain'}
         )
         assert response.status_code == 200
+        self.assertEqual(200, response.status_code)
+        self.assertIn('Log for testing.', response.data.decode('utf-8'))
+
+    @provide_session
+    def test_get_logs_response_with_ti_equal_to_none(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": True})
+
+        response = self.client.get(
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/Invalid-Task-ID/logs/1?token={token}",
+        )
+        self.assertIn('content', response.json)
+        self.assertIn('continuation_token', response.json)
+        self.assertEqual("[*** Task instance did not exist in the DB\n]",

Review comment:
       I think any errors should be reported as exceptions. We should not report errors otherwise, as this will make it difficult for the client to handle them. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] zikun commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
zikun commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r445544477



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,85 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+from accept_types import get_best_match

Review comment:
       FYI there is a built-in function from flask:
   https://github.com/apache/airflow/blob/8446526882a5798d838224b7566d5349111fd0d0/airflow/api_connexion/endpoints/config_endpoint.py#L30




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r443193917



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,93 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
 
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
-def get_log():
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.schemas.log_schema import logs_schema
+from airflow.models import DagRun
+from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.session import provide_session
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+
+    task_log_reader = TaskLogReader()
+    if not task_log_reader.is_supported:
+        metadata = {"end_of_log": True}
+        return logs_schema.dump(dict(
+            content="[Task log handler does not support read logs.]",
+            continuation_token=str(metadata)
+        ))
+
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")
+
+    ti = dag_run.get_task_instance(task_id, session)
+    if ti is None:
+        metadata['end_of_log'] = True
+        return logs_schema.dump(
+            dict(
+                content="[*** Task instance did not exist in the DB\n]",
+                continuation_token=str(metadata))
+        )
+    try:
+        dag = current_app.dag_bag.get_dag(dag_id)
+        if dag:
+            ti.task = dag.get_task(ti.task_id)
+
+        if request.headers.get('Content-Type', None) == 'application/json':

Review comment:
       This header can only be found in the response. We should look at [Accept](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Accept) header.  However, this parameter has a complex format, so we'll probably have to use the library to process it.
   
   We have another endpoint that supports different types of responses. See:
   https://github.com/apache/airflow/pull/9322/files




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r442112186



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,102 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+import os
 
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
-def get_log():
+from airflow import models, settings
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.schemas.log_schema import logs_schema
+from airflow.models import DagRun
+from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.session import provide_session
+
+if os.environ.get('SKIP_DAGS_PARSING') != 'True':
+    dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
+else:
+    dagbag = models.DagBag(os.devnull, include_examples=False)
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+
+    task_log_reader = TaskLogReader()
+
+    if not task_log_reader.is_supported:
+        metadata = {"end_of_log": True}
+        return logs_schema.dump(dict(
+            content="[Task log handler does not support read logs.]",
+            continuation_token=str(metadata)
+        ))
+
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")
+
+    ti = dag_run.get_task_instance(task_id, session)
+    if ti is None:
+        metadata['end_of_log'] = True
+        return logs_schema.dump(
+            dict(
+                content=str("[*** Task instance did not exist in the DB\n]"),
+                continuation_token=str(metadata))
+        )
+    try:
+        dag = dagbag.get_dag(dag_id)
+        if dag:
+            ti.task = dag.get_task(ti.task_id)
+
+        if request.headers.get('Content-Type', None) == 'application/json':
+            logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
+            logs = logs[0] if task_try_number is not None else logs
+
+            return logs_schema.dump(dict(continuation_token=str(metadata),
+                                         content=logs)
+                                    )

Review comment:
       If `full_content` is True, `metadata['download_logs']` is True too. So a continuation token is sent to user which they can use to continuously fetch logs until `'end_of_logs'` in metadata.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r443194486



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,93 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
 
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
-def get_log():
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.schemas.log_schema import logs_schema
+from airflow.models import DagRun
+from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.session import provide_session
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+
+    task_log_reader = TaskLogReader()
+    if not task_log_reader.is_supported:
+        metadata = {"end_of_log": True}
+        return logs_schema.dump(dict(
+            content="[Task log handler does not support read logs.]",
+            continuation_token=str(metadata)
+        ))
+
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")

Review comment:
       ```suggestion
           raise NotFound("DAG Run not found")
   ```
   It was too talkative and confusing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#issuecomment-646067046


   Depends on #9380


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r441362916



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -14,13 +14,99 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import logging
+import os
+from io import BytesIO
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import models, settings
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.configuration import conf
+from airflow.models import DagRun
+from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.utils.helpers import render_log_filename
+from airflow.utils.session import provide_session
 
-def get_log():
+if os.environ.get('SKIP_DAGS_PARSING') != 'True':
+    dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
+else:
+    dagbag = models.DagBag(os.devnull, include_examples=False)
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")
+
+    logger = logging.getLogger('airflow.task')
+    task_log_reader = conf.get('logging', 'task_log_reader')
+    handler = next((handler for handler in logger.handlers
+                    if handler.name == task_log_reader), None)
+
+    ti = dag_run.get_task_instance(task_id, session)
+    try:
+        if ti is None:
+            logs = ["*** Task instance did not exist in the DB\n"]
+            metadata['end_of_log'] = True
+        else:
+            dag = dagbag.get_dag(dag_id)
+            ti.task = dag.get_task(ti.task_id)
+            logs, metadatas = handler.read(ti, task_try_number, metadata=metadata)
+            metadata = metadatas[0]
+
+        if full_content:
+            while 'end_of_log' not in metadata or not metadata['end_of_log']:
+                logs_, metadatas = handler.read(ti, task_try_number, metadata)
+                metadata = metadatas[0]
+                logs.append("\n".join(logs_) + "\n")
+
+        if request.headers.get('Content-Type', None) == 'application/json':
+            return dict(continuation_token=str(metadata),
+                        content=str(logs))
+
+        file_obj = BytesIO(b'\n'.join(
+            log.encode('utf-8') for log in logs
+        ))
+        filename_template = conf.get('logging', 'LOG_FILENAME_TEMPLATE')
+        attachment_filename = render_log_filename(
+            ti=ti,
+            try_number="all" if task_try_number is None else task_try_number,
+            filename_template=filename_template)
+
+        return Response(file_obj, mimetype="text/plain",
+                        headers={"Content-Disposition": "attachment; filename={}".format(
+                            attachment_filename)})
+
+    except AttributeError as err:
+        error_message = [
+            "Task log handler {} does not support read logs.\n{}\n"
+            .format(task_log_reader, str(err))
+        ]
+        metadata['end_of_log'] = True
+        return dict(continuation_token=str(metadata),

Review comment:
       Schema would be helpful here when we work on HATEOS.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r444296241



##########
File path: tests/api_connexion/endpoints/test_log_endpoint.py
##########
@@ -14,25 +14,254 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import copy
+import logging.config
+import os
+import shutil
+import sys
+import tempfile
 import unittest
+from unittest import mock
+from unittest.mock import PropertyMock
 
-import pytest
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import DAG, settings
+from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
+from airflow.models import DagRun, TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session, provide_session
+from airflow.utils.types import DagRunType
 from airflow.www import app
+from tests.test_utils.config import conf_vars
+from tests.test_utils.db import clear_db_runs
 
 
 class TestGetLog(unittest.TestCase):
+    DAG_ID = 'dag_for_testing_log_endpoint'
+    DAG_ID_REMOVED = 'removed_dag_for_testing_log_endpoint'
+    TASK_ID = 'task_for_testing_log_endpoint'
+    TRY_NUMBER = 1
+
     @classmethod
-    def setUpClass(cls) -> None:
-        super().setUpClass()
-        cls.app = app.create_app(testing=True)  # type:ignore
+    def setUpClass(cls):
+        settings.configure_orm()
+        cls.session = settings.Session
+        cls.app = app.create_app(testing=True)
 
     def setUp(self) -> None:
-        self.client = self.app.test_client()  # type:ignore
+        self.default_time = "2020-06-10T20:00:00+00:00"
+        self.client = self.app.test_client()
+        self.log_dir = tempfile.mkdtemp()
+        # Make sure that the configure_logging is not cached
+        self.old_modules = dict(sys.modules)
+        self._prepare_log_files()
+        self._configure_loggers()
+        self._prepare_db()
+
+    def _create_dagrun(self, session):
+        dagrun_model = DagRun(
+            dag_id=self.DAG_ID,
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+
+    def _configure_loggers(self):
+        # Create a custom logging configuration
+        logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
+        logging_config['handlers']['task']['base_log_folder'] = self.log_dir
+
+        logging_config['handlers']['task']['filename_template'] = \
+            '{{ ti.dag_id }}/{{ ti.task_id }}/' \
+            '{{ ts | replace(":", ".") }}/{{ try_number }}.log'
+
+        # Write the custom logging configuration to a file
+        self.settings_folder = tempfile.mkdtemp()
+        settings_file = os.path.join(self.settings_folder, "airflow_local_settings.py")
+        new_logging_file = "LOGGING_CONFIG = {}".format(logging_config)
+        with open(settings_file, 'w') as handle:
+            handle.writelines(new_logging_file)
+        sys.path.append(self.settings_folder)
+
+        with conf_vars({('logging', 'logging_config_class'): 'airflow_local_settings.LOGGING_CONFIG'}):
+            self.app = app.create_app(testing=True)
+            self.client = self.app.test_client()
+            settings.configure_logging()
+
+    def _prepare_db(self):
+        dagbag = self.app.dag_bag  # pylint: disable=no-member
+        dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time))
+        dag.sync_to_db()
+        dag_removed = DAG(self.DAG_ID_REMOVED, start_date=timezone.parse(self.default_time))
+        dag_removed.sync_to_db()
+        dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag)
+        with create_session() as session:
+            self.ti = TaskInstance(
+                task=DummyOperator(task_id=self.TASK_ID, dag=dag),
+                execution_date=timezone.parse(self.default_time)
+            )
+            self.ti.try_number = 1
+            self.ti_removed_dag = TaskInstance(
+                task=DummyOperator(task_id=self.TASK_ID, dag=dag_removed),
+                execution_date=timezone.parse(self.default_time)
+            )
+            self.ti_removed_dag.try_number = 1
+
+            session.merge(self.ti)
+            session.merge(self.ti_removed_dag)
+
+    def _prepare_log_files(self):
+        dir_path = f"{self.log_dir}/{self.DAG_ID}/{self.TASK_ID}/" \
+                   f"{self.default_time.replace(':', '.')}/"
+        os.makedirs(dir_path)
+        with open(f"{dir_path}/1.log", "w+") as file:
+            file.write("Log for testing.")
+            file.flush()
 
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
+    def tearDown(self):
+        logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+        clear_db_runs()
+
+        # Remove any new modules imported during the test run. This lets us
+        # import the same source files for more than one test.
+        for mod in [m for m in sys.modules if m not in self.old_modules]:
+            del sys.modules[mod]
+
+        sys.path.remove(self.settings_folder)
+        shutil.rmtree(self.settings_folder)
+        shutil.rmtree(self.log_dir)
+
+        super().tearDown()
+
+    @provide_session
+    def test_should_response_200_json(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": False})
+        headers = {'Accept': 'application/json'}
         response = self.client.get(
-            "/dags/TEST_DG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_TASK_ID/logs/3"
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+            headers=headers
+        )
+        self.assertIn('content', response.json)
+        self.assertIn('continuation_token', response.json)
+        self.assertIn('Log for testing.', response.json.get('content'))
+        self.assertEqual(200, response.status_code)
+
+    @provide_session
+    def test_should_response_200_text_plain(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": True})
+
+        response = self.client.get(
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+            headers={'Accept': 'text/plain'}
         )
         assert response.status_code == 200
+        self.assertEqual(200, response.status_code)
+        self.assertIn('Log for testing.', response.data.decode('utf-8'))

Review comment:
       ```suggestion
           self.assertEqual('Log for testing.', response.data.decode('utf-8'))
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r443284187



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,93 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
 
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
-def get_log():
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.schemas.log_schema import logs_schema
+from airflow.models import DagRun
+from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.session import provide_session
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+
+    task_log_reader = TaskLogReader()
+    if not task_log_reader.is_supported:
+        metadata = {"end_of_log": True}
+        return logs_schema.dump(dict(
+            content="[Task log handler does not support read logs.]",
+            continuation_token=str(metadata)
+        ))
+
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")
+
+    ti = dag_run.get_task_instance(task_id, session)
+    if ti is None:
+        metadata['end_of_log'] = True
+        return logs_schema.dump(
+            dict(
+                content="[*** Task instance did not exist in the DB\n]",
+                continuation_token=str(metadata))
+        )
+    try:
+        dag = current_app.dag_bag.get_dag(dag_id)
+        if dag:
+            ti.task = dag.get_task(ti.task_id)
+
+        if request.headers.get('Content-Type', None) == 'application/json':
+            logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
+            logs = logs[0] if task_try_number is not None else logs
+
+            return logs_schema.dump(dict(continuation_token=str(metadata),
+                                         content=logs)
+                                    )
+
+        # Defaulting to content type of text/plain
+        logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)
+
+        attachment_filename = task_log_reader.render_log_filename(ti, task_try_number)
+
+        return Response(
+            logs,
+            mimetype="text/plain",
+            headers={
+                "Content-Disposition": f"attachment; filename={attachment_filename}"

Review comment:
       I am not sure, how I can test that it opens in the browser but take a look at how I did it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r441354164



##########
File path: tests/api_connexion/endpoints/test_logs/dag_for_testing_log_endpoint/task_for_testing_log_endpoint/2020-06-10T20.00.00+00.00/1.log
##########
@@ -0,0 +1 @@
+Log for testing.

Review comment:
       No. It is used by the tests




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r441368237



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -14,13 +14,99 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import logging
+import os
+from io import BytesIO
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import models, settings
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.configuration import conf
+from airflow.models import DagRun
+from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.utils.helpers import render_log_filename
+from airflow.utils.session import provide_session
 
-def get_log():
+if os.environ.get('SKIP_DAGS_PARSING') != 'True':
+    dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
+else:
+    dagbag = models.DagBag(os.devnull, include_examples=False)
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")
+
+    logger = logging.getLogger('airflow.task')
+    task_log_reader = conf.get('logging', 'task_log_reader')
+    handler = next((handler for handler in logger.handlers
+                    if handler.name == task_log_reader), None)
+
+    ti = dag_run.get_task_instance(task_id, session)
+    try:
+        if ti is None:
+            logs = ["*** Task instance did not exist in the DB\n"]
+            metadata['end_of_log'] = True
+        else:
+            dag = dagbag.get_dag(dag_id)
+            ti.task = dag.get_task(ti.task_id)
+            logs, metadatas = handler.read(ti, task_try_number, metadata=metadata)
+            metadata = metadatas[0]
+
+        if full_content:

Review comment:
       ```suggestion
           if full_content or request.headers.get('Content-Type', None) == 'text/plain':
   ```
   We return the full journal for the text. In this format, we don't have an easy way to return a token.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r443194198



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,102 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+import os
 
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
-def get_log():
+from airflow import models, settings
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.schemas.log_schema import logs_schema
+from airflow.models import DagRun
+from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.session import provide_session
+
+if os.environ.get('SKIP_DAGS_PARSING') != 'True':
+    dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
+else:
+    dagbag = models.DagBag(os.devnull, include_examples=False)
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+
+    task_log_reader = TaskLogReader()
+
+    if not task_log_reader.is_supported:
+        metadata = {"end_of_log": True}
+        return logs_schema.dump(dict(
+            content="[Task log handler does not support read logs.]",
+            continuation_token=str(metadata)
+        ))
+
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")
+
+    ti = dag_run.get_task_instance(task_id, session)
+    if ti is None:
+        metadata['end_of_log'] = True
+        return logs_schema.dump(
+            dict(
+                content=str("[*** Task instance did not exist in the DB\n]"),
+                continuation_token=str(metadata))
+        )
+    try:
+        dag = dagbag.get_dag(dag_id)
+        if dag:
+            ti.task = dag.get_task(ti.task_id)
+
+        if request.headers.get('Content-Type', None) == 'application/json':
+            logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
+            logs = logs[0] if task_try_number is not None else logs
+
+            return logs_schema.dump(dict(continuation_token=str(metadata),
+                                         content=logs)
+                                    )
+
+        # Defaulting to content type of text/plain

Review comment:
       Yes. We return a full log for the text response.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r441379271



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -14,13 +14,99 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import logging
+import os
+from io import BytesIO
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import models, settings
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.configuration import conf
+from airflow.models import DagRun
+from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.utils.helpers import render_log_filename
+from airflow.utils.session import provide_session
 
-def get_log():
+if os.environ.get('SKIP_DAGS_PARSING') != 'True':
+    dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
+else:
+    dagbag = models.DagBag(os.devnull, include_examples=False)

Review comment:
       Should I use the dagbag created at airflow/www/views.py?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] OmairK commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
OmairK commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r441357390



##########
File path: tests/api_connexion/endpoints/test_logs/dag_for_testing_log_endpoint/task_for_testing_log_endpoint/2020-06-10T20.00.00+00.00/1.log
##########
@@ -0,0 +1 @@
+Log for testing.

Review comment:
       This is a temporary file right? IMO it should be created in the start of the the test and should be unlinked during the tearDown. What do you think? 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r445547815



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,85 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+from accept_types import get_best_match

Review comment:
       Thanks for this!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r444296587



##########
File path: tests/api_connexion/endpoints/test_log_endpoint.py
##########
@@ -14,25 +14,254 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import copy
+import logging.config
+import os
+import shutil
+import sys
+import tempfile
 import unittest
+from unittest import mock
+from unittest.mock import PropertyMock
 
-import pytest
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import DAG, settings
+from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
+from airflow.models import DagRun, TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session, provide_session
+from airflow.utils.types import DagRunType
 from airflow.www import app
+from tests.test_utils.config import conf_vars
+from tests.test_utils.db import clear_db_runs
 
 
 class TestGetLog(unittest.TestCase):
+    DAG_ID = 'dag_for_testing_log_endpoint'
+    DAG_ID_REMOVED = 'removed_dag_for_testing_log_endpoint'
+    TASK_ID = 'task_for_testing_log_endpoint'
+    TRY_NUMBER = 1
+
     @classmethod
-    def setUpClass(cls) -> None:
-        super().setUpClass()
-        cls.app = app.create_app(testing=True)  # type:ignore
+    def setUpClass(cls):
+        settings.configure_orm()
+        cls.session = settings.Session
+        cls.app = app.create_app(testing=True)
 
     def setUp(self) -> None:
-        self.client = self.app.test_client()  # type:ignore
+        self.default_time = "2020-06-10T20:00:00+00:00"
+        self.client = self.app.test_client()
+        self.log_dir = tempfile.mkdtemp()
+        # Make sure that the configure_logging is not cached
+        self.old_modules = dict(sys.modules)
+        self._prepare_log_files()
+        self._configure_loggers()
+        self._prepare_db()
+
+    def _create_dagrun(self, session):
+        dagrun_model = DagRun(
+            dag_id=self.DAG_ID,
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+
+    def _configure_loggers(self):
+        # Create a custom logging configuration
+        logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
+        logging_config['handlers']['task']['base_log_folder'] = self.log_dir
+
+        logging_config['handlers']['task']['filename_template'] = \
+            '{{ ti.dag_id }}/{{ ti.task_id }}/' \
+            '{{ ts | replace(":", ".") }}/{{ try_number }}.log'
+
+        # Write the custom logging configuration to a file
+        self.settings_folder = tempfile.mkdtemp()
+        settings_file = os.path.join(self.settings_folder, "airflow_local_settings.py")
+        new_logging_file = "LOGGING_CONFIG = {}".format(logging_config)
+        with open(settings_file, 'w') as handle:
+            handle.writelines(new_logging_file)
+        sys.path.append(self.settings_folder)
+
+        with conf_vars({('logging', 'logging_config_class'): 'airflow_local_settings.LOGGING_CONFIG'}):
+            self.app = app.create_app(testing=True)
+            self.client = self.app.test_client()
+            settings.configure_logging()
+
+    def _prepare_db(self):
+        dagbag = self.app.dag_bag  # pylint: disable=no-member
+        dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time))
+        dag.sync_to_db()
+        dag_removed = DAG(self.DAG_ID_REMOVED, start_date=timezone.parse(self.default_time))
+        dag_removed.sync_to_db()
+        dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag)
+        with create_session() as session:
+            self.ti = TaskInstance(
+                task=DummyOperator(task_id=self.TASK_ID, dag=dag),
+                execution_date=timezone.parse(self.default_time)
+            )
+            self.ti.try_number = 1
+            self.ti_removed_dag = TaskInstance(
+                task=DummyOperator(task_id=self.TASK_ID, dag=dag_removed),
+                execution_date=timezone.parse(self.default_time)
+            )
+            self.ti_removed_dag.try_number = 1
+
+            session.merge(self.ti)
+            session.merge(self.ti_removed_dag)
+
+    def _prepare_log_files(self):
+        dir_path = f"{self.log_dir}/{self.DAG_ID}/{self.TASK_ID}/" \
+                   f"{self.default_time.replace(':', '.')}/"
+        os.makedirs(dir_path)
+        with open(f"{dir_path}/1.log", "w+") as file:
+            file.write("Log for testing.")
+            file.flush()
 
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
+    def tearDown(self):
+        logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+        clear_db_runs()
+
+        # Remove any new modules imported during the test run. This lets us
+        # import the same source files for more than one test.
+        for mod in [m for m in sys.modules if m not in self.old_modules]:
+            del sys.modules[mod]
+
+        sys.path.remove(self.settings_folder)
+        shutil.rmtree(self.settings_folder)
+        shutil.rmtree(self.log_dir)
+
+        super().tearDown()
+
+    @provide_session
+    def test_should_response_200_json(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": False})
+        headers = {'Accept': 'application/json'}
         response = self.client.get(
-            "/dags/TEST_DG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_TASK_ID/logs/3"
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+            headers=headers
+        )
+        self.assertIn('content', response.json)
+        self.assertIn('continuation_token', response.json)
+        self.assertIn('Log for testing.', response.json.get('content'))
+        self.assertEqual(200, response.status_code)
+
+    @provide_session
+    def test_should_response_200_text_plain(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": True})
+
+        response = self.client.get(
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+            headers={'Accept': 'text/plain'}
         )
         assert response.status_code == 200
+        self.assertEqual(200, response.status_code)
+        self.assertIn('Log for testing.', response.data.decode('utf-8'))
+
+    @provide_session
+    def test_get_logs_response_with_ti_equal_to_none(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": True})
+
+        response = self.client.get(
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/Invalid-Task-ID/logs/1?token={token}",
+        )
+        self.assertIn('content', response.json)
+        self.assertIn('continuation_token', response.json)
+        self.assertEqual("[*** Task instance did not exist in the DB\n]",

Review comment:
       Should we return 404 here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r444295989



##########
File path: tests/api_connexion/endpoints/test_log_endpoint.py
##########
@@ -14,25 +14,254 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import copy
+import logging.config
+import os
+import shutil
+import sys
+import tempfile
 import unittest
+from unittest import mock
+from unittest.mock import PropertyMock
 
-import pytest
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import DAG, settings
+from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
+from airflow.models import DagRun, TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session, provide_session
+from airflow.utils.types import DagRunType
 from airflow.www import app
+from tests.test_utils.config import conf_vars
+from tests.test_utils.db import clear_db_runs
 
 
 class TestGetLog(unittest.TestCase):
+    DAG_ID = 'dag_for_testing_log_endpoint'
+    DAG_ID_REMOVED = 'removed_dag_for_testing_log_endpoint'
+    TASK_ID = 'task_for_testing_log_endpoint'
+    TRY_NUMBER = 1
+
     @classmethod
-    def setUpClass(cls) -> None:
-        super().setUpClass()
-        cls.app = app.create_app(testing=True)  # type:ignore
+    def setUpClass(cls):
+        settings.configure_orm()
+        cls.session = settings.Session
+        cls.app = app.create_app(testing=True)
 
     def setUp(self) -> None:
-        self.client = self.app.test_client()  # type:ignore
+        self.default_time = "2020-06-10T20:00:00+00:00"
+        self.client = self.app.test_client()
+        self.log_dir = tempfile.mkdtemp()
+        # Make sure that the configure_logging is not cached
+        self.old_modules = dict(sys.modules)
+        self._prepare_log_files()
+        self._configure_loggers()
+        self._prepare_db()
+
+    def _create_dagrun(self, session):
+        dagrun_model = DagRun(
+            dag_id=self.DAG_ID,
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+
+    def _configure_loggers(self):
+        # Create a custom logging configuration
+        logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
+        logging_config['handlers']['task']['base_log_folder'] = self.log_dir
+
+        logging_config['handlers']['task']['filename_template'] = \
+            '{{ ti.dag_id }}/{{ ti.task_id }}/' \
+            '{{ ts | replace(":", ".") }}/{{ try_number }}.log'
+
+        # Write the custom logging configuration to a file
+        self.settings_folder = tempfile.mkdtemp()
+        settings_file = os.path.join(self.settings_folder, "airflow_local_settings.py")
+        new_logging_file = "LOGGING_CONFIG = {}".format(logging_config)
+        with open(settings_file, 'w') as handle:
+            handle.writelines(new_logging_file)
+        sys.path.append(self.settings_folder)
+
+        with conf_vars({('logging', 'logging_config_class'): 'airflow_local_settings.LOGGING_CONFIG'}):
+            self.app = app.create_app(testing=True)
+            self.client = self.app.test_client()
+            settings.configure_logging()
+
+    def _prepare_db(self):
+        dagbag = self.app.dag_bag  # pylint: disable=no-member
+        dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time))
+        dag.sync_to_db()
+        dag_removed = DAG(self.DAG_ID_REMOVED, start_date=timezone.parse(self.default_time))
+        dag_removed.sync_to_db()
+        dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag)
+        with create_session() as session:
+            self.ti = TaskInstance(
+                task=DummyOperator(task_id=self.TASK_ID, dag=dag),
+                execution_date=timezone.parse(self.default_time)
+            )
+            self.ti.try_number = 1
+            self.ti_removed_dag = TaskInstance(
+                task=DummyOperator(task_id=self.TASK_ID, dag=dag_removed),
+                execution_date=timezone.parse(self.default_time)
+            )
+            self.ti_removed_dag.try_number = 1
+
+            session.merge(self.ti)
+            session.merge(self.ti_removed_dag)
+
+    def _prepare_log_files(self):
+        dir_path = f"{self.log_dir}/{self.DAG_ID}/{self.TASK_ID}/" \
+                   f"{self.default_time.replace(':', '.')}/"
+        os.makedirs(dir_path)
+        with open(f"{dir_path}/1.log", "w+") as file:
+            file.write("Log for testing.")
+            file.flush()
 
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
+    def tearDown(self):
+        logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+        clear_db_runs()
+
+        # Remove any new modules imported during the test run. This lets us
+        # import the same source files for more than one test.
+        for mod in [m for m in sys.modules if m not in self.old_modules]:
+            del sys.modules[mod]
+
+        sys.path.remove(self.settings_folder)
+        shutil.rmtree(self.settings_folder)
+        shutil.rmtree(self.log_dir)
+
+        super().tearDown()
+
+    @provide_session
+    def test_should_response_200_json(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": False})
+        headers = {'Accept': 'application/json'}
         response = self.client.get(
-            "/dags/TEST_DG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_TASK_ID/logs/3"
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+            headers=headers
+        )
+        self.assertIn('content', response.json)

Review comment:
       We don't have test for schema, so ... can you add assertion for full response? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r441930394



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,102 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+import os
 
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
-def get_log():
+from airflow import models, settings
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.schemas.log_schema import logs_schema
+from airflow.models import DagRun
+from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.session import provide_session
+
+if os.environ.get('SKIP_DAGS_PARSING') != 'True':
+    dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
+else:
+    dagbag = models.DagBag(os.devnull, include_examples=False)
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+
+    task_log_reader = TaskLogReader()
+
+    if not task_log_reader.is_supported:
+        metadata = {"end_of_log": True}
+        return logs_schema.dump(dict(
+            content="[Task log handler does not support read logs.]",
+            continuation_token=str(metadata)
+        ))
+
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")
+
+    ti = dag_run.get_task_instance(task_id, session)
+    if ti is None:
+        metadata['end_of_log'] = True
+        return logs_schema.dump(
+            dict(
+                content=str("[*** Task instance did not exist in the DB\n]"),

Review comment:
       ```suggestion
                   content="[*** Task instance did not exist in the DB\n]",
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r446843529



##########
File path: requirements/requirements-python3.8.txt
##########
@@ -45,7 +45,7 @@ apispec==1.3.3
 appdirs==1.4.4
 argcomplete==1.11.1
 asn1crypto==1.3.0
-astroid==2.3.3
+astroid==2.4.2

Review comment:
       Have you encountered any problems with this version of the library?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r441356636



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -14,13 +14,99 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import logging
+import os
+from io import BytesIO
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import models, settings
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.configuration import conf
+from airflow.models import DagRun
+from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.utils.helpers import render_log_filename
+from airflow.utils.session import provide_session
 
-def get_log():
+if os.environ.get('SKIP_DAGS_PARSING') != 'True':
+    dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
+else:
+    dagbag = models.DagBag(os.devnull, include_examples=False)

Review comment:
       We need to come up with a way to make DagBag created only one for the webserver. Web and API should share one instance.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r446838469



##########
File path: tests/api_connexion/endpoints/test_log_endpoint.py
##########
@@ -14,25 +14,259 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import copy
+import logging.config
+import os
+import shutil
+import sys
+import tempfile
 import unittest
+from unittest import mock
+from unittest.mock import PropertyMock
 
-import pytest
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import DAG, settings
+from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
+from airflow.models import DagRun, TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session, provide_session
+from airflow.utils.types import DagRunType
 from airflow.www import app
+from tests.test_utils.config import conf_vars
+from tests.test_utils.db import clear_db_runs
 
 
 class TestGetLog(unittest.TestCase):
+    DAG_ID = 'dag_for_testing_log_endpoint'
+    TASK_ID = 'task_for_testing_log_endpoint'
+    TRY_NUMBER = 1
+
     @classmethod
-    def setUpClass(cls) -> None:
-        super().setUpClass()
-        cls.app = app.create_app(testing=True)  # type:ignore
+    def setUpClass(cls):
+        settings.configure_orm()
+        cls.session = settings.Session
+        cls.app = app.create_app(testing=True)
 
     def setUp(self) -> None:
-        self.client = self.app.test_client()  # type:ignore
+        self.default_time = "2020-06-10T20:00:00+00:00"
+        self.client = self.app.test_client()
+        self.log_dir = tempfile.mkdtemp()
+        # Make sure that the configure_logging is not cached
+        self.old_modules = dict(sys.modules)
+        self._prepare_log_files()
+        self._configure_loggers()
+        self._prepare_db()
+
+    def _create_dagrun(self, session):
+        dagrun_model = DagRun(
+            dag_id=self.DAG_ID,
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+
+    def _configure_loggers(self):
+        # Create a custom logging configuration
+        logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
+        logging_config['handlers']['task']['base_log_folder'] = self.log_dir
+
+        logging_config['handlers']['task']['filename_template'] = \
+            '{{ ti.dag_id }}/{{ ti.task_id }}/' \
+            '{{ ts | replace(":", ".") }}/{{ try_number }}.log'
+
+        # Write the custom logging configuration to a file
+        self.settings_folder = tempfile.mkdtemp()
+        settings_file = os.path.join(self.settings_folder, "airflow_local_settings.py")
+        new_logging_file = "LOGGING_CONFIG = {}".format(logging_config)
+        with open(settings_file, 'w') as handle:
+            handle.writelines(new_logging_file)
+        sys.path.append(self.settings_folder)
+
+        with conf_vars({('logging', 'logging_config_class'): 'airflow_local_settings.LOGGING_CONFIG'}):
+            self.app = app.create_app(testing=True)
+            self.client = self.app.test_client()
+            settings.configure_logging()
+
+    def _prepare_db(self):
+        dagbag = self.app.dag_bag  # pylint: disable=no-member
+        dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time))
+        dag.sync_to_db()
+        dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag)
+        with create_session() as session:
+            self.ti = TaskInstance(
+                task=DummyOperator(task_id=self.TASK_ID, dag=dag),
+                execution_date=timezone.parse(self.default_time)
+            )
+            self.ti.try_number = 1
+            session.merge(self.ti)
+
+    def _prepare_log_files(self):
+        dir_path = f"{self.log_dir}/{self.DAG_ID}/{self.TASK_ID}/" \
+                   f"{self.default_time.replace(':', '.')}/"
+        os.makedirs(dir_path)
+        with open(f"{dir_path}/1.log", "w+") as file:
+            file.write("Log for testing.")
+            file.flush()
+
+    def tearDown(self):
+        logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+        clear_db_runs()
+
+        # Remove any new modules imported during the test run. This lets us
+        # import the same source files for more than one test.
+        for mod in [m for m in sys.modules if m not in self.old_modules]:
+            del sys.modules[mod]
+
+        sys.path.remove(self.settings_folder)
+        shutil.rmtree(self.settings_folder)
+        shutil.rmtree(self.log_dir)
+
+        super().tearDown()
+
+    @provide_session
+    def test_should_response_200_json(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": False})
+        headers = {'Accept': 'application/json'}
+        response = self.client.get(
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+            headers=headers
+        )
+        expected_filename = "{}/{}/{}/{}/1.log".format(
+            self.log_dir,
+            self.DAG_ID,
+            self.TASK_ID,
+            self.default_time.replace(":", ".")
+        )
+        self.assertEqual(
+            response.json['content'],
+            f"*** Reading local file: {expected_filename}\nLog for testing."
+        )
+        self.assertIn(
+            response.json['continuation_token'],
+            "{'end_of_log': True}"

Review comment:
       The token is encrypted metadata so that the user cannot manipulate it and affect its content.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r442125343



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,102 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+import os
 
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
-def get_log():
+from airflow import models, settings
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.schemas.log_schema import logs_schema
+from airflow.models import DagRun
+from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.session import provide_session
+
+if os.environ.get('SKIP_DAGS_PARSING') != 'True':
+    dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
+else:
+    dagbag = models.DagBag(os.devnull, include_examples=False)
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+
+    task_log_reader = TaskLogReader()
+
+    if not task_log_reader.is_supported:
+        metadata = {"end_of_log": True}
+        return logs_schema.dump(dict(
+            content="[Task log handler does not support read logs.]",
+            continuation_token=str(metadata)
+        ))
+
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")
+
+    ti = dag_run.get_task_instance(task_id, session)
+    if ti is None:
+        metadata['end_of_log'] = True
+        return logs_schema.dump(
+            dict(
+                content=str("[*** Task instance did not exist in the DB\n]"),
+                continuation_token=str(metadata))
+        )
+    try:
+        dag = dagbag.get_dag(dag_id)
+        if dag:
+            ti.task = dag.get_task(ti.task_id)
+
+        if request.headers.get('Content-Type', None) == 'application/json':

Review comment:
       ```suggestion
           if not full_content or request.headers.get('Content-Type', None) == 'application/json':
   ```
   Send json if content requested is not full content. Disregard text/plain content type if metadata['download_logs'] is false. This way we conform to the spec. Any thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r442178632



##########
File path: airflow/api_connexion/schemas/log_schema.py
##########
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from marshmallow import Schema, fields
+
+
+class LogsSchema(Schema):
+    """ Schema for logs """
+
+    content = fields.Str()
+    continuation_token = fields.Str()
+
+
+logs_schema = LogsSchema()

Review comment:
       ```suggestion
   logs_schema = LogsSchema(strict=True)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r442115447



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,102 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+import os
 
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
-def get_log():
+from airflow import models, settings
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.schemas.log_schema import logs_schema
+from airflow.models import DagRun
+from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.session import provide_session
+
+if os.environ.get('SKIP_DAGS_PARSING') != 'True':
+    dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
+else:
+    dagbag = models.DagBag(os.devnull, include_examples=False)
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+
+    task_log_reader = TaskLogReader()
+
+    if not task_log_reader.is_supported:
+        metadata = {"end_of_log": True}
+        return logs_schema.dump(dict(
+            content="[Task log handler does not support read logs.]",
+            continuation_token=str(metadata)
+        ))
+
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")
+
+    ti = dag_run.get_task_instance(task_id, session)
+    if ti is None:
+        metadata['end_of_log'] = True
+        return logs_schema.dump(
+            dict(
+                content=str("[*** Task instance did not exist in the DB\n]"),
+                continuation_token=str(metadata))
+        )
+    try:
+        dag = dagbag.get_dag(dag_id)
+        if dag:
+            ti.task = dag.get_task(ti.task_id)
+
+        if request.headers.get('Content-Type', None) == 'application/json':
+            logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
+            logs = logs[0] if task_try_number is not None else logs
+
+            return logs_schema.dump(dict(continuation_token=str(metadata),
+                                         content=logs)
+                                    )
+
+        # Defaulting to content type of text/plain

Review comment:
       From the spec, text/plain response returns a string without continuation_token. So I think it's OK to default to streaming log and send as attachment?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r443194486



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,93 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
 
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
-def get_log():
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.schemas.log_schema import logs_schema
+from airflow.models import DagRun
+from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.session import provide_session
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+
+    task_log_reader = TaskLogReader()
+    if not task_log_reader.is_supported:
+        metadata = {"end_of_log": True}
+        return logs_schema.dump(dict(
+            content="[Task log handler does not support read logs.]",
+            continuation_token=str(metadata)
+        ))
+
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")

Review comment:
       ```suggestion
           raise NotFound("DagRun not found")
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r444481257



##########
File path: tests/api_connexion/endpoints/test_log_endpoint.py
##########
@@ -14,25 +14,254 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import copy
+import logging.config
+import os
+import shutil
+import sys
+import tempfile
 import unittest
+from unittest import mock
+from unittest.mock import PropertyMock
 
-import pytest
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import DAG, settings
+from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
+from airflow.models import DagRun, TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session, provide_session
+from airflow.utils.types import DagRunType
 from airflow.www import app
+from tests.test_utils.config import conf_vars
+from tests.test_utils.db import clear_db_runs
 
 
 class TestGetLog(unittest.TestCase):
+    DAG_ID = 'dag_for_testing_log_endpoint'
+    DAG_ID_REMOVED = 'removed_dag_for_testing_log_endpoint'
+    TASK_ID = 'task_for_testing_log_endpoint'
+    TRY_NUMBER = 1
+
     @classmethod
-    def setUpClass(cls) -> None:
-        super().setUpClass()
-        cls.app = app.create_app(testing=True)  # type:ignore
+    def setUpClass(cls):
+        settings.configure_orm()
+        cls.session = settings.Session
+        cls.app = app.create_app(testing=True)
 
     def setUp(self) -> None:
-        self.client = self.app.test_client()  # type:ignore
+        self.default_time = "2020-06-10T20:00:00+00:00"
+        self.client = self.app.test_client()
+        self.log_dir = tempfile.mkdtemp()
+        # Make sure that the configure_logging is not cached
+        self.old_modules = dict(sys.modules)
+        self._prepare_log_files()
+        self._configure_loggers()
+        self._prepare_db()
+
+    def _create_dagrun(self, session):
+        dagrun_model = DagRun(
+            dag_id=self.DAG_ID,
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+
+    def _configure_loggers(self):
+        # Create a custom logging configuration
+        logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
+        logging_config['handlers']['task']['base_log_folder'] = self.log_dir
+
+        logging_config['handlers']['task']['filename_template'] = \
+            '{{ ti.dag_id }}/{{ ti.task_id }}/' \
+            '{{ ts | replace(":", ".") }}/{{ try_number }}.log'
+
+        # Write the custom logging configuration to a file
+        self.settings_folder = tempfile.mkdtemp()
+        settings_file = os.path.join(self.settings_folder, "airflow_local_settings.py")
+        new_logging_file = "LOGGING_CONFIG = {}".format(logging_config)
+        with open(settings_file, 'w') as handle:
+            handle.writelines(new_logging_file)
+        sys.path.append(self.settings_folder)
+
+        with conf_vars({('logging', 'logging_config_class'): 'airflow_local_settings.LOGGING_CONFIG'}):
+            self.app = app.create_app(testing=True)
+            self.client = self.app.test_client()
+            settings.configure_logging()
+
+    def _prepare_db(self):
+        dagbag = self.app.dag_bag  # pylint: disable=no-member
+        dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time))
+        dag.sync_to_db()
+        dag_removed = DAG(self.DAG_ID_REMOVED, start_date=timezone.parse(self.default_time))
+        dag_removed.sync_to_db()
+        dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag)
+        with create_session() as session:
+            self.ti = TaskInstance(
+                task=DummyOperator(task_id=self.TASK_ID, dag=dag),
+                execution_date=timezone.parse(self.default_time)
+            )
+            self.ti.try_number = 1
+            self.ti_removed_dag = TaskInstance(
+                task=DummyOperator(task_id=self.TASK_ID, dag=dag_removed),
+                execution_date=timezone.parse(self.default_time)
+            )
+            self.ti_removed_dag.try_number = 1
+
+            session.merge(self.ti)
+            session.merge(self.ti_removed_dag)
+
+    def _prepare_log_files(self):
+        dir_path = f"{self.log_dir}/{self.DAG_ID}/{self.TASK_ID}/" \
+                   f"{self.default_time.replace(':', '.')}/"
+        os.makedirs(dir_path)
+        with open(f"{dir_path}/1.log", "w+") as file:
+            file.write("Log for testing.")
+            file.flush()
 
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
+    def tearDown(self):
+        logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+        clear_db_runs()
+
+        # Remove any new modules imported during the test run. This lets us
+        # import the same source files for more than one test.
+        for mod in [m for m in sys.modules if m not in self.old_modules]:
+            del sys.modules[mod]
+
+        sys.path.remove(self.settings_folder)
+        shutil.rmtree(self.settings_folder)
+        shutil.rmtree(self.log_dir)
+
+        super().tearDown()
+
+    @provide_session
+    def test_should_response_200_json(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": False})
+        headers = {'Accept': 'application/json'}
         response = self.client.get(
-            "/dags/TEST_DG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_TASK_ID/logs/3"
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+            headers=headers
+        )
+        self.assertIn('content', response.json)
+        self.assertIn('continuation_token', response.json)
+        self.assertIn('Log for testing.', response.json.get('content'))
+        self.assertEqual(200, response.status_code)
+
+    @provide_session
+    def test_should_response_200_text_plain(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": True})
+
+        response = self.client.get(
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+            headers={'Accept': 'text/plain'}
         )
         assert response.status_code == 200
+        self.assertEqual(200, response.status_code)
+        self.assertIn('Log for testing.', response.data.decode('utf-8'))
+
+    @provide_session
+    def test_get_logs_response_with_ti_equal_to_none(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": True})
+
+        response = self.client.get(
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/Invalid-Task-ID/logs/1?token={token}",
+        )
+        self.assertIn('content', response.json)
+        self.assertIn('continuation_token', response.json)
+        self.assertEqual("[*** Task instance did not exist in the DB\n]",

Review comment:
       Since we are logging, I think returning it as log is cool. What do you suggest




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#issuecomment-646267705


   Depends on #9391 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r441456879



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -14,13 +14,99 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import logging
+import os
+from io import BytesIO
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import models, settings
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.configuration import conf
+from airflow.models import DagRun
+from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.utils.helpers import render_log_filename
+from airflow.utils.session import provide_session
 
-def get_log():
+if os.environ.get('SKIP_DAGS_PARSING') != 'True':
+    dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
+else:
+    dagbag = models.DagBag(os.devnull, include_examples=False)

Review comment:
       I rather thought about adding a new field in flask_app or adding to the context.  This is a slightly larger and more problematic change.
   https://flask.palletsprojects.com/en/1.1.x/appcontext/




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] zikun commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
zikun commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r445544477



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,85 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+from accept_types import get_best_match

Review comment:
       FYI there is a build-in function from flask:
   https://github.com/apache/airflow/blob/8446526882a5798d838224b7566d5349111fd0d0/airflow/api_connexion/endpoints/config_endpoint.py#L30




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r446864065



##########
File path: requirements/requirements-python3.8.txt
##########
@@ -45,7 +45,7 @@ apispec==1.3.3
 appdirs==1.4.4
 argcomplete==1.11.1
 asn1crypto==1.3.0
-astroid==2.3.3
+astroid==2.4.2

Review comment:
       No. The file was generated when I added accept_types as dependency. Now, I have removed it. I will regenerate requirements.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r442110256



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -14,13 +14,99 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import logging
+import os
+from io import BytesIO
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import models, settings
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.configuration import conf
+from airflow.models import DagRun
+from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.utils.helpers import render_log_filename
+from airflow.utils.session import provide_session
 
-def get_log():
+if os.environ.get('SKIP_DAGS_PARSING') != 'True':
+    dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
+else:
+    dagbag = models.DagBag(os.devnull, include_examples=False)

Review comment:
       I am looking at this now, I think a separate PR is necessary?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r443193574



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,93 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
 
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
-def get_log():
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.schemas.log_schema import logs_schema
+from airflow.models import DagRun
+from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.session import provide_session
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+
+    task_log_reader = TaskLogReader()
+    if not task_log_reader.is_supported:
+        metadata = {"end_of_log": True}
+        return logs_schema.dump(dict(

Review comment:
       Can you use NamedTuple istead of dict? This will allow us to preserve types, so we will validate them by mypy.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r443193917



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,93 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
 
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
-def get_log():
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.schemas.log_schema import logs_schema
+from airflow.models import DagRun
+from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.session import provide_session
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+
+    task_log_reader = TaskLogReader()
+    if not task_log_reader.is_supported:
+        metadata = {"end_of_log": True}
+        return logs_schema.dump(dict(
+            content="[Task log handler does not support read logs.]",
+            continuation_token=str(metadata)
+        ))
+
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")
+
+    ti = dag_run.get_task_instance(task_id, session)
+    if ti is None:
+        metadata['end_of_log'] = True
+        return logs_schema.dump(
+            dict(
+                content="[*** Task instance did not exist in the DB\n]",
+                continuation_token=str(metadata))
+        )
+    try:
+        dag = current_app.dag_bag.get_dag(dag_id)
+        if dag:
+            ti.task = dag.get_task(ti.task_id)
+
+        if request.headers.get('Content-Type', None) == 'application/json':

Review comment:
       This header can only be found in the response. We should look at Accept header. 
   
   We have another endpoint that supports different types of responses.. See:
   https://github.com/apache/airflow/pull/9322/files




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r441365898



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -14,13 +14,99 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import logging
+import os
+from io import BytesIO
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import models, settings
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.configuration import conf
+from airflow.models import DagRun
+from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.utils.helpers import render_log_filename
+from airflow.utils.session import provide_session
 
-def get_log():
+if os.environ.get('SKIP_DAGS_PARSING') != 'True':
+    dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
+else:
+    dagbag = models.DagBag(os.devnull, include_examples=False)
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")
+
+    logger = logging.getLogger('airflow.task')
+    task_log_reader = conf.get('logging', 'task_log_reader')
+    handler = next((handler for handler in logger.handlers
+                    if handler.name == task_log_reader), None)
+
+    ti = dag_run.get_task_instance(task_id, session)
+    try:
+        if ti is None:
+            logs = ["*** Task instance did not exist in the DB\n"]
+            metadata['end_of_log'] = True
+        else:
+            dag = dagbag.get_dag(dag_id)
+            ti.task = dag.get_task(ti.task_id)

Review comment:
       ```suggestion
               if dag:
                   with supress(TaskNotFound):
                       ti.task = dag.get_task(ti.task_id)
   ```
   Stackdriver does not require access to DAG. It can be deleted and logs will still be available.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] randr97 commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
randr97 commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r441736516



##########
File path: airflow/www/views.py
##########
@@ -679,62 +680,53 @@ def get_logs_with_metadata(self, session=None):
 
             return response
 
-        logger = logging.getLogger('airflow.task')
-        task_log_reader = conf.get('logging', 'task_log_reader')
-        handler = next((handler for handler in logger.handlers
-                        if handler.name == task_log_reader), None)
+        task_log_reader = TaskLogReader()
+        if not task_log_reader.is_supported:
+            return jsonify(
+                messaege="Task log handler does not support read logs.",
+                errur=True,

Review comment:
       ```suggestion
                   error=True,
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r442118043



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,102 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+import os
 
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
-def get_log():
+from airflow import models, settings
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.schemas.log_schema import logs_schema
+from airflow.models import DagRun
+from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.session import provide_session
+
+if os.environ.get('SKIP_DAGS_PARSING') != 'True':
+    dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
+else:
+    dagbag = models.DagBag(os.devnull, include_examples=False)
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+
+    task_log_reader = TaskLogReader()
+
+    if not task_log_reader.is_supported:
+        metadata = {"end_of_log": True}
+        return logs_schema.dump(dict(
+            content="[Task log handler does not support read logs.]",
+            continuation_token=str(metadata)
+        ))
+
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")
+
+    ti = dag_run.get_task_instance(task_id, session)
+    if ti is None:
+        metadata['end_of_log'] = True
+        return logs_schema.dump(
+            dict(
+                content=str("[*** Task instance did not exist in the DB\n]"),
+                continuation_token=str(metadata))
+        )
+    try:
+        dag = dagbag.get_dag(dag_id)
+        if dag:
+            ti.task = dag.get_task(ti.task_id)
+
+        if request.headers.get('Content-Type', None) == 'application/json':
+            logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
+            logs = logs[0] if task_try_number is not None else logs
+
+            return logs_schema.dump(dict(continuation_token=str(metadata),
+                                         content=logs)
+                                    )
+
+        # Defaulting to content type of text/plain
+        logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)

Review comment:
       ```suggestion
           if not full_content:  # disregard content_type
               logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
               logs = logs[0] if task_try_number is not None else logs
   
               return logs_schema.dump(dict(continuation_token=str(metadata),
                                            content=logs)
                                       )
           logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)
   ```
   What do you think @mik-laj?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r442118043



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,102 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
+import os
 
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
-def get_log():
+from airflow import models, settings
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.schemas.log_schema import logs_schema
+from airflow.models import DagRun
+from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.session import provide_session
+
+if os.environ.get('SKIP_DAGS_PARSING') != 'True':
+    dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
+else:
+    dagbag = models.DagBag(os.devnull, include_examples=False)
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+
+    task_log_reader = TaskLogReader()
+
+    if not task_log_reader.is_supported:
+        metadata = {"end_of_log": True}
+        return logs_schema.dump(dict(
+            content="[Task log handler does not support read logs.]",
+            continuation_token=str(metadata)
+        ))
+
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")
+
+    ti = dag_run.get_task_instance(task_id, session)
+    if ti is None:
+        metadata['end_of_log'] = True
+        return logs_schema.dump(
+            dict(
+                content=str("[*** Task instance did not exist in the DB\n]"),
+                continuation_token=str(metadata))
+        )
+    try:
+        dag = dagbag.get_dag(dag_id)
+        if dag:
+            ti.task = dag.get_task(ti.task_id)
+
+        if request.headers.get('Content-Type', None) == 'application/json':
+            logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
+            logs = logs[0] if task_try_number is not None else logs
+
+            return logs_schema.dump(dict(continuation_token=str(metadata),
+                                         content=logs)
+                                    )
+
+        # Defaulting to content type of text/plain
+        logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)

Review comment:
       ```suggestion
           if not full_content:  # disregard content_type
               logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
               logs = logs[0] if task_try_number is not None else logs
   
               return logs_schema.dump(dict(continuation_token=str(metadata),
                                            content=logs)
                                       )
           logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)
   ```
   What do you think @mik-laj?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r444904773



##########
File path: tests/api_connexion/endpoints/test_log_endpoint.py
##########
@@ -14,25 +14,254 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import copy
+import logging.config
+import os
+import shutil
+import sys
+import tempfile
 import unittest
+from unittest import mock
+from unittest.mock import PropertyMock
 
-import pytest
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import DAG, settings
+from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
+from airflow.models import DagRun, TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session, provide_session
+from airflow.utils.types import DagRunType
 from airflow.www import app
+from tests.test_utils.config import conf_vars
+from tests.test_utils.db import clear_db_runs
 
 
 class TestGetLog(unittest.TestCase):
+    DAG_ID = 'dag_for_testing_log_endpoint'
+    DAG_ID_REMOVED = 'removed_dag_for_testing_log_endpoint'
+    TASK_ID = 'task_for_testing_log_endpoint'
+    TRY_NUMBER = 1
+
     @classmethod
-    def setUpClass(cls) -> None:
-        super().setUpClass()
-        cls.app = app.create_app(testing=True)  # type:ignore
+    def setUpClass(cls):
+        settings.configure_orm()
+        cls.session = settings.Session
+        cls.app = app.create_app(testing=True)
 
     def setUp(self) -> None:
-        self.client = self.app.test_client()  # type:ignore
+        self.default_time = "2020-06-10T20:00:00+00:00"
+        self.client = self.app.test_client()
+        self.log_dir = tempfile.mkdtemp()
+        # Make sure that the configure_logging is not cached
+        self.old_modules = dict(sys.modules)
+        self._prepare_log_files()
+        self._configure_loggers()
+        self._prepare_db()
+
+    def _create_dagrun(self, session):
+        dagrun_model = DagRun(
+            dag_id=self.DAG_ID,
+            run_id='TEST_DAG_RUN_ID',
+            run_type=DagRunType.MANUAL.value,
+            execution_date=timezone.parse(self.default_time),
+            start_date=timezone.parse(self.default_time),
+            external_trigger=True,
+        )
+        session.add(dagrun_model)
+        session.commit()
+
+    def _configure_loggers(self):
+        # Create a custom logging configuration
+        logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
+        logging_config['handlers']['task']['base_log_folder'] = self.log_dir
+
+        logging_config['handlers']['task']['filename_template'] = \
+            '{{ ti.dag_id }}/{{ ti.task_id }}/' \
+            '{{ ts | replace(":", ".") }}/{{ try_number }}.log'
+
+        # Write the custom logging configuration to a file
+        self.settings_folder = tempfile.mkdtemp()
+        settings_file = os.path.join(self.settings_folder, "airflow_local_settings.py")
+        new_logging_file = "LOGGING_CONFIG = {}".format(logging_config)
+        with open(settings_file, 'w') as handle:
+            handle.writelines(new_logging_file)
+        sys.path.append(self.settings_folder)
+
+        with conf_vars({('logging', 'logging_config_class'): 'airflow_local_settings.LOGGING_CONFIG'}):
+            self.app = app.create_app(testing=True)
+            self.client = self.app.test_client()
+            settings.configure_logging()
+
+    def _prepare_db(self):
+        dagbag = self.app.dag_bag  # pylint: disable=no-member
+        dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time))
+        dag.sync_to_db()
+        dag_removed = DAG(self.DAG_ID_REMOVED, start_date=timezone.parse(self.default_time))
+        dag_removed.sync_to_db()
+        dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag)
+        with create_session() as session:
+            self.ti = TaskInstance(
+                task=DummyOperator(task_id=self.TASK_ID, dag=dag),
+                execution_date=timezone.parse(self.default_time)
+            )
+            self.ti.try_number = 1
+            self.ti_removed_dag = TaskInstance(
+                task=DummyOperator(task_id=self.TASK_ID, dag=dag_removed),
+                execution_date=timezone.parse(self.default_time)
+            )
+            self.ti_removed_dag.try_number = 1
+
+            session.merge(self.ti)
+            session.merge(self.ti_removed_dag)
+
+    def _prepare_log_files(self):
+        dir_path = f"{self.log_dir}/{self.DAG_ID}/{self.TASK_ID}/" \
+                   f"{self.default_time.replace(':', '.')}/"
+        os.makedirs(dir_path)
+        with open(f"{dir_path}/1.log", "w+") as file:
+            file.write("Log for testing.")
+            file.flush()
 
-    @pytest.mark.skip(reason="Not implemented yet")
-    def test_should_response_200(self):
+    def tearDown(self):
+        logging.config.dictConfig(DEFAULT_LOGGING_CONFIG)
+        clear_db_runs()
+
+        # Remove any new modules imported during the test run. This lets us
+        # import the same source files for more than one test.
+        for mod in [m for m in sys.modules if m not in self.old_modules]:
+            del sys.modules[mod]
+
+        sys.path.remove(self.settings_folder)
+        shutil.rmtree(self.settings_folder)
+        shutil.rmtree(self.log_dir)
+
+        super().tearDown()
+
+    @provide_session
+    def test_should_response_200_json(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": False})
+        headers = {'Accept': 'application/json'}
         response = self.client.get(
-            "/dags/TEST_DG_ID/dagRuns/TEST_DAG_RUN_ID/taskInstances/TEST_TASK_ID/logs/3"
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+            headers=headers
+        )
+        self.assertIn('content', response.json)
+        self.assertIn('continuation_token', response.json)
+        self.assertIn('Log for testing.', response.json.get('content'))
+        self.assertEqual(200, response.status_code)
+
+    @provide_session
+    def test_should_response_200_text_plain(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": True})
+
+        response = self.client.get(
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/{self.TASK_ID}/logs/1?token={token}",
+            headers={'Accept': 'text/plain'}
         )
         assert response.status_code == 200
+        self.assertEqual(200, response.status_code)
+        self.assertIn('Log for testing.', response.data.decode('utf-8'))
+
+    @provide_session
+    def test_get_logs_response_with_ti_equal_to_none(self, session):
+        self._create_dagrun(session)
+        key = self.app.config["SECRET_KEY"]
+        serializer = URLSafeSerializer(key)
+        token = serializer.dumps({"download_logs": True})
+
+        response = self.client.get(
+            f"api/v1/dags/{self.DAG_ID}/dagRuns/TEST_DAG_RUN_ID/"
+            f"taskInstances/Invalid-Task-ID/logs/1?token={token}",
+        )
+        self.assertIn('content', response.json)
+        self.assertIn('continuation_token', response.json)
+        self.assertEqual("[*** Task instance did not exist in the DB\n]",

Review comment:
       Fixed. Please take a look https://github.com/apache/airflow/pull/9331/commits/e858bf2fb850c8626574d7182709636f59965d77




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] OmairK commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
OmairK commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r441351201



##########
File path: tests/api_connexion/endpoints/test_logs/dag_for_testing_log_endpoint/task_for_testing_log_endpoint/2020-06-10T20.00.00+00.00/1.log
##########
@@ -0,0 +1 @@
+Log for testing.

Review comment:
       I think this is left here by mistake.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj merged pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj merged pull request #9331:
URL: https://github.com/apache/airflow/pull/9331


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r441377869



##########
File path: tests/api_connexion/endpoints/test_logs/dag_for_testing_log_endpoint/task_for_testing_log_endpoint/2020-06-10T20.00.00+00.00/1.log
##########
@@ -0,0 +1 @@
+Log for testing.

Review comment:
       I'm thinking it's going to be a lot of codes for the setUp method that's already bloated? What do you think? The file is just for testing. @mik-laj




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r443194307



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,93 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135
 
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
-def get_log():
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.schemas.log_schema import logs_schema
+from airflow.models import DagRun
+from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.session import provide_session
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+
+    task_log_reader = TaskLogReader()
+    if not task_log_reader.is_supported:
+        metadata = {"end_of_log": True}
+        return logs_schema.dump(dict(
+            content="[Task log handler does not support read logs.]",
+            continuation_token=str(metadata)
+        ))
+
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")
+
+    ti = dag_run.get_task_instance(task_id, session)
+    if ti is None:
+        metadata['end_of_log'] = True
+        return logs_schema.dump(
+            dict(
+                content="[*** Task instance did not exist in the DB\n]",
+                continuation_token=str(metadata))
+        )
+    try:
+        dag = current_app.dag_bag.get_dag(dag_id)
+        if dag:
+            ti.task = dag.get_task(ti.task_id)
+
+        if request.headers.get('Content-Type', None) == 'application/json':
+            logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
+            logs = logs[0] if task_try_number is not None else logs
+
+            return logs_schema.dump(dict(continuation_token=str(metadata),
+                                         content=logs)
+                                    )
+
+        # Defaulting to content type of text/plain
+        logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)
+
+        attachment_filename = task_log_reader.render_log_filename(ti, task_try_number)
+
+        return Response(
+            logs,
+            mimetype="text/plain",
+            headers={
+                "Content-Disposition": f"attachment; filename={attachment_filename}"

Review comment:
       It should open in your browser to make it easier to view. Kubernetes API works in a similar way.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org