You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/06/18 19:34:18 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #9391: Extract TaskLogReader from views for uniform use in API & webserver and improve tests

ephraimbuddy opened a new pull request #9391:
URL: https://github.com/apache/airflow/pull/9391


   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [ ] Description above provides context of the change
   - [ ] Unit tests coverage for changes (not needed for documentation changes)
   - [ ] Target Github ISSUE in description if exists
   - [ ] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [ ] Relevant documentation is updated including usage instructions.
   - [ ] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#discussion_r442738286



##########
File path: airflow/utils/log/log_reader.py
##########
@@ -0,0 +1,119 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from typing import Any, Dict, Iterator, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.configuration import conf
+from airflow.models import TaskInstance
+from airflow.utils.helpers import render_log_filename
+
+
+class TaskLogReader:
+    """ Task log reader"""
+
+    def read_log_chunks(self, ti: TaskInstance, try_number: Optional[int],
+                        metadata) -> Tuple[List[str], Dict[str, Any]]:
+        """
+        Reads chunks of Task Instance logs.
+
+        :param ti: The taskInstance
+        :type ti: TaskInstance
+        :param try_number: If provided, logs for the given try will be returned.
+            Otherwise, logs from all attempts are returned.
+        :type try_number: Optional[int]
+        :param metadata: A dictionary containing information about how to read the task log
+        :type metadata: dict
+        :rtype: Tuple[List[str], Dict[str, Any]]
+
+        The following is an example of how to use this method to read log:
+
+        .. code-block:: python
+
+            logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
+            logs = logs[0] if try_number is not None else logs
+
+        where task_log_reader is an instance of TaskLogReader. The metadata will always
+        contain information about the task log which can enable you read logs to the
+        end.
+        """
+
+        logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
+        metadata = metadatas[0]
+        return logs, metadata
+
+    def read_log_stream(self, ti: TaskInstance, try_number: Optional[int],
+                        metadata: dict) -> Iterator[str]:
+        """
+        Used to continuously read log to the end
+
+        :param ti: The Task Instance
+        :type ti: TaskInstance
+        :param try_number: the task try number
+        :type try_number: Optional[int]
+        :param metadata: A dictionary containing information about how to read the task log
+        :type metadata: dict
+        :rtype: Iterator[str]
+        """
+
+        if try_number is None:
+            next_try = ti.next_try_number
+            try_numbers = list(range(1, next_try))
+        else:
+            try_numbers = [try_number]
+        for current_try_number in try_numbers:
+            metadata.pop('end_of_log', None)
+            metadata.pop('max_offset', None)
+            metadata.pop('offset', None)
+            while 'end_of_log' not in metadata or not metadata['end_of_log']:
+                logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
+                yield "\n".join(logs) + "\n"

Review comment:
       The key may not exist and if it exist, is a boolean




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#discussion_r442512310



##########
File path: airflow/utils/log/log_reader.py
##########
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.configuration import conf
+from airflow.models import TaskInstance
+from airflow.utils.helpers import render_log_filename
+
+
+class TaskLogReader:
+    """ Task log reader"""
+
+    def read_log_chunks(self, ti: TaskInstance, try_number: Optional[int],
+                        metadata) -> Tuple[List[str], Dict[str, Any]]:
+        """
+        Reads chunks of Task Instance logs.
+

Review comment:
       Could you describe here how to use this function? I would be happy to add a code snippet to explain how to use metadata.
   ```
               while 'end_of_log' not in metadata or not metadata['end_of_log']:
                   logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
                  for log in logs:
                      print(log)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#discussion_r442520906



##########
File path: airflow/utils/log/log_reader.py
##########
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.configuration import conf
+from airflow.models import TaskInstance
+from airflow.utils.helpers import render_log_filename
+
+
+class TaskLogReader:
+    """ Task log reader"""
+
+    def read_log_chunks(self, ti: TaskInstance, try_number: Optional[int],
+                        metadata) -> Tuple[List[str], Dict[str, Any]]:
+        """
+         Reads logs in chunks
+        :param ti: The taskInstance
+        :type ti: TaskInstance
+        :param try_number: The taskInstance try_number
+        :type try_number: Optional[int]
+        :param metadata: A dictionary containing information about how to read the task
+        :type metadata: dict
+
+        """
+
+        logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
+        metadata = metadatas[0]
+        return logs, metadata
+
+    def read_log_stream(self, ti: TaskInstance, try_number: Optional[int], metadata: dict):

Review comment:
       Fixed https://github.com/apache/airflow/pull/9391/commits/f8298101d216b62a3c598f444e5939f220c813b5




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#issuecomment-646323685


   If we checkmark the box, let's actually follow the guidelines:
   
   ![image](https://user-images.githubusercontent.com/8811558/85075467-24ee9080-b1b6-11ea-87a9-88519d759caf.png)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#discussion_r442503167



##########
File path: airflow/utils/log/log_reader.py
##########
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.configuration import conf
+from airflow.models import TaskInstance
+from airflow.utils.helpers import render_log_filename
+
+
+class TaskLogReader:
+    """ Task log reader"""
+
+    def read_log_chunks(self, ti: TaskInstance, try_number: Optional[int],
+                        metadata) -> Tuple[List[str], Dict[str, Any]]:
+        """
+         Reads logs in chunks
+        :param ti: The taskInstance
+        :type ti: TaskInstance
+        :param try_number: The taskInstance try_number

