You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2020/07/24 16:26:05 UTC

[skywalking-python] branch master updated: feature: add Kafka Plugin (#50)

This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git


The following commit(s) were added to refs/heads/master by this push:
     new 9165b1f  feature: add Kafka Plugin (#50)
9165b1f is described below

commit 9165b1fc33899c0264e4d28cbe9e0a802802f86d
Author: huawei <al...@gmail.com>
AuthorDate: Sat Jul 25 00:25:56 2020 +0800

    feature: add Kafka Plugin (#50)
---
 README.md                                          |   1 +
 setup.py                                           |   1 +
 skywalking/__init__.py                             |   2 +
 skywalking/plugins/sw_kafka/__init__.py            |  99 ++++++++++++++++++++
 skywalking/trace/tags/__init__.py                  |   3 +
 .../tags => tests/plugin/sw_kafka}/__init__.py     |  14 ---
 tests/plugin/sw_kafka/docker-compose.yml           | 103 +++++++++++++++++++++
 tests/plugin/sw_kafka/expected.data.yml            |  87 +++++++++++++++++
 .../plugin/sw_kafka/services}/__init__.py          |  14 ---
 .../plugin/sw_kafka/services/consumer.py           |  30 +++---
 .../plugin/sw_kafka/services/producer.py           |  35 ++++---
 .../plugin/sw_kafka/test_kafka.py                  |  39 +++++---
 12 files changed, 363 insertions(+), 65 deletions(-)

diff --git a/README.md b/README.md
index 4a54e9a..7509bb4 100755
--- a/README.md
+++ b/README.md
@@ -78,6 +78,7 @@ Library | Plugin Name
 | [PyMySQL](https://pymysql.readthedocs.io/en/latest/) | `sw_pymysql` |
 | [Django](https://www.djangoproject.com/) | `sw_django` |
 | [redis-py](https://github.com/andymccurdy/redis-py/) | `sw_redis` |
+| [kafka-python](https://kafka-python.readthedocs.io/en/master/) | `sw_kafka` |
 | [tornado](https://www.tornadoweb.org/en/stable/) | `sw_tornado` |
 
 ## API
diff --git a/setup.py b/setup.py
index 92fdd1b..b69e07d 100644
--- a/setup.py
+++ b/setup.py
@@ -47,6 +47,7 @@ setup(
             "Werkzeug",
             "pymysql",
             "redis",
+            "kafka-python",
             "tornado",
         ],
     },
diff --git a/skywalking/__init__.py b/skywalking/__init__.py
index 174d816..23fbb7e 100644
--- a/skywalking/__init__.py
+++ b/skywalking/__init__.py
@@ -30,6 +30,8 @@ class Component(Enum):
     Django = 7004
     Tornado = 7005
     Redis = 7
+    KafkaProducer = 40
+    KafkaConsumer = 41
 
 
 class Layer(Enum):
diff --git a/skywalking/plugins/sw_kafka/__init__.py b/skywalking/plugins/sw_kafka/__init__.py
new file mode 100644
index 0000000..a04feae
--- /dev/null
+++ b/skywalking/plugins/sw_kafka/__init__.py
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+
+from skywalking import Layer, Component
+from skywalking.trace import tags
+from skywalking.trace.carrier import Carrier
+from skywalking.trace.context import get_context
+from skywalking.trace.tags import Tag
+
+logger = logging.getLogger(__name__)
+
+
+def install():
+    # noinspection PyBroadException
+    try:
+        from kafka import KafkaProducer
+        from kafka import KafkaConsumer
+
+        _send = KafkaProducer.send
+        __poll_once = KafkaConsumer._poll_once
+        KafkaProducer.send = _sw_send_func(_send)
+        KafkaConsumer._poll_once = _sw__poll_once_func(__poll_once)
+
+    except Exception:
+        logger.warning('failed to install plugin %s', __name__)
+
+
+def _sw__poll_once_func(__poll_once):
+    def _sw__poll_once(this, timeout_ms, max_records, update_offsets=True):
+        res = __poll_once(this, timeout_ms, max_records, update_offsets=update_offsets)
+        if res:
+            brokers = ";".join(this.config["bootstrap_servers"])
+            context = get_context()
+            topics = ";".join(this._subscription.subscription or
+                              [t.topic for t in this._subscription._user_assignment])
+            with context.new_entry_span(
+                    op="Kafka/" + topics + "/Consumer/" + (this.config["group_id"] or "")) as span:
+                for consumerRecords in res.values():
+                    for record in consumerRecords:
+                        carrier = Carrier()
+                        for item in carrier:
+                            for header in record.headers:
+                                if item.key == header[0]:
+                                    item.val = str(header[1])
+
+                        span.extract(carrier)
+                    span.tag(Tag(key=tags.MqBroker, val=brokers))
+                    span.tag(Tag(key=tags.MqTopic, val=topics))
+                    span.layer = Layer.MQ
+                    span.component = Component.KafkaConsumer
+
+        return res
+
+    return _sw__poll_once
+
+
+def _sw_send_func(_send):
+    def _sw_send(this, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
+        peer = ";".join(this.config["bootstrap_servers"])
+        context = get_context()
+        carrier = Carrier()
+        with context.new_exit_span(op="Kafka/" + topic + "/Producer" or "/", peer=peer, carrier=carrier) as span:
+            span.layer = Layer.MQ
+            span.component = Component.KafkaProducer
+
+            if headers is None:
+                headers = []
+                for item in carrier:
+                    headers.append((item.key, item.val.encode("utf-8")))
+            else:
+                for item in carrier:
+                    headers.append((item.key, item.val.encode("utf-8")))
+
+            try:
+                res = _send(this, topic, value=value, key=key, headers=headers, partition=partition,
+                            timestamp_ms=timestamp_ms)
+                span.tag(Tag(key=tags.MqBroker, val=peer))
+                span.tag(Tag(key=tags.MqTopic, val=topic))
+            except BaseException as e:
+                span.raised()
+                raise e
+            return res
+
+    return _sw_send
diff --git a/skywalking/trace/tags/__init__.py b/skywalking/trace/tags/__init__.py
index 89a6819..57a389d 100644
--- a/skywalking/trace/tags/__init__.py
+++ b/skywalking/trace/tags/__init__.py
@@ -28,3 +28,6 @@ DbInstance = 'db.instance'
 DbStatement = 'db.statement'
 DbSqlParameters = 'db.sql.parameters'
 HttpParams = 'http.params'
+MqBroker = 'mq.broker'
+MqTopic = 'mq.topic'
+MqQueue = 'mq.queue'
diff --git a/skywalking/trace/tags/__init__.py b/tests/plugin/sw_kafka/__init__.py
similarity index 69%
copy from skywalking/trace/tags/__init__.py
copy to tests/plugin/sw_kafka/__init__.py
index 89a6819..b1312a0 100644
--- a/skywalking/trace/tags/__init__.py
+++ b/tests/plugin/sw_kafka/__init__.py
@@ -14,17 +14,3 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
-from collections import namedtuple
-
-Tag = namedtuple('Tag', 'key val overridable')
-Tag.__new__.__defaults__ = (None, None, False)
-
-HttpUrl = 'url'
-HttpMethod = 'http.method'
-HttpStatus = 'status.code'
-DbType = 'db.type'
-DbInstance = 'db.instance'
-DbStatement = 'db.statement'
-DbSqlParameters = 'db.sql.parameters'
-HttpParams = 'http.params'
diff --git a/tests/plugin/sw_kafka/docker-compose.yml b/tests/plugin/sw_kafka/docker-compose.yml
new file mode 100644
index 0000000..9f81de5
--- /dev/null
+++ b/tests/plugin/sw_kafka/docker-compose.yml
@@ -0,0 +1,103 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+version: '2.1'
+
+services:
+  collector:
+    extends:
+      service: collector
+      file: ../docker/docker-compose.base.yml
+
+  zookeeper-server:
+    image: zookeeper:3.4
+    hostname: zookeeper-server
+    ports:
+      - 2181:2181
+    networks:
+      - beyond
+    healthcheck:
+      test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/2181"]
+      interval: 5s
+      timeout: 60s
+      retries: 120
+
+  kafka-server:
+    image: bitnami/kafka:2.1.1
+    hostname: kafka-server
+    ports:
+      - 9092:9092
+    environment:
+      - KAFKA_ZOOKEEPER_CONNECT=zookeeper-server:2181
+      - KAFKA_BROKER_ID=1
+      - ALLOW_PLAINTEXT_LISTENER=yes
+      - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
+    networks:
+      - beyond
+    healthcheck:
+      test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9092"]
+      interval: 5s
+      timeout: 60s
+      retries: 120
+    depends_on:
+      zookeeper-server:
+        condition: service_healthy
+
+  producer:
+    extends:
+      service: agent
+      file: ../docker/docker-compose.base.yml
+    ports:
+      - 9090:9090
+    volumes:
+      - ./services/producer.py:/app/producer.py
+    command: ['bash', '-c', 'pip install flask && pip install kafka-python && python3 /app/producer.py']
+    healthcheck:
+      test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9090"]
+      interval: 5s
+      timeout: 60s
+      retries: 120
+    depends_on:
+      collector:
+        condition: service_healthy
+      kafka-server:
+        condition: service_healthy
+      consumer:
+        condition: service_healthy
+
+  consumer:
+    extends:
+      service: agent
+      file: ../docker/docker-compose.base.yml
+    ports:
+      - 9091:9091
+    volumes:
+      - ./services/consumer.py:/app/consumer.py
+    command: ['bash', '-c', 'pip install flask && pip install kafka-python && python3 /app/consumer.py']
+    healthcheck:
+      test: ["CMD", "bash", "-c", "ps -ef | grep /app/consumer | grep -v grep"]
+      interval: 5s
+      timeout: 60s
+      retries: 120
+    depends_on:
+      collector:
+        condition: service_healthy
+      kafka-server:
+        condition: service_healthy
+
+networks:
+  beyond:
diff --git a/tests/plugin/sw_kafka/expected.data.yml b/tests/plugin/sw_kafka/expected.data.yml
new file mode 100644
index 0000000..7b2c771
--- /dev/null
+++ b/tests/plugin/sw_kafka/expected.data.yml
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+segmentItems:
+  - serviceName: producer
+    segmentSize: 1
+    segments:
+      - segmentId: not null
+        spans:
+          - operationName: Kafka/skywalking/Producer
+            operationId: 0
+            parentSpanId: 0
+            spanId: 1
+            spanLayer: MQ
+            tags:
+              - key: mq.broker
+                value: 'kafka-server:9092'
+              - key: mq.topic
+                value: skywalking
+            startTime: gt 0
+            endTime: gt 0
+            componentId: 40
+            spanType: Exit
+            peer: kafka-server:9092
+            skipAnalysis: false
+          - operationName: /users
+            operationId: 0
+            parentSpanId: -1
+            spanId: 0
+            spanLayer: Http
+            tags:
+              - key: http.method
+                value: GET
+              - key: url
+                value: http://0.0.0.0:9090/users
+              - key: status.code
+                value: '200'
+            startTime: gt 0
+            endTime: gt 0
+            componentId: 7001
+            spanType: Entry
+            peer: not null
+            skipAnalysis: false
+  - serviceName: consumer
+    segmentSize: 1
+    segments:
+      - segmentId: not null
+        spans:
+          - operationName: Kafka/skywalking/Consumer/skywalking
+            operationId: 0
+            parentSpanId: -1
+            spanId: 0
+            spanLayer: MQ
+            tags:
+              - key: mq.broker
+                value: 'kafka-server:9092'
+              - key: mq.topic
+                value: skywalking
+            refs:
+              - parentEndpoint: Kafka/skywalking/Producer
+                networkAddress: 'kafka-server:9092'
+                refType: CrossProcess
+                parentSpanId: 1
+                parentTraceSegmentId: not null
+                parentServiceInstance: not null
+                parentService: producer
+                traceId: not null
+            startTime: gt 0
+            endTime: gt 0
+            componentId: 41
+            spanType: Entry
+            peer: ''
+            skipAnalysis: false
diff --git a/skywalking/trace/tags/__init__.py b/tests/plugin/sw_kafka/services/__init__.py
similarity index 69%
copy from skywalking/trace/tags/__init__.py
copy to tests/plugin/sw_kafka/services/__init__.py
index 89a6819..b1312a0 100644
--- a/skywalking/trace/tags/__init__.py
+++ b/tests/plugin/sw_kafka/services/__init__.py
@@ -14,17 +14,3 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
-from collections import namedtuple
-
-Tag = namedtuple('Tag', 'key val overridable')
-Tag.__new__.__defaults__ = (None, None, False)
-
-HttpUrl = 'url'
-HttpMethod = 'http.method'
-HttpStatus = 'status.code'
-DbType = 'db.type'
-DbInstance = 'db.instance'
-DbStatement = 'db.statement'
-DbSqlParameters = 'db.sql.parameters'
-HttpParams = 'http.params'
diff --git a/skywalking/trace/tags/__init__.py b/tests/plugin/sw_kafka/services/consumer.py
similarity index 55%
copy from skywalking/trace/tags/__init__.py
copy to tests/plugin/sw_kafka/services/consumer.py
index 89a6819..ab242f2 100644
--- a/skywalking/trace/tags/__init__.py
+++ b/tests/plugin/sw_kafka/services/consumer.py
@@ -15,16 +15,24 @@
 # limitations under the License.
 #
 
-from collections import namedtuple
+from skywalking import config, agent
 
-Tag = namedtuple('Tag', 'key val overridable')
-Tag.__new__.__defaults__ = (None, None, False)
+if __name__ == '__main__':
+    config.service_name = 'consumer'
+    config.logging_level = 'INFO'
+    agent.start()
 
-HttpUrl = 'url'
-HttpMethod = 'http.method'
-HttpStatus = 'status.code'
-DbType = 'db.type'
-DbInstance = 'db.instance'
-DbStatement = 'db.statement'
-DbSqlParameters = 'db.sql.parameters'
-HttpParams = 'http.params'
+    topic = "skywalking"
+    server_list = ["kafka-server:9092"]
+    group_id = "skywalking"
+    client_id = "0"
+
+    from kafka import KafkaConsumer
+    from kafka import TopicPartition
+    consumer = KafkaConsumer(group_id=group_id,
+                             client_id=client_id,
+                             bootstrap_servers=server_list)
+    partition = TopicPartition(topic, int(client_id))
+    consumer.assign([partition])
+    for msg in consumer:
+        print(msg)
diff --git a/skywalking/trace/tags/__init__.py b/tests/plugin/sw_kafka/services/producer.py
similarity index 56%
copy from skywalking/trace/tags/__init__.py
copy to tests/plugin/sw_kafka/services/producer.py
index 89a6819..e4ee375 100644
--- a/skywalking/trace/tags/__init__.py
+++ b/tests/plugin/sw_kafka/services/producer.py
@@ -15,16 +15,25 @@
 # limitations under the License.
 #
 
-from collections import namedtuple
-
-Tag = namedtuple('Tag', 'key val overridable')
-Tag.__new__.__defaults__ = (None, None, False)
-
-HttpUrl = 'url'
-HttpMethod = 'http.method'
-HttpStatus = 'status.code'
-DbType = 'db.type'
-DbInstance = 'db.instance'
-DbStatement = 'db.statement'
-DbSqlParameters = 'db.sql.parameters'
-HttpParams = 'http.params'
+
+from skywalking import agent, config
+
+if __name__ == '__main__':
+    config.service_name = 'producer'
+    config.logging_level = 'INFO'
+    agent.start()
+
+    from flask import Flask, jsonify
+    from kafka import KafkaProducer
+
+    app = Flask(__name__)
+    producer = KafkaProducer(bootstrap_servers=['kafka-server:9092'], api_version=(1, 0, 1))
+
+    @app.route("/users", methods=["POST", "GET"])
+    def application():
+        producer.send('skywalking', b'some_message_bytes')
+
+        return jsonify({"song": "Despacito", "artist": "Luis Fonsi"})
+
+    PORT = 9090
+    app.run(host='0.0.0.0', port=PORT, debug=True)
diff --git a/skywalking/trace/tags/__init__.py b/tests/plugin/sw_kafka/test_kafka.py
similarity index 55%
copy from skywalking/trace/tags/__init__.py
copy to tests/plugin/sw_kafka/test_kafka.py
index 89a6819..f24add7 100644
--- a/skywalking/trace/tags/__init__.py
+++ b/tests/plugin/sw_kafka/test_kafka.py
@@ -15,16 +15,29 @@
 # limitations under the License.
 #
 
-from collections import namedtuple
-
-Tag = namedtuple('Tag', 'key val overridable')
-Tag.__new__.__defaults__ = (None, None, False)
-
-HttpUrl = 'url'
-HttpMethod = 'http.method'
-HttpStatus = 'status.code'
-DbType = 'db.type'
-DbInstance = 'db.instance'
-DbStatement = 'db.statement'
-DbSqlParameters = 'db.sql.parameters'
-HttpParams = 'http.params'
+import os
+import time
+import unittest
+from os.path import abspath, dirname
+
+from testcontainers.compose import DockerCompose
+
+from tests.plugin import BasePluginTest
+
+
+class TestPlugin(BasePluginTest):
+    @classmethod
+    def setUpClass(cls):
+        cls.compose = DockerCompose(filepath=dirname(abspath(__file__)))
+        cls.compose.start()
+
+        cls.compose.wait_for(cls.url(('producer', '9090'), 'users'))
+
+    def test_request_plugin(self):
+        time.sleep(3)
+
+        self.validate(expected_file_name=os.path.join(dirname(abspath(__file__)), 'expected.data.yml'))
+
+
+if __name__ == '__main__':
+    unittest.main()