You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2021/02/12 22:50:48 UTC

[pulsar-dotpulsar] branch master updated: Implementing tracing using the ActivitySource and Activity. This is still work in progress.

This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ab2a73  Implementing tracing using the ActivitySource and Activity. This is still work in progress.
5ab2a73 is described below

commit 5ab2a7326f45f9b535cf6c4a284e577d3032de7c
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Fri Feb 12 23:50:34 2021 +0100

    Implementing tracing using the ActivitySource and Activity. This is still work in progress.
---
 src/DotPulsar/Abstractions/IConsumer.cs            | 12 +++-
 src/DotPulsar/Abstractions/IProducer.cs            |  7 +-
 src/DotPulsar/Abstractions/IPulsarClient.cs        |  5 ++
 src/DotPulsar/Abstractions/IReader.cs              |  7 +-
 src/DotPulsar/DotPulsar.csproj                     |  9 +++
 src/DotPulsar/Extensions/ConsumerExtensions.cs     | 74 ++++++++++++++++++++++
 src/DotPulsar/Internal/Constants.cs                | 13 ++--
 src/DotPulsar/Internal/Consumer.cs                 |  6 ++
 .../DotPulsarActivitySource.cs}                    | 19 +++---
 src/DotPulsar/Internal/Producer.cs                 |  3 +
 src/DotPulsar/Internal/PulsarClientBuilder.cs      |  4 +-
 src/DotPulsar/Internal/Reader.cs                   |  3 +
 src/DotPulsar/PulsarClient.cs                      | 15 +++--
 13 files changed, 154 insertions(+), 23 deletions(-)

diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs
index 7fb4e16..976f54b 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -35,7 +35,17 @@ namespace DotPulsar.Abstractions
         ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken = default);
 
         /// <summary>
-        /// The topic of the consumer.
+        /// The consumer's service url.
+        /// </summary>
+        public Uri ServiceUrl { get; }
+
+        /// <summary>
+        /// The consumer's subscription name.
+        /// </summary>
+        public string SubscriptionName { get; }
+
+        /// <summary>
+        /// The consumer's topic.
         /// </summary>
         string Topic { get; }
 
diff --git a/src/DotPulsar/Abstractions/IProducer.cs b/src/DotPulsar/Abstractions/IProducer.cs
index 27c59ee..baf3ca4 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Abstractions/IProducer.cs
@@ -22,7 +22,12 @@ namespace DotPulsar.Abstractions
     public interface IProducer : ISend, IState<ProducerState>, IAsyncDisposable
     {
         /// <summary>
-        /// The topic of the producer.
+        /// The producer's service url.
+        /// </summary>
+        public Uri ServiceUrl { get; }
+
+        /// <summary>
+        /// The producer's topic.
         /// </summary>
         string Topic { get; }
     }
diff --git a/src/DotPulsar/Abstractions/IPulsarClient.cs b/src/DotPulsar/Abstractions/IPulsarClient.cs
index ddce2e8..7189257 100644
--- a/src/DotPulsar/Abstractions/IPulsarClient.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClient.cs
@@ -35,5 +35,10 @@ namespace DotPulsar.Abstractions
         /// Create a reader.
         /// </summary>
         IReader CreateReader(ReaderOptions options);
+
+        /// <summary>
+        /// The client's service url.
+        /// </summary>
+        public Uri ServiceUrl { get; }
     }
 }
diff --git a/src/DotPulsar/Abstractions/IReader.cs b/src/DotPulsar/Abstractions/IReader.cs
index 6b4207b..7962d54 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -22,7 +22,12 @@ namespace DotPulsar.Abstractions
     public interface IReader : IGetLastMessageId, IReceive, ISeek, IState<ReaderState>, IAsyncDisposable
     {
         /// <summary>
-        /// The topic of the reader.
+        /// The reader's service url.
+        /// </summary>
+        public Uri ServiceUrl { get; }
+
+        /// <summary>
+        /// The reader's topic.
         /// </summary>
         string Topic { get; }
     }
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 0e84b71..b22d835 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -30,6 +30,15 @@
   <ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
     <PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="5.0.0" />
     <PackageReference Include="Microsoft.Bcl.HashCode" Version="1.1.1" />
