You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/08/12 06:17:13 UTC
[skywalking-python] branch master updated: Enable HTTP log
reporting protocol (#149)
This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git
The following commit(s) were added to refs/heads/master by this push:
new 8039d8b Enable HTTP log reporting protocol (#149)
8039d8b is described below
commit 8039d8b4a80952d7d1d6633a774f61da41762c15
Author: Yihao Chen <Su...@outlook.com>
AuthorDate: Thu Aug 12 14:17:10 2021 +0800
Enable HTTP log reporting protocol (#149)
---
docs/LogReporter.md | 4 +++-
skywalking/agent/protocol/http.py | 29 +++++++++++++++++++++++++++--
skywalking/client/http.py | 22 ++++++++++++++++++++--
3 files changed, 50 insertions(+), 5 deletions(-)
diff --git a/docs/LogReporter.md b/docs/LogReporter.md
index 9fc0b97..e4774ed 100644
--- a/docs/LogReporter.md
+++ b/docs/LogReporter.md
@@ -1,4 +1,4 @@
-# Python agent gRPC log reporter
+# Python agent log reporter
This functionality reports logs collected from the Python logging module(in theory, also logging libraries depending on the core logging module).
@@ -13,6 +13,8 @@ config.init(collector_address='127.0.0.1:11800', service_name='your awesome serv
agent.start()
```
+Note, if chosen `HTTP` protocol instead of `gRPC`/`Kafka`, the logs will be batch-reported to the collector REST endpoint.
+
`log_reporter_active=True` - Enables the log reporter.
`log_reporter_max_buffer_size` - The maximum queue backlog size for sending log data to backend, logs beyond this are silently dropped.
diff --git a/skywalking/agent/protocol/http.py b/skywalking/agent/protocol/http.py
index f5b1147..2ca8bac 100644
--- a/skywalking/agent/protocol/http.py
+++ b/skywalking/agent/protocol/http.py
@@ -15,13 +15,14 @@
# limitations under the License.
#
-from skywalking.loggings import logger
from queue import Queue, Empty
from time import time
from skywalking import config
from skywalking.agent import Protocol
-from skywalking.client.http import HttpServiceManagementClient, HttpTraceSegmentReportService
+from skywalking.client.http import HttpServiceManagementClient, HttpTraceSegmentReportService, HttpLogDataReportService
+from skywalking.loggings import logger
+from skywalking.protocol.logging.Logging_pb2 import LogData
from skywalking.trace.segment import Segment
@@ -30,6 +31,7 @@ class HttpProtocol(Protocol):
self.properties_sent = False
self.service_management = HttpServiceManagementClient()
self.traces_reporter = HttpTraceSegmentReportService()
+ self.log_reporter = HttpLogDataReportService()
def fork_after_in_child(self):
self.service_management.fork_after_in_child()
@@ -64,3 +66,26 @@ class HttpProtocol(Protocol):
self.traces_reporter.report(generator=generator())
except Exception:
pass
+
+ def report_log(self, queue: Queue, block: bool = True):
+ start = time()
+
+ def generator():
+ while True:
+ try:
+ timeout = config.QUEUE_TIMEOUT - int(time() - start) # type: int
+ if timeout <= 0:
+ return
+ log_data = queue.get(block=block, timeout=timeout) # type: LogData
+ except Empty:
+ return
+ queue.task_done()
+
+ logger.debug('Reporting Log')
+
+ yield log_data
+
+ try:
+ self.log_reporter.report(generator=generator())
+ except Exception:
+ pass
diff --git a/skywalking/client/http.py b/skywalking/client/http.py
index 7408fed..c6e26d6 100644
--- a/skywalking/client/http.py
+++ b/skywalking/client/http.py
@@ -14,13 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-from skywalking.loggings import logger
+import json
import requests
+from google.protobuf import json_format
from skywalking import config
from skywalking.client import ServiceManagementClient, TraceSegmentReportService
+from skywalking.loggings import logger
class HttpServiceManagementClient(ServiceManagementClient):
@@ -109,3 +110,20 @@ class HttpTraceSegmentReportService(TraceSegmentReportService):
} for span in segment.spans]
})
logger.debug('report traces response: %s', res)
+
+
+class HttpLogDataReportService(TraceSegmentReportService):
+ def __init__(self):
+ proto = 'https://' if config.force_tls else 'http://'
+ self.url_report = proto + config.collector_address.rstrip('/') + '/v3/logs'
+ self.session = requests.Session()
+
+ def fork_after_in_child(self):
+ self.session.close()
+ self.session = requests.Session()
+
+ def report(self, generator):
+ log_batch = [json.loads(json_format.MessageToJson(log_data)) for log_data in generator]
+ if log_batch: # prevent empty batches
+ res = self.session.post(self.url_report, json=log_batch)
+ logger.debug('report batch log response: %s', res)