Review comment:
       ```suggestion
           :param try_number: If provided, logs for the given try will be returned. Otherwise, logs from all attempts are returned.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#issuecomment-646323421


   Please add a description to the PR


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#discussion_r442750599



##########
File path: airflow/utils/log/log_reader.py
##########
@@ -0,0 +1,119 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from typing import Any, Dict, Iterator, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.configuration import conf
+from airflow.models import TaskInstance
+from airflow.utils.helpers import render_log_filename
+
+
+class TaskLogReader:

Review comment:
       Fixed. See https://github.com/apache/airflow/pull/9391/commits/e3bc183a1183e9fe0290199b19751a9b4173f25c




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9391: Extract TaskLogReader from views for uniform use in API & webserver and improve tests

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#discussion_r442497333



##########
File path: airflow/www/views.py
##########
@@ -679,62 +679,53 @@ def get_logs_with_metadata(self, session=None):
 
             return response
 
-        logger = logging.getLogger('airflow.task')
-        task_log_reader = conf.get('logging', 'task_log_reader')
-        handler = next((handler for handler in logger.handlers
-                        if handler.name == task_log_reader), None)
+        task_log_reader = TaskLogReader()
+        if not task_log_reader.is_supported:
+            return jsonify(
+                message="Task log handler does not support read logs.",

Review comment:
       Does Javascript need to be updated? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#discussion_r442504211



##########
File path: airflow/utils/log/log_reader.py
##########
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.configuration import conf
+from airflow.models import TaskInstance
+from airflow.utils.helpers import render_log_filename
+
+
+class TaskLogReader:
+    """ Task log reader"""
+
+    def read_log_chunks(self, ti: TaskInstance, try_number: Optional[int],
+                        metadata) -> Tuple[List[str], Dict[str, Any]]:
+        """
+         Reads logs in chunks
+        :param ti: The taskInstance
+        :type ti: TaskInstance
+        :param try_number: The taskInstance try_number
+        :type try_number: Optional[int]
+        :param metadata: A dictionary containing information about how to read the task
+        :type metadata: dict
+
+        """
+
+        logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
+        metadata = metadatas[0]
+        return logs, metadata
+
+    def read_log_stream(self, ti: TaskInstance, try_number: Optional[int], metadata: dict):
+        """
+        Used to continuously read log to the end
+
+        :param ti: The Task Instance
+        :type ti: TaskInstance
+        :param try_number: the task try number
+        :type try_number: Optional[int]
+        :param metadata: A dictionary containing information about how to read the task
+        :type metadata: dict
+
+        """
+
+        if try_number is None:
+            next_try = ti.next_try_number
+            try_numbers = list(range(1, next_try))
+        else:
+            try_numbers = [try_number]
+        for current_try_number in try_numbers:
+            metadata.pop('end_of_log', None)
+            metadata.pop('max_offset', None)
+            metadata.pop('offset', None)
+            while 'end_of_log' not in metadata or not metadata['end_of_log']:
+                logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
+                yield "\n".join(logs) + "\n"
+
+    @cached_property
+    def log_handler(self):
+        """The log handler"""
+
+        logger = logging.getLogger('airflow.task')
+        task_log_reader = conf.get('logging', 'task_log_reader')
+        handler = next((handler for handler in logger.handlers if handler.name == task_log_reader), None)
+        return handler
+
+    @property
+    def is_supported(self):
+        """ Checks if read method is supported by a given log handler"""

Review comment:
       ```suggestion
           """Checks if a read operation is supported by a current log handler."""
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#discussion_r442738286



