You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/08/12 06:17:13 UTC

[skywalking-python] branch master updated: Enable HTTP log reporting protocol (#149)

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git


The following commit(s) were added to refs/heads/master by this push:
     new 8039d8b  Enable HTTP log reporting protocol (#149)
8039d8b is described below

commit 8039d8b4a80952d7d1d6633a774f61da41762c15
Author: Yihao Chen <Su...@outlook.com>
AuthorDate: Thu Aug 12 14:17:10 2021 +0800

    Enable HTTP log reporting protocol (#149)
---
 docs/LogReporter.md               |  4 +++-
 skywalking/agent/protocol/http.py | 29 +++++++++++++++++++++++++++--
 skywalking/client/http.py         | 22 ++++++++++++++++++++--
 3 files changed, 50 insertions(+), 5 deletions(-)

diff --git a/docs/LogReporter.md b/docs/LogReporter.md
index 9fc0b97..e4774ed 100644
--- a/docs/LogReporter.md
+++ b/docs/LogReporter.md
@@ -1,4 +1,4 @@
-# Python agent gRPC log reporter
+# Python agent log reporter
 
 This functionality reports logs collected from the Python logging module(in theory, also logging libraries depending on the core logging module).
 
@@ -13,6 +13,8 @@ config.init(collector_address='127.0.0.1:11800', service_name='your awesome serv
 agent.start()
 ``` 
 
+Note, if chosen `HTTP` protocol instead of `gRPC`/`Kafka`, the logs will be batch-reported to the collector REST endpoint.
+
 `log_reporter_active=True` - Enables the log reporter.
 
 `log_reporter_max_buffer_size` - The maximum queue backlog size for sending log data to backend, logs beyond this are silently dropped.
diff --git a/skywalking/agent/protocol/http.py b/skywalking/agent/protocol/http.py
index f5b1147..2ca8bac 100644
--- a/skywalking/agent/protocol/http.py
+++ b/skywalking/agent/protocol/http.py
@@ -15,13 +15,14 @@
 # limitations under the License.
 #
 
-from skywalking.loggings import logger
 from queue import Queue, Empty
 from time import time
 
 from skywalking import config
 from skywalking.agent import Protocol
-from skywalking.client.http import HttpServiceManagementClient, HttpTraceSegmentReportService
+from skywalking.client.http import HttpServiceManagementClient, HttpTraceSegmentReportService, HttpLogDataReportService
+from skywalking.loggings import logger
+from skywalking.protocol.logging.Logging_pb2 import LogData
 from skywalking.trace.segment import Segment
 
 
@@ -30,6 +31,7 @@ class HttpProtocol(Protocol):
         self.properties_sent = False
         self.service_management = HttpServiceManagementClient()
         self.traces_reporter = HttpTraceSegmentReportService()
+        self.log_reporter = HttpLogDataReportService()
 
     def fork_after_in_child(self):
         self.service_management.fork_after_in_child()
@@ -64,3 +66,26 @@ class HttpProtocol(Protocol):
             self.traces_reporter.report(generator=generator())
         except Exception:
             pass
+
+    def report_log(self, queue: Queue, block: bool = True):
+        start = time()
+
+        def generator():
+            while True:
+                try:
+                    timeout = config.QUEUE_TIMEOUT - int(time() - start)  # type: int
+                    if timeout <= 0:
+                        return
+                    log_data = queue.get(block=block, timeout=timeout)  # type: LogData
+                except Empty:
+                    return
+                queue.task_done()
+
+                logger.debug('Reporting Log')
+
+                yield log_data
+
+        try:
+            self.log_reporter.report(generator=generator())
+        except Exception:
+            pass
diff --git a/skywalking/client/http.py b/skywalking/client/http.py
index 7408fed..c6e26d6 100644
--- a/skywalking/client/http.py
+++ b/skywalking/client/http.py
@@ -14,13 +14,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
-from skywalking.loggings import logger
+import json
 
 import requests
+from google.protobuf import json_format
 
 from skywalking import config
 from skywalking.client import ServiceManagementClient, TraceSegmentReportService
+from skywalking.loggings import logger
 
 
 class HttpServiceManagementClient(ServiceManagementClient):
@@ -109,3 +110,20 @@ class HttpTraceSegmentReportService(TraceSegmentReportService):
                 } for span in segment.spans]
             })
             logger.debug('report traces response: %s', res)
+
+
+class HttpLogDataReportService(TraceSegmentReportService):
+    def __init__(self):
+        proto = 'https://' if config.force_tls else 'http://'
+        self.url_report = proto + config.collector_address.rstrip('/') + '/v3/logs'
+        self.session = requests.Session()
+
+    def fork_after_in_child(self):
+        self.session.close()
+        self.session = requests.Session()
+
+    def report(self, generator):
+        log_batch = [json.loads(json_format.MessageToJson(log_data)) for log_data in generator]
+        if log_batch:  # prevent empty batches
+            res = self.session.post(self.url_report, json=log_batch)
+            logger.debug('report batch log response: %s', res)