You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by zh...@apache.org on 2021/08/08 07:48:44 UTC

[skywalking-python] branch master updated: Feature: collect and report logs (#147)

This is an automated email from the ASF dual-hosted git repository.

zhangke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git


The following commit(s) were added to refs/heads/master by this push:
     new 3323770  Feature: collect and report logs (#147)
3323770 is described below

commit 3323770abcc09ba5ff305da7299683f88d51fa9b
Author: Yihao Chen <Su...@outlook.com>
AuthorDate: Sun Aug 8 15:48:37 2021 +0800

    Feature: collect and report logs (#147)
    
    * Point to v8.6.0 protocols
    
    * Feature - collect and report logs through gRPC
    
    * Fix circular import agent-sw_logging
    
    * Fix - Remove wrongly named layout field
    
    * Fix - Lint
    
    * Provides more type hints
    
    * - Sync logReporter with latest changes.
    - Fix minor issue in configs
    
    * Doc - Add log-reporter-related EnvVars
    
    * Use latest skywalking-eyes to ignore .gitignore
    
    * Fix - Add .gitignore to .licenserc.yaml
    
    * - Move Formatter initialization to outer scope.
    
    Signed-off-by: YihaoChen <Su...@outlook.com>
    
    * New document - A complete guide to log reporter.
    
    Signed-off-by: YihaoChen <Su...@outlook.com>
    
    * Add a new section in README pointing to the log reporter guide.
    
    Signed-off-by: YihaoChen <Su...@outlook.com>
---
 .github/workflows/build.yaml           |   2 +-
 .licenserc.yaml                        |   1 +
 README.md                              |   5 ++
 docs/EnvVars.md                        |   7 +++
 docs/LogReporter.md                    |  81 ++++++++++++++++++++++++++
 protocol                               |   2 +-
 skywalking/agent/__init__.py           |  59 +++++++++++++++++--
 skywalking/agent/protocol/grpc.py      |   5 ++
 skywalking/agent/protocol/grpc_log.py  | 103 +++++++++++++++++++++++++++++++++
 skywalking/client/__init__.py          |   5 ++
 skywalking/client/grpc.py              |  26 +++++----
 skywalking/config.py                   |  13 +++++
 skywalking/{client => log}/__init__.py |  26 ++++-----
 skywalking/log/sw_logging.py           |  95 ++++++++++++++++++++++++++++++
 14 files changed, 396 insertions(+), 34 deletions(-)

diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml
index 1d53e65..9a18021 100644
--- a/.github/workflows/build.yaml
+++ b/.github/workflows/build.yaml
@@ -38,7 +38,7 @@ jobs:
         with:
           submodules: true
       - name: Check License
-        uses: apache/skywalking-eyes@9bd5feb86b5817aa6072b008f9866a2c3bbc8587
+        uses: apache/skywalking-eyes@63d89639812f1a94bd45d9329d0f936ec4769a37
         env:
           GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
       - name: Set up Python ${{ matrix.python-version }}
diff --git a/.licenserc.yaml b/.licenserc.yaml
index a1f5720..75f8a95 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -28,5 +28,6 @@ header:
     - 'LICENSE'
     - 'NOTICE'
     - '.github/PULL_REQUEST_TEMPLATE'
+    - '.gitignore'
 
   comment: on-failure
diff --git a/README.md b/README.md
index 9196ecc..1ea56c7 100755
--- a/README.md
+++ b/README.md
@@ -55,6 +55,11 @@ Alternatively, you can also pass the configurations via environment variables (s
 
 All supported environment variables can be found [here](docs/EnvVars.md)
 
+## Report logs with Python Agent
+The Python agent is capable of reporting collected logs to the backend(SkyWalking OAP/ [SkyWalking Satellite Sidecar](https://github.com/apache/skywalking-satellite)), enabling Log & Trace Correlation.
+
+Please refer to the [Log Reporter Doc](docs/LogReporter.md) for a detailed guide.
+
 ## Supported Libraries
 
 There are some built-in plugins (such as `http.server`, `Flask`, `Django` etc.) that support automatic instrumentation of Python libraries, the complete lists can be found [here](docs/Plugins.md)
diff --git a/docs/EnvVars.md b/docs/EnvVars.md
index bf0ffdb..35d11ad 100644
--- a/docs/EnvVars.md
+++ b/docs/EnvVars.md
@@ -32,3 +32,10 @@ Environment Variable | Description | Default
 | `SW_CELERY_PARAMETERS_LENGTH`| The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off  | `512` |
 | `SW_AGENT_PROFILE_ACTIVE` | If `True`, Python agent will enable profile when user create a new profile task. Otherwise disable profile. | `False` |
 | `SW_PROFILE_TASK_QUERY_INTERVAL` | The number of seconds between two profile task query. | `20` |
+| `SW_AGENT_LOG_REPORTER_ACTIVE` | If `True`, Python agent will report collected logs to the OAP or Satellite. Otherwise, it disables the feature. | `False` |
+| `SW_AGENT_LOG_COLLECTOR_BACKEND_SERVICES` | The log reporter will use a separate gRPC channel until the [Satellite](https://github.com/apache/skywalking-satellite) project is ready. | `127.0.0.1:11800` |
+| `SW_AGENT_LOG_REPORTER_BUFFER_SIZE` | The maximum queue backlog size for sending log data to backend, logs beyond this are silently dropped. | `10000` |
+| `SW_AGENT_LOG_REPORTER_MESSAGE_SIZE` | Max message size allowed for log transmission. | `10485760` |
+| `SW_AGENT_LOG_REPORTER_LEVEL` | This config specifies the logger levels of concern, any logs with a level below the config will be ignored. | `WARNING` |
+| `SW_AGENT_LOG_REPORTER_FORMATTED` | If `True`, the log reporter will transmit the logs as formatted. Otherwise, puts logRecord.msg and logRecord.args into message content and tags(`argument.n`), respectively. Along with an `exception` tag if an exception was raised. | `True` |
+| `SW_AGENT_LOG_REPORTER_LAYOUT` | The log reporter formats the logRecord message based on the layout given. | `%(asctime)s [%(threadName)s] %(levelname)s %(name)s - %(message)s` |
diff --git a/docs/LogReporter.md b/docs/LogReporter.md
new file mode 100644
index 0000000..43c9617
--- /dev/null
+++ b/docs/LogReporter.md
@@ -0,0 +1,81 @@
+# Python agent gRPC log reporter
+
+This functionality reports logs collected from the Python logging module(in theory, also logging libraries depending on the core logging module).
+
+To utilize this feature, you will need to add some new configurations to the agent initialization step.
+
+## Enabling the feature
+```Python 
+from skywalking import agent, config
+
+config.init(collector_address='127.0.0.1:11800', service_name='your awesome service',
+                log_grpc_reporter_active=True, log_grpc_collector_address='127.0.0.1:11800')
+agent.start()
+``` 
+
+`log_grpc_reporter_active=True` - Enables the log reporter.
+
+`log_grpc_collector_address` - For now, the log reporter uses a separate gRPC channel(will be merged upon the [SkyWalking Satellite Sidecar](https://github.com/apache/skywalking-satellite) project matures). 
+If you would like to use the Satellite sidecar, you will need to configure an address pointing to its gatherer. Otherwise, you can simply keep the address the same as the OAP.
+
+`log_grpc_reporter_max_buffer_size` and  `log_grpc_reporter_max_message_size` - Used to limit the reporting overhead.
+
+Alternatively, you can pass configurations through environment variables. 
+Please refer to [EnvVars.md](EnvVars.md) for the list of environment variables associated with the log reporter.
+
+## Specify a logging level
+Only the logs with a level equal to or higher than the specified will be collected and reported. 
+In other words, the agent ignores some unwanted logs based on your level threshold.
+
+`log_grpc_reporter_level` - The string name of a logger level. 
+
+Note that it also works with your custom logger levels, simply specify its string name in the config.
+
+## Formatting
+Note that regardless of the formatting, Python agent will always report the following three tags - 
+
+`level` - the logger level name
+
+`logger` - the logger name  
+
+`thread` - the thread name
+### Customize the reported log format
+You can choose to report collected logs in a custom layout.
+
+If not set, the agent uses the layout below by default, else the agent uses your custom layout set in `log_grpc_reporter_layout`.
+
+`'%(asctime)s [%(threadName)s] %(levelname)s %(name)s - %(message)s'`
+
+If the layout is set to `None`, the reported log content will only contain the pre-formatted `LogRecord.message`(`msg % args`) without any additional styles and information.
+
+### Transmit un-formatted logs
+You can also choose to report the log messages without any formatting.
+It separates the raw log msg `logRecord.msg` and `logRecord.args`, then puts them into message content and tags starting from `argument.0`, respectively, along with an `exception` tag if an exception was raised.
+
+Note when you set `log_grpc_reporter_formatted` to False, it ignores your custom layout introduced above.
+
+As an example, the following code:
+```Python
+logger.info("SW test log %s %s %s", 'arg0', 'arg1', 'arg2')
+```
+
+Will result in:
+```json
+{
+  "content": "SW test log %s %s %s",
+  "tags": [
+    {
+      "key": "argument.0",
+      "value": "arg0"
+    },
+    {
+      "key": "argument.1",
+      "value": "arg1"
+    },
+    {
+      "key": "argument.2",
+      "value": "arg2"
+    }
+  ]
+}
+```
\ No newline at end of file
diff --git a/protocol b/protocol
index 213d683..7da226c 160000
--- a/protocol
+++ b/protocol
@@ -1 +1 @@
-Subproject commit 213d6833da76d755abbd201be7aaed52328bf6f7
+Subproject commit 7da226cfced7fa4eb91c6528e8c30827288531a0
diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py
index b473783..585304b 100644
--- a/skywalking/agent/__init__.py
+++ b/skywalking/agent/__init__.py
@@ -21,8 +21,12 @@ from queue import Queue, Full
 from threading import Thread, Event
 from typing import TYPE_CHECKING
 
-from skywalking import config, plugins, loggings
+from skywalking.protocol.logging.Logging_pb2 import LogData
+
+from skywalking import config, plugins
+from skywalking import loggings
 from skywalking.agent.protocol import Protocol
+from skywalking.agent.protocol.grpc_log import GrpcLogProtocol
 from skywalking.command import command_service
 from skywalking.config import profile_active, profile_task_query_interval
 from skywalking.loggings import logger
@@ -30,10 +34,10 @@ from skywalking.loggings import logger
 if TYPE_CHECKING:
     from skywalking.trace.context import Segment
 
-
 __started = False
-__protocol = Protocol()  # type: Protocol
-__heartbeat_thread = __report_thread = __query_profile_thread = __command_dispatch_thread = __queue = __finished = None
+__protocol = __log_protocol = Protocol()  # type: Protocol
+__heartbeat_thread = __report_thread = __log_report_thread = __query_profile_thread = __command_dispatch_thread \
+    = __queue = __log_queue = __finished = None
 
 
 def __heartbeat():
@@ -56,6 +60,26 @@ def __report():
         __finished.wait(0)
 
 
+def __log_heartbeat():
+    while not __finished.is_set():
+        try:
+            __log_protocol.heartbeat()
+        except Exception as exc:
+            logger.error(str(exc))
+
+        __finished.wait(30)
+
+
+def __log_report():
+    while not __finished.is_set():
+        try:
+            __log_protocol.report(__log_queue)
+        except Exception as exc:
+            logger.error(str(exc))
+
+        __finished.wait(0)
+
+
 def __query_profile_command():
     while not __finished.is_set():
         try:
@@ -72,7 +96,8 @@ def __command_dispatch():
 
 
 def __init_threading():
-    global __heartbeat_thread, __report_thread,  __query_profile_thread, __command_dispatch_thread, __queue, __finished
+    global __heartbeat_thread, __report_thread, __log_report_thread, __query_profile_thread, \
+        __command_dispatch_thread, __queue, __log_queue, __finished
 
     __queue = Queue(maxsize=config.max_buffer_size)
     __finished = Event()
@@ -85,12 +110,19 @@ def __init_threading():
     __report_thread.start()
     __command_dispatch_thread.start()
 
+    if config.log_grpc_reporter_active:
+        __log_queue = Queue(maxsize=config.log_grpc_reporter_max_buffer_size)
+        __log_heartbeat_thread = Thread(name='LogHeartbeatThread', target=__log_heartbeat, daemon=True)
+        __log_report_thread = Thread(name='LogReportThread', target=__log_report, daemon=True)
+        __log_heartbeat_thread.start()
+        __log_report_thread.start()
+
     if profile_active:
         __query_profile_thread.start()
 
 
 def __init():
-    global __protocol
+    global __protocol, __log_protocol
 
     if config.protocol == 'grpc':
         from skywalking.agent.protocol.grpc import GrpcProtocol
@@ -103,12 +135,20 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    if config.log_grpc_reporter_active:
+        from skywalking import log
+        __log_protocol = GrpcLogProtocol()
+        log.install()
+
     __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    if config.log_grpc_reporter_active:
+        __log_protocol.report(__log_queue, False)
+        __log_queue.join()
     __finished.set()
 
 
@@ -177,3 +217,10 @@ def archive(segment: 'Segment'):
         __queue.put(segment, block=False)
     except Full:
         logger.warning('the queue is full, the segment will be abandoned')
+
+
+def archive_log(log_data: 'LogData'):
+    try:
+        __log_queue.put(log_data, block=False)
+    except Full:
+        logger.warning('the queue is full, the log will be abandoned')
diff --git a/skywalking/agent/protocol/grpc.py b/skywalking/agent/protocol/grpc.py
index 6ef9e78..8de1e69 100644
--- a/skywalking/agent/protocol/grpc.py
+++ b/skywalking/agent/protocol/grpc.py
@@ -67,6 +67,11 @@ class GrpcProtocol(Protocol):
                 self.service_management.send_instance_props()
                 self.properties_sent = True
 
+            logger.debug(
+                'segment reporter service heart beats, [%s], [%s]',
+                config.service_name,
+                config.service_instance,
+            )
             self.service_management.send_heart_beat()
 
         except grpc.RpcError:
diff --git a/skywalking/agent/protocol/grpc_log.py b/skywalking/agent/protocol/grpc_log.py
new file mode 100644
index 0000000..a0bf6bf
--- /dev/null
+++ b/skywalking/agent/protocol/grpc_log.py
@@ -0,0 +1,103 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import traceback
+from queue import Queue, Empty
+from time import time
+
+import grpc
+
+from skywalking import config
+from skywalking.agent import Protocol
+from skywalking.agent.protocol.interceptors import header_adder_interceptor
+from skywalking.client.grpc import GrpcServiceManagementClient, GrpcLogDataReportService
+from skywalking.loggings import logger
+from skywalking.protocol.logging.Logging_pb2 import LogData
+
+
+class GrpcLogProtocol(Protocol):
+    def __init__(self):
+        self.properties_sent = False
+        self.state = None
+
+        if config.force_tls:
+            self.channel = grpc.secure_channel(config.log_grpc_collector_address, grpc.ssl_channel_credentials(),
+                                               options=(('grpc.max_send_message_length',
+                                                         config.log_grpc_reporter_max_message_size),))
+        else:
+            self.channel = grpc.insecure_channel(config.log_grpc_collector_address,
+                                                 options=(('grpc.max_send_message_length',
+                                                           config.log_grpc_reporter_max_message_size),))
+        if config.authentication:
+            self.channel = grpc.intercept_channel(
+                self.channel, header_adder_interceptor('authentication', config.authentication)
+            )
+
+        self.channel.subscribe(self._cb, try_to_connect=True)
+        self.service_management = GrpcServiceManagementClient(self.channel)
+        self.log_reporter = GrpcLogDataReportService(self.channel)
+
+    def _cb(self, state):
+        logger.debug('grpc log reporter channel connectivity changed, [%s -> %s]', self.state, state)
+        self.state = state
+
+    def heartbeat(self):
+        try:
+            if not self.properties_sent:
+                self.service_management.send_instance_props()
+                self.properties_sent = True
+
+            logger.debug(
+                'log reporter service heart beats, [%s], [%s]',
+                config.service_name,
+                config.service_instance,
+            )
+            self.service_management.send_heart_beat()
+
+        except grpc.RpcError:
+            self.on_error()
+
+    def on_error(self):
+        traceback.print_exc() if logger.isEnabledFor(logging.DEBUG) else None
+        self.channel.unsubscribe(self._cb)
+        self.channel.subscribe(self._cb, try_to_connect=True)
+
+    def report(self, queue: Queue, block: bool = True):
+        start = time()
+
+        def generator():
+            while True:
+                try:
+                    timeout = config.QUEUE_TIMEOUT - int(time() - start)  # type: int
+                    if timeout <= 0:  # this is to make sure we exit eventually instead of being fed continuously
+                        return
+                    log_data = queue.get(block=block, timeout=timeout)  # type: LogData
+                except Empty:
+                    return
+
+                queue.task_done()
+
+                logger.debug('Reporting Log')
+
+                yield log_data
+
+        try:
+            self.log_reporter.report(generator())
+
+        except grpc.RpcError:
+            self.on_error()
diff --git a/skywalking/client/__init__.py b/skywalking/client/__init__.py
index 3cd175f..007e0bc 100644
--- a/skywalking/client/__init__.py
+++ b/skywalking/client/__init__.py
@@ -29,6 +29,11 @@ class TraceSegmentReportService(object):
         raise NotImplementedError()
 
 
+class LogDataReportService(object):
+    def report(self, generator):
+        raise NotImplementedError()
+
+
 class ProfileTaskChannelService(object):
     def do_query(self):
         raise NotImplementedError()
diff --git a/skywalking/client/grpc.py b/skywalking/client/grpc.py
index 6cef4a6..20b5f19 100644
--- a/skywalking/client/grpc.py
+++ b/skywalking/client/grpc.py
@@ -15,19 +15,18 @@
 # limitations under the License.
 #
 
-from skywalking.loggings import logger
-
 import grpc
-
-from skywalking import config
-from skywalking.client import ServiceManagementClient, TraceSegmentReportService, ProfileTaskChannelService
 from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
 from skywalking.protocol.language_agent.Tracing_pb2_grpc import TraceSegmentReportServiceStub
-from skywalking.protocol.profile.Profile_pb2_grpc import ProfileTaskStub
-from skywalking.protocol.profile.Profile_pb2 import ProfileTaskCommandQuery
+from skywalking.protocol.logging.Logging_pb2_grpc import LogReportServiceStub
 from skywalking.protocol.management.Management_pb2 import InstancePingPkg, InstanceProperties
 from skywalking.protocol.management.Management_pb2_grpc import ManagementServiceStub
+from skywalking.protocol.profile.Profile_pb2 import ProfileTaskCommandQuery
+from skywalking.protocol.profile.Profile_pb2_grpc import ProfileTaskStub
 
+from skywalking import config
+from skywalking.client import ServiceManagementClient, TraceSegmentReportService, ProfileTaskChannelService, \
+    LogDataReportService
 from skywalking.command import command_service
 from skywalking.profile import profile_task_execution_service
 
@@ -44,11 +43,6 @@ class GrpcServiceManagementClient(ServiceManagementClient):
         ))
 
     def send_heart_beat(self):
-        logger.debug(
-            'service heart beats, [%s], [%s]',
-            config.service_name,
-            config.service_instance,
-        )
         self.service_stub.keepAlive(InstancePingPkg(
             service=config.service_name,
             serviceInstance=config.service_instance,
@@ -77,3 +71,11 @@ class GrpcProfileTaskChannelService(ProfileTaskChannelService):
 
         commands = self.task_stub.getProfileTaskCommands(query)
         command_service.receive_command(commands)
+
+
+class GrpcLogDataReportService(LogDataReportService):
+    def __init__(self, channel: grpc.Channel):
+        self.report_stub = LogReportServiceStub(channel)
+
+    def report(self, generator):
+        self.report_stub.collect(generator)
diff --git a/skywalking/config.py b/skywalking/config.py
index 4e3b8ea..8aa26fb 100644
--- a/skywalking/config.py
+++ b/skywalking/config.py
@@ -67,6 +67,19 @@ profile_active = True if os.getenv('SW_AGENT_PROFILE_ACTIVE') and \
                          os.getenv('SW_AGENT_PROFILE_ACTIVE') == 'True' else False  # type: bool
 profile_task_query_interval = int(os.getenv('SW_PROFILE_TASK_QUERY_INTERVAL') or '20')
 
+# NOTE - Log reporting requires a separate channel, will merge in the future.
+log_grpc_reporter_active = True if os.getenv('SW_AGENT_LOG_REPORTER_ACTIVE') and \
+                         os.getenv('SW_AGENT_LOG_REPORTER_ACTIVE') == 'True' else False  # type: bool
+log_grpc_collector_address = os.getenv('SW_AGENT_LOG_COLLECTOR_BACKEND_SERVICES') or '127.0.0.1:11800'  # type: str
+log_grpc_reporter_max_buffer_size = int(os.getenv('SW_AGENT_LOG_REPORTER_BUFFER_SIZE') or '10000')  # type: int
+log_grpc_reporter_max_message_size = int(os.getenv('SW_AGENT_LOG_REPORTER_MESSAGE_SIZE') or '10485760')  # type: int
+log_grpc_reporter_level = os.getenv('SW_AGENT_LOG_REPORTER_LEVEL') or 'WARNING'  # type: str
+log_grpc_reporter_formatted = False if os.getenv('SW_AGENT_LOG_REPORTER_FORMATTED') and \
+                         os.getenv('SW_AGENT_LOG_REPORTER_FORMATTED') == 'False' else True  # type: bool
+log_grpc_reporter_layout = os.getenv('SW_AGENT_LOG_REPORTER_LAYOUT') or \
+                        '%(asctime)s [%(threadName)s] %(levelname)s %(name)s - %(message)s'  # type: str
+
+
 options = {key for key in globals() if key not in options}  # THIS MUST FOLLOW DIRECTLY AFTER LIST OF CONFIG OPTIONS!
 
 
diff --git a/skywalking/client/__init__.py b/skywalking/log/__init__.py
similarity index 65%
copy from skywalking/client/__init__.py
copy to skywalking/log/__init__.py
index 3cd175f..58f2bad 100644
--- a/skywalking/client/__init__.py
+++ b/skywalking/log/__init__.py
@@ -15,20 +15,18 @@
 # limitations under the License.
 #
 
+import logging
+import traceback
 
-class ServiceManagementClient(object):
-    def send_instance_props(self):
-        raise NotImplementedError()
+from skywalking.log import sw_logging
+from skywalking.loggings import logger
 
-    def send_heart_beat(self):
-        raise NotImplementedError()
 
-
-class TraceSegmentReportService(object):
-    def report(self, generator):
-        raise NotImplementedError()
-
-
-class ProfileTaskChannelService(object):
-    def do_query(self):
-        raise NotImplementedError()
+def install():
+    logger.debug('Installing plugin for logging module')
+    # noinspection PyBroadException
+    try:
+        sw_logging.install()
+    except Exception:
+        logger.warning('Failed to install sw_logging plugin')
+        traceback.print_exc() if logger.isEnabledFor(logging.DEBUG) else None
diff --git a/skywalking/log/sw_logging.py b/skywalking/log/sw_logging.py
new file mode 100644
index 0000000..db5039b
--- /dev/null
+++ b/skywalking/log/sw_logging.py
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+
+from skywalking import config, agent
+from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
+from skywalking.protocol.logging.Logging_pb2 import LogData, LogDataBody, TraceContext, LogTags, TextLog
+from skywalking.trace.context import get_context
+
+
+def install():
+    from logging import Logger, Formatter
+
+    layout = config.log_grpc_reporter_layout  # type: str
+    if layout:
+        formatter = Formatter(fmt=layout)
+
+    _handle = Logger.handle
+    log_reporter_level = logging.getLevelName(config.log_grpc_reporter_level)  # type: int
+
+    def _sw_handle(self, record):
+        if self.name == "skywalking":  # Ignore SkyWalking internal logger
+            return _handle(self, record)
+
+        if record.levelno < log_reporter_level:
+            return _handle(self, record)
+
+        def build_log_tags() -> LogTags:
+            core_tags = [
+                KeyStringValuePair(key='level', value=str(record.levelname)),
+                KeyStringValuePair(key='logger', value=str(record.name)),
+                KeyStringValuePair(key='thread', value=str(record.threadName))
+            ]
+            l_tags = LogTags()
+            l_tags.data.extend(core_tags)
+
+            if config.log_grpc_reporter_formatted:
+                return l_tags
+
+            for i, arg in enumerate(record.args):
+                l_tags.data.append(KeyStringValuePair(key='argument.' + str(i), value=str(arg)))
+
+            if record.exc_info:
+                l_tags.data.append(KeyStringValuePair(key='exception', value=str(record.exc_info)))
+
+            return l_tags
+
+        context = get_context()
+
+        log_data = LogData(
+            timestamp=round(record.created * 1000),
+            service=config.service_name,
+            serviceInstance=config.service_instance,
+            body=LogDataBody(
+                type='text',
+                text=TextLog(
+                    text=transform(record)
+                )
+            ),
+            traceContext=TraceContext(
+                traceId=str(context.segment.related_traces[0]),
+                traceSegmentId=str(context.segment.segment_id),
+                spanId=context.active_span().sid if context.active_span() else -1
+            ),
+            tags=build_log_tags(),
+        )
+
+        _handle(self=self, record=record)
+
+        agent.archive_log(log_data)
+
+    Logger.handle = _sw_handle
+
+    def transform(record) -> str:
+        if config.log_grpc_reporter_formatted:
+            if layout:
+                return formatter.format(record=record)
+            return record.getMessage()
+
+        return record.msg