You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bl...@apache.org on 2022/12/05 16:02:33 UTC
[pulsar-dotpulsar] branch master updated: Added DelayedStateMonitor for IState
This is an automated email from the ASF dual-hosted git repository.
blankensteiner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-dotpulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fa14c6f Added DelayedStateMonitor for IState
fa14c6f is described below
commit fa14c6faddbd2f7daf6c6350ba059435838a7325
Author: Daniel Blankensteiner <db...@vmail.dk>
AuthorDate: Mon Dec 5 17:02:19 2022 +0100
Added DelayedStateMonitor for IState
---
CHANGELOG.md | 1 +
src/DotPulsar/Extensions/StateExtensions.cs | 79 +++++++++++++++++++++++++++++
src/DotPulsar/Internal/ChannelManager.cs | 10 ++--
src/DotPulsar/Internal/Crc32C.cs | 6 +--
src/DotPulsar/Internal/IdLookup.cs | 2 +-
src/DotPulsar/Internal/MonitorState.cs | 11 ++--
6 files changed, 94 insertions(+), 15 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f0bab92..e0e906a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- .NET 7 added as a target framework
- Delay option for OnStateChangeFrom/To for IState and StateChangedFrom/To for IConsumer, IProducer, and IReader
+- DelayedStateMonitor extension methods for IState with Func and Action callbacks
### Removed
diff --git a/src/DotPulsar/Extensions/StateExtensions.cs b/src/DotPulsar/Extensions/StateExtensions.cs
index bb5f280..89dc2ef 100644
--- a/src/DotPulsar/Extensions/StateExtensions.cs
+++ b/src/DotPulsar/Extensions/StateExtensions.cs
@@ -103,4 +103,83 @@ public static class StateExtensions
}
}
}
+
+ /// <summary>
+ /// Will invoke the onStateLeft callback when the state if left (with delay) and onStateReached when it's reached again.
+ /// </summary>
+ /// <returns>
+ /// ValueTask that will run as long as a final state is not entered.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then the returned task will complete.
+ /// </remarks>
+ public static async ValueTask DelayedStateMonitor<TEntity, TState>(
+ this TEntity stateImplementer,
+ TState state,
+ TimeSpan delay,
+ Func<TEntity, TState, CancellationToken, ValueTask> onStateLeft,
+ Func<TEntity, TState, CancellationToken, ValueTask> onStateReached,
+ CancellationToken cancellationToken) where TEntity : IState<TState> where TState : notnull
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ {
+ var currentState = await stateImplementer.OnStateChangeFrom(state, delay, cancellationToken).ConfigureAwait(false);
+ if (stateImplementer.IsFinalState(currentState))
+ return;
+
+ try
+ {
+ await onStateLeft(stateImplementer, currentState, cancellationToken).ConfigureAwait(false);
+ }
+ catch
+ {
+ // Ignore
+ }
+
+ currentState = await stateImplementer.OnStateChangeTo(state, cancellationToken).ConfigureAwait(false);
+ if (stateImplementer.IsFinalState(currentState))
+ return;
+
+ try
+ {
+ await onStateReached(stateImplementer, currentState, cancellationToken).ConfigureAwait(false);
+ }
+ catch
+ {
+ // Ignore
+ }
+ }
+ }
+
+ /// <summary>
+ /// Will invoke the onStateLeft callback when the state if left (with delay) and onStateReached when it's reached again.
+ /// </summary>
+ /// <returns>
+ /// ValueTask that will run as long as a final state is not entered.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then the returned task will complete.
+ /// </remarks>
+ public static async ValueTask DelayedStateMonitor<TEntity, TState>(
+ this TEntity stateImplementer,
+ TState state,
+ TimeSpan delay,
+ Action<TEntity, TState> onStateLeft,
+ Action<TEntity, TState> onStateReached,
+ CancellationToken cancellationToken) where TEntity : IState<TState> where TState : notnull
+ {
+ ValueTask onStateLeftFunction(TEntity entity, TState state, CancellationToken cancellationToken)
+ {
+ onStateLeft(entity, state);
+ return new ValueTask();
+ }
+
+ ValueTask onStateReachedFunction(TEntity entity, TState state, CancellationToken cancellationToken)
+ {
+ onStateReached(entity, state);
+ return new ValueTask();
+ }
+
+ await stateImplementer.DelayedStateMonitor(state, delay, onStateLeftFunction, onStateReachedFunction, cancellationToken).ConfigureAwait(false);
+ }
}
diff --git a/src/DotPulsar/Internal/ChannelManager.cs b/src/DotPulsar/Internal/ChannelManager.cs
index afc3343..fac3820 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -201,24 +201,20 @@ public sealed class ChannelManager : IDisposable
private void Incoming(CommandCloseConsumer command)
{
- var channel = _consumerChannels[command.ConsumerId];
-
+ var channel = _consumerChannels.Remove(command.ConsumerId);
if (channel is null)
return;
- _ = _consumerChannels.Remove(command.ConsumerId);
_requestResponseHandler.Incoming(command);
channel.ClosedByServer();
}
private void Incoming(CommandCloseProducer command)
{
- var channel = _producerChannels[command.ProducerId];
-
+ var channel = _producerChannels.Remove(command.ProducerId);
if (channel is null)
return;
- _ = _producerChannels.Remove(command.ProducerId);
_requestResponseHandler.Incoming(command);
channel.ClosedByServer();
}
diff --git a/src/DotPulsar/Internal/Crc32C.cs b/src/DotPulsar/Internal/Crc32C.cs
index fa82bca..4660cbc 100644
--- a/src/DotPulsar/Internal/Crc32C.cs
+++ b/src/DotPulsar/Internal/Crc32C.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -18,7 +18,7 @@ using System.Buffers;
public static class Crc32C
{
- private const uint _generator = 0x82F63B78u;
+ private const uint Generator = 0x82F63B78u;
private static readonly uint[] _lookup;
@@ -33,7 +33,7 @@ public static class Crc32C
for (var j = 0; j < 16; j++)
{
for (var k = 0; k < 8; k++)
- entry = (entry & 1) == 1 ? _generator ^ (entry >> 1) : entry >> 1;
+ entry = (entry & 1) == 1 ? Generator ^ (entry >> 1) : entry >> 1;
_lookup[j * 256 + i] = entry;
}
diff --git a/src/DotPulsar/Internal/IdLookup.cs b/src/DotPulsar/Internal/IdLookup.cs
index d326f7b..206252b 100644
--- a/src/DotPulsar/Internal/IdLookup.cs
+++ b/src/DotPulsar/Internal/IdLookup.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
diff --git a/src/DotPulsar/Internal/MonitorState.cs b/src/DotPulsar/Internal/MonitorState.cs
index f1d9021..a50c933 100644
--- a/src/DotPulsar/Internal/MonitorState.cs
+++ b/src/DotPulsar/Internal/MonitorState.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -26,10 +26,11 @@ public static class StateMonitor
var state = ProducerState.Disconnected;
- while (!producer.IsFinalState(state))
+ while (!producer.IsFinalState(state) && !handler.CancellationToken.IsCancellationRequested)
{
var stateChanged = await producer.StateChangedFrom(state, handler.CancellationToken).ConfigureAwait(false);
state = stateChanged.ProducerState;
+
try
{
await handler.OnStateChanged(stateChanged, handler.CancellationToken).ConfigureAwait(false);
@@ -47,10 +48,11 @@ public static class StateMonitor
var state = ConsumerState.Disconnected;
- while (!consumer.IsFinalState(state))
+ while (!consumer.IsFinalState(state) && !handler.CancellationToken.IsCancellationRequested)
{
var stateChanged = await consumer.StateChangedFrom(state, handler.CancellationToken).ConfigureAwait(false);
state = stateChanged.ConsumerState;
+
try
{
await handler.OnStateChanged(stateChanged, handler.CancellationToken).ConfigureAwait(false);
@@ -68,10 +70,11 @@ public static class StateMonitor
var state = ReaderState.Disconnected;
- while (!reader.IsFinalState(state))
+ while (!reader.IsFinalState(state) && !handler.CancellationToken.IsCancellationRequested)
{
var stateChanged = await reader.StateChangedFrom(state, handler.CancellationToken).ConfigureAwait(false);
state = stateChanged.ReaderState;
+
try
{
await handler.OnStateChanged(stateChanged, handler.CancellationToken).ConfigureAwait(false);