##########
File path: airflow/utils/log/log_reader.py
##########
@@ -0,0 +1,119 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from typing import Any, Dict, Iterator, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.configuration import conf
+from airflow.models import TaskInstance
+from airflow.utils.helpers import render_log_filename
+
+
+class TaskLogReader:
+    """ Task log reader"""
+
+    def read_log_chunks(self, ti: TaskInstance, try_number: Optional[int],
+                        metadata) -> Tuple[List[str], Dict[str, Any]]:
+        """
+        Reads chunks of Task Instance logs.
+
+        :param ti: The taskInstance
+        :type ti: TaskInstance
+        :param try_number: If provided, logs for the given try will be returned.
+            Otherwise, logs from all attempts are returned.
+        :type try_number: Optional[int]
+        :param metadata: A dictionary containing information about how to read the task log
+        :type metadata: dict
+        :rtype: Tuple[List[str], Dict[str, Any]]
+
+        The following is an example of how to use this method to read log:
+
+        .. code-block:: python
+
+            logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
+            logs = logs[0] if try_number is not None else logs
+
+        where task_log_reader is an instance of TaskLogReader. The metadata will always
+        contain information about the task log which can enable you read logs to the
+        end.
+        """
+
+        logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
+        metadata = metadatas[0]
+        return logs, metadata
+
+    def read_log_stream(self, ti: TaskInstance, try_number: Optional[int],
+                        metadata: dict) -> Iterator[str]:
+        """
+        Used to continuously read log to the end
+
+        :param ti: The Task Instance
+        :type ti: TaskInstance
+        :param try_number: the task try number
+        :type try_number: Optional[int]
+        :param metadata: A dictionary containing information about how to read the task log
+        :type metadata: dict
+        :rtype: Iterator[str]
+        """
+
+        if try_number is None:
+            next_try = ti.next_try_number
+            try_numbers = list(range(1, next_try))
+        else:
+            try_numbers = [try_number]
+        for current_try_number in try_numbers:
+            metadata.pop('end_of_log', None)
+            metadata.pop('max_offset', None)
+            metadata.pop('offset', None)
+            while 'end_of_log' not in metadata or not metadata['end_of_log']:
+                logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
+                yield "\n".join(logs) + "\n"

Review comment:
       The key may not exist and if it exist, is a boolean. There's also a lot of local variables on the function already




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#discussion_r442502120



