You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Justin Bertram (JIRA)" <ji...@apache.org> on 2017/12/13 19:21:00 UTC

[jira] [Commented] (ARTEMIS-1556) Cannot Recover AMQP Subscription Existing On Remote Cluster Member

    [ https://issues.apache.org/jira/browse/ARTEMIS-1556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16289749#comment-16289749 ] 

Justin Bertram commented on ARTEMIS-1556:
-----------------------------------------

I believe this is working as designed.  As you can see in the "clustered-durable-subscription" example the durable subscription is created on each of the two cluster nodes.  Although the example is using the core client the underlying implementation for this use-case is essentially the same for both core and AMQP clients.

> Cannot Recover AMQP Subscription Existing On Remote Cluster Member 
> -------------------------------------------------------------------
>
>                 Key: ARTEMIS-1556
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-1556
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 2.4.0
>         Environment: Latest broker snapshot running on fedora 26. AMQP .Net Lite client. 
>            Reporter: Todd Baert
>
> Assuming broker1 and broker2 exist in a cluster, if an AMQP Source constituting a shared durable subscription (having capabilities "shared", "global" and "topic") is created on broker1, and attempted to be recovered from a client connected to broker2 (by creating an attach frame with a null source, with desired capabilities  "shared", "global" and "topic"), an Amqp.AmqpException is thrown.
> In a cluster, shouldn't it be possible to recover subscriptions that exist on another broker? Shouldn't the search for this subscription occur on all cluster members?
> I don't believe this is happening. It looks like we just search locally (not sure about this) - but certainly the exception is thrown and the sub is not recovered:
> See code below from org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext (comments are not my own, they are from source):
> {code:java}
>       if (source == null) {
>          // Attempt to recover a previous subscription happens when a link reattach happens on a
>          // subscription queue
>          String clientId = getClientId();
>          String pubId = sender.getName();
>          global = hasRemoteDesiredCapability(sender, GLOBAL);
>          queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, true, global, false);
>          QueueQueryResult result = sessionSPI.queueQuery(queue, RoutingType.MULTICAST, false);
>          multicast = true;
>          routingTypeToUse = RoutingType.MULTICAST;
>          // Once confirmed that the address exists we need to return a Source that reflects
>          // the lifetime policy and capabilities of the new subscription.
>          if (result.isExists()) {
> {code}
> I have a C# sample that demonstrates the issue:
> {code:java}
> using System;
> using System.Collections.Generic;
> using System.Threading;
> using System.Transactions;
> using Amqp;
> using Amqp.Framing;
> using Amqp.Sasl;
> using Amqp.Types;
> namespace amqp_client_demo
> {
>     class Program
>     {
>         static void Main(string[] args)
>         {
>             string url1 = "amqp://localhost:5672";
>             string url2 = "amqp://localhost:5673";
>             String ADDRESS = "orders";
>             Connection connection1 = new Connection(new Address(url1));
>             Session session1 = new Session(connection1);
>             ReceiverLink receiver1 = new ReceiverLink(session1, "test", new Source(){Durable = 2, Capabilities = new Symbol[]{"topic", "shared", "global"}, Address = ADDRESS}, null);
>           
>             SenderLink sender = new SenderLink(session1, "sender", ADDRESS);
>             sender.Send(new Message("test message"));
>             Connection connection2 = new Connection(new Address(url2));
>             Session session2 = new Session(connection2);
>             ReceiverLink receiver2 = new ReceiverLink(session2, "test", new Attach(){Source = null, DesiredCapabilities = new Symbol[]{"topic", "shared", "global"}}, null);
>             
>             Console.WriteLine(receiver2.Receive(TimeSpan.FromSeconds(1)).Body); //Unhandled Exception: Amqp.AmqpException: Unknown subscription link: test
>         }
>     }
> }
> {code}
> If url1 and url2 are modified to point to the same cluster members, this works fine. Otherwise the exception (see comment) is thrown.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)