You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Todd Baert (JIRA)" <ji...@apache.org> on 2017/12/09 14:22:00 UTC

[jira] [Updated] (ARTEMIS-1549) AMQP messages aren't redistributed across cluster bridge, NPE in ServerSessionImpl.send()

     [ https://issues.apache.org/jira/browse/ARTEMIS-1549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Todd Baert updated ARTEMIS-1549:
--------------------------------
    Description: 
I have setup a cluster of 2 brokers, using a simple static cluster configuration (see below). Sending a CORE message to broker1, and consuming that message from broker2 works as expected. Attempting the same over AMQP (using the AMQP .Net Lite client) results in an NPE in broker2:


{code:java}
08:44:28,061 ERROR [org.apache.activemq.artemis.core.server] AMQ224016: Caught exception: java.lang.NullPointerException
	at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:613) [artemis-amqp-protocol-2.3.0.jar:]
	at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:64) [artemis-amqp-protocol-2.3.0.jar:]
	at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1348) [artemis-server-2.3.0.jar:2.3.0]
	at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1309) [artemis-server-2.3.0.jar:2.3.0]
	at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1302) [artemis-server-2.3.0.jar:2.3.0]
	at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onSessionSend(ServerSessionPacketHandler.java:690) [artemis-server-2.3.0.jar:2.3.0]
	at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:290) [artemis-server-2.3.0.jar:2.3.0]
	at org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:33) [artemis-commons-2.3.0.jar:2.3.0]
	at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0]
	at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [artemis-commons-2.3.0.jar:2.3.0]
	at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [artemis-commons-2.3.0.jar:2.3.0]
	at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_151]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_151]
	at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151]
{code}

To be clear, the NPE exists on the broker to which the receiver is attached, every time a message is SENT by the producer (which is attached to the other broker).

Attempting to send/receive the AMQP messages on the same cluster member works as expected.

Here is some client code that demonstrates the issue:

{code:C#}
using System;
using System.Collections.Generic;
using System.Threading;
using System.Transactions;
using Amqp;
using Amqp.Framing;
using Amqp.Sasl;
using Amqp.Types;

namespace Test
{
    class Program
    {
        static void Main(string[] args)
        {
            string url1 = "amqp://localhost:5672";
            string url2 = "amqp://localhost:5673";
            String ADDRESS = "orders";

            Connection connection1 = new Connection(new Address(url1));
            Session session1 = new Session(connection1);
            ReceiverLink receiver = new ReceiverLink(session1, "sub1", CreateSharedDurableSubscriberSource(ADDRESS), null);

            Connection connection2 = new Connection(new Address(url2));
            Session session2 = new Session(connection2);
            SenderLink sender = new SenderLink(session2, "sender", ADDRESS);

            receiver.Start(300, (r,  m) => {
                r.Accept(m);
                Console.WriteLine("Got message: " + m.Body);
            });
        
            Message outMessage = new Message("order placed at " + DateTime.Now.ToString());
            outMessage.Header = new Header();
            outMessage.Header.Durable = true;                    
            sender.Send(outMessage);

            Thread.CurrentThread.Join();
        }

        private static Source CreateSharedDurableSubscriberSource(String address)
        {
            Source source = new Source();
            source.Address = address;

            source.ExpiryPolicy = new Symbol("never");
            
            source.Durable = 2;

            source.Capabilities = new Symbol[]{"topic", "shared", "global"};
            source.DistributionMode = new Symbol("copy");

            return source;
        }
    }
}
{code}

Here is the cluster config from broker1, broker2 is configured accordingly


{code:java}
      <acceptors>
         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>

         <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
      </acceptors>

      <connectors>
        <connector name="broker1-connector">tcp://localhost:61616</connector>
        <connector name="broker2-connector">tcp://localhost:61617</connector>
      </connectors>

      <!-- TODD: credentials for the cluster -->
      <cluster-user>todd</cluster-user>
      <cluster-password>password</cluster-password>      

      <!-- ADDED -->
      <cluster-connections>
        <cluster-connection name="preprod">
          <!-- TODO - is this needed? -->
          <connector-ref>broker1-connector</connector-ref>
          <retry-interval>500</retry-interval>
          <use-duplicate-detection>true</use-duplicate-detection>
          <message-load-balancing>ON_DEMAND</message-load-balancing>          
          <max-hops>1</max-hops>
          <static-connectors allow-direct-connections-only="true">
              <connector-ref>broker2-connector</connector-ref>
          </static-connectors>
        </cluster-connection>
      </cluster-connections>
{code}

Message load balancing has no effect on this issue, redistributiuon-delay is set to 0 for the address in question.



  was:
I have setup a cluster of 2 brokers, using a simple static cluster configuration (see below). Sending a CORE message to broker1, and consuming that message from broker2 works as expected. Attempting the same over AMQP (using the AMQP .Net Lite client) results in an NPE in broker2:


{code:java}
08:44:28,061 ERROR [org.apache.activemq.artemis.core.server] AMQ224016: Caught exception: java.lang.NullPointerException
	at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:613) [artemis-amqp-protocol-2.3.0.jar:]
	at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:64) [artemis-amqp-protocol-2.3.0.jar:]
	at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1348) [artemis-server-2.3.0.jar:2.3.0]
	at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1309) [artemis-server-2.3.0.jar:2.3.0]
	at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1302) [artemis-server-2.3.0.jar:2.3.0]
	at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onSessionSend(ServerSessionPacketHandler.java:690) [artemis-server-2.3.0.jar:2.3.0]
	at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:290) [artemis-server-2.3.0.jar:2.3.0]
	at org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:33) [artemis-commons-2.3.0.jar:2.3.0]
	at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0]
	at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [artemis-commons-2.3.0.jar:2.3.0]
	at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [artemis-commons-2.3.0.jar:2.3.0]
	at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_151]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_151]
	at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151]
{code}

