You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Justin Bertram (JIRA)" <ji...@apache.org> on 2017/12/09 14:39:00 UTC

[jira] [Commented] (ARTEMIS-1549) AMQP messages aren't redistributed across cluster bridge, NPE in ServerSessionImpl.send()

    [ https://issues.apache.org/jira/browse/ARTEMIS-1549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16284758#comment-16284758 ] 

Justin Bertram commented on ARTEMIS-1549:
-----------------------------------------

This looks like a duplicate of ARTEMIS-1542 to me.  Can you take a look at that JIRA?

> AMQP messages aren't redistributed across cluster bridge, NPE in ServerSessionImpl.send()
> -----------------------------------------------------------------------------------------
>
>                 Key: ARTEMIS-1549
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-1549
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: AMQP, Broker
>    Affects Versions: 2.2.0, 2.3.0, 2.4.0
>         Environment: AMQP .NET lite client, .NET Core runtime 2.0, connecting with brokers on local machine (fedora 26)
>            Reporter: Todd Baert
>
> I have setup a cluster of 2 brokers, using a simple static cluster configuration (see below). Sending a CORE message to broker1, and consuming that message from broker2 works as expected. Attempting the same over AMQP (using the AMQP .Net Lite client) results in an NPE in broker2:
> {code:none}
> 08:44:28,061 ERROR [org.apache.activemq.artemis.core.server] AMQ224016: Caught exception: java.lang.NullPointerException
> 	at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:613) [artemis-amqp-protocol-2.3.0.jar:]
> 	at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:64) [artemis-amqp-protocol-2.3.0.jar:]
> 	at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1348) [artemis-server-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1309) [artemis-server-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1302) [artemis-server-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onSessionSend(ServerSessionPacketHandler.java:690) [artemis-server-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:290) [artemis-server-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:33) [artemis-commons-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [artemis-commons-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [artemis-commons-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0]
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_151]
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_151]
> 	at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151]
> {code}
> To be clear, the NPE exists on the broker to which the receiver is attached, every time a message is SENT by the producer (which is attached to the other broker).
> Attempting to send/receive the AMQP messages on the same cluster member works as expected.
> Here is some client code that demonstrates the issue:
> {code:java}
> using System;
> using System.Collections.Generic;
> using System.Threading;
> using System.Transactions;
> using Amqp;
> using Amqp.Framing;
> using Amqp.Sasl;
> using Amqp.Types;
> namespace Test
> {
>     class Program
>     {
>         static void Main(string[] args)
>         {
>             string url1 = "amqp://localhost:5672";
>             string url2 = "amqp://localhost:5673";
>             String ADDRESS = "orders";
>             Connection connection1 = new Connection(new Address(url1));
>             Session session1 = new Session(connection1);
>             ReceiverLink receiver = new ReceiverLink(session1, "sub1", CreateSharedDurableSubscriberSource(ADDRESS), null);
>             Connection connection2 = new Connection(new Address(url2));
>             Session session2 = new Session(connection2);
>             SenderLink sender = new SenderLink(session2, "sender", ADDRESS);
>             receiver.Start(300, (r,  m) => {
>                 r.Accept(m);
>                 Console.WriteLine("Got message: " + m.Body);
>             });
>         
>             Message outMessage = new Message("order placed at " + DateTime.Now.ToString());
>             outMessage.Header = new Header();
>             outMessage.Header.Durable = true;                    
>             sender.Send(outMessage);
>             Thread.CurrentThread.Join();
>         }
>         private static Source CreateSharedDurableSubscriberSource(String address)
>         {
>             Source source = new Source();
>             source.Address = address;
>             source.ExpiryPolicy = new Symbol("never");
>             
>             source.Durable = 2;
>             source.Capabilities = new Symbol[]{"topic", "shared", "global"};
>             source.DistributionMode = new Symbol("copy");
>             return source;
>         }
>     }
> }
> {code}
> Here is the cluster config from broker1, broker2 is configured accordingly
> {code:xml}
>       <acceptors>
>          <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
>          <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
>       </acceptors>
>       <connectors>
>         <connector name="broker1-connector">tcp://localhost:61616</connector>
>         <connector name="broker2-connector">tcp://localhost:61617</connector>
>       </connectors>
>       <cluster-user>todd</cluster-user>
>       <cluster-password>password</cluster-password>      
>       <cluster-connections>
>         <cluster-connection name="preprod">
>           <connector-ref>broker1-connector</connector-ref>
>           <retry-interval>500</retry-interval>
>           <use-duplicate-detection>true</use-duplicate-detection>
>           <message-load-balancing>ON_DEMAND</message-load-balancing>          
>           <max-hops>1</max-hops>
>           <static-connectors allow-direct-connections-only="true">
>               <connector-ref>broker2-connector</connector-ref>
>           </static-connectors>
>         </cluster-connection>
>       </cluster-connections>
> {code}
> message-load-balancing has no effect on this issue, redistributiuon-delay is set to 0 for the address in question.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)