You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2021/03/04 04:48:43 UTC
[activemq-nms-amqp] branch 2.0.x updated: AMQNET-637 Allow closing
on error handler, Await()
This is an automated email from the ASF dual-hosted git repository.
michaelpearce pushed a commit to branch 2.0.x
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git
The following commit(s) were added to refs/heads/2.0.x by this push:
new 007df90 AMQNET-637 Allow closing on error handler, Await()
new 86359c7 Merge pull request #65 from lukeabsent/AMQNET-637
007df90 is described below
commit 007df909803946c0b8850fc16e7889180fbe53d5
Author: lukeabsent <tr...@interia.pl>
AuthorDate: Tue Feb 23 09:59:03 2021 +0100
AMQNET-637 Allow closing on error handler, Await()
---
src/NMS.AMQP/NmsMessageConsumer.cs | 9 +++++++--
src/NMS.AMQP/NmsSession.cs | 5 +++++
src/NMS.AMQP/SessionDispatcher.cs | 24 ++++++++++++++++++++++++
3 files changed, 36 insertions(+), 2 deletions(-)
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs b/src/NMS.AMQP/NmsMessageConsumer.cs
index d64632d..63f6896 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -327,7 +327,7 @@ namespace Apache.NMS.AMQP
if (Session.IsStarted && started && Listener != null)
{
- using(await syncRoot.LockAsync())
+ using(await syncRoot.LockAsync().Await())
{
try
{
@@ -400,7 +400,12 @@ namespace Apache.NMS.AMQP
//
// We need to decide how to respond to these, but definitely we cannot
// let this error propagate as it could take down the SessionDispatcher
- Session.Connection.OnAsyncException(e);
+
+ // To let close the existing session/connection in error handler
+ using (Session.ExcludeCheckIsOnDeliveryExecutionFlow())
+ {
+ Session.Connection.OnAsyncException(e);
+ }
}
}
}
diff --git a/src/NMS.AMQP/NmsSession.cs b/src/NMS.AMQP/NmsSession.cs
index 0eb697c..15ef81e 100644
--- a/src/NMS.AMQP/NmsSession.cs
+++ b/src/NMS.AMQP/NmsSession.cs
@@ -841,6 +841,11 @@ namespace Apache.NMS.AMQP
}
}
+ internal IDisposable ExcludeCheckIsOnDeliveryExecutionFlow()
+ {
+ return dispatcher?.ExcludeCheckIsOnDeliveryExecutionFlow();
+ }
+
public async Task OnConnectionRecovery(IProvider provider)
{
await provider.CreateResource(SessionInfo).Await();
diff --git a/src/NMS.AMQP/SessionDispatcher.cs b/src/NMS.AMQP/SessionDispatcher.cs
index 70d26e4..53a5da9 100644
--- a/src/NMS.AMQP/SessionDispatcher.cs
+++ b/src/NMS.AMQP/SessionDispatcher.cs
@@ -15,6 +15,7 @@
* limitations under the License.
*/
+using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
@@ -62,5 +63,28 @@ namespace Apache.NMS.AMQP
cts.Cancel();
cts.Dispose();
}
+
+ public IDisposable ExcludeCheckIsOnDeliveryExecutionFlow()
+ {
+ return new ExcludeCheckIsOnDeliveryExecutionFlowBlock(this);
+ }
+
+ private class ExcludeCheckIsOnDeliveryExecutionFlowBlock : IDisposable
+ {
+ private readonly bool previousValue = false;
+ private readonly SessionDispatcher sessionDispatcher;
+
+ public ExcludeCheckIsOnDeliveryExecutionFlowBlock(SessionDispatcher sessionDispatcher)
+ {
+ this.sessionDispatcher = sessionDispatcher;
+ this.previousValue = sessionDispatcher.isOnDispatcherFlow.Value;
+ sessionDispatcher.isOnDispatcherFlow.Value = false;
+ }
+
+ public void Dispose()
+ {
+ sessionDispatcher.isOnDispatcherFlow.Value = previousValue;
+ }
+ }
}
}
\ No newline at end of file