You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2019/10/05 11:02:53 UTC
[activemq-nms-amqp] branch master updated: AMQNET-619: Handle
remote detach properly for sender link
This is an automated email from the ASF dual-hosted git repository.
michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-nms-amqp.git
The following commit(s) were added to refs/heads/master by this push:
new 590fc54 AMQNET-619: Handle remote detach properly for sender link
new fe647a9 Merge pull request #42 from Havret/handle_sender_link_remote_detach_properly
590fc54 is described below
commit 590fc549d06c347cbeb2fda685ff51113e8fe7a5
Author: Havret <h4...@gmail.com>
AuthorDate: Sat Oct 5 10:21:42 2019 +0200
AMQNET-619: Handle remote detach properly for sender link
---
src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs | 17 +++++++++-
.../Integration/FailoverIntegrationTest.cs | 39 ++++++++++++++++++++++
test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs | 24 ++++++++++---
3 files changed, 75 insertions(+), 5 deletions(-)
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
index d767577..d3982dd 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
@@ -54,7 +54,7 @@ namespace Apache.NMS.AMQP.Provider.Amqp
string linkName = info.Id + ":" + target.Address;
var taskCompletionSource = new TaskCompletionSource<bool>();
- senderLink = new SenderLink(session.UnderlyingSession, linkName, frame, (link, attach) => { taskCompletionSource.SetResult(true); });
+ senderLink = new SenderLink(session.UnderlyingSession, linkName, frame, HandleOpened(taskCompletionSource));
senderLink.AddClosedCallback((sender, error) =>
{
@@ -73,6 +73,21 @@ namespace Apache.NMS.AMQP.Provider.Amqp
return taskCompletionSource.Task;
}
+
+ private OnAttached HandleOpened(TaskCompletionSource<bool> tsc) => (link, attach) =>
+ {
+ if (IsClosePending(attach))
+ return;
+
+ tsc.SetResult(true);
+ };
+
+ private static bool IsClosePending(Attach attach)
+ {
+ // When no link terminus was created, the peer will now detach/close us otherwise
+ // we need to validate the returned remote target prior to open completion.
+ return attach.Target == null;
+ }
private Source CreateSource() => new Source
{
diff --git a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
index 816dc84..cb3f4cc 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
@@ -1028,6 +1028,45 @@ namespace NMS.AMQP.Test.Integration
}
}
+ [Test, Timeout(20_000)]
+ public void TestCreateProducerFailsWhenLinkRefused()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ testPeer.ExpectSaslAnonymous();
+ testPeer.ExpectOpen();
+ testPeer.ExpectBegin();
+
+ NmsConnection connection = EstablishAnonymousConnection(testPeer);
+ connection.Start();
+
+ testPeer.ExpectBegin();
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+
+ string topicName = "myTopic";
+ ITopic topic = session.GetTopic(topicName);
+
+ // Expect a link to a topic node, which we will then refuse
+ testPeer.ExpectSenderAttach(targetMatcher: source =>
+ {
+ Assert.AreEqual(topicName, source.Address);
+ Assert.IsFalse(source.Dynamic);
+ Assert.AreEqual((uint) TerminusDurability.NONE, source.Durable);
+ }, sourceMatcher: Assert.NotNull, refuseLink: true);
+
+ //Expect the detach response to the test peer closing the producer link after refusal.
+ testPeer.ExpectDetach(expectClosed: true, sendResponse: false, replyClosed: false);
+
+ Assert.Catch<NMSException>(() => session.CreateProducer(topic));
+
+ // Shut it down
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+
private NmsConnection EstablishAnonymousConnection(params TestAmqpPeer[] peers)
{
return EstablishAnonymousConnection(null, null, peers);
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
index 5646215..f1981e5 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
@@ -421,7 +421,11 @@ namespace NMS.AMQP.Test.TestAmqp
ExpectSenderAttach(sourceMatcher: Assert.NotNull, targetMatcher: Assert.NotNull);
}
- public void ExpectSenderAttach(Action<Source> sourceMatcher, Action<Target> targetMatcher, uint creditAmount = 100, bool senderSettled = false)
+ public void ExpectSenderAttach(Action<Source> sourceMatcher,
+ Action<Target> targetMatcher,
+ bool refuseLink = false,
+ uint creditAmount = 100,
+ bool senderSettled = false)
{
var attachMatcher = new FrameMatcher<Attach>()
.WithAssertion(attach => Assert.IsNotNull(attach.LinkName))
@@ -440,15 +444,26 @@ namespace NMS.AMQP.Test.TestAmqp
RcvSettleMode = ReceiverSettleMode.First,
Handle = context.Command.Handle,
LinkName = context.Command.LinkName,
- Source = context.Command.Source,
- Target = context.Command.Target
+ Source = context.Command.Source
};
+ if (refuseLink)
+ attach.Target = null;
+ else
+ attach.Target = context.Command.Target;
+
lastInitiatedLinkHandle = context.Command.Handle;
context.SendCommand(attach);
- var flow = new Flow()
+ if (refuseLink)
+ {
+ var detach = new Detach { Closed = true, Handle = context.Command.Handle };
+ context.SendCommand(detach);
+ }
+ else
+ {
+ var flow = new Flow
{
NextIncomingId = 1,
IncomingWindow = 2048,
@@ -460,6 +475,7 @@ namespace NMS.AMQP.Test.TestAmqp
};
context.SendCommand(flow);
+ }
});
AddMatcher(attachMatcher);