+    <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.1" />
+  </ItemGroup>
+
+  <ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.1'">
+    <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.1" />
+  </ItemGroup>
+    
+  <ItemGroup Condition="'$(TargetFramework)' == 'netcoreapp3.1'">
+    <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.1" />
   </ItemGroup>
 
 </Project>
diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs b/src/DotPulsar/Extensions/ConsumerExtensions.cs
index b61971c..63b8d49 100644
--- a/src/DotPulsar/Extensions/ConsumerExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs
@@ -15,6 +15,10 @@
 namespace DotPulsar.Extensions
 {
     using DotPulsar.Abstractions;
+    using DotPulsar.Internal;
+    using System;
+    using System.Collections.Generic;
+    using System.Diagnostics;
     using System.Threading;
     using System.Threading.Tasks;
 
@@ -36,6 +40,76 @@ namespace DotPulsar.Extensions
             => await consumer.AcknowledgeCumulative(message.MessageId, cancellationToken).ConfigureAwait(false);
 
         /// <summary>
+        /// Process and auto-acknowledge a message.
+        /// </summary>
+        public static async ValueTask Process(this IConsumer consumer, Func<Message, CancellationToken, ValueTask> processor, CancellationToken cancellationToken = default)  // TODO Allow user to set number of workers
+        {
+            const string operation = "process";
+            var operationName = $"{consumer.Topic} {operation}";
+
+            var tags = new List<KeyValuePair<string, object?>>
+            {
+                new KeyValuePair<string, object?>("messaging.destination", consumer.Topic),
+                new KeyValuePair<string, object?>("messaging.destination_kind", "topic"),
+                new KeyValuePair<string, object?>("messaging.operation", operation),
+                new KeyValuePair<string, object?>("messaging.system", "pulsar"),
+                new KeyValuePair<string, object?>("messaging.url", consumer.ServiceUrl),
+                new KeyValuePair<string, object?>("messaging.pulsar.subscription", consumer.SubscriptionName) // TODO Ask Pulsar community to define Pulsar specific tags
+            };
+
+            while (!cancellationToken.IsCancellationRequested)
+            {
+                var message = await consumer.Receive(cancellationToken);
+
+                var activity = StartActivity(message, operationName, tags);
+
+                if (activity is not null && activity.IsAllDataRequested)
+                {
+                    activity.SetTag("messaging.message_id", message.MessageId.ToString());
+                    activity.SetTag("messaging.message_payload_size_bytes", message.Data.Length);
+                }
+
+                try
+                {
+                    await processor(message, cancellationToken);
+                }
+                catch
+                {
+                    // Ignore
+                }
+
+                activity?.Dispose();
+
+                await consumer.Acknowledge(message.MessageId, cancellationToken);
+            }
+        }
+
+        private static Activity? StartActivity(Message message, string operationName, IEnumerable<KeyValuePair<string, object?>> tags)
+        {
+            if (!DotPulsarActivitySource.ActivitySource.HasListeners())
+                return null;
+
+            var properties = message.Properties;
+
+            if (properties.TryGetValue("traceparent", out var traceparent))  // TODO Allow the user to overwrite the keys 'traceparent' and 'tracestate'
+            {
+                var tracestate = properties.ContainsKey("tracestate") ? properties["tracestrate"] : null;
+                if (ActivityContext.TryParse(traceparent, tracestate, out var activityContext))
+                    return DotPulsarActivitySource.ActivitySource.StartActivity(operationName, ActivityKind.Consumer, activityContext, tags);
+            }
+
+            var activity = DotPulsarActivitySource.ActivitySource.StartActivity(operationName, ActivityKind.Consumer);
+
+            if (activity is not null && activity.IsAllDataRequested)
+            {
+                foreach (var tag in tags)
+                    activity.SetTag(tag.Key, tag.Value);
+            }
+
+            return activity;
+        }
+
+        /// <summary>
         /// Wait for the state to change to a specific state.
         /// </summary>
         /// <returns>
diff --git a/src/DotPulsar/Internal/Constants.cs b/src/DotPulsar/Internal/Constants.cs
index abc1f75..9ee752b 100644
--- a/src/DotPulsar/Internal/Constants.cs
+++ b/src/DotPulsar/Internal/Constants.cs
@@ -15,18 +15,22 @@
 namespace DotPulsar.Internal
 {
     using System;
-    using System.Reflection;
 
     public static class Constants
     {
         static Constants()
         {
-            var assemblyName = Assembly.GetCallingAssembly().GetName();
+            var assembly = typeof(Constants).Assembly;
+            var assemblyName = assembly.GetName();
+            if (assemblyName.Name is null)
+                throw new Exception($"Assembly name of {assembly.FullName} is null");
+
             var assemblyVersion = assemblyName.Version;
             if (assemblyVersion is null)
-                throw new Exception("Assembly version of CallingAssembly is null");
+                throw new Exception($"Assembly version of {assembly.FullName} is null");
 
-            ClientVersion = $"{assemblyName.Name} {assemblyVersion.ToString(3)}";
+            ClientName = assemblyName.Name;
+            ClientVersion = assemblyVersion.ToString(3);
             ProtocolVersion = 14;
             PulsarScheme = "pulsar";
             PulsarSslScheme = "pulsar+ssl";
@@ -37,6 +41,7 @@ namespace DotPulsar.Internal
             MetadataOffset = 10;
         }
 
+        public static string ClientName { get; }
         public static string ClientVersion { get; }
         public static int ProtocolVersion { get; }
         public static string PulsarScheme { get; }
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index d263df2..c29db02 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -37,10 +37,14 @@ namespace DotPulsar.Internal
         private readonly IStateChanged<ConsumerState> _state;
         private int _isDisposed;
 
+        public Uri ServiceUrl { get; }
+        public string SubscriptionName { get; }
         public string Topic { get; }
 
         public Consumer(
             Guid correlationId,
+            Uri serviceUrl,
+            string subscriptionName,
             string topic,
             IRegisterEvent eventRegister,
             IConsumerChannel initialChannel,
@@ -48,6 +52,8 @@ namespace DotPulsar.Internal
             IStateChanged<ConsumerState> state)
         {
             _correlationId = correlationId;
+            ServiceUrl = serviceUrl;
+            SubscriptionName = subscriptionName;
             Topic = topic;
             _eventRegister = eventRegister;
             _channel = initialChannel;
diff --git a/src/DotPulsar/Abstractions/IProducer.cs b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
similarity index 64%
copy from src/DotPulsar/Abstractions/IProducer.cs
copy to src/DotPulsar/Internal/DotPulsarActivitySource.cs
index 27c59ee..09c8464 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
@@ -12,18 +12,17 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Abstractions
+namespace DotPulsar.Internal
 {
-    using System;
+    using System.Diagnostics;
 
-    /// <summary>
-    /// A producer abstraction.
-    /// </summary>
-    public interface IProducer : ISend, IState<ProducerState>, IAsyncDisposable
+    public static class DotPulsarActivitySource
     {
-        /// <summary>
-        /// The topic of the producer.
-        /// </summary>
-        string Topic { get; }
+        static DotPulsarActivitySource()
+        {
+            ActivitySource = new ActivitySource(Constants.ClientName, Constants.ClientVersion);
+        }
+
+        public static ActivitySource ActivitySource { get; }
     }
 }
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 12e4625..02f9bd3 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -36,10 +36,12 @@ namespace DotPulsar.Internal
         private readonly SequenceId _sequenceId;
         private int _isDisposed;
 
+        public Uri ServiceUrl { get; }
         public string Topic { get; }
 
         public Producer(
             Guid correlationId,
+            Uri serviceUrl,
             string topic,
             ulong initialSequenceId,
             IRegisterEvent registerEvent,
@@ -50,6 +52,7 @@ namespace DotPulsar.Internal
             var messageMetadataPolicy = new DefaultPooledObjectPolicy<PulsarApi.MessageMetadata>();
             _messageMetadataPool = new DefaultObjectPool<PulsarApi.MessageMetadata>(messageMetadataPolicy);
             _correlationId = correlationId;
+            ServiceUrl = serviceUrl;
             Topic = topic;
             _sequenceId = new SequenceId(initialSequenceId);
             _eventRegister = registerEvent;
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index 88c1245..120d5e5 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -41,7 +41,7 @@ namespace DotPulsar.Internal
             _commandConnect = new CommandConnect
             {
                 ProtocolVersion = Constants.ProtocolVersion,
-                ClientVersion = Constants.ClientVersion
+                ClientVersion = $"{Constants.ClientName} {Constants.ClientVersion}"
             };
 
             _exceptionHandlers = new List<IHandleException>();
@@ -157,7 +157,7 @@ namespace DotPulsar.Internal
             var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) { new DefaultExceptionHandler(_retryInterval) };
             var exceptionHandlerPipeline = new ExceptionHandlerPipeline(exceptionHandlers);
 
-            return new PulsarClient(connectionPool, processManager, exceptionHandlerPipeline);
+            return new PulsarClient(connectionPool, processManager, exceptionHandlerPipeline, _serviceUrl);
         }
     }
 }
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index ccbea59..731e4cd 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -32,10 +32,12 @@ namespace DotPulsar.Internal
         private readonly IStateChanged<ReaderState> _state;
         private int _isDisposed;
 
+        public Uri ServiceUrl { get; }
         public string Topic { get; }
 
         public Reader(
             Guid correlationId,
+            Uri serviceUrl,
             string topic,
             IRegisterEvent eventRegister,
             IConsumerChannel initialChannel,
@@ -43,6 +45,7 @@ namespace DotPulsar.Internal
             IStateChanged<ReaderState> state)
         {
             _correlationId = correlationId;
+            ServiceUrl = serviceUrl;
             Topic = topic;
             _eventRegister = eventRegister;
             _channel = initialChannel;
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index e2c0577..c18c7d8 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -32,11 +32,18 @@ namespace DotPulsar
         private readonly IHandleException _exceptionHandler;
         private int _isDisposed;
 
-        internal PulsarClient(IConnectionPool connectionPool, ProcessManager processManager, IHandleException exceptionHandler)
+        public Uri ServiceUrl { get; }
+
+        internal PulsarClient(
+            IConnectionPool connectionPool,
+            ProcessManager processManager,
+            IHandleException exceptionHandler,
+            Uri serviceUrl)
         {
             _connectionPool = connectionPool;
             _processManager = processManager;
             _exceptionHandler = exceptionHandler;
+            ServiceUrl = serviceUrl;
             _isDisposed = 0;
             DotPulsarEventSource.Log.ClientCreated();
         }
@@ -57,7 +64,7 @@ namespace DotPulsar
             var executor = new Executor(correlationId, _processManager, _exceptionHandler);
             var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
             var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
-            var producer = new Producer(correlationId, options.Topic, options.InitialSequenceId, _processManager, new NotReadyChannel(), executor, stateManager);
+            var producer = new Producer(correlationId, ServiceUrl, options.Topic, options.InitialSequenceId, _processManager, new NotReadyChannel(), executor, stateManager);
             if (options.StateChangedHandler is not null)
                 _ = StateMonitor.MonitorProducer(producer, options.StateChangedHandler);
             var process = new ProducerProcess(correlationId, stateManager, factory, producer);
@@ -76,7 +83,7 @@ namespace DotPulsar
             var executor = new Executor(correlationId, _processManager, _exceptionHandler);
             var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
             var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
-            var consumer = new Consumer(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
+            var consumer = new Consumer(correlationId, ServiceUrl, options.SubscriptionName, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
             if (options.StateChangedHandler is not null)
                 _ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler);
             var process = new ConsumerProcess(correlationId, stateManager, factory, consumer, options.SubscriptionType == SubscriptionType.Failover);
@@ -95,7 +102,7 @@ namespace DotPulsar
             var executor = new Executor(correlationId, _processManager, _exceptionHandler);
             var factory = new ReaderChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
             var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
-            var reader = new Reader(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
+            var reader = new Reader(correlationId, ServiceUrl, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
             if (options.StateChangedHandler is not null)
                 _ = StateMonitor.MonitorReader(reader, options.StateChangedHandler);
             var process = new ReaderProcess(correlationId, stateManager, factory, reader);