##########
File path: airflow/utils/log/log_reader.py
##########
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.configuration import conf
+from airflow.models import TaskInstance
+from airflow.utils.helpers import render_log_filename
+
+
+class TaskLogReader:
+    """ Task log reader"""
+
+    def read_log_chunks(self, ti: TaskInstance, try_number: Optional[int],
+                        metadata) -> Tuple[List[str], Dict[str, Any]]:
+        """
+         Reads logs in chunks

Review comment:
       ```suggestion
           Reads chunks of Task Instance logs.
   
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#discussion_r442728887



##########
File path: airflow/utils/log/log_reader.py
##########
@@ -0,0 +1,119 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from typing import Any, Dict, Iterator, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.configuration import conf
+from airflow.models import TaskInstance
+from airflow.utils.helpers import render_log_filename
+
+
+class TaskLogReader:

Review comment:
       Sorry, I messed up on rebase. I'm sorting it out now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj merged pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
mik-laj merged pull request #9391:
URL: https://github.com/apache/airflow/pull/9391


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#discussion_r442738286



##########
File path: airflow/utils/log/log_reader.py
##########
@@ -0,0 +1,119 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from typing import Any, Dict, Iterator, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.configuration import conf
+from airflow.models import TaskInstance
+from airflow.utils.helpers import render_log_filename
+
+
+class TaskLogReader:
+    """ Task log reader"""
+
+    def read_log_chunks(self, ti: TaskInstance, try_number: Optional[int],
+                        metadata) -> Tuple[List[str], Dict[str, Any]]:
+        """
+        Reads chunks of Task Instance logs.
+
+        :param ti: The taskInstance
+        :type ti: TaskInstance
+        :param try_number: If provided, logs for the given try will be returned.
+            Otherwise, logs from all attempts are returned.
+        :type try_number: Optional[int]
+        :param metadata: A dictionary containing information about how to read the task log
+        :type metadata: dict
+        :rtype: Tuple[List[str], Dict[str, Any]]
+
+        The following is an example of how to use this method to read log:
+
+        .. code-block:: python
+
+            logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
+            logs = logs[0] if try_number is not None else logs
+
+        where task_log_reader is an instance of TaskLogReader. The metadata will always
+        contain information about the task log which can enable you read logs to the
+        end.
+        """
+
+        logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
+        metadata = metadatas[0]
+        return logs, metadata
+
+    def read_log_stream(self, ti: TaskInstance, try_number: Optional[int],
+                        metadata: dict) -> Iterator[str]:
+        """
+        Used to continuously read log to the end
+
+        :param ti: The Task Instance
+        :type ti: TaskInstance
+        :param try_number: the task try number
+        :type try_number: Optional[int]
+        :param metadata: A dictionary containing information about how to read the task log
+        :type metadata: dict
+        :rtype: Iterator[str]
+        """
+
+        if try_number is None:
+            next_try = ti.next_try_number
+            try_numbers = list(range(1, next_try))
+        else:
+            try_numbers = [try_number]
+        for current_try_number in try_numbers:
+            metadata.pop('end_of_log', None)
+            metadata.pop('max_offset', None)
+            metadata.pop('offset', None)
+            while 'end_of_log' not in metadata or not metadata['end_of_log']:
+                logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
+                yield "\n".join(logs) + "\n"

Review comment:
       The key may not exist and if it exist, is a boolean. There's also a log of local variables on the function already




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#issuecomment-646304842


   @KevinYang21  Could you look at it? We need to extract logic to be able to reuse it in the API.@KevinYang21 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#issuecomment-646353622


   Hi @kaxil, please take a look, I have updated the description. Thanks for pointing this out.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#issuecomment-646518708


   @ephraimbuddy Can you do a rebase?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#discussion_r442503987



##########
File path: airflow/utils/log/log_reader.py
##########
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.configuration import conf
+from airflow.models import TaskInstance
+from airflow.utils.helpers import render_log_filename
+
+
+class TaskLogReader:
+    """ Task log reader"""
+
+    def read_log_chunks(self, ti: TaskInstance, try_number: Optional[int],
+                        metadata) -> Tuple[List[str], Dict[str, Any]]:
+        """
+         Reads logs in chunks
+        :param ti: The taskInstance
+        :type ti: TaskInstance
+        :param try_number: The taskInstance try_number
+        :type try_number: Optional[int]
+        :param metadata: A dictionary containing information about how to read the task
+        :type metadata: dict
+
+        """
+
+        logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
+        metadata = metadatas[0]
+        return logs, metadata
+
+    def read_log_stream(self, ti: TaskInstance, try_number: Optional[int], metadata: dict):
+        """
+        Used to continuously read log to the end
+
+        :param ti: The Task Instance
+        :type ti: TaskInstance
+        :param try_number: the task try number
+        :type try_number: Optional[int]
+        :param metadata: A dictionary containing information about how to read the task
+        :type metadata: dict
+
+        """
+
+        if try_number is None:
+            next_try = ti.next_try_number
+            try_numbers = list(range(1, next_try))
+        else:
+            try_numbers = [try_number]
+        for current_try_number in try_numbers:
+            metadata.pop('end_of_log', None)
+            metadata.pop('max_offset', None)
+            metadata.pop('offset', None)
+            while 'end_of_log' not in metadata or not metadata['end_of_log']:
+                logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
+                yield "\n".join(logs) + "\n"
+
+    @cached_property
+    def log_handler(self):
+        """The log handler"""

Review comment:
       ```suggestion
           """Log handler, which is configured to read logs."""
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#discussion_r442538254



##########
File path: airflow/www/views.py
##########
@@ -679,62 +679,53 @@ def get_logs_with_metadata(self, session=None):
 
             return response
 
-        logger = logging.getLogger('airflow.task')
-        task_log_reader = conf.get('logging', 'task_log_reader')
-        handler = next((handler for handler in logger.handlers
-                        if handler.name == task_log_reader), None)
+        task_log_reader = TaskLogReader()
+        if not task_log_reader.is_supported:
+            return jsonify(
+                message="Task log handler does not support read logs.",

Review comment:
       Everything is fine with the logging on the website

##########
File path: airflow/www/views.py
##########
@@ -679,62 +679,53 @@ def get_logs_with_metadata(self, session=None):
 
             return response
 
-        logger = logging.getLogger('airflow.task')
-        task_log_reader = conf.get('logging', 'task_log_reader')
-        handler = next((handler for handler in logger.handlers
-                        if handler.name == task_log_reader), None)
+        task_log_reader = TaskLogReader()
+        if not task_log_reader.is_supported:
+            return jsonify(
+                message="Task log handler does not support read logs.",

Review comment:
       Checking...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#discussion_r442503491



##########
File path: airflow/utils/log/log_reader.py
##########
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.configuration import conf
+from airflow.models import TaskInstance
+from airflow.utils.helpers import render_log_filename
+
+
+class TaskLogReader:
+    """ Task log reader"""
+
+    def read_log_chunks(self, ti: TaskInstance, try_number: Optional[int],
+                        metadata) -> Tuple[List[str], Dict[str, Any]]:
+        """
+         Reads logs in chunks
+        :param ti: The taskInstance
+        :type ti: TaskInstance
+        :param try_number: The taskInstance try_number
+        :type try_number: Optional[int]
+        :param metadata: A dictionary containing information about how to read the task
+        :type metadata: dict
+
+        """
+
+        logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
+        metadata = metadatas[0]
+        return logs, metadata
+
+    def read_log_stream(self, ti: TaskInstance, try_number: Optional[int], metadata: dict):

Review comment:
       Can you add return type?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#discussion_r442535463



##########
File path: airflow/www/views.py
##########
@@ -679,62 +679,53 @@ def get_logs_with_metadata(self, session=None):
 
             return response
 
-        logger = logging.getLogger('airflow.task')
-        task_log_reader = conf.get('logging', 'task_log_reader')
-        handler = next((handler for handler in logger.handlers
-                        if handler.name == task_log_reader), None)
+        task_log_reader = TaskLogReader()
+        if not task_log_reader.is_supported:
+            return jsonify(
+                message="Task log handler does not support read logs.",

Review comment:
       Checking...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#discussion_r442503313



##########
File path: airflow/utils/log/log_reader.py
##########
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.configuration import conf
+from airflow.models import TaskInstance
+from airflow.utils.helpers import render_log_filename
+
+
+class TaskLogReader:
+    """ Task log reader"""
+
+    def read_log_chunks(self, ti: TaskInstance, try_number: Optional[int],
+                        metadata) -> Tuple[List[str], Dict[str, Any]]:
+        """
+         Reads logs in chunks
+        :param ti: The taskInstance
+        :type ti: TaskInstance
+        :param try_number: The taskInstance try_number
+        :type try_number: Optional[int]
+        :param metadata: A dictionary containing information about how to read the task
+        :type metadata: dict
+