To be clear, the NPE exists on the broker to which the receiver is attached, every time a message is SENT by the producer (which is attached to the other broker).

Attempting to send/receive the AMQP messages on the same cluster member works as expected.

Here is some client code that demonstrates the issue:

{code:C#}
using System;
using System.Collections.Generic;
using System.Threading;
using System.Transactions;
using Amqp;
using Amqp.Framing;
using Amqp.Sasl;
using Amqp.Types;

namespace Test
{
    class Program
    {
        static void Main(string[] args)
        {
            string url1 = "amqp://localhost:5672";
            string url2 = "amqp://localhost:5673";
            String ADDRESS = "orders";

            Connection connection1 = new Connection(new Address(url1));
            Session session1 = new Session(connection1);
            ReceiverLink receiver = new ReceiverLink(session1, "sub1", CreateSharedDurableSubscriberSource(ADDRESS), null);

            Connection connection2 = new Connection(new Address(url2));
            Session session2 = new Session(connection2);
            SenderLink sender = new SenderLink(session2, "sender", ADDRESS);

            receiver.Start(300, (r,  m) => {
                r.Accept(m);
                Console.WriteLine("Got message: " + m.Body);
            });
        
            Message outMessage = new Message("order placed at " + DateTime.Now.ToString());
            outMessage.Header = new Header();
            outMessage.Header.Durable = true;                    
            sender.Send(outMessage);

            Thread.CurrentThread.Join();
        }

        private static Source CreateSharedDurableSubscriberSource(String address)
        {
            Source source = new Source();
            source.Address = address;

            source.ExpiryPolicy = new Symbol("never");
            
            source.Durable = 2;

            source.Capabilities = new Symbol[]{"topic", "shared", "global"};
            source.DistributionMode = new Symbol("copy");

            return source;
        }
    }
}
{code}

Here is the cluster config from broker1, broker2 is configured accordingly


{code:java}
      <acceptors>
         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>

         <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
      </acceptors>

      <connectors>
        <connector name="broker1-connector">tcp://localhost:61616</connector>
        <connector name="broker2-connector">tcp://localhost:61617</connector>
      </connectors>

      <!-- TODD: credentials for the cluster -->
      <cluster-user>todd</cluster-user>
      <cluster-password>password</cluster-password>      

      <!-- ADDED -->
      <cluster-connections>
        <cluster-connection name="preprod">
          <!-- TODO - is this needed? -->
          <connector-ref>broker1-connector</connector-ref>
          <retry-interval>500</retry-interval>
          <use-duplicate-detection>true</use-duplicate-detection>
          <message-load-balancing>ON_DEMAND</message-load-balancing>          
          <max-hops>1</max-hops>
          <static-connectors allow-direct-connections-only="true">
              <connector-ref>broker2-connector</connector-ref>
          </static-connectors>
        </cluster-connection>
      </cluster-connections>
{code}

Message load balancing has no effect on this issue.




> AMQP messages aren't redistributed across cluster bridge, NPE in ServerSessionImpl.send()
> -----------------------------------------------------------------------------------------
>
>                 Key: ARTEMIS-1549
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-1549
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: AMQP, Broker
>    Affects Versions: 2.2.0, 2.3.0, 2.4.0
>         Environment: AMQP .NET lite client, .NET Core runtime 2.0, connecting with brokers on local machine (fedora 26)
>            Reporter: Todd Baert
>
> I have setup a cluster of 2 brokers, using a simple static cluster configuration (see below). Sending a CORE message to broker1, and consuming that message from broker2 works as expected. Attempting the same over AMQP (using the AMQP .Net Lite client) results in an NPE in broker2:
> {code:java}
> 08:44:28,061 ERROR [org.apache.activemq.artemis.core.server] AMQ224016: Caught exception: java.lang.NullPointerException
> 	at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:613) [artemis-amqp-protocol-2.3.0.jar:]
> 	at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:64) [artemis-amqp-protocol-2.3.0.jar:]
> 	at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1348) [artemis-server-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1309) [artemis-server-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1302) [artemis-server-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onSessionSend(ServerSessionPacketHandler.java:690) [artemis-server-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:290) [artemis-server-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:33) [artemis-commons-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [artemis-commons-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [artemis-commons-2.3.0.jar:2.3.0]
> 	at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0]
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_151]
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_151]
> 	at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151]
> {code}
> To be clear, the NPE exists on the broker to which the receiver is attached, every time a message is SENT by the producer (which is attached to the other broker).
> Attempting to send/receive the AMQP messages on the same cluster member works as expected.
> Here is some client code that demonstrates the issue:
> {code:C#}
> using System;
> using System.Collections.Generic;
> using System.Threading;
> using System.Transactions;
> using Amqp;
> using Amqp.Framing;
> using Amqp.Sasl;
> using Amqp.Types;
> namespace Test
> {
>     class Program
>     {
>         static void Main(string[] args)
>         {
>             string url1 = "amqp://localhost:5672";
>             string url2 = "amqp://localhost:5673";
>             String ADDRESS = "orders";
>             Connection connection1 = new Connection(new Address(url1));
>             Session session1 = new Session(connection1);
>             ReceiverLink receiver = new ReceiverLink(session1, "sub1", CreateSharedDurableSubscriberSource(ADDRESS), null);
>             Connection connection2 = new Connection(new Address(url2));
>             Session session2 = new Session(connection2);
>             SenderLink sender = new SenderLink(session2, "sender", ADDRESS);
>             receiver.Start(300, (r,  m) => {
>                 r.Accept(m);
>                 Console.WriteLine("Got message: " + m.Body);
>             });
>         
>             Message outMessage = new Message("order placed at " + DateTime.Now.ToString());
>             outMessage.Header = new Header();
>             outMessage.Header.Durable = true;                    
>             sender.Send(outMessage);
>             Thread.CurrentThread.Join();
>         }
>         private static Source CreateSharedDurableSubscriberSource(String address)
>         {
>             Source source = new Source();
>             source.Address = address;
>             source.ExpiryPolicy = new Symbol("never");
>             
>             source.Durable = 2;
>             source.Capabilities = new Symbol[]{"topic", "shared", "global"};
>             source.DistributionMode = new Symbol("copy");
>             return source;
>         }
>     }
> }
> {code}
> Here is the cluster config from broker1, broker2 is configured accordingly
> {code:java}
>       <acceptors>
>          <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
>          <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
>       </acceptors>
>       <connectors>
>         <connector name="broker1-connector">tcp://localhost:61616</connector>
>         <connector name="broker2-connector">tcp://localhost:61617</connector>
>       </connectors>
>       <!-- TODD: credentials for the cluster -->
>       <cluster-user>todd</cluster-user>
>       <cluster-password>password</cluster-password>      
>       <!-- ADDED -->
>       <cluster-connections>
>         <cluster-connection name="preprod">
>           <!-- TODO - is this needed? -->
>           <connector-ref>broker1-connector</connector-ref>
>           <retry-interval>500</retry-interval>
>           <use-duplicate-detection>true</use-duplicate-detection>
>           <message-load-balancing>ON_DEMAND</message-load-balancing>          
>           <max-hops>1</max-hops>
>           <static-connectors allow-direct-connections-only="true">
>               <connector-ref>broker2-connector</connector-ref>
>           </static-connectors>
>         </cluster-connection>
>       </cluster-connections>
> {code}
> Message load balancing has no effect on this issue, redistributiuon-delay is set to 0 for the address in question.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)