You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by yi...@apache.org on 2022/10/21 14:28:10 UTC

[skywalking-python] branch master updated: feat: add Kafka support to MeterReportService (#243)

This is an automated email from the ASF dual-hosted git repository.

yihaochen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git


The following commit(s) were added to refs/heads/master by this push:
     new ca56ad2  feat: add Kafka support to MeterReportService (#243)
ca56ad2 is described below

commit ca56ad28400749b9db6eefc7fb9da7c2712d43bc
Author: jiang1997 <ji...@live.cn>
AuthorDate: Fri Oct 21 22:28:04 2022 +0800

    feat: add Kafka support to MeterReportService (#243)
    
    * feat: add Kafka support to MeterReportService
    
    * remove unnecessary comments
    
    * remove unnecessary conversions
---
 skywalking/agent/protocol/kafka.py | 31 ++++++++++++++++++++++++++++++-
 skywalking/client/kafka.py         | 24 +++++++++++++++++++-----
 skywalking/config.py               |  1 +
 tests/e2e/case/grpc/e2e.yaml       |  4 +---
 tests/e2e/case/kafka/e2e.yaml      |  8 +++-----
 5 files changed, 54 insertions(+), 14 deletions(-)

diff --git a/skywalking/agent/protocol/kafka.py b/skywalking/agent/protocol/kafka.py
index dc2c2d2..0f3b1fc 100644
--- a/skywalking/agent/protocol/kafka.py
+++ b/skywalking/agent/protocol/kafka.py
@@ -22,10 +22,11 @@ from time import time
 from skywalking import config
 from skywalking.agent import Protocol
 from skywalking.client.kafka import KafkaServiceManagementClient, KafkaTraceSegmentReportService, \
-    KafkaLogDataReportService
+    KafkaLogDataReportService, KafkaMeterDataReportService
 from skywalking.loggings import logger, getLogger, logger_debug_enabled
 from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
 from skywalking.protocol.language_agent.Tracing_pb2 import SegmentObject, SpanObject, Log, SegmentReference
+from skywalking.protocol.language_agent.Meter_pb2 import MeterData
 from skywalking.protocol.logging.Logging_pb2 import LogData
 from skywalking.trace.segment import Segment
 
@@ -39,6 +40,7 @@ class KafkaProtocol(Protocol):
         self.service_management = KafkaServiceManagementClient()
         self.traces_reporter = KafkaTraceSegmentReportService()
         self.log_reporter = KafkaLogDataReportService()
+        self.meter_reporter = KafkaMeterDataReportService()
 
     def heartbeat(self):
         self.service_management.send_heart_beat()
@@ -133,3 +135,30 @@ class KafkaProtocol(Protocol):
                 yield log_data
 
         self.log_reporter.report(generator=generator())
+
+    def report_meter(self, queue: Queue, block: bool = True):
+        start = None
+
+        def generator():
+            nonlocal start
+
+            while True:
+                try:
+                    timeout = config.QUEUE_TIMEOUT  # type: int
+                    if not start:  # make sure first time through queue is always checked
+                        start = time()
+                    else:
+                        timeout -= int(time() - start)
+                        if timeout <= 0:  # this is to make sure we exit eventually instead of being fed continuously
+                            return
+                    meter_data = queue.get(block=block, timeout=timeout)  # type: MeterData
+                except Empty:
+                    return
+                queue.task_done()
+
+                if logger_debug_enabled:
+                    logger.debug('Reporting Meter')
+
+                yield meter_data
+
+        self.meter_reporter.report(generator=generator())
diff --git a/skywalking/client/kafka.py b/skywalking/client/kafka.py
index 2aaa7db..51be7df 100644
--- a/skywalking/client/kafka.py
+++ b/skywalking/client/kafka.py
@@ -21,9 +21,10 @@ import os
 from kafka import KafkaProducer
 
 from skywalking import config
-from skywalking.client import ServiceManagementClient, TraceSegmentReportService, LogDataReportService
+from skywalking.client import MeterReportService, ServiceManagementClient, TraceSegmentReportService, LogDataReportService
 from skywalking.loggings import logger, logger_debug_enabled
 from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
+from skywalking.protocol.language_agent.Meter_pb2 import MeterDataCollection
 from skywalking.protocol.management.Management_pb2 import InstancePingPkg, InstanceProperties
 
 kafka_configs = {}
@@ -79,7 +80,7 @@ class KafkaServiceManagementClient(ServiceManagementClient):
         )
 
         key = bytes(self.topic_key_register + instance.serviceInstance, encoding='utf-8')
-        value = bytes(instance.SerializeToString())
+        value = instance.SerializeToString()
         self.producer.send(topic=self.topic, key=key, value=value)
 
     def send_heart_beat(self):
@@ -96,7 +97,7 @@ class KafkaServiceManagementClient(ServiceManagementClient):
         )
 
         key = bytes(instance_ping_pkg.serviceInstance, encoding='utf-8')
-        value = bytes(instance_ping_pkg.SerializeToString())
+        value = instance_ping_pkg.SerializeToString()
         future = self.producer.send(topic=self.topic, key=key, value=value)
         res = future.get(timeout=10)
         if logger_debug_enabled:
@@ -111,7 +112,7 @@ class KafkaTraceSegmentReportService(TraceSegmentReportService):
     def report(self, generator):
         for segment in generator:
             key = bytes(segment.traceSegmentId, encoding='utf-8')
-            value = bytes(segment.SerializeToString())
+            value = segment.SerializeToString()
             self.producer.send(topic=self.topic, key=key, value=value)
 
 
@@ -123,10 +124,23 @@ class KafkaLogDataReportService(LogDataReportService):
     def report(self, generator):
         for log_data in generator:
             key = bytes(log_data.traceContext.traceSegmentId, encoding='utf-8')
-            value = bytes(log_data.SerializeToString())
+            value = log_data.SerializeToString()
             self.producer.send(topic=self.topic, key=key, value=value)
 
 
+class KafkaMeterDataReportService(MeterReportService):
+    def __init__(self):
+        self.producer = KafkaProducer(**kafka_configs)
+        self.topic = config.kafka_topic_meter
+
+    def report(self, generator):
+        collection = MeterDataCollection()
+        collection.meterData.extend(list(generator))
+        key = bytes(config.service_instance, encoding='utf-8')
+        value = collection.SerializeToString()
+        self.producer.send(topic=self.topic, key=key, value=value)
+
+
 class KafkaConfigDuplicated(Exception):
     def __init__(self, key):
         self.key = key
diff --git a/skywalking/config.py b/skywalking/config.py
index fd89fa4..2b20581 100644
--- a/skywalking/config.py
+++ b/skywalking/config.py
@@ -38,6 +38,7 @@ kafka_bootstrap_servers: str = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS')
 kafka_topic_management: str = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or 'skywalking-managements'
 kafka_topic_segment: str = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or 'skywalking-segments'
 kafka_topic_log: str = os.getenv('SW_KAFKA_REPORTER_TOPIC_LOG') or 'skywalking-logs'
+kafka_topic_meter: str = os.getenv('SW_KAFKA_REPORTER_TOPIC_METER') or 'skywalking-meters'
 force_tls: bool = os.getenv('SW_AGENT_FORCE_TLS', '').lower() == 'true'
 protocol: str = (os.getenv('SW_AGENT_PROTOCOL') or 'grpc').lower()
 authentication: str = os.getenv('SW_AGENT_AUTHENTICATION')
diff --git a/tests/e2e/case/grpc/e2e.yaml b/tests/e2e/case/grpc/e2e.yaml
index d184a17..cdbdafb 100644
--- a/tests/e2e/case/grpc/e2e.yaml
+++ b/tests/e2e/case/grpc/e2e.yaml
@@ -61,9 +61,7 @@ verify:
     - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=e2e-service-provider
       expected: ../expected/service-instance.yml
 
-    # TODO: Metric Collection Implementation is not merged https://github.com/apache/skywalking/issues/7084
-    # service instance pvm metrics TODO: PVM Collection Implementation needed https://github.com/apache/skywalking/issues/5944
-    # swctl --display yaml --base-url=http://localhost:12800/graphql metrics linear --name instance_jvm_thread_live_count --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
+    # service instance pvm metrics
     - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name meter_total_cpu_utilization --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
       expected: ../expected/metrics-has-value.yml
 
diff --git a/tests/e2e/case/kafka/e2e.yaml b/tests/e2e/case/kafka/e2e.yaml
index 4933b5b..cdbdafb 100644
--- a/tests/e2e/case/kafka/e2e.yaml
+++ b/tests/e2e/case/kafka/e2e.yaml
@@ -61,11 +61,9 @@ verify:
     - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql instance list --service-name=e2e-service-provider
       expected: ../expected/service-instance.yml
 
-    # TODO: Metric Collection Implementation is not merged https://github.com/apache/skywalking/issues/7084
-    # service instance pvm metrics TODO: PVM Collection Implementation needed https://github.com/apache/skywalking/issues/5944
-    # swctl --display yaml --base-url=http://localhost:12800/graphql metrics linear --name instance_jvm_thread_live_count --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
-    #    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name instance_jvm_thread_live_count --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
-    #      expected: ../expected/metrics-has-value.yml
+    # service instance pvm metrics
+    - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql metrics linear --name meter_total_cpu_utilization --instance-name=provider1 --service-name=e2e-service-provider |yq e 'to_entries' -
+      expected: ../expected/metrics-has-value.yml
 
     # trace segment list
     - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace ls