You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2021/03/04 04:48:43 UTC

[activemq-nms-amqp] branch 2.0.x updated: AMQNET-637 Allow closing on error handler, Await()

This is an automated email from the ASF dual-hosted git repository.

michaelpearce pushed a commit to branch 2.0.x
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git


The following commit(s) were added to refs/heads/2.0.x by this push:
     new 007df90  AMQNET-637 Allow closing on error handler, Await()
     new 86359c7  Merge pull request #65 from lukeabsent/AMQNET-637
007df90 is described below

commit 007df909803946c0b8850fc16e7889180fbe53d5
Author: lukeabsent <tr...@interia.pl>
AuthorDate: Tue Feb 23 09:59:03 2021 +0100

    AMQNET-637 Allow closing on error handler, Await()
---
 src/NMS.AMQP/NmsMessageConsumer.cs |  9 +++++++--
 src/NMS.AMQP/NmsSession.cs         |  5 +++++
 src/NMS.AMQP/SessionDispatcher.cs  | 24 ++++++++++++++++++++++++
 3 files changed, 36 insertions(+), 2 deletions(-)

diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs b/src/NMS.AMQP/NmsMessageConsumer.cs
index d64632d..63f6896 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -327,7 +327,7 @@ namespace Apache.NMS.AMQP
             
             if (Session.IsStarted && started && Listener != null)
             {
-                using(await syncRoot.LockAsync())
+                using(await syncRoot.LockAsync().Await())
                 {
                     try
                     {
@@ -400,7 +400,12 @@ namespace Apache.NMS.AMQP
                         //
                         // We need to decide how to respond to these, but definitely we cannot
                         // let this error propagate as it could take down the SessionDispatcher
-                        Session.Connection.OnAsyncException(e);
+
+                        // To let close the existing session/connection in error handler
+                        using (Session.ExcludeCheckIsOnDeliveryExecutionFlow())
+                        {
+                            Session.Connection.OnAsyncException(e);
+                        }
                     }
                 }
             }
diff --git a/src/NMS.AMQP/NmsSession.cs b/src/NMS.AMQP/NmsSession.cs
index 0eb697c..15ef81e 100644
--- a/src/NMS.AMQP/NmsSession.cs
+++ b/src/NMS.AMQP/NmsSession.cs
@@ -841,6 +841,11 @@ namespace Apache.NMS.AMQP
             }
         }
 
+        internal IDisposable ExcludeCheckIsOnDeliveryExecutionFlow()
+        {
+            return dispatcher?.ExcludeCheckIsOnDeliveryExecutionFlow();
+        }
+
         public async Task OnConnectionRecovery(IProvider provider)
         {
             await provider.CreateResource(SessionInfo).Await();
diff --git a/src/NMS.AMQP/SessionDispatcher.cs b/src/NMS.AMQP/SessionDispatcher.cs
index 70d26e4..53a5da9 100644
--- a/src/NMS.AMQP/SessionDispatcher.cs
+++ b/src/NMS.AMQP/SessionDispatcher.cs
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+using System;
 using System.Threading;
 using System.Threading.Tasks;
 using System.Threading.Tasks.Dataflow;
@@ -62,5 +63,28 @@ namespace Apache.NMS.AMQP
             cts.Cancel();
             cts.Dispose();
         }
+
+        public IDisposable ExcludeCheckIsOnDeliveryExecutionFlow()
+        {
+            return new ExcludeCheckIsOnDeliveryExecutionFlowBlock(this);
+        }
+
+        private class ExcludeCheckIsOnDeliveryExecutionFlowBlock : IDisposable
+        {
+            private readonly bool previousValue = false;
+            private readonly SessionDispatcher sessionDispatcher;
+
+            public ExcludeCheckIsOnDeliveryExecutionFlowBlock(SessionDispatcher sessionDispatcher)
+            {
+                this.sessionDispatcher = sessionDispatcher;
+                this.previousValue = sessionDispatcher.isOnDispatcherFlow.Value;
+                sessionDispatcher.isOnDispatcherFlow.Value = false;
+            }
+
+            public void Dispose()
+            {
+                sessionDispatcher.isOnDispatcherFlow.Value = previousValue;
+            }
+        }
     }
 }
\ No newline at end of file