You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2019/07/31 17:55:11 UTC
[qpid-proton] branch master updated: PROTON-2082: [Python] Support
tracing messages using OpenTracing and Jaeger
This is an automated email from the ASF dual-hosted git repository.
astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/master by this push:
new 7afa39b PROTON-2082: [Python] Support tracing messages using OpenTracing and Jaeger
7afa39b is described below
commit 7afa39b83024563f8edabae41a84faaadf128663
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Tue Jul 30 15:39:30 2019 -0400
PROTON-2082: [Python] Support tracing messages using OpenTracing and Jaeger
---
python/CMakeLists.txt | 2 +
python/proton/_tracing.py | 138 ++++++++++++++++++++++++++++++++++++++++++++++
python/proton/tracing.py | 29 ++++++++++
python/setup.py.in | 3 +
4 files changed, 172 insertions(+)
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index a7dfe4d..9040af4 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -74,12 +74,14 @@ set (pysrc
proton/_exceptions.py
proton/_io.py
proton/_message.py
+ proton/_tracing.py
proton/_transport.py
proton/_url.py
proton/_wrapper.py
proton/handlers.py
proton/reactor.py
+ proton/tracing.py
proton/utils.py
proton/_handlers.py
diff --git a/python/proton/_tracing.py b/python/proton/_tracing.py
new file mode 100644
index 0000000..a663bdf
--- /dev/null
+++ b/python/proton/_tracing.py
@@ -0,0 +1,138 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import atexit
+import functools
+import os
+import sys
+import time
+import weakref
+
+try:
+ import opentracing
+ import jaeger_client
+ from opentracing.ext import tags
+ from opentracing.propagation import Format
+except ImportError:
+ raise ImportError('proton tracing requires opentracing and jaeger_client modules')
+
+import proton
+from proton import Sender as ProtonSender
+from proton.handlers import (
+ OutgoingMessageHandler as ProtonOutgoingMessageHandler,
+ IncomingMessageHandler as ProtonIncomingMessageHandler
+)
+
+_tracer = None
+_trace_key = proton.symbol('x-opt-qpid-tracestate')
+
+def get_tracer():
+ global _tracer
+ if _tracer is not None:
+ return _tracer
+ exe = sys.argv[0] if sys.argv[0] else 'interactive-session'
+ return init_tracer(os.path.basename(exe))
+
+def _fini_tracer():
+ time.sleep(1)
+ c = opentracing.global_tracer().close()
+ while not c.done():
+ time.sleep(0.5)
+
+def init_tracer(service_name):
+ global _tracer
+ if _tracer is not None:
+ return _tracer
+
+ config = jaeger_client.Config(
+ config={},
+ service_name=service_name,
+ validate=True
+ )
+ config.initialize_tracer()
+ _tracer = opentracing.global_tracer()
+ # A nasty hack to ensure enough time for the tracing data to be flushed
+ atexit.register(_fini_tracer)
+ return _tracer
+
+
+class IncomingMessageHandler(ProtonIncomingMessageHandler):
+ def on_message(self, event):
+ if self.delegate is not None:
+ tracer = get_tracer()
+ message = event.message
+ receiver = event.receiver
+ connection = event.connection
+ span_tags = {
+ tags.SPAN_KIND: tags.SPAN_KIND_CONSUMER,
+ tags.MESSAGE_BUS_DESTINATION: receiver.source.address,
+ tags.PEER_ADDRESS: connection.connected_address,
+ tags.PEER_HOSTNAME: connection.hostname,
+ 'inserted_by': 'proton-message-tracing'
+ }
+ if message.annotations is not None:
+ headers = message.annotations[_trace_key]
+ span_ctx = tracer.extract(Format.TEXT_MAP, headers)
+ with tracer.start_active_span('amqp-delivery-receive', child_of=span_ctx, tags=span_tags):
+ proton._events._dispatch(self.delegate, 'on_message', event)
+ else:
+ with tracer.start_active_span('amqp-delivery-receive', ignore_active_span=True, tags=span_tags):
+ proton._events._dispatch(self.delegate, 'on_message', event)
+
+class OutgoingMessageHandler(ProtonOutgoingMessageHandler):
+ def on_settled(self, event):
+ if self.delegate is not None:
+ delivery = event.delivery
+ state = delivery.remote_state
+ span = delivery.span
+ span.set_tag('delivery-terminal-state', state.name)
+ span.log_kv({'event': 'delivery settled', 'state': state.name})
+ span.finish()
+ proton._events._dispatch(self.delegate, 'on_settled', event)
+
+class Sender(ProtonSender):
+ def send(self, msg):
+ tracer = get_tracer()
+ connection = self.connection
+ span_tags = {
+ tags.SPAN_KIND: tags.SPAN_KIND_PRODUCER,
+ tags.MESSAGE_BUS_DESTINATION: self.target.address,
+ tags.PEER_ADDRESS: connection.connected_address,
+ tags.PEER_HOSTNAME: connection.hostname,
+ 'inserted_by': 'proton-message-tracing'
+ }
+ span = tracer.start_span('amqp-delivery-send', tags=span_tags)
+ headers = {}
+ tracer.inject(span, Format.TEXT_MAP, headers)
+ if msg.annotations is None:
+ msg.annotations = { _trace_key: headers }
+ else:
+ msg.annotations[_trace_key] = headers
+ delivery = ProtonSender.send(self, msg)
+ delivery.span = span
+ span.set_tag('delivery-tag', delivery.tag)
+ return delivery
+
+# Monkey patch proton for tracing (need to patch both internal and external names)
+proton._handlers.IncomingMessageHandler = IncomingMessageHandler
+proton._handlers.OutgoingMessageHandler = OutgoingMessageHandler
+proton._endpoints.Sender = Sender
+proton.handlers.IncomingMessageHandler = IncomingMessageHandler
+proton.handlers.OutgoingMessageHandler = OutgoingMessageHandler
+proton.Sender = Sender
diff --git a/python/proton/tracing.py b/python/proton/tracing.py
new file mode 100644
index 0000000..a9e6e15
--- /dev/null
+++ b/python/proton/tracing.py
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+from ._tracing import (
+ get_tracer, init_tracer
+)
+
+__all__ = [
+ 'get_tracer',
+ 'init_tracer'
+]
diff --git a/python/setup.py.in b/python/setup.py.in
index 76434c3..ad7258c 100644
--- a/python/setup.py.in
+++ b/python/setup.py.in
@@ -336,6 +336,9 @@ setup(name='python-qpid-proton',
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6"],
cmdclass=cmdclass,
+ extras_require={
+ 'opentracing': ['opentracing', 'jaeger_client']
+ },
# Note well: the following extension instance is modified during the
# installation! If you make changes below, you may need to update the
# Configure class above
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org