You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2022/02/03 16:29:16 UTC
[pulsar-dotpulsar] branch master updated: Adding some extension methods and moving certain exceptions to 'public' in order to create a lean and correct Processing sample (without warnings)
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fabc9e3 Adding some extension methods and moving certain exceptions to 'public' in order to create a lean and correct Processing sample (without warnings)
fabc9e3 is described below
commit fabc9e34138b83280306b64cfc0003e7d5faf013
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Thu Feb 3 17:29:07 2022 +0100
Adding some extension methods and moving certain exceptions to 'public' in order to create a lean and correct Processing sample (without warnings)
---
samples/Processing/LoggerExtensions.cs | 65 ++++++++++++++++++++++
samples/Processing/Processing.csproj | 2 +-
samples/Processing/Worker.cs | 25 ++-------
.../Exceptions/ChannelNotReadyException.cs | 6 +-
.../Exceptions/ConsumerNotFoundException.cs | 4 +-
.../Exceptions/ServiceNotReadyException.cs | 4 +-
.../Exceptions/TooManyRequestsException.cs | 4 +-
.../Extensions/ConsumerBuilderExtensions.cs | 12 ++++
.../Extensions/ProducerBuilderExtensions.cs | 12 ++++
.../Extensions/ReaderBuilderExtensions.cs | 12 ++++
src/DotPulsar/Internal/NotReadyChannel.cs | 2 +-
11 files changed, 113 insertions(+), 35 deletions(-)
diff --git a/samples/Processing/LoggerExtensions.cs b/samples/Processing/LoggerExtensions.cs
new file mode 100644
index 0000000..9c46547
--- /dev/null
+++ b/samples/Processing/LoggerExtensions.cs
@@ -0,0 +1,65 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Processing;
+
+using DotPulsar;
+using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
+
+#pragma warning disable IDE0079 // Remove unnecessary suppression... Ehm... *sigh*
+#pragma warning disable IDE0060 // Remove unused parameter... Why Microsoft? Why do you force me to do this?
+
+public static partial class LoggerExtensions
+{
+ // ConsumerChangedState
+ public static void ConsumerChangedState(this ILogger logger, ConsumerStateChanged stateChanged)
+ {
+ var logLevel = stateChanged.ConsumerState switch
+ {
+ ConsumerState.Disconnected => LogLevel.Warning,
+ ConsumerState.Faulted => LogLevel.Error,
+ _ => LogLevel.Information
+ };
+
+ logger.ConsumerChangedState(logLevel, stateChanged.Consumer.Topic, stateChanged.ConsumerState.ToString());
+ }
+
+ [LoggerMessage(EventId = 0, Message = "The consumer for topic '{topic}' changed state to '{state}'")]
+ static partial void ConsumerChangedState(this ILogger logger, LogLevel logLevel, string topic, string state);
+
+ // OutputMessage
+ public static void OutputMessage(this ILogger logger, IMessage<string> message)
+ {
+ var publishedOn = message.PublishTimeAsDateTime;
+ var payload = message.Value();
+ logger.OutputMessage(publishedOn, payload);
+ }
+
+ [LoggerMessage(EventId = 1, Level = LogLevel.Information, Message = "Received: '{payload}' published on {publishedOn}")]
+ static partial void OutputMessage(this ILogger logger, DateTime publishedOn, string payload);
+
+ // PulsarClientException
+ public static void PulsarClientException(this ILogger logger, ExceptionContext exceptionContext)
+ {
+ if (exceptionContext.Exception is not ChannelNotReadyException)
+ logger.PulsarClientException(exceptionContext.Exception);
+ }
+
+ [LoggerMessage(EventId = 2, Level = LogLevel.Warning, Message = "The PulsarClient got an exception")]
+ static partial void PulsarClientException(this ILogger logger, Exception exception);
+}
+
+#pragma warning restore IDE0060 // Remove unused parameter
+#pragma warning restore IDE0079 // Remove unnecessary suppression
diff --git a/samples/Processing/Processing.csproj b/samples/Processing/Processing.csproj
index b5ec451..ad4d9a8 100644
--- a/samples/Processing/Processing.csproj
+++ b/samples/Processing/Processing.csproj
@@ -1,4 +1,4 @@
-<Project Sdk="Microsoft.NET.Sdk.Worker">
+<Project Sdk="Microsoft.NET.Sdk.Worker">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
diff --git a/samples/Processing/Worker.cs b/samples/Processing/Worker.cs
index d4d639d..e753b52 100644
--- a/samples/Processing/Worker.cs
+++ b/samples/Processing/Worker.cs
@@ -26,10 +26,12 @@ public class Worker : BackgroundService
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
- await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
+ await using var client = PulsarClient.Builder()
+ .ExceptionHandler(context => _logger.PulsarClientException(context))
+ .Build(); //Connecting to pulsar://localhost:6650
await using var consumer = client.NewConsumer(Schema.String)
- .StateChangedHandler(Monitor, cancellationToken)
+ .StateChangedHandler(consumerStateChanged => _logger.ConsumerChangedState(consumerStateChanged))
.SubscriptionName("MySubscription")
.Topic("persistent://public/default/mytopic")
.Create();
@@ -39,24 +41,7 @@ public class Worker : BackgroundService
private ValueTask ProcessMessage(IMessage<string> message, CancellationToken cancellationToken)
{
- _logger.LogInformation($"Received: {message.Value()}");
+ _logger.OutputMessage(message);
return ValueTask.CompletedTask;
}
-
- private void Monitor(ConsumerStateChanged stateChanged, CancellationToken cancellationToken)
- {
- var stateMessage = stateChanged.ConsumerState switch
- {
- ConsumerState.Active => "is active",
- ConsumerState.Inactive => "is inactive",
- ConsumerState.Disconnected => "is disconnected",
- ConsumerState.Closed => "has closed",
- ConsumerState.ReachedEndOfTopic => "has reached end of topic",
- ConsumerState.Faulted => "has faulted",
- _ => $"has an unknown state '{stateChanged.ConsumerState}'"
- };
-
- var topic = stateChanged.Consumer.Topic;
- _logger.LogInformation($"The consumer for topic '{topic}' {stateMessage}");
- }
}
diff --git a/src/DotPulsar/Internal/Exceptions/ChannelNotReadyException.cs b/src/DotPulsar/Exceptions/ChannelNotReadyException.cs
similarity index 80%
rename from src/DotPulsar/Internal/Exceptions/ChannelNotReadyException.cs
rename to src/DotPulsar/Exceptions/ChannelNotReadyException.cs
index 599c0b2..712734b 100644
--- a/src/DotPulsar/Internal/Exceptions/ChannelNotReadyException.cs
+++ b/src/DotPulsar/Exceptions/ChannelNotReadyException.cs
@@ -12,11 +12,9 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal.Exceptions;
-
-using DotPulsar.Exceptions;
+namespace DotPulsar.Exceptions;
public sealed class ChannelNotReadyException : DotPulsarException
{
- public ChannelNotReadyException() : base("The service is not ready yet") { }
+ public ChannelNotReadyException() : base("The channnel is not ready yet") { }
}
diff --git a/src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs b/src/DotPulsar/Exceptions/ConsumerNotFoundException.cs
similarity index 91%
rename from src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs
rename to src/DotPulsar/Exceptions/ConsumerNotFoundException.cs
index be58640..f66432b 100644
--- a/src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs
+++ b/src/DotPulsar/Exceptions/ConsumerNotFoundException.cs
@@ -12,9 +12,7 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal.Exceptions;
-
-using DotPulsar.Exceptions;
+namespace DotPulsar.Exceptions;
public sealed class ConsumerNotFoundException : DotPulsarException
{
diff --git a/src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs b/src/DotPulsar/Exceptions/ServiceNotReadyException.cs
similarity index 90%
rename from src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs
rename to src/DotPulsar/Exceptions/ServiceNotReadyException.cs
index 35e9a51..053aed7 100644
--- a/src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs
+++ b/src/DotPulsar/Exceptions/ServiceNotReadyException.cs
@@ -12,9 +12,7 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal.Exceptions;
-
-using DotPulsar.Exceptions;
+namespace DotPulsar.Exceptions;
public sealed class ServiceNotReadyException : DotPulsarException
{
diff --git a/src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs b/src/DotPulsar/Exceptions/TooManyRequestsException.cs
similarity index 90%
rename from src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs
rename to src/DotPulsar/Exceptions/TooManyRequestsException.cs
index 22ef3ae..0d67632 100644
--- a/src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs
+++ b/src/DotPulsar/Exceptions/TooManyRequestsException.cs
@@ -12,9 +12,7 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal.Exceptions;
-
-using DotPulsar.Exceptions;
+namespace DotPulsar.Exceptions;
public sealed class TooManyRequestsException : DotPulsarException
{
diff --git a/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs b/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
index 7ab0354..5cecd16 100644
--- a/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
@@ -30,6 +30,18 @@ public static class ConsumerBuilderExtensions
/// </summary>
public static IConsumerBuilder<TMessage> StateChangedHandler<TMessage>(
this IConsumerBuilder<TMessage> builder,
+ Action<ConsumerStateChanged> handler)
+ {
+ void forwarder(ConsumerStateChanged consumerStateChanged, CancellationToken _) => handler(consumerStateChanged);
+ builder.StateChangedHandler(new ActionStateChangedHandler<ConsumerStateChanged>(forwarder, default));
+ return builder;
+ }
+
+ /// <summary>
+ /// Register a state changed handler.
+ /// </summary>
+ public static IConsumerBuilder<TMessage> StateChangedHandler<TMessage>(
+ this IConsumerBuilder<TMessage> builder,
Action<ConsumerStateChanged, CancellationToken> handler,
CancellationToken cancellationToken = default)
{
diff --git a/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs b/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs
index 907e770..11cc6c7 100644
--- a/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs
+++ b/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs
@@ -30,6 +30,18 @@ public static class ProducerBuilderExtensions
/// </summary>
public static IProducerBuilder<TMessage> StateChangedHandler<TMessage>(
this IProducerBuilder<TMessage> builder,
+ Action<ProducerStateChanged> handler)
+ {
+ void forwarder(ProducerStateChanged producerStateChanged, CancellationToken _) => handler(producerStateChanged);
+ builder.StateChangedHandler(new ActionStateChangedHandler<ProducerStateChanged>(forwarder, default));
+ return builder;
+ }
+
+ /// <summary>
+ /// Register a state changed handler.
+ /// </summary>
+ public static IProducerBuilder<TMessage> StateChangedHandler<TMessage>(
+ this IProducerBuilder<TMessage> builder,
Action<ProducerStateChanged, CancellationToken> handler,
CancellationToken cancellationToken = default)
{
diff --git a/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs b/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs
index 32f7505..555a5af 100644
--- a/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs
+++ b/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs
@@ -30,6 +30,18 @@ public static class ReaderBuilderExtensions
/// </summary>
public static IReaderBuilder<TMessage> StateChangedHandler<TMessage>(
this IReaderBuilder<TMessage> builder,
+ Action<ReaderStateChanged> handler)
+ {
+ void forwarder(ReaderStateChanged readerStateChanged, CancellationToken _) => handler(readerStateChanged);
+ builder.StateChangedHandler(new ActionStateChangedHandler<ReaderStateChanged>(forwarder, default));
+ return builder;
+ }
+
+ /// <summary>
+ /// Register a state changed handler.
+ /// </summary>
+ public static IReaderBuilder<TMessage> StateChangedHandler<TMessage>(
+ this IReaderBuilder<TMessage> builder,
Action<ReaderStateChanged, CancellationToken> handler,
CancellationToken cancellationToken = default)
{
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs b/src/DotPulsar/Internal/NotReadyChannel.cs
index da1a7af..a3c990e 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -16,7 +16,7 @@ namespace DotPulsar.Internal;
using Abstractions;
using DotPulsar.Abstractions;
-using Exceptions;
+using DotPulsar.Exceptions;
using PulsarApi;
using System;
using System.Buffers;