You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2023/02/06 02:18:24 UTC

[skywalking-python] branch master updated: Add kafka prefix to kakfa topic names (#277)

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git


The following commit(s) were added to refs/heads/master by this push:
     new e0c97ba  Add kafka prefix to kakfa topic names (#277)
e0c97ba is described below

commit e0c97ba2c34e2ad00c0b44a154e33b85bde16cfa
Author: SheltonZSL <11...@users.noreply.github.com>
AuthorDate: Sun Feb 5 21:18:19 2023 -0500

    Add kafka prefix to kakfa topic names (#277)
---
 CHANGELOG.md                            |  1 +
 docs/en/setup/Configuration.md          |  1 +
 skywalking/config.py                    | 11 ++++++++++-
 tests/e2e/case/kafka/docker-compose.yml |  4 +++-
 4 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 095af31..72deb73 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -17,6 +17,7 @@
   - Add Greenlet profiler (#246)
   - Add test and support for Python Slim base images (#249)
   - Add support for the tags of Virtual Cache for Redis (#263)
+  - Add a new configuration `kafka_namespace` to prefix the kafka topic names (#277)
 
 - Plugins:
   - Add aioredis, aiormq, amqp, asyncpg, aio-pika, kombu RMQ plugins (#230 Missing test coverage) 
diff --git a/docs/en/setup/Configuration.md b/docs/en/setup/Configuration.md
index 248d002..9a5a572 100644
--- a/docs/en/setup/Configuration.md
+++ b/docs/en/setup/Configuration.md
@@ -23,6 +23,7 @@ export SW_AGENT_YourConfiguration=YourValue
 | service_instance | SW_AGENT_SERVICE_INSTANCE | <class 'str'> | str(uuid.uuid1()).replace('-', '') | The name of this particular awesome Python service instance |
 | namespace | SW_AGENT_NAMESPACE | <class 'str'> |  | The agent namespace of the Python service (available as tag and the suffix of service name) |
 | kafka_bootstrap_servers | SW_AGENT_KAFKA_BOOTSTRAP_SERVERS | <class 'str'> | localhost:9092 | A list of host/port pairs to use for establishing the initial connection to your Kafka cluster. It is in the form of host1:port1,host2:port2,... (used for Kafka reporter protocol) |
+| kafka_namespace | SW_AGENT_KAFKA_NAMESPACE | <class 'str'> |  | The kafka namespace specified by OAP side SW_NAMESPACE, prepends the following kafka topic names with a `-`. |
 | kafka_topic_management | SW_AGENT_KAFKA_TOPIC_MANAGEMENT | <class 'str'> | skywalking-managements | Specifying Kafka topic name for service instance reporting and registering, this should be in sync with OAP |
 | kafka_topic_segment | SW_AGENT_KAFKA_TOPIC_SEGMENT | <class 'str'> | skywalking-segments | Specifying Kafka topic name for Tracing data, this should be in sync with OAP |
 | kafka_topic_log | SW_AGENT_KAFKA_TOPIC_LOG | <class 'str'> | skywalking-logs | Specifying Kafka topic name for Log data, this should be in sync with OAP |
diff --git a/skywalking/config.py b/skywalking/config.py
index 4d14d5b..1bc3c0a 100644
--- a/skywalking/config.py
+++ b/skywalking/config.py
@@ -59,6 +59,8 @@ namespace: str = os.getenv('SW_AGENT_NAMESPACE', '')
 # A list of host/port pairs to use for establishing the initial connection to your Kafka cluster.
 # It is in the form of host1:port1,host2:port2,... (used for Kafka reporter protocol)
 kafka_bootstrap_servers: str = os.getenv('SW_AGENT_KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
+# The kafka namespace specified by OAP side SW_NAMESPACE, prepends the following kafka topic names with a `-`.
+kafka_namespace: str = os.getenv('SW_AGENT_KAFKA_NAMESPACE', '')
 # Specifying Kafka topic name for service instance reporting and registering, this should be in sync with OAP
 kafka_topic_management: str = os.getenv('SW_AGENT_KAFKA_TOPIC_MANAGEMENT', 'skywalking-managements')
 # Specifying Kafka topic name for Tracing data, this should be in sync with OAP
@@ -222,13 +224,20 @@ def finalize_name() -> None:
     """
     This function concatenates the serviceName according to
     Java agent's implementation.
-    TODO: add kafka namespace prefix and cluster concept
+    TODO: add cluster concept
     Ref https://github.com/apache/skywalking-java/pull/123
     """
     global service_name
     if namespace:
         service_name = f'{service_name}|{namespace}'
 
+    global kafka_topic_management, kafka_topic_meter, kafka_topic_log, kafka_topic_segment
+    if kafka_namespace:
+        kafka_topic_management = f'{kafka_namespace}-{kafka_topic_management}'
+        kafka_topic_meter = f'{kafka_namespace}-{kafka_topic_meter}'
+        kafka_topic_log = f'{kafka_namespace}-{kafka_topic_log}'
+        kafka_topic_segment = f'{kafka_namespace}-{kafka_topic_segment}'
+
 
 def finalize_regex() -> None:
     """
diff --git a/tests/e2e/case/kafka/docker-compose.yml b/tests/e2e/case/kafka/docker-compose.yml
index ca16506..552e50e 100644
--- a/tests/e2e/case/kafka/docker-compose.yml
+++ b/tests/e2e/case/kafka/docker-compose.yml
@@ -81,6 +81,7 @@ services:
       SW_KAFKA_FETCHER_SERVERS: broker-a:9092,broker-b:9092
       SW_KAFKA_FETCHER_PARTITIONS: 2
       SW_KAFKA_FETCHER_PARTITIONS_FACTOR: 1
+      SW_NAMESPACE: 'e2e'
     depends_on:
       broker-a:
         condition: service_healthy
@@ -99,7 +100,7 @@ services:
       SW_AGENT_COLLECTOR_ADDRESS: oap:11800
       SW_AGENT_PROTOCOL: kafka
       SW_AGENT_KAFKA_BOOTSTRAP_SERVERS: broker-a:9092,broker-b:9092
-
+      SW_AGENT_KAFKA_NAMESPACE: 'e2e'
     depends_on:
       oap:
         condition: service_healthy
@@ -112,6 +113,7 @@ services:
       SW_AGENT_COLLECTOR_ADDRESS: oap:11800
       SW_AGENT_PROTOCOL: kafka
       SW_AGENT_KAFKA_BOOTSTRAP_SERVERS: broker-a:9092,broker-b:9092
+      SW_AGENT_KAFKA_NAMESPACE: 'e2e'
     ports:
       - "9090"
     depends_on: