You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2019/07/31 17:55:11 UTC

[qpid-proton] branch master updated: PROTON-2082: [Python] Support tracing messages using OpenTracing and Jaeger

This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/master by this push:
     new 7afa39b  PROTON-2082: [Python] Support tracing messages using OpenTracing and Jaeger
7afa39b is described below

commit 7afa39b83024563f8edabae41a84faaadf128663
Author: Andrew Stitcher <as...@apache.org>
AuthorDate: Tue Jul 30 15:39:30 2019 -0400

    PROTON-2082: [Python] Support tracing messages using OpenTracing and Jaeger
---
 python/CMakeLists.txt     |   2 +
 python/proton/_tracing.py | 138 ++++++++++++++++++++++++++++++++++++++++++++++
 python/proton/tracing.py  |  29 ++++++++++
 python/setup.py.in        |   3 +
 4 files changed, 172 insertions(+)

diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index a7dfe4d..9040af4 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -74,12 +74,14 @@ set (pysrc
     proton/_exceptions.py
     proton/_io.py
     proton/_message.py
+    proton/_tracing.py
     proton/_transport.py
     proton/_url.py
     proton/_wrapper.py
 
     proton/handlers.py
     proton/reactor.py
+    proton/tracing.py
     proton/utils.py
 
     proton/_handlers.py
diff --git a/python/proton/_tracing.py b/python/proton/_tracing.py
new file mode 100644
index 0000000..a663bdf
--- /dev/null
+++ b/python/proton/_tracing.py
@@ -0,0 +1,138 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import atexit
+import functools
+import os
+import sys
+import time
+import weakref
+
+try:
+    import opentracing
+    import jaeger_client
+    from opentracing.ext import tags
+    from opentracing.propagation import Format
+except ImportError:
+    raise ImportError('proton tracing requires opentracing and jaeger_client modules')
+
+import proton
+from proton import Sender as ProtonSender
+from proton.handlers import (
+    OutgoingMessageHandler as ProtonOutgoingMessageHandler,
+    IncomingMessageHandler as ProtonIncomingMessageHandler
+)
+
+_tracer = None
+_trace_key = proton.symbol('x-opt-qpid-tracestate')
+
+def get_tracer():
+    global _tracer
+    if _tracer is not None:
+        return _tracer
+    exe = sys.argv[0] if sys.argv[0] else 'interactive-session'
+    return init_tracer(os.path.basename(exe))
+
+def _fini_tracer():
+    time.sleep(1)
+    c = opentracing.global_tracer().close()
+    while not c.done():
+        time.sleep(0.5)
+
+def init_tracer(service_name):
+    global _tracer
+    if _tracer is not None:
+        return _tracer
+
+    config = jaeger_client.Config(
+        config={},
+        service_name=service_name,
+        validate=True
+    )
+    config.initialize_tracer()
+    _tracer = opentracing.global_tracer()
+    # A nasty hack to ensure enough time for the tracing data to be flushed
+    atexit.register(_fini_tracer)
+    return _tracer
+
+
+class IncomingMessageHandler(ProtonIncomingMessageHandler):
+    def on_message(self, event):
+        if self.delegate is not None:
+            tracer = get_tracer()
+            message = event.message
+            receiver = event.receiver
+            connection = event.connection
+            span_tags = {
+                tags.SPAN_KIND: tags.SPAN_KIND_CONSUMER,
+                tags.MESSAGE_BUS_DESTINATION: receiver.source.address,
+                tags.PEER_ADDRESS: connection.connected_address,
+                tags.PEER_HOSTNAME: connection.hostname,
+                'inserted_by': 'proton-message-tracing'
+            }
+            if message.annotations is not None:
+                headers = message.annotations[_trace_key]
+                span_ctx = tracer.extract(Format.TEXT_MAP, headers)
+                with tracer.start_active_span('amqp-delivery-receive', child_of=span_ctx, tags=span_tags):
+                    proton._events._dispatch(self.delegate, 'on_message', event)
+            else:
+                with tracer.start_active_span('amqp-delivery-receive', ignore_active_span=True, tags=span_tags):
+                    proton._events._dispatch(self.delegate, 'on_message', event)
+
+class OutgoingMessageHandler(ProtonOutgoingMessageHandler):
+    def on_settled(self, event):
+        if self.delegate is not None:
+            delivery = event.delivery
+            state = delivery.remote_state
+            span = delivery.span
+            span.set_tag('delivery-terminal-state', state.name)
+            span.log_kv({'event': 'delivery settled', 'state': state.name})
+            span.finish()
+            proton._events._dispatch(self.delegate, 'on_settled', event)
+
+class Sender(ProtonSender):
+    def send(self, msg):
+        tracer = get_tracer()
+        connection = self.connection
+        span_tags = {
+            tags.SPAN_KIND: tags.SPAN_KIND_PRODUCER,
+            tags.MESSAGE_BUS_DESTINATION: self.target.address,
+            tags.PEER_ADDRESS: connection.connected_address,
+            tags.PEER_HOSTNAME: connection.hostname,
+            'inserted_by': 'proton-message-tracing'
+        }
+        span = tracer.start_span('amqp-delivery-send', tags=span_tags)
+        headers = {}
+        tracer.inject(span, Format.TEXT_MAP, headers)
+        if msg.annotations is None:
+            msg.annotations = { _trace_key: headers }
+        else:
+            msg.annotations[_trace_key] = headers
+        delivery = ProtonSender.send(self, msg)
+        delivery.span = span
+        span.set_tag('delivery-tag', delivery.tag)
+        return delivery
+
+# Monkey patch proton for tracing (need to patch both internal and external names)
+proton._handlers.IncomingMessageHandler = IncomingMessageHandler
+proton._handlers.OutgoingMessageHandler = OutgoingMessageHandler
+proton._endpoints.Sender = Sender
+proton.handlers.IncomingMessageHandler = IncomingMessageHandler
+proton.handlers.OutgoingMessageHandler = OutgoingMessageHandler
+proton.Sender = Sender
diff --git a/python/proton/tracing.py b/python/proton/tracing.py
new file mode 100644
index 0000000..a9e6e15
--- /dev/null
+++ b/python/proton/tracing.py
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+from ._tracing import (
+    get_tracer, init_tracer
+)
+
+__all__ = [
+    'get_tracer',
+    'init_tracer'
+]
diff --git a/python/setup.py.in b/python/setup.py.in
index 76434c3..ad7258c 100644
--- a/python/setup.py.in
+++ b/python/setup.py.in
@@ -336,6 +336,9 @@ setup(name='python-qpid-proton',
                    "Programming Language :: Python :: 3.5",
                    "Programming Language :: Python :: 3.6"],
       cmdclass=cmdclass,
+      extras_require={
+          'opentracing': ['opentracing', 'jaeger_client']
+      },
       # Note well: the following extension instance is modified during the
       # installation!  If you make changes below, you may need to update the
       # Configure class above


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org