You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/27 07:59:48 UTC

[GitHub] [airflow] whatnick opened a new pull request, #26004: [Loki log handler] - Integration with Grafana Loki

whatnick opened a new pull request, #26004:
URL: https://github.com/apache/airflow/pull/26004

   This Pull request will make the following changes:
   
   Create a log handler that pushes task logs to Loki with labels and using the same labels pull logs to Airflow Web UI. This is a bump of the previous PR by @sshah90 and attempts to cherry pick changes for review and integration of maintainer feedback.
   
   related: 
   #12920
   #12925 
   
   Note:
   
   I had added documentation at [logging-tasks.rst](https://github.com/apache/airflow/blob/master/docs/apache-airflow/logging-monitoring/logging-tasks.rst) file but looks like all log handler docs have been moved to providers folders so I make change accordingly and update the same PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #26004: [Loki log handler] - Integration with Grafana Loki

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #26004:
URL: https://github.com/apache/airflow/pull/26004#issuecomment-1382598915

   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hterik commented on a diff in pull request #26004: [Loki log handler] - Integration with Grafana Loki

Posted by GitBox <gi...@apache.org>.
hterik commented on code in PR #26004:
URL: https://github.com/apache/airflow/pull/26004#discussion_r980779167


##########
airflow/providers/grafana/log/loki_task_handler.py:
##########
@@ -0,0 +1,242 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Loki logging handler for tasks"""
+import time
+from typing import Dict, Optional, Tuple, Union
+
+import logging_loki
+import requests
+from cached_property import cached_property
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.models import TaskInstance
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+DEFAULT_LOGGER_NAME = "airflow"
+
+
+class LokiTaskHandler(FileTaskHandler, LoggingMixin):
+    """
+    LokiTaskHandler that directly makes Loki logging API calls while reading and writing logs.
+    This is a Python standard ``logging`` handler using that can be used to route Python standard
+    logging messages directly to the Loki Logging API. It can also be used to save logs for
+    executing tasks. To do this, you should set as a handler with the name "tasks". In this case,
+    it will also be used to read the log for display in Web UI.
+    :param base_log_folder: Base log folder to place logs (incase Loki is down).
+    :type base_log_folder: str
+    :param filename_template: template filename string (incase Loki is down)
+    :type filename_template: str
+    :param loki_conn_id: Connection ID that will be used for authorization to the Loki Platform.
+    :type loki_conn_id: str
+    :param name: the name of the custom log in Loki Logging. Defaults to 'airflow'.
+    :type name: str
+    :param labels: (Optional) Mapping of labels for the entry.
+    :type labels: dict
+    """
+
+    # pylint: disable=too-many-arguments
+    def __init__(
+        self,
+        base_log_folder: str,
+        filename_template: str,
+        loki_conn_id: str,
+        name: str = DEFAULT_LOGGER_NAME,
+        labels: Optional[Dict[str, str]] = None,
+    ):
+        super().__init__(base_log_folder, filename_template)
+        self.loki_conn_id = loki_conn_id
+        self.name: str = name
+        self.timestamp_pattern = "%Y-%m-%dT%H:%M:%S"
+        self.labels = labels
+        self._session: Optional[requests.Session] = None
+
+    @cached_property
+    def get_conn(self):
+        """Loki connection for client"""
+        return BaseHook.get_connection(self.loki_conn_id)
+
+    @property
+    def session(self) -> requests.Session:
+        """Create HTTP session"""
+        if self._session is None:
+            self._session = requests.Session()
+            self._session.auth = (self.get_conn.login, self.get_conn.password) or None
+        return self._session
+
+    def is_loki_alive(self):
+        """Checks whether Loki is ready for pushing/pulling logs"""
+        try:
+            status = self.session.get(
+                f"{self.get_conn.host}/ready",
+            )
+            return status.status_code
+        except ConnectionError as error_msg:
+            self.log.exception(error_msg)
+            return None
+
+    @staticmethod
+    def _task_label(task_instance: TaskInstance) -> Dict[str, str]:
+        """
+        Returns task instance labels for Loki which will use while reading
+        and writing logs from loki.
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        # Not adding execution date since it violates Loki label standards
+        # https://grafana.com/blog/2020/08/27/the-concise-guide-to-labels-in-loki/
+
+        return {
+            "airflow_dag_id": task_instance.dag_id,
+            "airflow_task_id": task_instance.task_id,
+            "airflow_try_number": str(task_instance.try_number),
+        }
+
+    def get_label(self, task_instance: TaskInstance) -> Dict[str, str]:
+        """
+        Update task_labels with optional labels and return Loki labels.
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        tags = {}
+        task_labels = self._task_label(task_instance)
+        if self.labels:
+            tags.update(self.labels)
+            tags.update(task_labels)
+            return tags
+        return task_labels
+
+    def client(self, task_instance: TaskInstance) -> logging_loki.handlers.LokiHandler:
+        """
+        Returns Loki logger handler using connection and label to write logs
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        return logging_loki.LokiHandler(
+            url=f"{self.get_conn.host}/loki/api/v1/push",
+            tags=self.get_label(task_instance),
+            version="1",
+            auth=(f"{self.get_conn.login}", f"{self.get_conn.password}"),
+        )
+
+    def set_context(self, task_instance: TaskInstance) -> None:
+        """
+        Provide task_instance context to airflow task handler.
+        If Loki is not available then will use Filehandler context
+        to avoid losing log data.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        """
+        if self.is_loki_alive() == 200:
+            self.handler: logging_loki.handlers.LokiHandler = self.client(task_instance)
+            if self.formatter:
+                self.handler.setFormatter(self.formatter)
+                self.handler.setLevel(self.level)
+        else:
+            super().set_context(task_instance)  # Filehandler Context
+
+    def convert_epoch_time(self, timestamp: str) -> int:
+        """Convert timestamp to epoch"""
+        return int(time.mktime(time.strptime(timestamp[:19], self.timestamp_pattern)))
+
+    def query_time_range(
+        self,
+        task_instance: TaskInstance,
+    ) -> Tuple[int, int]:
+        """
+        Return task start and end time in epoch format for query.
+        If Task's try_number > 1 then it will use task execution time as start time.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        """
+        start_time, end_time, execution_time = (
+            str(task_instance.start_date.isoformat()),
+            task_instance.end_date,
+            str(task_instance.execution_date.isoformat()),
+        )
+        next_try = task_instance.next_try_number
+        start_epoch_time = self.convert_epoch_time(start_time)
+
+        if end_time and next_try < 3:
+            end_epoch_time = self.convert_epoch_time(end_time.isoformat())
+            return (start_epoch_time - 60, end_epoch_time + 60 * 2)
+        if end_time:
+            end_epoch_time = self.convert_epoch_time(end_time.isoformat())
+            execution_time_epoch_time = self.convert_epoch_time(execution_time)
+            return (execution_time_epoch_time - 60, end_epoch_time + 60 * 2)
+        return (start_epoch_time - 60, start_epoch_time + 60 * 60 * 24)
+
+    def get_read_parameters(self, task_instance, try_number) -> Dict[str, Union[str, int]]:
+        """
+        Construct query(LogQL) and update parameters for loki read request.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        :param try_number: current try_number to read log from
+        :type: try_number: int
+        """
+        start_time, end_time = self.query_time_range(task_instance)
+        task_lables = self.get_label(task_instance)
+        del task_lables["airflow_try_number"]  # try_number will come from read method
+
+        # converting task_lables(dict) to LogQL format
+        # {"dag_id":"airflow"} --> {dag_id = "airflow"}
+        query = "{"
+        for key, value in task_lables.items():
+            query += f"""{key}="{value}", """
+        query += f"""airflow_try_number="{try_number}"}} != "WARNING" """
+
+        return {
+            "direction": "forward",
+            "start": start_time,
+            "end": end_time,
+            "query": query,
+            "limit": 5000,
+        }
+
+    def _read(self, task_instance, try_number, metadata=None) -> Tuple[str, Dict[str, bool]]:
+        """
+        Return list of logs string for each task try number by streaming it from Loki
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        :param try_number: current try_number to read log from
+        :type: try_number: int
+        """
+        logs = ""
+        if self.is_loki_alive() == 200:
+            query_parameters = self.get_read_parameters(task_instance, try_number)
+            logs += f"*** Loki Query:{query_parameters['query']} \n"
+            logs += (
+                f"*** Epoch start_time: {query_parameters['start']} "
+                f"and end_time {query_parameters['end']}\n"
+            )
+            try:
+                response = self.session.get(
+                    f"{self.get_conn.host}/loki/api/v1/query_range",
+                    params=query_parameters,
+                ).json()
+                results = response["data"]["result"]
+                logs_list = [value for result in results for value in result["values"]]
+                # sorting with list of values with timestamp.

Review Comment:
   In case of multiple streams, i believe this sorting can be done more efficiently as well, since each individual stream is already sorted. 
   Instead of putting all values together into one logs_list and then sorting the whole list, you can iterate one item at a time from each of the value-lists, but only pick the one with highest timestamp when appending to the final log list. This should be O(N) rather than O(N logN) and one less list-comprehension-in-list-comprehension.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hterik commented on a diff in pull request #26004: [Loki log handler] - Integration with Grafana Loki

Posted by GitBox <gi...@apache.org>.
hterik commented on code in PR #26004:
URL: https://github.com/apache/airflow/pull/26004#discussion_r980750848


##########
airflow/providers/grafana/log/loki_task_handler.py:
##########
@@ -0,0 +1,242 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Loki logging handler for tasks"""
+import time
+from typing import Dict, Optional, Tuple, Union
+
+import logging_loki
+import requests
+from cached_property import cached_property
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.models import TaskInstance
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+DEFAULT_LOGGER_NAME = "airflow"
+
+
+class LokiTaskHandler(FileTaskHandler, LoggingMixin):
+    """
+    LokiTaskHandler that directly makes Loki logging API calls while reading and writing logs.
+    This is a Python standard ``logging`` handler using that can be used to route Python standard
+    logging messages directly to the Loki Logging API. It can also be used to save logs for
+    executing tasks. To do this, you should set as a handler with the name "tasks". In this case,
+    it will also be used to read the log for display in Web UI.
+    :param base_log_folder: Base log folder to place logs (incase Loki is down).
+    :type base_log_folder: str
+    :param filename_template: template filename string (incase Loki is down)
+    :type filename_template: str
+    :param loki_conn_id: Connection ID that will be used for authorization to the Loki Platform.
+    :type loki_conn_id: str
+    :param name: the name of the custom log in Loki Logging. Defaults to 'airflow'.
+    :type name: str
+    :param labels: (Optional) Mapping of labels for the entry.
+    :type labels: dict
+    """
+
+    # pylint: disable=too-many-arguments
+    def __init__(
+        self,
+        base_log_folder: str,
+        filename_template: str,
+        loki_conn_id: str,
+        name: str = DEFAULT_LOGGER_NAME,
+        labels: Optional[Dict[str, str]] = None,
+    ):
+        super().__init__(base_log_folder, filename_template)
+        self.loki_conn_id = loki_conn_id
+        self.name: str = name
+        self.timestamp_pattern = "%Y-%m-%dT%H:%M:%S"
+        self.labels = labels
+        self._session: Optional[requests.Session] = None
+
+    @cached_property
+    def get_conn(self):
+        """Loki connection for client"""
+        return BaseHook.get_connection(self.loki_conn_id)
+
+    @property
+    def session(self) -> requests.Session:
+        """Create HTTP session"""
+        if self._session is None:
+            self._session = requests.Session()
+            self._session.auth = (self.get_conn.login, self.get_conn.password) or None
+        return self._session
+
+    def is_loki_alive(self):
+        """Checks whether Loki is ready for pushing/pulling logs"""
+        try:
+            status = self.session.get(
+                f"{self.get_conn.host}/ready",
+            )
+            return status.status_code
+        except ConnectionError as error_msg:
+            self.log.exception(error_msg)
+            return None
+
+    @staticmethod
+    def _task_label(task_instance: TaskInstance) -> Dict[str, str]:
+        """
+        Returns task instance labels for Loki which will use while reading
+        and writing logs from loki.
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        # Not adding execution date since it violates Loki label standards
+        # https://grafana.com/blog/2020/08/27/the-concise-guide-to-labels-in-loki/
+
+        return {
+            "airflow_dag_id": task_instance.dag_id,
+            "airflow_task_id": task_instance.task_id,
+            "airflow_try_number": str(task_instance.try_number),
+        }
+
+    def get_label(self, task_instance: TaskInstance) -> Dict[str, str]:
+        """
+        Update task_labels with optional labels and return Loki labels.
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        tags = {}
+        task_labels = self._task_label(task_instance)
+        if self.labels:
+            tags.update(self.labels)
+            tags.update(task_labels)
+            return tags
+        return task_labels
+
+    def client(self, task_instance: TaskInstance) -> logging_loki.handlers.LokiHandler:
+        """
+        Returns Loki logger handler using connection and label to write logs
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        return logging_loki.LokiHandler(
+            url=f"{self.get_conn.host}/loki/api/v1/push",
+            tags=self.get_label(task_instance),
+            version="1",
+            auth=(f"{self.get_conn.login}", f"{self.get_conn.password}"),
+        )
+
+    def set_context(self, task_instance: TaskInstance) -> None:
+        """
+        Provide task_instance context to airflow task handler.
+        If Loki is not available then will use Filehandler context
+        to avoid losing log data.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        """
+        if self.is_loki_alive() == 200:
+            self.handler: logging_loki.handlers.LokiHandler = self.client(task_instance)
+            if self.formatter:
+                self.handler.setFormatter(self.formatter)
+                self.handler.setLevel(self.level)
+        else:
+            super().set_context(task_instance)  # Filehandler Context
+
+    def convert_epoch_time(self, timestamp: str) -> int:
+        """Convert timestamp to epoch"""
+        return int(time.mktime(time.strptime(timestamp[:19], self.timestamp_pattern)))
+
+    def query_time_range(
+        self,
+        task_instance: TaskInstance,
+    ) -> Tuple[int, int]:
+        """
+        Return task start and end time in epoch format for query.
+        If Task's try_number > 1 then it will use task execution time as start time.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        """
+        start_time, end_time, execution_time = (
+            str(task_instance.start_date.isoformat()),
+            task_instance.end_date,
+            str(task_instance.execution_date.isoformat()),
+        )
+        next_try = task_instance.next_try_number
+        start_epoch_time = self.convert_epoch_time(start_time)
+
+        if end_time and next_try < 3:
+            end_epoch_time = self.convert_epoch_time(end_time.isoformat())
+            return (start_epoch_time - 60, end_epoch_time + 60 * 2)
+        if end_time:
+            end_epoch_time = self.convert_epoch_time(end_time.isoformat())
+            execution_time_epoch_time = self.convert_epoch_time(execution_time)
+            return (execution_time_epoch_time - 60, end_epoch_time + 60 * 2)
+        return (start_epoch_time - 60, start_epoch_time + 60 * 60 * 24)

Review Comment:
   Should end of range here be datetime.now instead? Otherwise it will not be possible to see logs for tasks in progress that have been running for over 24 hours. Maybe use `task_instance.execution_timeout` if set ?



##########
airflow/providers/grafana/log/loki_task_handler.py:
##########
@@ -0,0 +1,242 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Loki logging handler for tasks"""
+import time
+from typing import Dict, Optional, Tuple, Union
+
+import logging_loki
+import requests
+from cached_property import cached_property
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.models import TaskInstance
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+DEFAULT_LOGGER_NAME = "airflow"
+
+
+class LokiTaskHandler(FileTaskHandler, LoggingMixin):
+    """
+    LokiTaskHandler that directly makes Loki logging API calls while reading and writing logs.
+    This is a Python standard ``logging`` handler using that can be used to route Python standard
+    logging messages directly to the Loki Logging API. It can also be used to save logs for
+    executing tasks. To do this, you should set as a handler with the name "tasks". In this case,
+    it will also be used to read the log for display in Web UI.
+    :param base_log_folder: Base log folder to place logs (incase Loki is down).
+    :type base_log_folder: str
+    :param filename_template: template filename string (incase Loki is down)
+    :type filename_template: str
+    :param loki_conn_id: Connection ID that will be used for authorization to the Loki Platform.
+    :type loki_conn_id: str
+    :param name: the name of the custom log in Loki Logging. Defaults to 'airflow'.
+    :type name: str
+    :param labels: (Optional) Mapping of labels for the entry.
+    :type labels: dict
+    """
+
+    # pylint: disable=too-many-arguments
+    def __init__(
+        self,
+        base_log_folder: str,
+        filename_template: str,
+        loki_conn_id: str,
+        name: str = DEFAULT_LOGGER_NAME,
+        labels: Optional[Dict[str, str]] = None,
+    ):
+        super().__init__(base_log_folder, filename_template)
+        self.loki_conn_id = loki_conn_id
+        self.name: str = name
+        self.timestamp_pattern = "%Y-%m-%dT%H:%M:%S"
+        self.labels = labels
+        self._session: Optional[requests.Session] = None
+
+    @cached_property
+    def get_conn(self):
+        """Loki connection for client"""
+        return BaseHook.get_connection(self.loki_conn_id)
+
+    @property
+    def session(self) -> requests.Session:
+        """Create HTTP session"""
+        if self._session is None:
+            self._session = requests.Session()
+            self._session.auth = (self.get_conn.login, self.get_conn.password) or None
+        return self._session
+
+    def is_loki_alive(self):
+        """Checks whether Loki is ready for pushing/pulling logs"""
+        try:
+            status = self.session.get(
+                f"{self.get_conn.host}/ready",
+            )
+            return status.status_code
+        except ConnectionError as error_msg:
+            self.log.exception(error_msg)
+            return None
+
+    @staticmethod
+    def _task_label(task_instance: TaskInstance) -> Dict[str, str]:
+        """
+        Returns task instance labels for Loki which will use while reading
+        and writing logs from loki.
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        # Not adding execution date since it violates Loki label standards
+        # https://grafana.com/blog/2020/08/27/the-concise-guide-to-labels-in-loki/
+
+        return {
+            "airflow_dag_id": task_instance.dag_id,
+            "airflow_task_id": task_instance.task_id,
+            "airflow_try_number": str(task_instance.try_number),
+        }
+
+    def get_label(self, task_instance: TaskInstance) -> Dict[str, str]:
+        """
+        Update task_labels with optional labels and return Loki labels.
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        tags = {}
+        task_labels = self._task_label(task_instance)
+        if self.labels:
+            tags.update(self.labels)
+            tags.update(task_labels)
+            return tags
+        return task_labels
+
+    def client(self, task_instance: TaskInstance) -> logging_loki.handlers.LokiHandler:
+        """
+        Returns Loki logger handler using connection and label to write logs
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        return logging_loki.LokiHandler(
+            url=f"{self.get_conn.host}/loki/api/v1/push",
+            tags=self.get_label(task_instance),
+            version="1",
+            auth=(f"{self.get_conn.login}", f"{self.get_conn.password}"),
+        )
+
+    def set_context(self, task_instance: TaskInstance) -> None:
+        """
+        Provide task_instance context to airflow task handler.
+        If Loki is not available then will use Filehandler context
+        to avoid losing log data.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        """
+        if self.is_loki_alive() == 200:
+            self.handler: logging_loki.handlers.LokiHandler = self.client(task_instance)
+            if self.formatter:
+                self.handler.setFormatter(self.formatter)
+                self.handler.setLevel(self.level)
+        else:
+            super().set_context(task_instance)  # Filehandler Context
+
+    def convert_epoch_time(self, timestamp: str) -> int:
+        """Convert timestamp to epoch"""
+        return int(time.mktime(time.strptime(timestamp[:19], self.timestamp_pattern)))
+
+    def query_time_range(
+        self,
+        task_instance: TaskInstance,
+    ) -> Tuple[int, int]:
+        """
+        Return task start and end time in epoch format for query.
+        If Task's try_number > 1 then it will use task execution time as start time.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        """
+        start_time, end_time, execution_time = (
+            str(task_instance.start_date.isoformat()),
+            task_instance.end_date,
+            str(task_instance.execution_date.isoformat()),
+        )
+        next_try = task_instance.next_try_number
+        start_epoch_time = self.convert_epoch_time(start_time)
+
+        if end_time and next_try < 3:
+            end_epoch_time = self.convert_epoch_time(end_time.isoformat())
+            return (start_epoch_time - 60, end_epoch_time + 60 * 2)
+        if end_time:
+            end_epoch_time = self.convert_epoch_time(end_time.isoformat())
+            execution_time_epoch_time = self.convert_epoch_time(execution_time)
+            return (execution_time_epoch_time - 60, end_epoch_time + 60 * 2)
+        return (start_epoch_time - 60, start_epoch_time + 60 * 60 * 24)
+
+    def get_read_parameters(self, task_instance, try_number) -> Dict[str, Union[str, int]]:
+        """
+        Construct query(LogQL) and update parameters for loki read request.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        :param try_number: current try_number to read log from
+        :type: try_number: int
+        """
+        start_time, end_time = self.query_time_range(task_instance)
+        task_lables = self.get_label(task_instance)
+        del task_lables["airflow_try_number"]  # try_number will come from read method
+
+        # converting task_lables(dict) to LogQL format
+        # {"dag_id":"airflow"} --> {dag_id = "airflow"}
+        query = "{"
+        for key, value in task_lables.items():
+            query += f"""{key}="{value}", """
+        query += f"""airflow_try_number="{try_number}"}} != "WARNING" """
+
+        return {
+            "direction": "forward",
+            "start": start_time,
+            "end": end_time,
+            "query": query,
+            "limit": 5000,
+        }
+
+    def _read(self, task_instance, try_number, metadata=None) -> Tuple[str, Dict[str, bool]]:
+        """
+        Return list of logs string for each task try number by streaming it from Loki
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        :param try_number: current try_number to read log from
+        :type: try_number: int
+        """
+        logs = ""
+        if self.is_loki_alive() == 200:

Review Comment:
   IMO this type of pre-is-alive-check is redundant, better just try the real query right away and only fallback on any exceptions.
   
   Checking for is_alive before:
   * Introduces unnecessary delay
   * Introduces potential race condition where alive at first works, but 1ms later server goes down and query fails. Forcing you to deal with the query-failure scenario anyway.
   * Introduces one more source of error, in case the is_alive fails because of intermittent network flakyness. 



##########
airflow/providers/grafana/log/loki_task_handler.py:
##########
@@ -0,0 +1,242 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Loki logging handler for tasks"""
+import time
+from typing import Dict, Optional, Tuple, Union
+
+import logging_loki
+import requests
+from cached_property import cached_property
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.models import TaskInstance
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+DEFAULT_LOGGER_NAME = "airflow"
+
+
+class LokiTaskHandler(FileTaskHandler, LoggingMixin):
+    """
+    LokiTaskHandler that directly makes Loki logging API calls while reading and writing logs.
+    This is a Python standard ``logging`` handler using that can be used to route Python standard
+    logging messages directly to the Loki Logging API. It can also be used to save logs for
+    executing tasks. To do this, you should set as a handler with the name "tasks". In this case,
+    it will also be used to read the log for display in Web UI.
+    :param base_log_folder: Base log folder to place logs (incase Loki is down).

Review Comment:
   (I would advice you to always place logs locally as files, and use a daemon like Promtail to ship them to remote storage. 
   This decouples the logging from the application and improves reliability for both of them. )



##########
airflow/providers/grafana/log/loki_task_handler.py:
##########
@@ -0,0 +1,242 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Loki logging handler for tasks"""
+import time
+from typing import Dict, Optional, Tuple, Union
+
+import logging_loki
+import requests
+from cached_property import cached_property
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.models import TaskInstance
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+DEFAULT_LOGGER_NAME = "airflow"
+
+
+class LokiTaskHandler(FileTaskHandler, LoggingMixin):
+    """
+    LokiTaskHandler that directly makes Loki logging API calls while reading and writing logs.
+    This is a Python standard ``logging`` handler using that can be used to route Python standard
+    logging messages directly to the Loki Logging API. It can also be used to save logs for
+    executing tasks. To do this, you should set as a handler with the name "tasks". In this case,
+    it will also be used to read the log for display in Web UI.
+    :param base_log_folder: Base log folder to place logs (incase Loki is down).
+    :type base_log_folder: str
+    :param filename_template: template filename string (incase Loki is down)
+    :type filename_template: str
+    :param loki_conn_id: Connection ID that will be used for authorization to the Loki Platform.
+    :type loki_conn_id: str
+    :param name: the name of the custom log in Loki Logging. Defaults to 'airflow'.
+    :type name: str
+    :param labels: (Optional) Mapping of labels for the entry.
+    :type labels: dict
+    """
+
+    # pylint: disable=too-many-arguments
+    def __init__(
+        self,
+        base_log_folder: str,
+        filename_template: str,
+        loki_conn_id: str,
+        name: str = DEFAULT_LOGGER_NAME,
+        labels: Optional[Dict[str, str]] = None,
+    ):
+        super().__init__(base_log_folder, filename_template)
+        self.loki_conn_id = loki_conn_id
+        self.name: str = name
+        self.timestamp_pattern = "%Y-%m-%dT%H:%M:%S"
+        self.labels = labels
+        self._session: Optional[requests.Session] = None
+
+    @cached_property
+    def get_conn(self):
+        """Loki connection for client"""
+        return BaseHook.get_connection(self.loki_conn_id)
+
+    @property
+    def session(self) -> requests.Session:
+        """Create HTTP session"""
+        if self._session is None:
+            self._session = requests.Session()
+            self._session.auth = (self.get_conn.login, self.get_conn.password) or None
+        return self._session
+
+    def is_loki_alive(self):
+        """Checks whether Loki is ready for pushing/pulling logs"""
+        try:
+            status = self.session.get(
+                f"{self.get_conn.host}/ready",
+            )
+            return status.status_code
+        except ConnectionError as error_msg:
+            self.log.exception(error_msg)
+            return None
+
+    @staticmethod
+    def _task_label(task_instance: TaskInstance) -> Dict[str, str]:
+        """
+        Returns task instance labels for Loki which will use while reading
+        and writing logs from loki.
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        # Not adding execution date since it violates Loki label standards
+        # https://grafana.com/blog/2020/08/27/the-concise-guide-to-labels-in-loki/
+
+        return {
+            "airflow_dag_id": task_instance.dag_id,
+            "airflow_task_id": task_instance.task_id,
+            "airflow_try_number": str(task_instance.try_number),
+        }
+
+    def get_label(self, task_instance: TaskInstance) -> Dict[str, str]:
+        """
+        Update task_labels with optional labels and return Loki labels.
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        tags = {}
+        task_labels = self._task_label(task_instance)
+        if self.labels:
+            tags.update(self.labels)
+            tags.update(task_labels)
+            return tags
+        return task_labels
+
+    def client(self, task_instance: TaskInstance) -> logging_loki.handlers.LokiHandler:
+        """
+        Returns Loki logger handler using connection and label to write logs
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        return logging_loki.LokiHandler(
+            url=f"{self.get_conn.host}/loki/api/v1/push",
+            tags=self.get_label(task_instance),
+            version="1",
+            auth=(f"{self.get_conn.login}", f"{self.get_conn.password}"),
+        )
+
+    def set_context(self, task_instance: TaskInstance) -> None:
+        """
+        Provide task_instance context to airflow task handler.
+        If Loki is not available then will use Filehandler context
+        to avoid losing log data.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        """
+        if self.is_loki_alive() == 200:
+            self.handler: logging_loki.handlers.LokiHandler = self.client(task_instance)
+            if self.formatter:
+                self.handler.setFormatter(self.formatter)
+                self.handler.setLevel(self.level)
+        else:
+            super().set_context(task_instance)  # Filehandler Context
+
+    def convert_epoch_time(self, timestamp: str) -> int:
+        """Convert timestamp to epoch"""
+        return int(time.mktime(time.strptime(timestamp[:19], self.timestamp_pattern)))
+
+    def query_time_range(
+        self,
+        task_instance: TaskInstance,
+    ) -> Tuple[int, int]:
+        """
+        Return task start and end time in epoch format for query.
+        If Task's try_number > 1 then it will use task execution time as start time.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        """
+        start_time, end_time, execution_time = (
+            str(task_instance.start_date.isoformat()),
+            task_instance.end_date,
+            str(task_instance.execution_date.isoformat()),
+        )
+        next_try = task_instance.next_try_number
+        start_epoch_time = self.convert_epoch_time(start_time)
+
+        if end_time and next_try < 3:
+            end_epoch_time = self.convert_epoch_time(end_time.isoformat())
+            return (start_epoch_time - 60, end_epoch_time + 60 * 2)
+        if end_time:
+            end_epoch_time = self.convert_epoch_time(end_time.isoformat())
+            execution_time_epoch_time = self.convert_epoch_time(execution_time)
+            return (execution_time_epoch_time - 60, end_epoch_time + 60 * 2)
+        return (start_epoch_time - 60, start_epoch_time + 60 * 60 * 24)
+
+    def get_read_parameters(self, task_instance, try_number) -> Dict[str, Union[str, int]]:
+        """
+        Construct query(LogQL) and update parameters for loki read request.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        :param try_number: current try_number to read log from
+        :type: try_number: int
+        """
+        start_time, end_time = self.query_time_range(task_instance)
+        task_lables = self.get_label(task_instance)
+        del task_lables["airflow_try_number"]  # try_number will come from read method
+
+        # converting task_lables(dict) to LogQL format
+        # {"dag_id":"airflow"} --> {dag_id = "airflow"}
+        query = "{"
+        for key, value in task_lables.items():
+            query += f"""{key}="{value}", """
+        query += f"""airflow_try_number="{try_number}"}} != "WARNING" """
+
+        return {
+            "direction": "forward",
+            "start": start_time,
+            "end": end_time,
+            "query": query,
+            "limit": 5000,
+        }
+
+    def _read(self, task_instance, try_number, metadata=None) -> Tuple[str, Dict[str, bool]]:
+        """
+        Return list of logs string for each task try number by streaming it from Loki
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        :param try_number: current try_number to read log from
+        :type: try_number: int
+        """
+        logs = ""
+        if self.is_loki_alive() == 200:
+            query_parameters = self.get_read_parameters(task_instance, try_number)
+            logs += f"*** Loki Query:{query_parameters['query']} \n"
+            logs += (
+                f"*** Epoch start_time: {query_parameters['start']} "
+                f"and end_time {query_parameters['end']}\n"
+            )
+            try:
+                response = self.session.get(
+                    f"{self.get_conn.host}/loki/api/v1/query_range",
+                    params=query_parameters,
+                ).json()
+                results = response["data"]["result"]
+                logs_list = [value for result in results for value in result["values"]]
+                # sorting with list of values with timestamp.

Review Comment:
   Why is this sorting needed? Is it in case multiple streams are returned? Because within one stream the result is already sorted when returned from loki.
   
   https://grafana.com/docs/loki/latest/api/#query-loki-over-a-range-of-time



##########
airflow/providers/grafana/log/loki_task_handler.py:
##########
@@ -0,0 +1,242 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Loki logging handler for tasks"""
+import time
+from typing import Dict, Optional, Tuple, Union
+
+import logging_loki
+import requests
+from cached_property import cached_property
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.models import TaskInstance
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+DEFAULT_LOGGER_NAME = "airflow"
+
+
+class LokiTaskHandler(FileTaskHandler, LoggingMixin):
+    """
+    LokiTaskHandler that directly makes Loki logging API calls while reading and writing logs.
+    This is a Python standard ``logging`` handler using that can be used to route Python standard
+    logging messages directly to the Loki Logging API. It can also be used to save logs for
+    executing tasks. To do this, you should set as a handler with the name "tasks". In this case,
+    it will also be used to read the log for display in Web UI.
+    :param base_log_folder: Base log folder to place logs (incase Loki is down).
+    :type base_log_folder: str
+    :param filename_template: template filename string (incase Loki is down)
+    :type filename_template: str
+    :param loki_conn_id: Connection ID that will be used for authorization to the Loki Platform.
+    :type loki_conn_id: str
+    :param name: the name of the custom log in Loki Logging. Defaults to 'airflow'.
+    :type name: str
+    :param labels: (Optional) Mapping of labels for the entry.
+    :type labels: dict
+    """
+
+    # pylint: disable=too-many-arguments
+    def __init__(
+        self,
+        base_log_folder: str,
+        filename_template: str,
+        loki_conn_id: str,
+        name: str = DEFAULT_LOGGER_NAME,
+        labels: Optional[Dict[str, str]] = None,
+    ):
+        super().__init__(base_log_folder, filename_template)
+        self.loki_conn_id = loki_conn_id
+        self.name: str = name
+        self.timestamp_pattern = "%Y-%m-%dT%H:%M:%S"
+        self.labels = labels
+        self._session: Optional[requests.Session] = None
+
+    @cached_property
+    def get_conn(self):
+        """Loki connection for client"""
+        return BaseHook.get_connection(self.loki_conn_id)
+
+    @property
+    def session(self) -> requests.Session:
+        """Create HTTP session"""
+        if self._session is None:
+            self._session = requests.Session()
+            self._session.auth = (self.get_conn.login, self.get_conn.password) or None
+        return self._session
+
+    def is_loki_alive(self):
+        """Checks whether Loki is ready for pushing/pulling logs"""
+        try:
+            status = self.session.get(
+                f"{self.get_conn.host}/ready",
+            )
+            return status.status_code
+        except ConnectionError as error_msg:
+            self.log.exception(error_msg)
+            return None
+
+    @staticmethod
+    def _task_label(task_instance: TaskInstance) -> Dict[str, str]:
+        """
+        Returns task instance labels for Loki which will use while reading
+        and writing logs from loki.
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        # Not adding execution date since it violates Loki label standards
+        # https://grafana.com/blog/2020/08/27/the-concise-guide-to-labels-in-loki/
+
+        return {
+            "airflow_dag_id": task_instance.dag_id,
+            "airflow_task_id": task_instance.task_id,
+            "airflow_try_number": str(task_instance.try_number),
+        }
+
+    def get_label(self, task_instance: TaskInstance) -> Dict[str, str]:
+        """
+        Update task_labels with optional labels and return Loki labels.
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        tags = {}
+        task_labels = self._task_label(task_instance)
+        if self.labels:
+            tags.update(self.labels)
+            tags.update(task_labels)
+            return tags
+        return task_labels
+
+    def client(self, task_instance: TaskInstance) -> logging_loki.handlers.LokiHandler:
+        """
+        Returns Loki logger handler using connection and label to write logs
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        return logging_loki.LokiHandler(
+            url=f"{self.get_conn.host}/loki/api/v1/push",
+            tags=self.get_label(task_instance),
+            version="1",
+            auth=(f"{self.get_conn.login}", f"{self.get_conn.password}"),
+        )
+
+    def set_context(self, task_instance: TaskInstance) -> None:
+        """
+        Provide task_instance context to airflow task handler.
+        If Loki is not available then will use Filehandler context
+        to avoid losing log data.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        """
+        if self.is_loki_alive() == 200:
+            self.handler: logging_loki.handlers.LokiHandler = self.client(task_instance)
+            if self.formatter:
+                self.handler.setFormatter(self.formatter)
+                self.handler.setLevel(self.level)
+        else:
+            super().set_context(task_instance)  # Filehandler Context
+
+    def convert_epoch_time(self, timestamp: str) -> int:
+        """Convert timestamp to epoch"""
+        return int(time.mktime(time.strptime(timestamp[:19], self.timestamp_pattern)))
+
+    def query_time_range(
+        self,
+        task_instance: TaskInstance,
+    ) -> Tuple[int, int]:
+        """
+        Return task start and end time in epoch format for query.
+        If Task's try_number > 1 then it will use task execution time as start time.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        """
+        start_time, end_time, execution_time = (
+            str(task_instance.start_date.isoformat()),
+            task_instance.end_date,
+            str(task_instance.execution_date.isoformat()),
+        )
+        next_try = task_instance.next_try_number
+        start_epoch_time = self.convert_epoch_time(start_time)
+
+        if end_time and next_try < 3:
+            end_epoch_time = self.convert_epoch_time(end_time.isoformat())
+            return (start_epoch_time - 60, end_epoch_time + 60 * 2)
+        if end_time:
+            end_epoch_time = self.convert_epoch_time(end_time.isoformat())
+            execution_time_epoch_time = self.convert_epoch_time(execution_time)
+            return (execution_time_epoch_time - 60, end_epoch_time + 60 * 2)
+        return (start_epoch_time - 60, start_epoch_time + 60 * 60 * 24)
+
+    def get_read_parameters(self, task_instance, try_number) -> Dict[str, Union[str, int]]:
+        """
+        Construct query(LogQL) and update parameters for loki read request.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        :param try_number: current try_number to read log from
+        :type: try_number: int
+        """
+        start_time, end_time = self.query_time_range(task_instance)
+        task_lables = self.get_label(task_instance)
+        del task_lables["airflow_try_number"]  # try_number will come from read method
+
+        # converting task_lables(dict) to LogQL format
+        # {"dag_id":"airflow"} --> {dag_id = "airflow"}
+        query = "{"
+        for key, value in task_lables.items():
+            query += f"""{key}="{value}", """

Review Comment:
   Does this need any escaping or sanitizing to avoid query injections?



##########
airflow/providers/grafana/log/loki_task_handler.py:
##########
@@ -0,0 +1,242 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Loki logging handler for tasks"""
+import time
+from typing import Dict, Optional, Tuple, Union
+
+import logging_loki
+import requests
+from cached_property import cached_property
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.models import TaskInstance
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+DEFAULT_LOGGER_NAME = "airflow"
+
+
+class LokiTaskHandler(FileTaskHandler, LoggingMixin):
+    """
+    LokiTaskHandler that directly makes Loki logging API calls while reading and writing logs.
+    This is a Python standard ``logging`` handler using that can be used to route Python standard
+    logging messages directly to the Loki Logging API. It can also be used to save logs for
+    executing tasks. To do this, you should set as a handler with the name "tasks". In this case,
+    it will also be used to read the log for display in Web UI.
+    :param base_log_folder: Base log folder to place logs (incase Loki is down).
+    :type base_log_folder: str
+    :param filename_template: template filename string (incase Loki is down)
+    :type filename_template: str
+    :param loki_conn_id: Connection ID that will be used for authorization to the Loki Platform.
+    :type loki_conn_id: str
+    :param name: the name of the custom log in Loki Logging. Defaults to 'airflow'.
+    :type name: str
+    :param labels: (Optional) Mapping of labels for the entry.
+    :type labels: dict
+    """
+
+    # pylint: disable=too-many-arguments
+    def __init__(
+        self,
+        base_log_folder: str,
+        filename_template: str,
+        loki_conn_id: str,
+        name: str = DEFAULT_LOGGER_NAME,
+        labels: Optional[Dict[str, str]] = None,
+    ):
+        super().__init__(base_log_folder, filename_template)
+        self.loki_conn_id = loki_conn_id
+        self.name: str = name
+        self.timestamp_pattern = "%Y-%m-%dT%H:%M:%S"
+        self.labels = labels
+        self._session: Optional[requests.Session] = None
+
+    @cached_property
+    def get_conn(self):
+        """Loki connection for client"""
+        return BaseHook.get_connection(self.loki_conn_id)
+
+    @property
+    def session(self) -> requests.Session:
+        """Create HTTP session"""
+        if self._session is None:
+            self._session = requests.Session()
+            self._session.auth = (self.get_conn.login, self.get_conn.password) or None
+        return self._session
+
+    def is_loki_alive(self):
+        """Checks whether Loki is ready for pushing/pulling logs"""
+        try:
+            status = self.session.get(
+                f"{self.get_conn.host}/ready",
+            )
+            return status.status_code
+        except ConnectionError as error_msg:
+            self.log.exception(error_msg)
+            return None
+
+    @staticmethod
+    def _task_label(task_instance: TaskInstance) -> Dict[str, str]:
+        """
+        Returns task instance labels for Loki which will use while reading
+        and writing logs from loki.
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        # Not adding execution date since it violates Loki label standards
+        # https://grafana.com/blog/2020/08/27/the-concise-guide-to-labels-in-loki/
+
+        return {
+            "airflow_dag_id": task_instance.dag_id,
+            "airflow_task_id": task_instance.task_id,
+            "airflow_try_number": str(task_instance.try_number),
+        }
+
+    def get_label(self, task_instance: TaskInstance) -> Dict[str, str]:
+        """
+        Update task_labels with optional labels and return Loki labels.
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        tags = {}
+        task_labels = self._task_label(task_instance)
+        if self.labels:
+            tags.update(self.labels)
+            tags.update(task_labels)
+            return tags
+        return task_labels
+
+    def client(self, task_instance: TaskInstance) -> logging_loki.handlers.LokiHandler:
+        """
+        Returns Loki logger handler using connection and label to write logs
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        return logging_loki.LokiHandler(
+            url=f"{self.get_conn.host}/loki/api/v1/push",
+            tags=self.get_label(task_instance),
+            version="1",
+            auth=(f"{self.get_conn.login}", f"{self.get_conn.password}"),
+        )
+
+    def set_context(self, task_instance: TaskInstance) -> None:
+        """
+        Provide task_instance context to airflow task handler.
+        If Loki is not available then will use Filehandler context
+        to avoid losing log data.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        """
+        if self.is_loki_alive() == 200:
+            self.handler: logging_loki.handlers.LokiHandler = self.client(task_instance)
+            if self.formatter:
+                self.handler.setFormatter(self.formatter)
+                self.handler.setLevel(self.level)
+        else:
+            super().set_context(task_instance)  # Filehandler Context
+
+    def convert_epoch_time(self, timestamp: str) -> int:
+        """Convert timestamp to epoch"""
+        return int(time.mktime(time.strptime(timestamp[:19], self.timestamp_pattern)))
+
+    def query_time_range(
+        self,
+        task_instance: TaskInstance,
+    ) -> Tuple[int, int]:
+        """
+        Return task start and end time in epoch format for query.
+        If Task's try_number > 1 then it will use task execution time as start time.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        """
+        start_time, end_time, execution_time = (
+            str(task_instance.start_date.isoformat()),
+            task_instance.end_date,
+            str(task_instance.execution_date.isoformat()),
+        )
+        next_try = task_instance.next_try_number
+        start_epoch_time = self.convert_epoch_time(start_time)
+
+        if end_time and next_try < 3:
+            end_epoch_time = self.convert_epoch_time(end_time.isoformat())
+            return (start_epoch_time - 60, end_epoch_time + 60 * 2)
+        if end_time:
+            end_epoch_time = self.convert_epoch_time(end_time.isoformat())
+            execution_time_epoch_time = self.convert_epoch_time(execution_time)
+            return (execution_time_epoch_time - 60, end_epoch_time + 60 * 2)
+        return (start_epoch_time - 60, start_epoch_time + 60 * 60 * 24)
+
+    def get_read_parameters(self, task_instance, try_number) -> Dict[str, Union[str, int]]:
+        """
+        Construct query(LogQL) and update parameters for loki read request.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        :param try_number: current try_number to read log from
+        :type: try_number: int
+        """
+        start_time, end_time = self.query_time_range(task_instance)
+        task_lables = self.get_label(task_instance)
+        del task_lables["airflow_try_number"]  # try_number will come from read method
+
+        # converting task_lables(dict) to LogQL format
+        # {"dag_id":"airflow"} --> {dag_id = "airflow"}
+        query = "{"
+        for key, value in task_lables.items():
+            query += f"""{key}="{value}", """
+        query += f"""airflow_try_number="{try_number}"}} != "WARNING" """
+
+        return {
+            "direction": "forward",
+            "start": start_time,
+            "end": end_time,
+            "query": query,
+            "limit": 5000,
+        }
+
+    def _read(self, task_instance, try_number, metadata=None) -> Tuple[str, Dict[str, bool]]:
+        """
+        Return list of logs string for each task try number by streaming it from Loki
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        :param try_number: current try_number to read log from
+        :type: try_number: int
+        """
+        logs = ""
+        if self.is_loki_alive() == 200:
+            query_parameters = self.get_read_parameters(task_instance, try_number)
+            logs += f"*** Loki Query:{query_parameters['query']} \n"
+            logs += (
+                f"*** Epoch start_time: {query_parameters['start']} "
+                f"and end_time {query_parameters['end']}\n"
+            )
+            try:
+                response = self.session.get(
+                    f"{self.get_conn.host}/loki/api/v1/query_range",
+                    params=query_parameters,
+                ).json()
+                results = response["data"]["result"]
+                logs_list = [value for result in results for value in result["values"]]
+                # sorting with list of values with timestamp.
+                logs += "\n".join([f"{log_line[1]}" for log_line in sorted(logs_list, key=lambda x: x[0])])

Review Comment:
   Maybe append some kind of warning message to the log if the max number of values per query is reached. 
   
   https://grafana.com/docs/loki/latest/api/#query-loki-over-a-range-of-time



##########
airflow/providers/grafana/log/loki_task_handler.py:
##########
@@ -0,0 +1,242 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Loki logging handler for tasks"""
+import time
+from typing import Dict, Optional, Tuple, Union
+
+import logging_loki
+import requests
+from cached_property import cached_property
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.models import TaskInstance
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+DEFAULT_LOGGER_NAME = "airflow"
+
+
+class LokiTaskHandler(FileTaskHandler, LoggingMixin):
+    """
+    LokiTaskHandler that directly makes Loki logging API calls while reading and writing logs.
+    This is a Python standard ``logging`` handler using that can be used to route Python standard
+    logging messages directly to the Loki Logging API. It can also be used to save logs for
+    executing tasks. To do this, you should set as a handler with the name "tasks". In this case,
+    it will also be used to read the log for display in Web UI.
+    :param base_log_folder: Base log folder to place logs (incase Loki is down).
+    :type base_log_folder: str
+    :param filename_template: template filename string (incase Loki is down)
+    :type filename_template: str
+    :param loki_conn_id: Connection ID that will be used for authorization to the Loki Platform.
+    :type loki_conn_id: str
+    :param name: the name of the custom log in Loki Logging. Defaults to 'airflow'.
+    :type name: str
+    :param labels: (Optional) Mapping of labels for the entry.
+    :type labels: dict
+    """
+
+    # pylint: disable=too-many-arguments
+    def __init__(
+        self,
+        base_log_folder: str,
+        filename_template: str,
+        loki_conn_id: str,
+        name: str = DEFAULT_LOGGER_NAME,
+        labels: Optional[Dict[str, str]] = None,
+    ):
+        super().__init__(base_log_folder, filename_template)
+        self.loki_conn_id = loki_conn_id
+        self.name: str = name
+        self.timestamp_pattern = "%Y-%m-%dT%H:%M:%S"
+        self.labels = labels
+        self._session: Optional[requests.Session] = None
+
+    @cached_property
+    def get_conn(self):
+        """Loki connection for client"""
+        return BaseHook.get_connection(self.loki_conn_id)
+
+    @property
+    def session(self) -> requests.Session:
+        """Create HTTP session"""
+        if self._session is None:
+            self._session = requests.Session()
+            self._session.auth = (self.get_conn.login, self.get_conn.password) or None
+        return self._session
+
+    def is_loki_alive(self):
+        """Checks whether Loki is ready for pushing/pulling logs"""
+        try:
+            status = self.session.get(
+                f"{self.get_conn.host}/ready",
+            )
+            return status.status_code
+        except ConnectionError as error_msg:
+            self.log.exception(error_msg)
+            return None
+
+    @staticmethod
+    def _task_label(task_instance: TaskInstance) -> Dict[str, str]:
+        """
+        Returns task instance labels for Loki which will use while reading
+        and writing logs from loki.
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        # Not adding execution date since it violates Loki label standards
+        # https://grafana.com/blog/2020/08/27/the-concise-guide-to-labels-in-loki/
+
+        return {
+            "airflow_dag_id": task_instance.dag_id,
+            "airflow_task_id": task_instance.task_id,
+            "airflow_try_number": str(task_instance.try_number),
+        }
+
+    def get_label(self, task_instance: TaskInstance) -> Dict[str, str]:
+        """
+        Update task_labels with optional labels and return Loki labels.
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        tags = {}
+        task_labels = self._task_label(task_instance)
+        if self.labels:
+            tags.update(self.labels)
+            tags.update(task_labels)
+            return tags
+        return task_labels
+
+    def client(self, task_instance: TaskInstance) -> logging_loki.handlers.LokiHandler:
+        """
+        Returns Loki logger handler using connection and label to write logs
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        return logging_loki.LokiHandler(
+            url=f"{self.get_conn.host}/loki/api/v1/push",
+            tags=self.get_label(task_instance),
+            version="1",
+            auth=(f"{self.get_conn.login}", f"{self.get_conn.password}"),
+        )
+
+    def set_context(self, task_instance: TaskInstance) -> None:
+        """
+        Provide task_instance context to airflow task handler.
+        If Loki is not available then will use Filehandler context
+        to avoid losing log data.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        """
+        if self.is_loki_alive() == 200:
+            self.handler: logging_loki.handlers.LokiHandler = self.client(task_instance)
+            if self.formatter:
+                self.handler.setFormatter(self.formatter)
+                self.handler.setLevel(self.level)
+        else:
+            super().set_context(task_instance)  # Filehandler Context
+
+    def convert_epoch_time(self, timestamp: str) -> int:
+        """Convert timestamp to epoch"""
+        return int(time.mktime(time.strptime(timestamp[:19], self.timestamp_pattern)))
+
+    def query_time_range(
+        self,
+        task_instance: TaskInstance,
+    ) -> Tuple[int, int]:
+        """
+        Return task start and end time in epoch format for query.
+        If Task's try_number > 1 then it will use task execution time as start time.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        """
+        start_time, end_time, execution_time = (
+            str(task_instance.start_date.isoformat()),
+            task_instance.end_date,
+            str(task_instance.execution_date.isoformat()),
+        )
+        next_try = task_instance.next_try_number
+        start_epoch_time = self.convert_epoch_time(start_time)
+
+        if end_time and next_try < 3:
+            end_epoch_time = self.convert_epoch_time(end_time.isoformat())
+            return (start_epoch_time - 60, end_epoch_time + 60 * 2)
+        if end_time:
+            end_epoch_time = self.convert_epoch_time(end_time.isoformat())
+            execution_time_epoch_time = self.convert_epoch_time(execution_time)
+            return (execution_time_epoch_time - 60, end_epoch_time + 60 * 2)
+        return (start_epoch_time - 60, start_epoch_time + 60 * 60 * 24)
+
+    def get_read_parameters(self, task_instance, try_number) -> Dict[str, Union[str, int]]:
+        """
+        Construct query(LogQL) and update parameters for loki read request.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        :param try_number: current try_number to read log from
+        :type: try_number: int
+        """
+        start_time, end_time = self.query_time_range(task_instance)
+        task_lables = self.get_label(task_instance)
+        del task_lables["airflow_try_number"]  # try_number will come from read method
+
+        # converting task_lables(dict) to LogQL format
+        # {"dag_id":"airflow"} --> {dag_id = "airflow"}
+        query = "{"
+        for key, value in task_lables.items():
+            query += f"""{key}="{value}", """
+        query += f"""airflow_try_number="{try_number}"}} != "WARNING" """

Review Comment:
   Why `!= "WARNING"` ?



##########
airflow/providers/grafana/log/loki_task_handler.py:
##########
@@ -0,0 +1,242 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Loki logging handler for tasks"""
+import time
+from typing import Dict, Optional, Tuple, Union
+
+import logging_loki
+import requests
+from cached_property import cached_property
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.models import TaskInstance
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+DEFAULT_LOGGER_NAME = "airflow"
+
+
+class LokiTaskHandler(FileTaskHandler, LoggingMixin):
+    """
+    LokiTaskHandler that directly makes Loki logging API calls while reading and writing logs.
+    This is a Python standard ``logging`` handler using that can be used to route Python standard
+    logging messages directly to the Loki Logging API. It can also be used to save logs for
+    executing tasks. To do this, you should set as a handler with the name "tasks". In this case,
+    it will also be used to read the log for display in Web UI.
+    :param base_log_folder: Base log folder to place logs (incase Loki is down).
+    :type base_log_folder: str
+    :param filename_template: template filename string (incase Loki is down)
+    :type filename_template: str
+    :param loki_conn_id: Connection ID that will be used for authorization to the Loki Platform.
+    :type loki_conn_id: str
+    :param name: the name of the custom log in Loki Logging. Defaults to 'airflow'.
+    :type name: str
+    :param labels: (Optional) Mapping of labels for the entry.
+    :type labels: dict
+    """
+
+    # pylint: disable=too-many-arguments
+    def __init__(
+        self,
+        base_log_folder: str,
+        filename_template: str,
+        loki_conn_id: str,
+        name: str = DEFAULT_LOGGER_NAME,
+        labels: Optional[Dict[str, str]] = None,
+    ):
+        super().__init__(base_log_folder, filename_template)
+        self.loki_conn_id = loki_conn_id
+        self.name: str = name
+        self.timestamp_pattern = "%Y-%m-%dT%H:%M:%S"
+        self.labels = labels
+        self._session: Optional[requests.Session] = None
+
+    @cached_property
+    def get_conn(self):
+        """Loki connection for client"""
+        return BaseHook.get_connection(self.loki_conn_id)
+
+    @property
+    def session(self) -> requests.Session:
+        """Create HTTP session"""
+        if self._session is None:
+            self._session = requests.Session()
+            self._session.auth = (self.get_conn.login, self.get_conn.password) or None
+        return self._session
+
+    def is_loki_alive(self):
+        """Checks whether Loki is ready for pushing/pulling logs"""
+        try:
+            status = self.session.get(
+                f"{self.get_conn.host}/ready",
+            )
+            return status.status_code
+        except ConnectionError as error_msg:
+            self.log.exception(error_msg)
+            return None
+
+    @staticmethod
+    def _task_label(task_instance: TaskInstance) -> Dict[str, str]:
+        """
+        Returns task instance labels for Loki which will use while reading
+        and writing logs from loki.
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        # Not adding execution date since it violates Loki label standards
+        # https://grafana.com/blog/2020/08/27/the-concise-guide-to-labels-in-loki/
+
+        return {
+            "airflow_dag_id": task_instance.dag_id,
+            "airflow_task_id": task_instance.task_id,
+            "airflow_try_number": str(task_instance.try_number),
+        }
+
+    def get_label(self, task_instance: TaskInstance) -> Dict[str, str]:
+        """
+        Update task_labels with optional labels and return Loki labels.
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        tags = {}
+        task_labels = self._task_label(task_instance)
+        if self.labels:
+            tags.update(self.labels)
+            tags.update(task_labels)
+            return tags
+        return task_labels
+
+    def client(self, task_instance: TaskInstance) -> logging_loki.handlers.LokiHandler:
+        """
+        Returns Loki logger handler using connection and label to write logs
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        return logging_loki.LokiHandler(
+            url=f"{self.get_conn.host}/loki/api/v1/push",
+            tags=self.get_label(task_instance),
+            version="1",
+            auth=(f"{self.get_conn.login}", f"{self.get_conn.password}"),
+        )
+
+    def set_context(self, task_instance: TaskInstance) -> None:
+        """
+        Provide task_instance context to airflow task handler.
+        If Loki is not available then will use Filehandler context
+        to avoid losing log data.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        """
+        if self.is_loki_alive() == 200:
+            self.handler: logging_loki.handlers.LokiHandler = self.client(task_instance)
+            if self.formatter:
+                self.handler.setFormatter(self.formatter)
+                self.handler.setLevel(self.level)
+        else:
+            super().set_context(task_instance)  # Filehandler Context
+
+    def convert_epoch_time(self, timestamp: str) -> int:
+        """Convert timestamp to epoch"""
+        return int(time.mktime(time.strptime(timestamp[:19], self.timestamp_pattern)))
+
+    def query_time_range(
+        self,
+        task_instance: TaskInstance,
+    ) -> Tuple[int, int]:
+        """
+        Return task start and end time in epoch format for query.
+        If Task's try_number > 1 then it will use task execution time as start time.
+        :param ti: task instance object
+        :type: task_instance: TaskInstance
+        """
+        start_time, end_time, execution_time = (
+            str(task_instance.start_date.isoformat()),
+            task_instance.end_date,
+            str(task_instance.execution_date.isoformat()),
+        )
+        next_try = task_instance.next_try_number
+        start_epoch_time = self.convert_epoch_time(start_time)
+
+        if end_time and next_try < 3:

Review Comment:
   What does `< 3 `mean here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hterik commented on a diff in pull request #26004: [Loki log handler] - Integration with Grafana Loki

Posted by GitBox <gi...@apache.org>.
hterik commented on code in PR #26004:
URL: https://github.com/apache/airflow/pull/26004#discussion_r960681889


##########
airflow/providers/grafana/log/loki_task_handler.py:
##########
@@ -0,0 +1,242 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Loki logging handler for tasks"""
+import time
+from typing import Dict, Optional, Tuple, Union
+
+import logging_loki
+import requests
+from cached_property import cached_property
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.models import TaskInstance
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+DEFAULT_LOGGER_NAME = "airflow"
+
+
+class LokiTaskHandler(FileTaskHandler, LoggingMixin):
+    """
+    LokiTaskHandler that directly makes Loki logging API calls while reading and writing logs.
+    This is a Python standard ``logging`` handler using that can be used to route Python standard
+    logging messages directly to the Loki Logging API. It can also be used to save logs for
+    executing tasks. To do this, you should set as a handler with the name "tasks". In this case,
+    it will also be used to read the log for display in Web UI.
+    :param base_log_folder: Base log folder to place logs (incase Loki is down).
+    :type base_log_folder: str
+    :param filename_template: template filename string (incase Loki is down)
+    :type filename_template: str
+    :param loki_conn_id: Connection ID that will be used for authorization to the Loki Platform.
+    :type loki_conn_id: str
+    :param name: the name of the custom log in Loki Logging. Defaults to 'airflow'.
+    :type name: str
+    :param labels: (Optional) Mapping of labels for the entry.
+    :type labels: dict
+    """
+
+    # pylint: disable=too-many-arguments
+    def __init__(
+        self,
+        base_log_folder: str,
+        filename_template: str,
+        loki_conn_id: str,
+        name: str = DEFAULT_LOGGER_NAME,
+        labels: Optional[Dict[str, str]] = None,
+    ):
+        super().__init__(base_log_folder, filename_template)
+        self.loki_conn_id = loki_conn_id
+        self.name: str = name
+        self.timestamp_pattern = "%Y-%m-%dT%H:%M:%S"
+        self.labels = labels
+        self._session: Optional[requests.Session] = None
+
+    @cached_property
+    def get_conn(self):
+        """Loki connection for client"""
+        return BaseHook.get_connection(self.loki_conn_id)
+
+    @property
+    def session(self) -> requests.Session:
+        """Create HTTP session"""
+        if self._session is None:
+            self._session = requests.Session()
+            self._session.auth = (self.get_conn.login, self.get_conn.password) or None
+        return self._session
+
+    def is_loki_alive(self):
+        """Checks whether Loki is ready for pushing/pulling logs"""
+        try:
+            status = self.session.get(
+                f"{self.get_conn.host}/ready",
+            )
+            return status.status_code
+        except ConnectionError as error_msg:
+            self.log.exception(error_msg)
+            return None
+
+    @staticmethod
+    def _task_label(task_instance: TaskInstance) -> Dict[str, str]:
+        """
+        Returns task instance labels for Loki which will use while reading
+        and writing logs from loki.
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        # Not adding execution date since it violates Loki label standards
+        # https://grafana.com/blog/2020/08/27/the-concise-guide-to-labels-in-loki/
+
+        return {
+            "airflow_dag_id": task_instance.dag_id,
+            "airflow_task_id": task_instance.task_id,
+            "airflow_try_number": str(task_instance.try_number),

Review Comment:
   Should run_id be included here? For dags where multiple runs go in parallel. 
   Note though, that dynamic labels such as run_id are discouraged according to https://grafana.com/docs/loki/latest/best-practices/ 
   It's a little odd that try_number is included and run_id is not. If you want to flatten it into single stream i would argue try_number should not be included either.
   
   We use Loki for airflow tasks today (but ship the logs via Promtail) What we have done is to use dag_id and task_id as labels, then "[pack](https://grafana.com/docs/loki/latest/clients/promtail/stages/pack/)" the run_id into the stream. Since the task start+end date limits the stream length it becomes a short search.  Anecdotally It's working fine for us, but our volumes are not very big either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] closed pull request #26004: [Loki log handler] - Integration with Grafana Loki

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #26004: [Loki log handler] - Integration with Grafana Loki
URL: https://github.com/apache/airflow/pull/26004


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #26004: [Loki log handler] - Integration with Grafana Loki

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #26004:
URL: https://github.com/apache/airflow/pull/26004#issuecomment-1331151583

   @whatnick - are you sure you want to continue on that and try to make contribution to the apache project if there is another provider someone maintains? There are a number of conditions that have to be fulfilled to add a provider to the Airlfow repo (it has to be really maintaineable long-term by the community) - and if somoene wants to maintain an integration outside (and for example puts a link on the ecosystem page https://airflow.apache.org/ecosystem/ ), that's quite a bit less for us to maintain, and easier to relaease and produce such 3rd-party providers. Shall we close this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] whatnick commented on a diff in pull request #26004: [Loki log handler] - Integration with Grafana Loki

Posted by GitBox <gi...@apache.org>.
whatnick commented on code in PR #26004:
URL: https://github.com/apache/airflow/pull/26004#discussion_r962084067


##########
airflow/providers/grafana/log/loki_task_handler.py:
##########
@@ -0,0 +1,242 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Loki logging handler for tasks"""
+import time
+from typing import Dict, Optional, Tuple, Union
+
+import logging_loki
+import requests
+from cached_property import cached_property
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.models import TaskInstance
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+DEFAULT_LOGGER_NAME = "airflow"
+
+
+class LokiTaskHandler(FileTaskHandler, LoggingMixin):
+    """
+    LokiTaskHandler that directly makes Loki logging API calls while reading and writing logs.
+    This is a Python standard ``logging`` handler using that can be used to route Python standard
+    logging messages directly to the Loki Logging API. It can also be used to save logs for
+    executing tasks. To do this, you should set as a handler with the name "tasks". In this case,
+    it will also be used to read the log for display in Web UI.
+    :param base_log_folder: Base log folder to place logs (incase Loki is down).
+    :type base_log_folder: str
+    :param filename_template: template filename string (incase Loki is down)
+    :type filename_template: str
+    :param loki_conn_id: Connection ID that will be used for authorization to the Loki Platform.
+    :type loki_conn_id: str
+    :param name: the name of the custom log in Loki Logging. Defaults to 'airflow'.
+    :type name: str
+    :param labels: (Optional) Mapping of labels for the entry.
+    :type labels: dict
+    """
+
+    # pylint: disable=too-many-arguments
+    def __init__(
+        self,
+        base_log_folder: str,
+        filename_template: str,
+        loki_conn_id: str,
+        name: str = DEFAULT_LOGGER_NAME,
+        labels: Optional[Dict[str, str]] = None,
+    ):
+        super().__init__(base_log_folder, filename_template)
+        self.loki_conn_id = loki_conn_id
+        self.name: str = name
+        self.timestamp_pattern = "%Y-%m-%dT%H:%M:%S"
+        self.labels = labels
+        self._session: Optional[requests.Session] = None
+
+    @cached_property
+    def get_conn(self):
+        """Loki connection for client"""
+        return BaseHook.get_connection(self.loki_conn_id)
+
+    @property
+    def session(self) -> requests.Session:
+        """Create HTTP session"""
+        if self._session is None:
+            self._session = requests.Session()
+            self._session.auth = (self.get_conn.login, self.get_conn.password) or None
+        return self._session
+
+    def is_loki_alive(self):
+        """Checks whether Loki is ready for pushing/pulling logs"""
+        try:
+            status = self.session.get(
+                f"{self.get_conn.host}/ready",
+            )
+            return status.status_code
+        except ConnectionError as error_msg:
+            self.log.exception(error_msg)
+            return None
+
+    @staticmethod
+    def _task_label(task_instance: TaskInstance) -> Dict[str, str]:
+        """
+        Returns task instance labels for Loki which will use while reading
+        and writing logs from loki.
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        # Not adding execution date since it violates Loki label standards
+        # https://grafana.com/blog/2020/08/27/the-concise-guide-to-labels-in-loki/
+
+        return {
+            "airflow_dag_id": task_instance.dag_id,
+            "airflow_task_id": task_instance.task_id,
+            "airflow_try_number": str(task_instance.try_number),

Review Comment:
   Thanks for the review, I am redoing this PR from an old PR which was abandoned. I am aware of the curse of dimensionality the addition of dynamic labels creates. Better to search by those than index and sort by those. Could you summarize in tabular form the metadata in labels and those as contents of the stream to improve this PR.
   
   Also would love to see a diagram of your work around via promtail to loki to share with the team. FYI I started on this task as part of my work towards [DEA Airflow](https://github.com/GeoscienceAustralia/dea-airflow).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] whatnick commented on a diff in pull request #26004: [Loki log handler] - Integration with Grafana Loki

Posted by GitBox <gi...@apache.org>.
whatnick commented on code in PR #26004:
URL: https://github.com/apache/airflow/pull/26004#discussion_r962084067


##########
airflow/providers/grafana/log/loki_task_handler.py:
##########
@@ -0,0 +1,242 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Loki logging handler for tasks"""
+import time
+from typing import Dict, Optional, Tuple, Union
+
+import logging_loki
+import requests
+from cached_property import cached_property
+
+from airflow.hooks.base_hook import BaseHook
+from airflow.models import TaskInstance
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+DEFAULT_LOGGER_NAME = "airflow"
+
+
+class LokiTaskHandler(FileTaskHandler, LoggingMixin):
+    """
+    LokiTaskHandler that directly makes Loki logging API calls while reading and writing logs.
+    This is a Python standard ``logging`` handler using that can be used to route Python standard
+    logging messages directly to the Loki Logging API. It can also be used to save logs for
+    executing tasks. To do this, you should set as a handler with the name "tasks". In this case,
+    it will also be used to read the log for display in Web UI.
+    :param base_log_folder: Base log folder to place logs (incase Loki is down).
+    :type base_log_folder: str
+    :param filename_template: template filename string (incase Loki is down)
+    :type filename_template: str
+    :param loki_conn_id: Connection ID that will be used for authorization to the Loki Platform.
+    :type loki_conn_id: str
+    :param name: the name of the custom log in Loki Logging. Defaults to 'airflow'.
+    :type name: str
+    :param labels: (Optional) Mapping of labels for the entry.
+    :type labels: dict
+    """
+
+    # pylint: disable=too-many-arguments
+    def __init__(
+        self,
+        base_log_folder: str,
+        filename_template: str,
+        loki_conn_id: str,
+        name: str = DEFAULT_LOGGER_NAME,
+        labels: Optional[Dict[str, str]] = None,
+    ):
+        super().__init__(base_log_folder, filename_template)
+        self.loki_conn_id = loki_conn_id
+        self.name: str = name
+        self.timestamp_pattern = "%Y-%m-%dT%H:%M:%S"
+        self.labels = labels
+        self._session: Optional[requests.Session] = None
+
+    @cached_property
+    def get_conn(self):
+        """Loki connection for client"""
+        return BaseHook.get_connection(self.loki_conn_id)
+
+    @property
+    def session(self) -> requests.Session:
+        """Create HTTP session"""
+        if self._session is None:
+            self._session = requests.Session()
+            self._session.auth = (self.get_conn.login, self.get_conn.password) or None
+        return self._session
+
+    def is_loki_alive(self):
+        """Checks whether Loki is ready for pushing/pulling logs"""
+        try:
+            status = self.session.get(
+                f"{self.get_conn.host}/ready",
+            )
+            return status.status_code
+        except ConnectionError as error_msg:
+            self.log.exception(error_msg)
+            return None
+
+    @staticmethod
+    def _task_label(task_instance: TaskInstance) -> Dict[str, str]:
+        """
+        Returns task instance labels for Loki which will use while reading
+        and writing logs from loki.
+        :param task_instance: task instance object
+        :type: task_instance: TaskInstance
+        """
+        # Not adding execution date since it violates Loki label standards
+        # https://grafana.com/blog/2020/08/27/the-concise-guide-to-labels-in-loki/
+
+        return {
+            "airflow_dag_id": task_instance.dag_id,
+            "airflow_task_id": task_instance.task_id,
+            "airflow_try_number": str(task_instance.try_number),

Review Comment:
   Thanks for the review, I am redoing this PR from an old PR which was abandoned. I am aware of the curse of dimensionality the addition of dynamic labels creates. Better to search by those than index and sort by those. Could you summarize in tabular form the metadata in labels and those as contents of the stream to improve this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] snjypl commented on pull request #26004: [Loki log handler] - Integration with Grafana Loki

Posted by GitBox <gi...@apache.org>.
snjypl commented on PR #26004:
URL: https://github.com/apache/airflow/pull/26004#issuecomment-1331006227

   i would like to mention a grafana loki  provider that i am working on. 
   
   https://github.com/snjypl/airflow-provider-grafana-loki
   
   everyone's feedback would be greatly appreciated. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org