You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2022/12/05 16:02:33 UTC

[pulsar-dotpulsar] branch master updated: Added DelayedStateMonitor for IState

This is an automated email from the ASF dual-hosted git repository.

blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fa14c6f  Added DelayedStateMonitor for IState
fa14c6f is described below

commit fa14c6faddbd2f7daf6c6350ba059435838a7325
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Mon Dec 5 17:02:19 2022 +0100

    Added DelayedStateMonitor for IState
---
 CHANGELOG.md                                |  1 +
 src/DotPulsar/Extensions/StateExtensions.cs | 79 +++++++++++++++++++++++++++++
 src/DotPulsar/Internal/ChannelManager.cs    | 10 ++--
 src/DotPulsar/Internal/Crc32C.cs            |  6 +--
 src/DotPulsar/Internal/IdLookup.cs          |  2 +-
 src/DotPulsar/Internal/MonitorState.cs      | 11 ++--
 6 files changed, 94 insertions(+), 15 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index f0bab92..e0e906a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
 
 - .NET 7 added as a target framework
 - Delay option for OnStateChangeFrom/To for IState and StateChangedFrom/To for IConsumer, IProducer, and IReader
+- DelayedStateMonitor extension methods for IState with Func and Action callbacks
 
 ### Removed
 
diff --git a/src/DotPulsar/Extensions/StateExtensions.cs b/src/DotPulsar/Extensions/StateExtensions.cs
index bb5f280..89dc2ef 100644
--- a/src/DotPulsar/Extensions/StateExtensions.cs
+++ b/src/DotPulsar/Extensions/StateExtensions.cs
@@ -103,4 +103,83 @@ public static class StateExtensions
             }
         }
     }
+
+    /// <summary>
+    /// Will invoke the onStateLeft callback when the state if left (with delay) and onStateReached when it's reached again.
+    /// </summary>
+    /// <returns>
+    /// ValueTask that will run as long as a final state is not entered.
+    /// </returns>
+    /// <remarks>
+    /// If the state change to a final state, then the returned task will complete.
+    /// </remarks>
+    public static async ValueTask DelayedStateMonitor<TEntity, TState>(
+        this TEntity stateImplementer,
+        TState state,
+        TimeSpan delay,
+        Func<TEntity, TState, CancellationToken, ValueTask> onStateLeft,
+        Func<TEntity, TState, CancellationToken, ValueTask> onStateReached,
+        CancellationToken cancellationToken) where TEntity : IState<TState> where TState : notnull
+    {
+        while (!cancellationToken.IsCancellationRequested)
+        {
+            var currentState = await stateImplementer.OnStateChangeFrom(state, delay, cancellationToken).ConfigureAwait(false);
+            if (stateImplementer.IsFinalState(currentState))
+                return;
+
+            try
+            {
+                await onStateLeft(stateImplementer, currentState, cancellationToken).ConfigureAwait(false);
+            }
+            catch
+            {
+                // Ignore
+            }
+
+            currentState = await stateImplementer.OnStateChangeTo(state, cancellationToken).ConfigureAwait(false);
+            if (stateImplementer.IsFinalState(currentState))
+                return;
+
+            try
+            {
+                await onStateReached(stateImplementer, currentState, cancellationToken).ConfigureAwait(false);
+            }
+            catch
+            {
+                // Ignore
+            }
+        }
+    }
+
+    /// <summary>
+    /// Will invoke the onStateLeft callback when the state if left (with delay) and onStateReached when it's reached again.
+    /// </summary>
+    /// <returns>
+    /// ValueTask that will run as long as a final state is not entered.
+    /// </returns>
+    /// <remarks>
+    /// If the state change to a final state, then the returned task will complete.
+    /// </remarks>
+    public static async ValueTask DelayedStateMonitor<TEntity, TState>(
+        this TEntity stateImplementer,
+        TState state,
+        TimeSpan delay,
+        Action<TEntity, TState> onStateLeft,
+        Action<TEntity, TState> onStateReached,
+        CancellationToken cancellationToken) where TEntity : IState<TState> where TState : notnull
+    {
+        ValueTask onStateLeftFunction(TEntity entity, TState state, CancellationToken cancellationToken)
+        {
+            onStateLeft(entity, state);
+            return new ValueTask();
+        }
+
+        ValueTask onStateReachedFunction(TEntity entity, TState state, CancellationToken cancellationToken)
+        {
+            onStateReached(entity, state);
+            return new ValueTask();
+        }
+
+        await stateImplementer.DelayedStateMonitor(state, delay, onStateLeftFunction, onStateReachedFunction, cancellationToken).ConfigureAwait(false);
+    }
 }