Review comment:
       Can you add return type?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9391: Extract TaskLogReader from views.py

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9391:
URL: https://github.com/apache/airflow/pull/9391#discussion_r442721441



##########
File path: airflow/utils/log/log_reader.py
##########
@@ -0,0 +1,119 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from typing import Any, Dict, Iterator, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.configuration import conf
+from airflow.models import TaskInstance
+from airflow.utils.helpers import render_log_filename
+
+
+class TaskLogReader:
+    """ Task log reader"""
+
+    def read_log_chunks(self, ti: TaskInstance, try_number: Optional[int],
+                        metadata) -> Tuple[List[str], Dict[str, Any]]:
+        """
+        Reads chunks of Task Instance logs.
+
+        :param ti: The taskInstance
+        :type ti: TaskInstance
+        :param try_number: If provided, logs for the given try will be returned.
+            Otherwise, logs from all attempts are returned.
+        :type try_number: Optional[int]
+        :param metadata: A dictionary containing information about how to read the task log
+        :type metadata: dict
+        :rtype: Tuple[List[str], Dict[str, Any]]
+
+        The following is an example of how to use this method to read log:
+
+        .. code-block:: python
+
+            logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
+            logs = logs[0] if try_number is not None else logs
+
+        where task_log_reader is an instance of TaskLogReader. The metadata will always
+        contain information about the task log which can enable you read logs to the
+        end.
+        """
+
+        logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
+        metadata = metadatas[0]
+        return logs, metadata
+
+    def read_log_stream(self, ti: TaskInstance, try_number: Optional[int],
+                        metadata: dict) -> Iterator[str]:
+        """
+        Used to continuously read log to the end
+
+        :param ti: The Task Instance
+        :type ti: TaskInstance
+        :param try_number: the task try number
+        :type try_number: Optional[int]
+        :param metadata: A dictionary containing information about how to read the task log
+        :type metadata: dict
+        :rtype: Iterator[str]
+        """
+
+        if try_number is None:
+            next_try = ti.next_try_number
+            try_numbers = list(range(1, next_try))
+        else:
+            try_numbers = [try_number]
+        for current_try_number in try_numbers:
+            metadata.pop('end_of_log', None)
+            metadata.pop('max_offset', None)
+            metadata.pop('offset', None)
+            while 'end_of_log' not in metadata or not metadata['end_of_log']:
+                logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
+                yield "\n".join(logs) + "\n"

Review comment:
       What would you say to use variable?
   ```
   end_of_log = metadata.get("end_of_log")
   ```

##########
File path: airflow/utils/log/log_reader.py
##########
@@ -0,0 +1,119 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from typing import Any, Dict, Iterator, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.configuration import conf
+from airflow.models import TaskInstance
+from airflow.utils.helpers import render_log_filename
+
+
+class TaskLogReader:

Review comment:
       Do I miss something or this class is only used in tests?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org