You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/08/12 16:06:21 UTC

[skywalking-python] branch master updated: Enable Kafka log reporting protocol (#154)

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git


The following commit(s) were added to refs/heads/master by this push:
     new d1f286f  Enable Kafka log reporting protocol (#154)
d1f286f is described below

commit d1f286f41bdc6daf986859e17cd2f92c449c5806
Author: Yihao Chen <Su...@outlook.com>
AuthorDate: Fri Aug 13 00:06:14 2021 +0800

    Enable Kafka log reporting protocol (#154)
---
 docs/EnvVars.md                    |  3 ++-
 docs/LogReporter.md                | 10 ++++++++--
 skywalking/agent/protocol/kafka.py | 32 ++++++++++++++++++++++++++++----
 skywalking/client/http.py          |  4 ++--
 skywalking/client/kafka.py         | 20 ++++++++++++++++----
 skywalking/config.py               |  5 +++--
 skywalking/log/sw_logging.py       |  2 +-
 skywalking/plugins/sw_kafka.py     |  8 +++++---
 8 files changed, 65 insertions(+), 19 deletions(-)

diff --git a/docs/EnvVars.md b/docs/EnvVars.md
index e128c9c..9416534 100644
--- a/docs/EnvVars.md
+++ b/docs/EnvVars.md
@@ -28,6 +28,7 @@ Environment Variable | Description | Default
 | `SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. It is in the form host1:port1,host2:port2,... | `localhost:9092` |
 | `SW_KAFKA_REPORTER_TOPIC_MANAGEMENT` | Specifying Kafka topic name for service instance reporting and registering. | `skywalking-managements` |
 | `SW_KAFKA_REPORTER_TOPIC_SEGMENT` | Specifying Kafka topic name for Tracing data. | `skywalking-segments` |
+| `SW_KAFKA_REPORTER_TOPIC_LOG` | Specifying Kafka topic name for Log data. | `skywalking-logs` |
 | `SW_KAFKA_REPORTER_CONFIG_key` | The configs to init KafkaProducer. it support the basic arguments (whose type is either `str`, `bool`, or `int`) listed [here](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer) | unset |
 | `SW_CELERY_PARAMETERS_LENGTH`| The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off  | `512` |
 | `SW_AGENT_PROFILE_ACTIVE` | If `True`, Python agent will enable profile when user create a new profile task. Otherwise disable profile. | `False` |
@@ -35,7 +36,7 @@ Environment Variable | Description | Default
 | `SW_AGENT_LOG_REPORTER_ACTIVE` | If `True`, Python agent will report collected logs to the OAP or Satellite. Otherwise, it disables the feature. | `False` |
 | `SW_AGENT_LOG_REPORTER_BUFFER_SIZE` | The maximum queue backlog size for sending log data to backend, logs beyond this are silently dropped. | `10000` |
 | `SW_AGENT_LOG_REPORTER_LEVEL` | This config specifies the logger levels of concern, any logs with a level below the config will be ignored. | `WARNING` |
-| `SW_AGENT_LOG_IGNORE_FILTER` | This config customizes whether to ignore the application-defined logger filters, if `True`, all logs are reported disregarding any filter rules. | `False` |
+| `SW_AGENT_LOG_REPORTER_IGNORE_FILTER` | This config customizes whether to ignore the application-defined logger filters, if `True`, all logs are reported disregarding any filter rules. | `False` |
 | `SW_AGENT_LOG_REPORTER_FORMATTED` | If `True`, the log reporter will transmit the logs as formatted. Otherwise, puts logRecord.msg and logRecord.args into message content and tags(`argument.n`), respectively. Along with an `exception` tag if an exception was raised. | `True` |
 | `SW_AGENT_LOG_REPORTER_LAYOUT` | The log reporter formats the logRecord message based on the layout given. | `%(asctime)s [%(threadName)s] %(levelname)s %(name)s - %(message)s` |
 | `SW_AGENT_CAUSE_EXCEPTION_DEPTH` | This config limits agent to report up to `limit` stacktrace, please refer to [Python traceback](https://docs.python.org/3/library/traceback.html#traceback.print_tb) for more explanations. | `5` | 
diff --git a/docs/LogReporter.md b/docs/LogReporter.md
index e4774ed..7af0d83 100644
--- a/docs/LogReporter.md
+++ b/docs/LogReporter.md
@@ -9,11 +9,17 @@ To utilize this feature, you will need to add some new configurations to the age
 from skywalking import agent, config
 
 config.init(collector_address='127.0.0.1:11800', service_name='your awesome service',
-                log_reporter_active=True)
+                log_reporter_active=True)  # defaults to grpc protocol
 agent.start()
 ``` 
 
-Note, if chosen `HTTP` protocol instead of `gRPC`/`Kafka`, the logs will be batch-reported to the collector REST endpoint.
+Log reporter supports all three protocols including `grpc`, `http` and `kafka`, which shares the same config `protocol` with trace reporter.
+
+If chosen `http` protocol, the logs will be batch-reported to the collector REST endpoint `oap/v3/logs`.
+
+If chosen `kafka` protocol, please make sure to config 
+[kafka-fetcher](https://skywalking.apache.org/docs/main/v8.4.0/en/setup/backend/backend-fetcher/#kafka-fetcher) 
+on the OAP side, and make sure Python agent config `kafka_bootstrap_servers` points to your Kafka brokers.
 
 `log_reporter_active=True` - Enables the log reporter.
 
diff --git a/skywalking/agent/protocol/kafka.py b/skywalking/agent/protocol/kafka.py
index 15a5697..a0b1a4c 100644
--- a/skywalking/agent/protocol/kafka.py
+++ b/skywalking/agent/protocol/kafka.py
@@ -16,15 +16,18 @@
 #
 
 import logging
-from skywalking.loggings import logger, getLogger
 from queue import Queue, Empty
 from time import time
 
-from skywalking import config
-from skywalking.agent import Protocol
-from skywalking.client.kafka import KafkaServiceManagementClient, KafkaTraceSegmentReportService
 from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
 from skywalking.protocol.language_agent.Tracing_pb2 import SegmentObject, SpanObject, Log, SegmentReference
+from skywalking.protocol.logging.Logging_pb2 import LogData
+
+from skywalking import config
+from skywalking.agent import Protocol
+from skywalking.client.kafka import KafkaServiceManagementClient, KafkaTraceSegmentReportService, \
+    KafkaLogDataReportService
+from skywalking.loggings import logger, getLogger
 from skywalking.trace.segment import Segment
 
 # avoid too many kafka logs
@@ -36,6 +39,7 @@ class KafkaProtocol(Protocol):
     def __init__(self):
         self.service_management = KafkaServiceManagementClient()
         self.traces_reporter = KafkaTraceSegmentReportService()
+        self.log_reporter = KafkaLogDataReportService()
 
     def heartbeat(self):
         self.service_management.send_heart_beat()
@@ -97,3 +101,23 @@ class KafkaProtocol(Protocol):
                 yield s
 
         self.traces_reporter.report(generator())
+
+    def report_log(self, queue: Queue, block: bool = True):
+        start = time()
+
+        def generator():
+            while True:
+                try:
+                    timeout = config.QUEUE_TIMEOUT - int(time() - start)  # type: int
+                    if timeout <= 0:
+                        return
+                    log_data = queue.get(block=block, timeout=timeout)  # type: LogData
+                except Empty:
+                    return
+                queue.task_done()
+
+                logger.debug('Reporting Log')
+
+                yield log_data
+
+        self.log_reporter.report(generator=generator())
diff --git a/skywalking/client/http.py b/skywalking/client/http.py
index c6e26d6..153eba1 100644
--- a/skywalking/client/http.py
+++ b/skywalking/client/http.py
@@ -20,7 +20,7 @@ import requests
 from google.protobuf import json_format
 
 from skywalking import config
-from skywalking.client import ServiceManagementClient, TraceSegmentReportService
+from skywalking.client import ServiceManagementClient, TraceSegmentReportService, LogDataReportService
 from skywalking.loggings import logger
 
 
@@ -112,7 +112,7 @@ class HttpTraceSegmentReportService(TraceSegmentReportService):
             logger.debug('report traces response: %s', res)
 
 
-class HttpLogDataReportService(TraceSegmentReportService):
+class HttpLogDataReportService(LogDataReportService):
     def __init__(self):
         proto = 'https://' if config.force_tls else 'http://'
         self.url_report = proto + config.collector_address.rstrip('/') + '/v3/logs'
diff --git a/skywalking/client/kafka.py b/skywalking/client/kafka.py
index a390b77..bf8e96e 100644
--- a/skywalking/client/kafka.py
+++ b/skywalking/client/kafka.py
@@ -15,16 +15,16 @@
 # limitations under the License.
 #
 
-import os
 import ast
-from skywalking.loggings import logger
+import os
 
-from skywalking import config
-from skywalking.client import ServiceManagementClient, TraceSegmentReportService
 from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
 from skywalking.protocol.management.Management_pb2 import InstancePingPkg, InstanceProperties
 
 from kafka import KafkaProducer
+from skywalking import config
+from skywalking.client import ServiceManagementClient, TraceSegmentReportService, LogDataReportService
+from skywalking.loggings import logger
 
 kafka_configs = {}
 
@@ -106,6 +106,18 @@ class KafkaTraceSegmentReportService(TraceSegmentReportService):
             self.producer.send(topic=self.topic, key=key, value=value)
 
 
+class KafkaLogDataReportService(LogDataReportService):
+    def __init__(self):
+        self.producer = KafkaProducer(**kafka_configs)
+        self.topic = config.kafka_topic_log
+
+    def report(self, generator):
+        for log_data in generator:
+            key = bytes(log_data.traceContext.traceSegmentId, encoding="utf-8")
+            value = bytes(log_data.SerializeToString())
+            self.producer.send(topic=self.topic, key=key, value=value)
+
+
 class KafkaConfigDuplicated(Exception):
     def __init__(self, key):
         self.key = key
diff --git a/skywalking/config.py b/skywalking/config.py
index d8149e0..0123f3b 100644
--- a/skywalking/config.py
+++ b/skywalking/config.py
@@ -62,6 +62,7 @@ elasticsearch_trace_dsl = True if os.getenv('SW_ELASTICSEARCH_TRACE_DSL') and \
 kafka_bootstrap_servers = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS') or "localhost:9092"  # type: str
 kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "skywalking-managements"  # type: str
 kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments"  # type: str
+kafka_topic_log = os.getenv('SW_KAFKA_REPORTER_TOPIC_LOG') or "skywalking-logs"  # type: str
 celery_parameters_length = int(os.getenv('SW_CELERY_PARAMETERS_LENGTH') or '512')
 profile_active = True if os.getenv('SW_AGENT_PROFILE_ACTIVE') and \
                          os.getenv('SW_AGENT_PROFILE_ACTIVE') == 'True' else False  # type: bool
@@ -72,8 +73,8 @@ log_reporter_active = True if os.getenv('SW_AGENT_LOG_REPORTER_ACTIVE') and \
                               os.getenv('SW_AGENT_LOG_REPORTER_ACTIVE') == 'True' else False  # type: bool
 log_reporter_max_buffer_size = int(os.getenv('SW_AGENT_LOG_REPORTER_BUFFER_SIZE') or '10000')  # type: int
 log_reporter_level = os.getenv('SW_AGENT_LOG_REPORTER_LEVEL') or 'WARNING'  # type: str
-log_reporter_ignore_filter = True if os.getenv('SW_AGENT_LOG_IGNORE_FILTER') and \
-                         os.getenv('SW_AGENT_LOG_REPORTER_FORMATTED') == 'True' else False  # type: bool
+log_reporter_ignore_filter = True if os.getenv('SW_AGENT_LOG_REPORTER_IGNORE_FILTER') and \
+                         os.getenv('SW_AGENT_LOG_REPORTER_IGNORE_FILTER') == 'True' else False  # type: bool
 log_reporter_formatted = False if os.getenv('SW_AGENT_LOG_REPORTER_FORMATTED') and \
                          os.getenv('SW_AGENT_LOG_REPORTER_FORMATTED') == 'False' else True  # type: bool
 log_reporter_layout = os.getenv('SW_AGENT_LOG_REPORTER_LAYOUT') or \
diff --git a/skywalking/log/sw_logging.py b/skywalking/log/sw_logging.py
index dd87a57..907e8df 100644
--- a/skywalking/log/sw_logging.py
+++ b/skywalking/log/sw_logging.py
@@ -96,5 +96,5 @@ def install():
         if config.log_reporter_formatted:
             if layout:
                 return formatter.format(record=record)
-            return record.getMessage() + '\n' + sw_traceback()
+            return record.getMessage() + ('\n' + sw_traceback() if record.exc_info else '')
         return str(record.msg)  # convert possible exception to str
diff --git a/skywalking/plugins/sw_kafka.py b/skywalking/plugins/sw_kafka.py
index 4185ea7..1d101bd 100644
--- a/skywalking/plugins/sw_kafka.py
+++ b/skywalking/plugins/sw_kafka.py
@@ -15,8 +15,8 @@
 # limitations under the License.
 #
 
-from skywalking import config
 from skywalking import Layer, Component
+from skywalking import config
 from skywalking.trace.carrier import Carrier
 from skywalking.trace.context import get_context
 from skywalking.trace.tags import TagMqBroker, TagMqTopic
@@ -64,8 +64,10 @@ def _sw__poll_once_func(__poll_once):
 
 def _sw_send_func(_send):
     def _sw_send(this, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
-        # ignore trace skywalking self request
-        if config.protocol == 'kafka' and config.kafka_topic_segment == topic or config.kafka_topic_management == topic:
+        # ignore trace & log reporter - skywalking self request
+        if config.protocol == 'kafka' and config.kafka_topic_segment == topic \
+                or config.kafka_topic_log == topic \
+                or config.kafka_topic_management == topic:
             return _send(this, topic, value=value, key=key, headers=headers, partition=partition,
                          timestamp_ms=timestamp_ms)