You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2022/02/03 16:29:16 UTC

[pulsar-dotpulsar] branch master updated: Adding some extension methods and moving certain exceptions to 'public' in order to create a lean and correct Processing sample (without warnings)

This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fabc9e3  Adding some extension methods and moving certain exceptions to 'public' in order to create a lean and correct Processing sample (without warnings)
fabc9e3 is described below

commit fabc9e34138b83280306b64cfc0003e7d5faf013
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Thu Feb 3 17:29:07 2022 +0100

    Adding some extension methods and moving certain exceptions to 'public' in order to create a lean and correct Processing sample (without warnings)
---
 samples/Processing/LoggerExtensions.cs             | 65 ++++++++++++++++++++++
 samples/Processing/Processing.csproj               |  2 +-
 samples/Processing/Worker.cs                       | 25 ++-------
 .../Exceptions/ChannelNotReadyException.cs         |  6 +-
 .../Exceptions/ConsumerNotFoundException.cs        |  4 +-
 .../Exceptions/ServiceNotReadyException.cs         |  4 +-
 .../Exceptions/TooManyRequestsException.cs         |  4 +-
 .../Extensions/ConsumerBuilderExtensions.cs        | 12 ++++
 .../Extensions/ProducerBuilderExtensions.cs        | 12 ++++
 .../Extensions/ReaderBuilderExtensions.cs          | 12 ++++
 src/DotPulsar/Internal/NotReadyChannel.cs          |  2 +-
 11 files changed, 113 insertions(+), 35 deletions(-)

diff --git a/samples/Processing/LoggerExtensions.cs b/samples/Processing/LoggerExtensions.cs
new file mode 100644
index 0000000..9c46547
--- /dev/null
+++ b/samples/Processing/LoggerExtensions.cs
@@ -0,0 +1,65 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Processing;
+
+using DotPulsar;
+using DotPulsar.Abstractions;
+using DotPulsar.Exceptions;
+
+#pragma warning disable IDE0079 // Remove unnecessary suppression... Ehm... *sigh*
+#pragma warning disable IDE0060 // Remove unused parameter... Why Microsoft? Why do you force me to do this?
+
+public static partial class LoggerExtensions
+{
+    // ConsumerChangedState
+    public static void ConsumerChangedState(this ILogger logger, ConsumerStateChanged stateChanged)
+    {
+        var logLevel = stateChanged.ConsumerState switch
+        {
+            ConsumerState.Disconnected => LogLevel.Warning,
+            ConsumerState.Faulted => LogLevel.Error,
+            _ => LogLevel.Information
+        };
+
+        logger.ConsumerChangedState(logLevel, stateChanged.Consumer.Topic, stateChanged.ConsumerState.ToString());
+    }
+
+    [LoggerMessage(EventId = 0, Message = "The consumer for topic '{topic}' changed state to '{state}'")]
+    static partial void ConsumerChangedState(this ILogger logger, LogLevel logLevel, string topic, string state);
+
+    // OutputMessage
+    public static void OutputMessage(this ILogger logger, IMessage<string> message)
+    {
+        var publishedOn = message.PublishTimeAsDateTime;
+        var payload = message.Value();
+        logger.OutputMessage(publishedOn, payload);
+    }
+
+    [LoggerMessage(EventId = 1, Level = LogLevel.Information, Message = "Received: '{payload}' published on {publishedOn}")]
+    static partial void OutputMessage(this ILogger logger, DateTime publishedOn, string payload);
+
+    // PulsarClientException
+    public static void PulsarClientException(this ILogger logger, ExceptionContext exceptionContext)
+    {
+        if (exceptionContext.Exception is not ChannelNotReadyException)
+            logger.PulsarClientException(exceptionContext.Exception);
+    }
+
+    [LoggerMessage(EventId = 2, Level = LogLevel.Warning, Message = "The PulsarClient got an exception")]
+    static partial void PulsarClientException(this ILogger logger, Exception exception);
+}
+
+#pragma warning restore IDE0060 // Remove unused parameter
+#pragma warning restore IDE0079 // Remove unnecessary suppression
diff --git a/samples/Processing/Processing.csproj b/samples/Processing/Processing.csproj
index b5ec451..ad4d9a8 100644
--- a/samples/Processing/Processing.csproj
+++ b/samples/Processing/Processing.csproj
@@ -1,4 +1,4 @@
-<Project Sdk="Microsoft.NET.Sdk.Worker">
+<Project Sdk="Microsoft.NET.Sdk.Worker">
 
   <PropertyGroup>
     <TargetFramework>net6.0</TargetFramework>