diff --git a/src/DotPulsar/Internal/ChannelManager.cs b/src/DotPulsar/Internal/ChannelManager.cs
index afc3343..fac3820 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -201,24 +201,20 @@ public sealed class ChannelManager : IDisposable
 
     private void Incoming(CommandCloseConsumer command)
     {
-        var channel = _consumerChannels[command.ConsumerId];
-
+        var channel = _consumerChannels.Remove(command.ConsumerId);
         if (channel is null)
             return;
 
-        _ = _consumerChannels.Remove(command.ConsumerId);
         _requestResponseHandler.Incoming(command);
         channel.ClosedByServer();
     }
 
     private void Incoming(CommandCloseProducer command)
     {
-        var channel = _producerChannels[command.ProducerId];
-
+        var channel = _producerChannels.Remove(command.ProducerId);
         if (channel is null)
             return;
 
-        _ = _producerChannels.Remove(command.ProducerId);
         _requestResponseHandler.Incoming(command);
         channel.ClosedByServer();
     }
diff --git a/src/DotPulsar/Internal/Crc32C.cs b/src/DotPulsar/Internal/Crc32C.cs
index fa82bca..4660cbc 100644
--- a/src/DotPulsar/Internal/Crc32C.cs
+++ b/src/DotPulsar/Internal/Crc32C.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -18,7 +18,7 @@ using System.Buffers;
 
 public static class Crc32C
 {
-    private const uint _generator = 0x82F63B78u;
+    private const uint Generator = 0x82F63B78u;
 
     private static readonly uint[] _lookup;
 
@@ -33,7 +33,7 @@ public static class Crc32C
             for (var j = 0; j < 16; j++)
             {
                 for (var k = 0; k < 8; k++)
-                    entry = (entry & 1) == 1 ? _generator ^ (entry >> 1) : entry >> 1;
+                    entry = (entry & 1) == 1 ? Generator ^ (entry >> 1) : entry >> 1;
 
                 _lookup[j * 256 + i] = entry;
             }
diff --git a/src/DotPulsar/Internal/IdLookup.cs b/src/DotPulsar/Internal/IdLookup.cs
index d326f7b..206252b 100644
--- a/src/DotPulsar/Internal/IdLookup.cs
+++ b/src/DotPulsar/Internal/IdLookup.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
diff --git a/src/DotPulsar/Internal/MonitorState.cs b/src/DotPulsar/Internal/MonitorState.cs
index f1d9021..a50c933 100644
--- a/src/DotPulsar/Internal/MonitorState.cs
+++ b/src/DotPulsar/Internal/MonitorState.cs
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -26,10 +26,11 @@ public static class StateMonitor
 
         var state = ProducerState.Disconnected;
 
-        while (!producer.IsFinalState(state))
+        while (!producer.IsFinalState(state) && !handler.CancellationToken.IsCancellationRequested)
         {
             var stateChanged = await producer.StateChangedFrom(state, handler.CancellationToken).ConfigureAwait(false);
             state = stateChanged.ProducerState;
+
             try
             {
                 await handler.OnStateChanged(stateChanged, handler.CancellationToken).ConfigureAwait(false);
@@ -47,10 +48,11 @@ public static class StateMonitor
 
         var state = ConsumerState.Disconnected;
 
-        while (!consumer.IsFinalState(state))
+        while (!consumer.IsFinalState(state) && !handler.CancellationToken.IsCancellationRequested)
         {
             var stateChanged = await consumer.StateChangedFrom(state, handler.CancellationToken).ConfigureAwait(false);
             state = stateChanged.ConsumerState;
+
             try
             {
                 await handler.OnStateChanged(stateChanged, handler.CancellationToken).ConfigureAwait(false);
@@ -68,10 +70,11 @@ public static class StateMonitor
 
         var state = ReaderState.Disconnected;
 
-        while (!reader.IsFinalState(state))
+        while (!reader.IsFinalState(state) && !handler.CancellationToken.IsCancellationRequested)
         {
             var stateChanged = await reader.StateChangedFrom(state, handler.CancellationToken).ConfigureAwait(false);
             state = stateChanged.ReaderState;
+
             try
             {
                 await handler.OnStateChanged(stateChanged, handler.CancellationToken).ConfigureAwait(false);