You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2021/02/12 22:50:48 UTC
[pulsar-dotpulsar] branch master updated: Implementing tracing
using the ActivitySource and Activity. This is still work in progress.
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5ab2a73 Implementing tracing using the ActivitySource and Activity. This is still work in progress.
5ab2a73 is described below
commit 5ab2a7326f45f9b535cf6c4a284e577d3032de7c
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Fri Feb 12 23:50:34 2021 +0100
Implementing tracing using the ActivitySource and Activity. This is still work in progress.
---
src/DotPulsar/Abstractions/IConsumer.cs | 12 +++-
src/DotPulsar/Abstractions/IProducer.cs | 7 +-
src/DotPulsar/Abstractions/IPulsarClient.cs | 5 ++
src/DotPulsar/Abstractions/IReader.cs | 7 +-
src/DotPulsar/DotPulsar.csproj | 9 +++
src/DotPulsar/Extensions/ConsumerExtensions.cs | 74 ++++++++++++++++++++++
src/DotPulsar/Internal/Constants.cs | 13 ++--
src/DotPulsar/Internal/Consumer.cs | 6 ++
.../DotPulsarActivitySource.cs} | 19 +++---
src/DotPulsar/Internal/Producer.cs | 3 +
src/DotPulsar/Internal/PulsarClientBuilder.cs | 4 +-
src/DotPulsar/Internal/Reader.cs | 3 +
src/DotPulsar/PulsarClient.cs | 15 +++--
13 files changed, 154 insertions(+), 23 deletions(-)
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs
index 7fb4e16..976f54b 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -35,7 +35,17 @@ namespace DotPulsar.Abstractions
ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken = default);
/// <summary>
- /// The topic of the consumer.
+ /// The consumer's service url.
+ /// </summary>
+ public Uri ServiceUrl { get; }
+
+ /// <summary>
+ /// The consumer's subscription name.
+ /// </summary>
+ public string SubscriptionName { get; }
+
+ /// <summary>
+ /// The consumer's topic.
/// </summary>
string Topic { get; }
diff --git a/src/DotPulsar/Abstractions/IProducer.cs b/src/DotPulsar/Abstractions/IProducer.cs
index 27c59ee..baf3ca4 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Abstractions/IProducer.cs
@@ -22,7 +22,12 @@ namespace DotPulsar.Abstractions
public interface IProducer : ISend, IState<ProducerState>, IAsyncDisposable
{
/// <summary>
- /// The topic of the producer.
+ /// The producer's service url.
+ /// </summary>
+ public Uri ServiceUrl { get; }
+
+ /// <summary>
+ /// The producer's topic.
/// </summary>
string Topic { get; }
}
diff --git a/src/DotPulsar/Abstractions/IPulsarClient.cs b/src/DotPulsar/Abstractions/IPulsarClient.cs
index ddce2e8..7189257 100644
--- a/src/DotPulsar/Abstractions/IPulsarClient.cs
+++ b/src/DotPulsar/Abstractions/IPulsarClient.cs
@@ -35,5 +35,10 @@ namespace DotPulsar.Abstractions
/// Create a reader.
/// </summary>
IReader CreateReader(ReaderOptions options);
+
+ /// <summary>
+ /// The client's service url.
+ /// </summary>
+ public Uri ServiceUrl { get; }
}
}
diff --git a/src/DotPulsar/Abstractions/IReader.cs b/src/DotPulsar/Abstractions/IReader.cs
index 6b4207b..7962d54 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -22,7 +22,12 @@ namespace DotPulsar.Abstractions
public interface IReader : IGetLastMessageId, IReceive, ISeek, IState<ReaderState>, IAsyncDisposable
{
/// <summary>
- /// The topic of the reader.
+ /// The reader's service url.
+ /// </summary>
+ public Uri ServiceUrl { get; }
+
+ /// <summary>
+ /// The reader's topic.
/// </summary>
string Topic { get; }
}
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 0e84b71..b22d835 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -30,6 +30,15 @@
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="5.0.0" />
<PackageReference Include="Microsoft.Bcl.HashCode" Version="1.1.1" />
+ <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.1" />
+ </ItemGroup>
+
+ <ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.1'">
+ <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.1" />
+ </ItemGroup>
+
+ <ItemGroup Condition="'$(TargetFramework)' == 'netcoreapp3.1'">
+ <PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.1" />
</ItemGroup>
</Project>
diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs b/src/DotPulsar/Extensions/ConsumerExtensions.cs
index b61971c..63b8d49 100644
--- a/src/DotPulsar/Extensions/ConsumerExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs
@@ -15,6 +15,10 @@
namespace DotPulsar.Extensions
{
using DotPulsar.Abstractions;
+ using DotPulsar.Internal;
+ using System;
+ using System.Collections.Generic;
+ using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
@@ -36,6 +40,76 @@ namespace DotPulsar.Extensions
=> await consumer.AcknowledgeCumulative(message.MessageId, cancellationToken).ConfigureAwait(false);
/// <summary>
+ /// Process and auto-acknowledge a message.
+ /// </summary>
+ public static async ValueTask Process(this IConsumer consumer, Func<Message, CancellationToken, ValueTask> processor, CancellationToken cancellationToken = default) // TODO Allow user to set number of workers
+ {
+ const string operation = "process";
+ var operationName = $"{consumer.Topic} {operation}";
+
+ var tags = new List<KeyValuePair<string, object?>>
+ {
+ new KeyValuePair<string, object?>("messaging.destination", consumer.Topic),
+ new KeyValuePair<string, object?>("messaging.destination_kind", "topic"),
+ new KeyValuePair<string, object?>("messaging.operation", operation),
+ new KeyValuePair<string, object?>("messaging.system", "pulsar"),
+ new KeyValuePair<string, object?>("messaging.url", consumer.ServiceUrl),
+ new KeyValuePair<string, object?>("messaging.pulsar.subscription", consumer.SubscriptionName) // TODO Ask Pulsar community to define Pulsar specific tags
+ };
+
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ var message = await consumer.Receive(cancellationToken);
+
+ var activity = StartActivity(message, operationName, tags);
+
+ if (activity is not null && activity.IsAllDataRequested)
+ {
+ activity.SetTag("messaging.message_id", message.MessageId.ToString());
+ activity.SetTag("messaging.message_payload_size_bytes", message.Data.Length);
+ }
+
+ try
+ {
+ await processor(message, cancellationToken);
+ }
+ catch
+ {
+ // Ignore
+ }
+
+ activity?.Dispose();
+
+ await consumer.Acknowledge(message.MessageId, cancellationToken);
+ }
+ }
+
+ private static Activity? StartActivity(Message message, string operationName, IEnumerable<KeyValuePair<string, object?>> tags)
+ {
+ if (!DotPulsarActivitySource.ActivitySource.HasListeners())
+ return null;
+
+ var properties = message.Properties;
+
+ if (properties.TryGetValue("traceparent", out var traceparent)) // TODO Allow the user to overwrite the keys 'traceparent' and 'tracestate'
+ {
+ var tracestate = properties.ContainsKey("tracestate") ? properties["tracestrate"] : null;
+ if (ActivityContext.TryParse(traceparent, tracestate, out var activityContext))
+ return DotPulsarActivitySource.ActivitySource.StartActivity(operationName, ActivityKind.Consumer, activityContext, tags);
+ }
+
+ var activity = DotPulsarActivitySource.ActivitySource.StartActivity(operationName, ActivityKind.Consumer);
+
+ if (activity is not null && activity.IsAllDataRequested)
+ {
+ foreach (var tag in tags)
+ activity.SetTag(tag.Key, tag.Value);
+ }
+
+ return activity;
+ }
+
+ /// <summary>
/// Wait for the state to change to a specific state.
/// </summary>
/// <returns>
diff --git a/src/DotPulsar/Internal/Constants.cs b/src/DotPulsar/Internal/Constants.cs
index abc1f75..9ee752b 100644
--- a/src/DotPulsar/Internal/Constants.cs
+++ b/src/DotPulsar/Internal/Constants.cs
@@ -15,18 +15,22 @@
namespace DotPulsar.Internal
{
using System;
- using System.Reflection;
public static class Constants
{
static Constants()
{
- var assemblyName = Assembly.GetCallingAssembly().GetName();
+ var assembly = typeof(Constants).Assembly;
+ var assemblyName = assembly.GetName();
+ if (assemblyName.Name is null)
+ throw new Exception($"Assembly name of {assembly.FullName} is null");
+
var assemblyVersion = assemblyName.Version;
if (assemblyVersion is null)
- throw new Exception("Assembly version of CallingAssembly is null");
+ throw new Exception($"Assembly version of {assembly.FullName} is null");
- ClientVersion = $"{assemblyName.Name} {assemblyVersion.ToString(3)}";
+ ClientName = assemblyName.Name;
+ ClientVersion = assemblyVersion.ToString(3);
ProtocolVersion = 14;
PulsarScheme = "pulsar";
PulsarSslScheme = "pulsar+ssl";
@@ -37,6 +41,7 @@ namespace DotPulsar.Internal
MetadataOffset = 10;
}
+ public static string ClientName { get; }
public static string ClientVersion { get; }
public static int ProtocolVersion { get; }
public static string PulsarScheme { get; }
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index d263df2..c29db02 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -37,10 +37,14 @@ namespace DotPulsar.Internal
private readonly IStateChanged<ConsumerState> _state;
private int _isDisposed;
+ public Uri ServiceUrl { get; }
+ public string SubscriptionName { get; }
public string Topic { get; }
public Consumer(
Guid correlationId,
+ Uri serviceUrl,
+ string subscriptionName,
string topic,
IRegisterEvent eventRegister,
IConsumerChannel initialChannel,
@@ -48,6 +52,8 @@ namespace DotPulsar.Internal
IStateChanged<ConsumerState> state)
{
_correlationId = correlationId;
+ ServiceUrl = serviceUrl;
+ SubscriptionName = subscriptionName;
Topic = topic;
_eventRegister = eventRegister;
_channel = initialChannel;
diff --git a/src/DotPulsar/Abstractions/IProducer.cs b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
similarity index 64%
copy from src/DotPulsar/Abstractions/IProducer.cs
copy to src/DotPulsar/Internal/DotPulsarActivitySource.cs
index 27c59ee..09c8464 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Internal/DotPulsarActivitySource.cs
@@ -12,18 +12,17 @@
* limitations under the License.
*/
-namespace DotPulsar.Abstractions
+namespace DotPulsar.Internal
{
- using System;
+ using System.Diagnostics;
- /// <summary>
- /// A producer abstraction.
- /// </summary>
- public interface IProducer : ISend, IState<ProducerState>, IAsyncDisposable
+ public static class DotPulsarActivitySource
{
- /// <summary>
- /// The topic of the producer.
- /// </summary>
- string Topic { get; }
+ static DotPulsarActivitySource()
+ {
+ ActivitySource = new ActivitySource(Constants.ClientName, Constants.ClientVersion);
+ }
+
+ public static ActivitySource ActivitySource { get; }
}
}
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 12e4625..02f9bd3 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -36,10 +36,12 @@ namespace DotPulsar.Internal
private readonly SequenceId _sequenceId;
private int _isDisposed;
+ public Uri ServiceUrl { get; }
public string Topic { get; }
public Producer(
Guid correlationId,
+ Uri serviceUrl,
string topic,
ulong initialSequenceId,
IRegisterEvent registerEvent,
@@ -50,6 +52,7 @@ namespace DotPulsar.Internal
var messageMetadataPolicy = new DefaultPooledObjectPolicy<PulsarApi.MessageMetadata>();
_messageMetadataPool = new DefaultObjectPool<PulsarApi.MessageMetadata>(messageMetadataPolicy);
_correlationId = correlationId;
+ ServiceUrl = serviceUrl;
Topic = topic;
_sequenceId = new SequenceId(initialSequenceId);
_eventRegister = registerEvent;
diff --git a/src/DotPulsar/Internal/PulsarClientBuilder.cs b/src/DotPulsar/Internal/PulsarClientBuilder.cs
index 88c1245..120d5e5 100644
--- a/src/DotPulsar/Internal/PulsarClientBuilder.cs
+++ b/src/DotPulsar/Internal/PulsarClientBuilder.cs
@@ -41,7 +41,7 @@ namespace DotPulsar.Internal
_commandConnect = new CommandConnect
{
ProtocolVersion = Constants.ProtocolVersion,
- ClientVersion = Constants.ClientVersion
+ ClientVersion = $"{Constants.ClientName} {Constants.ClientVersion}"
};
_exceptionHandlers = new List<IHandleException>();
@@ -157,7 +157,7 @@ namespace DotPulsar.Internal
var exceptionHandlers = new List<IHandleException>(_exceptionHandlers) { new DefaultExceptionHandler(_retryInterval) };
var exceptionHandlerPipeline = new ExceptionHandlerPipeline(exceptionHandlers);
- return new PulsarClient(connectionPool, processManager, exceptionHandlerPipeline);
+ return new PulsarClient(connectionPool, processManager, exceptionHandlerPipeline, _serviceUrl);
}
}
}
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index ccbea59..731e4cd 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -32,10 +32,12 @@ namespace DotPulsar.Internal
private readonly IStateChanged<ReaderState> _state;
private int _isDisposed;
+ public Uri ServiceUrl { get; }
public string Topic { get; }
public Reader(
Guid correlationId,
+ Uri serviceUrl,
string topic,
IRegisterEvent eventRegister,
IConsumerChannel initialChannel,
@@ -43,6 +45,7 @@ namespace DotPulsar.Internal
IStateChanged<ReaderState> state)
{
_correlationId = correlationId;
+ ServiceUrl = serviceUrl;
Topic = topic;
_eventRegister = eventRegister;
_channel = initialChannel;
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index e2c0577..c18c7d8 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -32,11 +32,18 @@ namespace DotPulsar
private readonly IHandleException _exceptionHandler;
private int _isDisposed;
- internal PulsarClient(IConnectionPool connectionPool, ProcessManager processManager, IHandleException exceptionHandler)
+ public Uri ServiceUrl { get; }
+
+ internal PulsarClient(
+ IConnectionPool connectionPool,
+ ProcessManager processManager,
+ IHandleException exceptionHandler,
+ Uri serviceUrl)
{
_connectionPool = connectionPool;
_processManager = processManager;
_exceptionHandler = exceptionHandler;
+ ServiceUrl = serviceUrl;
_isDisposed = 0;
DotPulsarEventSource.Log.ClientCreated();
}
@@ -57,7 +64,7 @@ namespace DotPulsar
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
- var producer = new Producer(correlationId, options.Topic, options.InitialSequenceId, _processManager, new NotReadyChannel(), executor, stateManager);
+ var producer = new Producer(correlationId, ServiceUrl, options.Topic, options.InitialSequenceId, _processManager, new NotReadyChannel(), executor, stateManager);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorProducer(producer, options.StateChangedHandler);
var process = new ProducerProcess(correlationId, stateManager, factory, producer);
@@ -76,7 +83,7 @@ namespace DotPulsar
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
- var consumer = new Consumer(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
+ var consumer = new Consumer(correlationId, ServiceUrl, options.SubscriptionName, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler);
var process = new ConsumerProcess(correlationId, stateManager, factory, consumer, options.SubscriptionType == SubscriptionType.Failover);
@@ -95,7 +102,7 @@ namespace DotPulsar
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var factory = new ReaderChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
- var reader = new Reader(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
+ var reader = new Reader(correlationId, ServiceUrl, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
if (options.StateChangedHandler is not null)
_ = StateMonitor.MonitorReader(reader, options.StateChangedHandler);
var process = new ReaderProcess(correlationId, stateManager, factory, reader);