You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by yi...@apache.org on 2022/10/21 14:28:10 UTC
[skywalking-python] branch master updated: feat: add Kafka support to MeterReportService (#243)
This is an automated email from the ASF dual-hosted git repository.
yihaochen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git
The following commit(s) were added to refs/heads/master by this push:
new ca56ad2 feat: add Kafka support to MeterReportService (#243)
ca56ad2 is described below
commit ca56ad28400749b9db6eefc7fb9da7c2712d43bc
Author: jiang1997 <ji...@live.cn>
AuthorDate: Fri Oct 21 22:28:04 2022 +0800
feat: add Kafka support to MeterReportService (#243)
* feat: add Kafka support to MeterReportService
* remove unnecessary comments
* remove unnecessary conversions
---
skywalking/agent/protocol/kafka.py | 31 ++++++++++++++++++++++++++++++-
skywalking/client/kafka.py | 24 +++++++++++++++++++-----
skywalking/config.py | 1 +
tests/e2e/case/grpc/e2e.yaml | 4 +---
tests/e2e/case/kafka/e2e.yaml | 8 +++-----
5 files changed, 54 insertions(+), 14 deletions(-)
diff --git a/skywalking/agent/protocol/kafka.py b/skywalking/agent/protocol/kafka.py
index dc2c2d2..0f3b1fc 100644
--- a/skywalking/agent/protocol/kafka.py
+++ b/skywalking/agent/protocol/kafka.py
@@ -22,10 +22,11 @@ from time import time
from skywalking import config
from skywalking.agent import Protocol
from skywalking.client.kafka import KafkaServiceManagementClient, KafkaTraceSegmentReportService, \
- KafkaLogDataReportService
+ KafkaLogDataReportService, KafkaMeterDataReportService
from skywalking.loggings import logger, getLogger, logger_debug_enabled
from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
from skywalking.protocol.language_agent.Tracing_pb2 import SegmentObject, SpanObject, Log, SegmentReference
+from skywalking.protocol.language_agent.Meter_pb2 import MeterData
from skywalking.protocol.logging.Logging_pb2 import LogData
from skywalking.trace.segment import Segment
@@ -39,6 +40,7 @@ class KafkaProtocol(Protocol):
self.service_management = KafkaServiceManagementClient()
self.traces_reporter = KafkaTraceSegmentReportService()
self.log_reporter = KafkaLogDataReportService()
+ self.meter_reporter = KafkaMeterDataReportService()
def heartbeat(self):
self.service_management.send_heart_beat()
@@ -133,3 +135,30 @@ class KafkaProtocol(Protocol):
yield log_data
self.log_reporter.report(generator=generator())
+
+ def report_meter(self, queue: Queue, block: bool = True):
+ start = None
+
+ def generator():
+ nonlocal start
+
+ while True:
+ try:
+ timeout = config.QUEUE_TIMEOUT # type: int
+ if not start: # make sure first time through queue is always checked
+ start = time()
+ else:
+ timeout -= int(time() - start)
+ if timeout <= 0: # this is to make sure we exit eventually instead of being fed continuously
+ return
+ meter_data = queue.get(block=block, timeout=timeout) # type: MeterData
+ except Empty:
+ return
+ queue.task_done()
+
+ if logger_debug_enabled:
+ logger.debug('Reporting Meter')
+
+ yield meter_data
+
+ self.meter_reporter.report(generator=generator())
diff --git a/skywalking/client/kafka.py b/skywalking/client/kafka.py
index 2aaa7db..51be7df 100644
--- a/skywalking/client/kafka.py
+++ b/skywalking/client/kafka.py
@@ -21,9 +21,10 @@ import os
from kafka import KafkaProducer
from skywalking import config
-from skywalking.client import ServiceManagementClient, TraceSegmentReportService, LogDataReportService
+from skywalking.client import MeterReportService, ServiceManagementClient, TraceSegmentReportService, LogDataReportService
from skywalking.loggings import logger, logger_debug_enabled
from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
+from skywalking.protocol.language_agent.Meter_pb2 import MeterDataCollection
from skywalking.protocol.management.Management_pb2 import InstancePingPkg, InstanceProperties
kafka_configs = {}
@@ -79,7 +80,7 @@ class KafkaServiceManagementClient(ServiceManagementClient):
)
key = bytes(self.topic_key_register + instance.serviceInstance, encoding='utf-8')
- value = bytes(instance.SerializeToString())
+ value = instance.SerializeToString()
self.producer.send(topic=self.topic, key=key, value=value)
def send_heart_beat(self):
@@ -96,7 +97,7 @@ class KafkaServiceManagementClient(ServiceManagementClient):
)
key = bytes(instance_ping_pkg.serviceInstance, encoding='utf-8')
- value = bytes(instance_ping_pkg.SerializeToString())
+ value = instance_ping_pkg.SerializeToString()
future = self.producer.send(topic=self.topic, key=key, value=value)
res = future.get(timeout=10)
if logger_debug_enabled:
@@ -111,7 +112,7 @@ class KafkaTraceSegmentReportService(TraceSegmentReportService):
def report(self, generator):
for segment in generator:
key = bytes(segment.traceSegmentId, encoding='utf-8')
- value = bytes(segment.SerializeToString())
+ value = segment.SerializeToString()
self.producer.send(topic=self.topic, key=key, value=value)
@@ -123,10 +124,23 @@ class KafkaLogDataReportService(LogDataReportService):
def report(self, generator):
for log_data in generator:
key = bytes(log_data.traceContext.traceSegmentId, encoding='utf-8')
- value = bytes(log_data.SerializeToString())
+ value = log_data.SerializeToString()
self.producer.send(topic=self.topic, key=key, value=value)
+class KafkaMeterDataReportService(MeterReportService):
+ def __init__(self):
+ self.producer = KafkaProducer(**kafka_configs)
+ self.topic = config.kafka_topic_meter
+
+ def report(self, generator):
+ collection = MeterDataCollection()
+ collection.meterData.extend(list(generator))
+ key = bytes(config.service_instance, encoding='utf-8')
+ value = collection.SerializeToString()
+ self.producer.send(topic=self.topic, key=key, value=value)
+
+
class KafkaConfigDuplicated(Exception):
def __init__(self, key):
self.key = key
diff --git a/skywalking/config.py b/skywalking/config.py
index fd89fa4..2b20581 100644
--- a/skywalking/config.py
+++ b/skywalking/config.py
@@ -38,6 +38,7 @@ kafka_bootstrap_servers: str = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS')
kafka_topic_management: str = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or 'skywalking-managements'
kafka_topic_segment: str = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or 'skywalking-segments'
kafka_topic_log: str = os.getenv('SW_KAFKA_REPORTER_TOPIC_LOG') or 'skywalking-logs'
+kafka_topic_meter: str = os.getenv('SW_KAFKA_REPORTER_TOPIC_METER') or 'skywalking-meters'
force_tls: bool = os.getenv('SW_AGENT_FORCE_TLS', '').lower() == 'true'
protocol: str = (os.getenv('SW_AGENT_PROTOCOL') or 'grpc').lower()
authentication: str = os.getenv('SW_AGENT_AUTHENTICATION')
diff --git a/tests/e2e/case/grpc/e2e.yaml b/tests/e2e/case/grpc/e2e.yaml
index d184a17..cdbdafb 100644
--- a/tests/e2e/case/grpc/e2e.yaml
+++ b/tests/e2e/case/grpc/e2e.yaml
@@ -61,9 +61,7 @@ verify:
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=e2e-service-provider
expected: ../expected/service-instance.yml
- # TODO: Metric Collection Implementation is not merged https://github.com/apache/skywalking/issues/7084
- # service instance pvm metrics TODO: PVM Collection Implementation needed https://github.com/apache/skywalking/issues/5944
- # swctl --display yaml --base-url=http://localhost:12800/graphql metrics linear --name instance_jvm_thread_live_count --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
+ # service instance pvm metrics
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name meter_total_cpu_utilization --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
expected: ../expected/metrics-has-value.yml
diff --git a/tests/e2e/case/kafka/e2e.yaml b/tests/e2e/case/kafka/e2e.yaml
index 4933b5b..cdbdafb 100644
--- a/tests/e2e/case/kafka/e2e.yaml
+++ b/tests/e2e/case/kafka/e2e.yaml
@@ -61,11 +61,9 @@ verify:
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=e2e-service-provider
expected: ../expected/service-instance.yml
- # TODO: Metric Collection Implementation is not merged https://github.com/apache/skywalking/issues/7084
- # service instance pvm metrics TODO: PVM Collection Implementation needed https://github.com/apache/skywalking/issues/5944
- # swctl --display yaml --base-url=http://localhost:12800/graphql metrics linear --name instance_jvm_thread_live_count --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
- # - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name instance_jvm_thread_live_count --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
- # expected: ../expected/metrics-has-value.yml
+ # service instance pvm metrics
+ - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name meter_total_cpu_utilization --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
+ expected: ../expected/metrics-has-value.yml
# trace segment list
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace ls