You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2020/07/24 16:26:05 UTC
[skywalking-python] branch master updated: feature: add Kafka
Plugin (#50)
This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git
The following commit(s) were added to refs/heads/master by this push:
new 9165b1f feature: add Kafka Plugin (#50)
9165b1f is described below
commit 9165b1fc33899c0264e4d28cbe9e0a802802f86d
Author: huawei <al...@gmail.com>
AuthorDate: Sat Jul 25 00:25:56 2020 +0800
feature: add Kafka Plugin (#50)
---
README.md | 1 +
setup.py | 1 +
skywalking/__init__.py | 2 +
skywalking/plugins/sw_kafka/__init__.py | 99 ++++++++++++++++++++
skywalking/trace/tags/__init__.py | 3 +
.../tags => tests/plugin/sw_kafka}/__init__.py | 14 ---
tests/plugin/sw_kafka/docker-compose.yml | 103 +++++++++++++++++++++
tests/plugin/sw_kafka/expected.data.yml | 87 +++++++++++++++++
.../plugin/sw_kafka/services}/__init__.py | 14 ---
.../plugin/sw_kafka/services/consumer.py | 30 +++---
.../plugin/sw_kafka/services/producer.py | 35 ++++---
.../plugin/sw_kafka/test_kafka.py | 39 +++++---
12 files changed, 363 insertions(+), 65 deletions(-)
diff --git a/README.md b/README.md
index 4a54e9a..7509bb4 100755
--- a/README.md
+++ b/README.md
@@ -78,6 +78,7 @@ Library | Plugin Name
| [PyMySQL](https://pymysql.readthedocs.io/en/latest/) | `sw_pymysql` |
| [Django](https://www.djangoproject.com/) | `sw_django` |
| [redis-py](https://github.com/andymccurdy/redis-py/) | `sw_redis` |
+| [kafka-python](https://kafka-python.readthedocs.io/en/master/) | `sw_kafka` |
| [tornado](https://www.tornadoweb.org/en/stable/) | `sw_tornado` |
## API
diff --git a/setup.py b/setup.py
index 92fdd1b..b69e07d 100644
--- a/setup.py
+++ b/setup.py
@@ -47,6 +47,7 @@ setup(
"Werkzeug",
"pymysql",
"redis",
+ "kafka-python",
"tornado",
],
},
diff --git a/skywalking/__init__.py b/skywalking/__init__.py
index 174d816..23fbb7e 100644
--- a/skywalking/__init__.py
+++ b/skywalking/__init__.py
@@ -30,6 +30,8 @@ class Component(Enum):
Django = 7004
Tornado = 7005
Redis = 7
+ KafkaProducer = 40
+ KafkaConsumer = 41
class Layer(Enum):
diff --git a/skywalking/plugins/sw_kafka/__init__.py b/skywalking/plugins/sw_kafka/__init__.py
new file mode 100644
index 0000000..a04feae
--- /dev/null
+++ b/skywalking/plugins/sw_kafka/__init__.py
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+
+from skywalking import Layer, Component
+from skywalking.trace import tags
+from skywalking.trace.carrier import Carrier
+from skywalking.trace.context import get_context
+from skywalking.trace.tags import Tag
+
+logger = logging.getLogger(__name__)
+
+
+def install():
+ # noinspection PyBroadException
+ try:
+ from kafka import KafkaProducer
+ from kafka import KafkaConsumer
+
+ _send = KafkaProducer.send
+ __poll_once = KafkaConsumer._poll_once
+ KafkaProducer.send = _sw_send_func(_send)
+ KafkaConsumer._poll_once = _sw__poll_once_func(__poll_once)
+
+ except Exception:
+ logger.warning('failed to install plugin %s', __name__)
+
+
+def _sw__poll_once_func(__poll_once):
+ def _sw__poll_once(this, timeout_ms, max_records, update_offsets=True):
+ res = __poll_once(this, timeout_ms, max_records, update_offsets=update_offsets)
+ if res:
+ brokers = ";".join(this.config["bootstrap_servers"])
+ context = get_context()
+ topics = ";".join(this._subscription.subscription or
+ [t.topic for t in this._subscription._user_assignment])
+ with context.new_entry_span(
+ op="Kafka/" + topics + "/Consumer/" + (this.config["group_id"] or "")) as span:
+ for consumerRecords in res.values():
+ for record in consumerRecords:
+ carrier = Carrier()
+ for item in carrier:
+ for header in record.headers:
+ if item.key == header[0]:
+ item.val = str(header[1])
+
+ span.extract(carrier)
+ span.tag(Tag(key=tags.MqBroker, val=brokers))
+ span.tag(Tag(key=tags.MqTopic, val=topics))
+ span.layer = Layer.MQ
+ span.component = Component.KafkaConsumer
+
+ return res
+
+ return _sw__poll_once
+
+
+def _sw_send_func(_send):
+ def _sw_send(this, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
+ peer = ";".join(this.config["bootstrap_servers"])
+ context = get_context()
+ carrier = Carrier()
+ with context.new_exit_span(op="Kafka/" + topic + "/Producer" or "/", peer=peer, carrier=carrier) as span:
+ span.layer = Layer.MQ
+ span.component = Component.KafkaProducer
+
+ if headers is None:
+ headers = []
+ for item in carrier:
+ headers.append((item.key, item.val.encode("utf-8")))
+ else:
+ for item in carrier:
+ headers.append((item.key, item.val.encode("utf-8")))
+
+ try:
+ res = _send(this, topic, value=value, key=key, headers=headers, partition=partition,
+ timestamp_ms=timestamp_ms)
+ span.tag(Tag(key=tags.MqBroker, val=peer))
+ span.tag(Tag(key=tags.MqTopic, val=topic))
+ except BaseException as e:
+ span.raised()
+ raise e
+ return res
+
+ return _sw_send
diff --git a/skywalking/trace/tags/__init__.py b/skywalking/trace/tags/__init__.py
index 89a6819..57a389d 100644
--- a/skywalking/trace/tags/__init__.py
+++ b/skywalking/trace/tags/__init__.py
@@ -28,3 +28,6 @@ DbInstance = 'db.instance'
DbStatement = 'db.statement'
DbSqlParameters = 'db.sql.parameters'
HttpParams = 'http.params'
+MqBroker = 'mq.broker'
+MqTopic = 'mq.topic'
+MqQueue = 'mq.queue'
diff --git a/skywalking/trace/tags/__init__.py b/tests/plugin/sw_kafka/__init__.py
similarity index 69%
copy from skywalking/trace/tags/__init__.py
copy to tests/plugin/sw_kafka/__init__.py
index 89a6819..b1312a0 100644
--- a/skywalking/trace/tags/__init__.py
+++ b/tests/plugin/sw_kafka/__init__.py
@@ -14,17 +14,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-from collections import namedtuple
-
-Tag = namedtuple('Tag', 'key val overridable')
-Tag.__new__.__defaults__ = (None, None, False)
-
-HttpUrl = 'url'
-HttpMethod = 'http.method'
-HttpStatus = 'status.code'
-DbType = 'db.type'
-DbInstance = 'db.instance'
-DbStatement = 'db.statement'
-DbSqlParameters = 'db.sql.parameters'
-HttpParams = 'http.params'
diff --git a/tests/plugin/sw_kafka/docker-compose.yml b/tests/plugin/sw_kafka/docker-compose.yml
new file mode 100644
index 0000000..9f81de5
--- /dev/null
+++ b/tests/plugin/sw_kafka/docker-compose.yml
@@ -0,0 +1,103 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+version: '2.1'
+
+services:
+ collector:
+ extends:
+ service: collector
+ file: ../docker/docker-compose.base.yml
+
+ zookeeper-server:
+ image: zookeeper:3.4
+ hostname: zookeeper-server
+ ports:
+ - 2181:2181
+ networks:
+ - beyond
+ healthcheck:
+ test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/2181"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+
+ kafka-server:
+ image: bitnami/kafka:2.1.1
+ hostname: kafka-server
+ ports:
+ - 9092:9092
+ environment:
+ - KAFKA_ZOOKEEPER_CONNECT=zookeeper-server:2181
+ - KAFKA_BROKER_ID=1
+ - ALLOW_PLAINTEXT_LISTENER=yes
+ - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
+ networks:
+ - beyond
+ healthcheck:
+ test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9092"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+ depends_on:
+ zookeeper-server:
+ condition: service_healthy
+
+ producer:
+ extends:
+ service: agent
+ file: ../docker/docker-compose.base.yml
+ ports:
+ - 9090:9090
+ volumes:
+ - ./services/producer.py:/app/producer.py
+ command: ['bash', '-c', 'pip install flask && pip install kafka-python && python3 /app/producer.py']
+ healthcheck:
+ test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9090"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+ depends_on:
+ collector:
+ condition: service_healthy
+ kafka-server:
+ condition: service_healthy
+ consumer:
+ condition: service_healthy
+
+ consumer:
+ extends:
+ service: agent
+ file: ../docker/docker-compose.base.yml
+ ports:
+ - 9091:9091
+ volumes:
+ - ./services/consumer.py:/app/consumer.py
+ command: ['bash', '-c', 'pip install flask && pip install kafka-python && python3 /app/consumer.py']
+ healthcheck:
+ test: ["CMD", "bash", "-c", "ps -ef | grep /app/consumer | grep -v grep"]
+ interval: 5s
+ timeout: 60s
+ retries: 120
+ depends_on:
+ collector:
+ condition: service_healthy
+ kafka-server:
+ condition: service_healthy
+
+networks:
+ beyond:
diff --git a/tests/plugin/sw_kafka/expected.data.yml b/tests/plugin/sw_kafka/expected.data.yml
new file mode 100644
index 0000000..7b2c771
--- /dev/null
+++ b/tests/plugin/sw_kafka/expected.data.yml
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+segmentItems:
+ - serviceName: producer
+ segmentSize: 1
+ segments:
+ - segmentId: not null
+ spans:
+ - operationName: Kafka/skywalking/Producer
+ operationId: 0
+ parentSpanId: 0
+ spanId: 1
+ spanLayer: MQ
+ tags:
+ - key: mq.broker
+ value: 'kafka-server:9092'
+ - key: mq.topic
+ value: skywalking
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 40
+ spanType: Exit
+ peer: kafka-server:9092
+ skipAnalysis: false
+ - operationName: /users
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: Http
+ tags:
+ - key: http.method
+ value: GET
+ - key: url
+ value: http://0.0.0.0:9090/users
+ - key: status.code
+ value: '200'
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 7001
+ spanType: Entry
+ peer: not null
+ skipAnalysis: false
+ - serviceName: consumer
+ segmentSize: 1
+ segments:
+ - segmentId: not null
+ spans:
+ - operationName: Kafka/skywalking/Consumer/skywalking
+ operationId: 0
+ parentSpanId: -1
+ spanId: 0
+ spanLayer: MQ
+ tags:
+ - key: mq.broker
+ value: 'kafka-server:9092'
+ - key: mq.topic
+ value: skywalking
+ refs:
+ - parentEndpoint: Kafka/skywalking/Producer
+ networkAddress: 'kafka-server:9092'
+ refType: CrossProcess
+ parentSpanId: 1
+ parentTraceSegmentId: not null
+ parentServiceInstance: not null
+ parentService: producer
+ traceId: not null
+ startTime: gt 0
+ endTime: gt 0
+ componentId: 41
+ spanType: Entry
+ peer: ''
+ skipAnalysis: false
diff --git a/skywalking/trace/tags/__init__.py b/tests/plugin/sw_kafka/services/__init__.py
similarity index 69%
copy from skywalking/trace/tags/__init__.py
copy to tests/plugin/sw_kafka/services/__init__.py
index 89a6819..b1312a0 100644
--- a/skywalking/trace/tags/__init__.py
+++ b/tests/plugin/sw_kafka/services/__init__.py
@@ -14,17 +14,3 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-from collections import namedtuple
-
-Tag = namedtuple('Tag', 'key val overridable')
-Tag.__new__.__defaults__ = (None, None, False)
-
-HttpUrl = 'url'
-HttpMethod = 'http.method'
-HttpStatus = 'status.code'
-DbType = 'db.type'
-DbInstance = 'db.instance'
-DbStatement = 'db.statement'
-DbSqlParameters = 'db.sql.parameters'
-HttpParams = 'http.params'
diff --git a/skywalking/trace/tags/__init__.py b/tests/plugin/sw_kafka/services/consumer.py
similarity index 55%
copy from skywalking/trace/tags/__init__.py
copy to tests/plugin/sw_kafka/services/consumer.py
index 89a6819..ab242f2 100644
--- a/skywalking/trace/tags/__init__.py
+++ b/tests/plugin/sw_kafka/services/consumer.py
@@ -15,16 +15,24 @@
# limitations under the License.
#
-from collections import namedtuple
+from skywalking import config, agent
-Tag = namedtuple('Tag', 'key val overridable')
-Tag.__new__.__defaults__ = (None, None, False)
+if __name__ == '__main__':
+ config.service_name = 'consumer'
+ config.logging_level = 'INFO'
+ agent.start()
-HttpUrl = 'url'
-HttpMethod = 'http.method'
-HttpStatus = 'status.code'
-DbType = 'db.type'
-DbInstance = 'db.instance'
-DbStatement = 'db.statement'
-DbSqlParameters = 'db.sql.parameters'
-HttpParams = 'http.params'
+ topic = "skywalking"
+ server_list = ["kafka-server:9092"]
+ group_id = "skywalking"
+ client_id = "0"
+
+ from kafka import KafkaConsumer
+ from kafka import TopicPartition
+ consumer = KafkaConsumer(group_id=group_id,
+ client_id=client_id,
+ bootstrap_servers=server_list)
+ partition = TopicPartition(topic, int(client_id))
+ consumer.assign([partition])
+ for msg in consumer:
+ print(msg)
diff --git a/skywalking/trace/tags/__init__.py b/tests/plugin/sw_kafka/services/producer.py
similarity index 56%
copy from skywalking/trace/tags/__init__.py
copy to tests/plugin/sw_kafka/services/producer.py
index 89a6819..e4ee375 100644
--- a/skywalking/trace/tags/__init__.py
+++ b/tests/plugin/sw_kafka/services/producer.py
@@ -15,16 +15,25 @@
# limitations under the License.
#
-from collections import namedtuple
-
-Tag = namedtuple('Tag', 'key val overridable')
-Tag.__new__.__defaults__ = (None, None, False)
-
-HttpUrl = 'url'
-HttpMethod = 'http.method'
-HttpStatus = 'status.code'
-DbType = 'db.type'
-DbInstance = 'db.instance'
-DbStatement = 'db.statement'
-DbSqlParameters = 'db.sql.parameters'
-HttpParams = 'http.params'
+
+from skywalking import agent, config
+
+if __name__ == '__main__':
+ config.service_name = 'producer'
+ config.logging_level = 'INFO'
+ agent.start()
+
+ from flask import Flask, jsonify
+ from kafka import KafkaProducer
+
+ app = Flask(__name__)
+ producer = KafkaProducer(bootstrap_servers=['kafka-server:9092'], api_version=(1, 0, 1))
+
+ @app.route("/users", methods=["POST", "GET"])
+ def application():
+ producer.send('skywalking', b'some_message_bytes')
+
+ return jsonify({"song": "Despacito", "artist": "Luis Fonsi"})
+
+ PORT = 9090
+ app.run(host='0.0.0.0', port=PORT, debug=True)
diff --git a/skywalking/trace/tags/__init__.py b/tests/plugin/sw_kafka/test_kafka.py
similarity index 55%
copy from skywalking/trace/tags/__init__.py
copy to tests/plugin/sw_kafka/test_kafka.py
index 89a6819..f24add7 100644
--- a/skywalking/trace/tags/__init__.py
+++ b/tests/plugin/sw_kafka/test_kafka.py
@@ -15,16 +15,29 @@
# limitations under the License.
#
-from collections import namedtuple
-
-Tag = namedtuple('Tag', 'key val overridable')
-Tag.__new__.__defaults__ = (None, None, False)
-
-HttpUrl = 'url'
-HttpMethod = 'http.method'
-HttpStatus = 'status.code'
-DbType = 'db.type'
-DbInstance = 'db.instance'
-DbStatement = 'db.statement'
-DbSqlParameters = 'db.sql.parameters'
-HttpParams = 'http.params'
+import os
+import time
+import unittest
+from os.path import abspath, dirname
+
+from testcontainers.compose import DockerCompose
+
+from tests.plugin import BasePluginTest
+
+
+class TestPlugin(BasePluginTest):
+ @classmethod
+ def setUpClass(cls):
+ cls.compose = DockerCompose(filepath=dirname(abspath(__file__)))
+ cls.compose.start()
+
+ cls.compose.wait_for(cls.url(('producer', '9090'), 'users'))
+
+ def test_request_plugin(self):
+ time.sleep(3)
+
+ self.validate(expected_file_name=os.path.join(dirname(abspath(__file__)), 'expected.data.yml'))
+
+
+if __name__ == '__main__':
+ unittest.main()