diff --git a/samples/Processing/Worker.cs b/samples/Processing/Worker.cs
index d4d639d..e753b52 100644
--- a/samples/Processing/Worker.cs
+++ b/samples/Processing/Worker.cs
@@ -26,10 +26,12 @@ public class Worker : BackgroundService
 
     protected override async Task ExecuteAsync(CancellationToken cancellationToken)
     {
-        await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
+        await using var client = PulsarClient.Builder()
+            .ExceptionHandler(context => _logger.PulsarClientException(context))
+            .Build(); //Connecting to pulsar://localhost:6650
 
         await using var consumer = client.NewConsumer(Schema.String)
-            .StateChangedHandler(Monitor, cancellationToken)
+            .StateChangedHandler(consumerStateChanged => _logger.ConsumerChangedState(consumerStateChanged))
             .SubscriptionName("MySubscription")
             .Topic("persistent://public/default/mytopic")
             .Create();
@@ -39,24 +41,7 @@ public class Worker : BackgroundService
 
     private ValueTask ProcessMessage(IMessage<string> message, CancellationToken cancellationToken)
     {
-        _logger.LogInformation($"Received: {message.Value()}");
+        _logger.OutputMessage(message);
         return ValueTask.CompletedTask;
     }
-
-    private void Monitor(ConsumerStateChanged stateChanged, CancellationToken cancellationToken)
-    {
-        var stateMessage = stateChanged.ConsumerState switch
-        {
-            ConsumerState.Active => "is active",
-            ConsumerState.Inactive => "is inactive",
-            ConsumerState.Disconnected => "is disconnected",
-            ConsumerState.Closed => "has closed",
-            ConsumerState.ReachedEndOfTopic => "has reached end of topic",
-            ConsumerState.Faulted => "has faulted",
-            _ => $"has an unknown state '{stateChanged.ConsumerState}'"
-        };
-
-        var topic = stateChanged.Consumer.Topic;
-        _logger.LogInformation($"The consumer for topic '{topic}' {stateMessage}");
-    }
 }
diff --git a/src/DotPulsar/Internal/Exceptions/ChannelNotReadyException.cs b/src/DotPulsar/Exceptions/ChannelNotReadyException.cs
similarity index 80%
rename from src/DotPulsar/Internal/Exceptions/ChannelNotReadyException.cs
rename to src/DotPulsar/Exceptions/ChannelNotReadyException.cs
index 599c0b2..712734b 100644
--- a/src/DotPulsar/Internal/Exceptions/ChannelNotReadyException.cs
+++ b/src/DotPulsar/Exceptions/ChannelNotReadyException.cs
@@ -12,11 +12,9 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Internal.Exceptions;
-
-using DotPulsar.Exceptions;
+namespace DotPulsar.Exceptions;
 
 public sealed class ChannelNotReadyException : DotPulsarException
 {
-    public ChannelNotReadyException() : base("The service is not ready yet") { }
+    public ChannelNotReadyException() : base("The channnel is not ready yet") { }
 }
diff --git a/src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs b/src/DotPulsar/Exceptions/ConsumerNotFoundException.cs
similarity index 91%
rename from src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs
rename to src/DotPulsar/Exceptions/ConsumerNotFoundException.cs
index be58640..f66432b 100644
--- a/src/DotPulsar/Internal/Exceptions/ConsumerNotFoundException.cs
+++ b/src/DotPulsar/Exceptions/ConsumerNotFoundException.cs
@@ -12,9 +12,7 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Internal.Exceptions;
-
-using DotPulsar.Exceptions;
+namespace DotPulsar.Exceptions;
 
 public sealed class ConsumerNotFoundException : DotPulsarException
 {
diff --git a/src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs b/src/DotPulsar/Exceptions/ServiceNotReadyException.cs
similarity index 90%
rename from src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs
rename to src/DotPulsar/Exceptions/ServiceNotReadyException.cs
index 35e9a51..053aed7 100644
--- a/src/DotPulsar/Internal/Exceptions/ServiceNotReadyException.cs
+++ b/src/DotPulsar/Exceptions/ServiceNotReadyException.cs
@@ -12,9 +12,7 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Internal.Exceptions;
-
-using DotPulsar.Exceptions;
+namespace DotPulsar.Exceptions;
 
 public sealed class ServiceNotReadyException : DotPulsarException
 {
diff --git a/src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs b/src/DotPulsar/Exceptions/TooManyRequestsException.cs
similarity index 90%
rename from src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs
rename to src/DotPulsar/Exceptions/TooManyRequestsException.cs
index 22ef3ae..0d67632 100644
--- a/src/DotPulsar/Internal/Exceptions/TooManyRequestsException.cs
+++ b/src/DotPulsar/Exceptions/TooManyRequestsException.cs
@@ -12,9 +12,7 @@
  * limitations under the License.
  */
 
-namespace DotPulsar.Internal.Exceptions;
-
-using DotPulsar.Exceptions;
+namespace DotPulsar.Exceptions;
 
 public sealed class TooManyRequestsException : DotPulsarException
 {
diff --git a/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs b/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
index 7ab0354..5cecd16 100644
--- a/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
+++ b/src/DotPulsar/Extensions/ConsumerBuilderExtensions.cs
@@ -30,6 +30,18 @@ public static class ConsumerBuilderExtensions
     /// </summary>
     public static IConsumerBuilder<TMessage> StateChangedHandler<TMessage>(
         this IConsumerBuilder<TMessage> builder,
+        Action<ConsumerStateChanged> handler)
+    {
+        void forwarder(ConsumerStateChanged consumerStateChanged, CancellationToken _) => handler(consumerStateChanged);
+        builder.StateChangedHandler(new ActionStateChangedHandler<ConsumerStateChanged>(forwarder, default));
+        return builder;
+    }
+
+    /// <summary>
+    /// Register a state changed handler.
+    /// </summary>
+    public static IConsumerBuilder<TMessage> StateChangedHandler<TMessage>(
+        this IConsumerBuilder<TMessage> builder,
         Action<ConsumerStateChanged, CancellationToken> handler,
         CancellationToken cancellationToken = default)
     {
diff --git a/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs b/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs
index 907e770..11cc6c7 100644
--- a/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs
+++ b/src/DotPulsar/Extensions/ProducerBuilderExtensions.cs
@@ -30,6 +30,18 @@ public static class ProducerBuilderExtensions
     /// </summary>
     public static IProducerBuilder<TMessage> StateChangedHandler<TMessage>(
         this IProducerBuilder<TMessage> builder,
+        Action<ProducerStateChanged> handler)
+    {
+        void forwarder(ProducerStateChanged producerStateChanged, CancellationToken _) => handler(producerStateChanged);
+        builder.StateChangedHandler(new ActionStateChangedHandler<ProducerStateChanged>(forwarder, default));
+        return builder;
+    }
+
+    /// <summary>
+    /// Register a state changed handler.
+    /// </summary>
+    public static IProducerBuilder<TMessage> StateChangedHandler<TMessage>(
+        this IProducerBuilder<TMessage> builder,
         Action<ProducerStateChanged, CancellationToken> handler,
         CancellationToken cancellationToken = default)
     {
diff --git a/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs b/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs
index 32f7505..555a5af 100644
--- a/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs
+++ b/src/DotPulsar/Extensions/ReaderBuilderExtensions.cs
@@ -30,6 +30,18 @@ public static class ReaderBuilderExtensions
     /// </summary>
     public static IReaderBuilder<TMessage> StateChangedHandler<TMessage>(
         this IReaderBuilder<TMessage> builder,
+        Action<ReaderStateChanged> handler)
+    {
+        void forwarder(ReaderStateChanged readerStateChanged, CancellationToken _) => handler(readerStateChanged);
+        builder.StateChangedHandler(new ActionStateChangedHandler<ReaderStateChanged>(forwarder, default));
+        return builder;
+    }
+
+    /// <summary>
+    /// Register a state changed handler.
+    /// </summary>
+    public static IReaderBuilder<TMessage> StateChangedHandler<TMessage>(
+        this IReaderBuilder<TMessage> builder,
         Action<ReaderStateChanged, CancellationToken> handler,
         CancellationToken cancellationToken = default)
     {
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs b/src/DotPulsar/Internal/NotReadyChannel.cs
index da1a7af..a3c990e 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -16,7 +16,7 @@ namespace DotPulsar.Internal;
 
 using Abstractions;
 using DotPulsar.Abstractions;
-using Exceptions;
+using DotPulsar.Exceptions;
 using PulsarApi;
 using System;
 using System.Buffers;