You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Todd Baert (JIRA)" <ji...@apache.org> on 2017/12/09 14:22:00 UTC
[jira] [Updated] (ARTEMIS-1549) AMQP messages aren't redistributed
across cluster bridge, NPE in ServerSessionImpl.send()
[ https://issues.apache.org/jira/browse/ARTEMIS-1549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Todd Baert updated ARTEMIS-1549:
--------------------------------
Description:
I have setup a cluster of 2 brokers, using a simple static cluster configuration (see below). Sending a CORE message to broker1, and consuming that message from broker2 works as expected. Attempting the same over AMQP (using the AMQP .Net Lite client) results in an NPE in broker2:
{code:java}
08:44:28,061 ERROR [org.apache.activemq.artemis.core.server] AMQ224016: Caught exception: java.lang.NullPointerException
at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:613) [artemis-amqp-protocol-2.3.0.jar:]
at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:64) [artemis-amqp-protocol-2.3.0.jar:]
at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1348) [artemis-server-2.3.0.jar:2.3.0]
at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1309) [artemis-server-2.3.0.jar:2.3.0]
at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1302) [artemis-server-2.3.0.jar:2.3.0]
at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onSessionSend(ServerSessionPacketHandler.java:690) [artemis-server-2.3.0.jar:2.3.0]
at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:290) [artemis-server-2.3.0.jar:2.3.0]
at org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:33) [artemis-commons-2.3.0.jar:2.3.0]
at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0]
at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [artemis-commons-2.3.0.jar:2.3.0]
at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [artemis-commons-2.3.0.jar:2.3.0]
at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_151]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_151]
at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151]
{code}
To be clear, the NPE exists on the broker to which the receiver is attached, every time a message is SENT by the producer (which is attached to the other broker).
Attempting to send/receive the AMQP messages on the same cluster member works as expected.
Here is some client code that demonstrates the issue:
{code:C#}
using System;
using System.Collections.Generic;
using System.Threading;
using System.Transactions;
using Amqp;
using Amqp.Framing;
using Amqp.Sasl;
using Amqp.Types;
namespace Test
{
class Program
{
static void Main(string[] args)
{
string url1 = "amqp://localhost:5672";
string url2 = "amqp://localhost:5673";
String ADDRESS = "orders";
Connection connection1 = new Connection(new Address(url1));
Session session1 = new Session(connection1);
ReceiverLink receiver = new ReceiverLink(session1, "sub1", CreateSharedDurableSubscriberSource(ADDRESS), null);
Connection connection2 = new Connection(new Address(url2));
Session session2 = new Session(connection2);
SenderLink sender = new SenderLink(session2, "sender", ADDRESS);
receiver.Start(300, (r, m) => {
r.Accept(m);
Console.WriteLine("Got message: " + m.Body);
});
Message outMessage = new Message("order placed at " + DateTime.Now.ToString());
outMessage.Header = new Header();
outMessage.Header.Durable = true;
sender.Send(outMessage);
Thread.CurrentThread.Join();
}
private static Source CreateSharedDurableSubscriberSource(String address)
{
Source source = new Source();
source.Address = address;
source.ExpiryPolicy = new Symbol("never");
source.Durable = 2;
source.Capabilities = new Symbol[]{"topic", "shared", "global"};
source.DistributionMode = new Symbol("copy");
return source;
}
}
}
{code}
Here is the cluster config from broker1, broker2 is configured accordingly
{code:java}
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
</acceptors>
<connectors>
<connector name="broker1-connector">tcp://localhost:61616</connector>
<connector name="broker2-connector">tcp://localhost:61617</connector>
</connectors>
<!-- TODD: credentials for the cluster -->
<cluster-user>todd</cluster-user>
<cluster-password>password</cluster-password>
<!-- ADDED -->
<cluster-connections>
<cluster-connection name="preprod">
<!-- TODO - is this needed? -->
<connector-ref>broker1-connector</connector-ref>
<retry-interval>500</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
<message-load-balancing>ON_DEMAND</message-load-balancing>
<max-hops>1</max-hops>
<static-connectors allow-direct-connections-only="true">
<connector-ref>broker2-connector</connector-ref>
</static-connectors>
</cluster-connection>
</cluster-connections>
{code}
Message load balancing has no effect on this issue, redistributiuon-delay is set to 0 for the address in question.
was:
I have setup a cluster of 2 brokers, using a simple static cluster configuration (see below). Sending a CORE message to broker1, and consuming that message from broker2 works as expected. Attempting the same over AMQP (using the AMQP .Net Lite client) results in an NPE in broker2:
{code:java}
08:44:28,061 ERROR [org.apache.activemq.artemis.core.server] AMQ224016: Caught exception: java.lang.NullPointerException
at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:613) [artemis-amqp-protocol-2.3.0.jar:]
at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:64) [artemis-amqp-protocol-2.3.0.jar:]
at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1348) [artemis-server-2.3.0.jar:2.3.0]
at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1309) [artemis-server-2.3.0.jar:2.3.0]
at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1302) [artemis-server-2.3.0.jar:2.3.0]
at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onSessionSend(ServerSessionPacketHandler.java:690) [artemis-server-2.3.0.jar:2.3.0]
at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:290) [artemis-server-2.3.0.jar:2.3.0]
at org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:33) [artemis-commons-2.3.0.jar:2.3.0]
at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0]
at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [artemis-commons-2.3.0.jar:2.3.0]
at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [artemis-commons-2.3.0.jar:2.3.0]
at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_151]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_151]
at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151]
{code}
To be clear, the NPE exists on the broker to which the receiver is attached, every time a message is SENT by the producer (which is attached to the other broker).
Attempting to send/receive the AMQP messages on the same cluster member works as expected.
Here is some client code that demonstrates the issue:
{code:C#}
using System;
using System.Collections.Generic;
using System.Threading;
using System.Transactions;
using Amqp;
using Amqp.Framing;
using Amqp.Sasl;
using Amqp.Types;
namespace Test
{
class Program
{
static void Main(string[] args)
{
string url1 = "amqp://localhost:5672";
string url2 = "amqp://localhost:5673";
String ADDRESS = "orders";
Connection connection1 = new Connection(new Address(url1));
Session session1 = new Session(connection1);
ReceiverLink receiver = new ReceiverLink(session1, "sub1", CreateSharedDurableSubscriberSource(ADDRESS), null);
Connection connection2 = new Connection(new Address(url2));
Session session2 = new Session(connection2);
SenderLink sender = new SenderLink(session2, "sender", ADDRESS);
receiver.Start(300, (r, m) => {
r.Accept(m);
Console.WriteLine("Got message: " + m.Body);
});
Message outMessage = new Message("order placed at " + DateTime.Now.ToString());
outMessage.Header = new Header();
outMessage.Header.Durable = true;
sender.Send(outMessage);
Thread.CurrentThread.Join();
}
private static Source CreateSharedDurableSubscriberSource(String address)
{
Source source = new Source();
source.Address = address;
source.ExpiryPolicy = new Symbol("never");
source.Durable = 2;
source.Capabilities = new Symbol[]{"topic", "shared", "global"};
source.DistributionMode = new Symbol("copy");
return source;
}
}
}
{code}
Here is the cluster config from broker1, broker2 is configured accordingly
{code:java}
<acceptors>
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
<acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
</acceptors>
<connectors>
<connector name="broker1-connector">tcp://localhost:61616</connector>
<connector name="broker2-connector">tcp://localhost:61617</connector>
</connectors>
<!-- TODD: credentials for the cluster -->
<cluster-user>todd</cluster-user>
<cluster-password>password</cluster-password>
<!-- ADDED -->
<cluster-connections>
<cluster-connection name="preprod">
<!-- TODO - is this needed? -->
<connector-ref>broker1-connector</connector-ref>
<retry-interval>500</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
<message-load-balancing>ON_DEMAND</message-load-balancing>
<max-hops>1</max-hops>
<static-connectors allow-direct-connections-only="true">
<connector-ref>broker2-connector</connector-ref>
</static-connectors>
</cluster-connection>
</cluster-connections>
{code}
Message load balancing has no effect on this issue.
> AMQP messages aren't redistributed across cluster bridge, NPE in ServerSessionImpl.send()
> -----------------------------------------------------------------------------------------
>
> Key: ARTEMIS-1549
> URL: https://issues.apache.org/jira/browse/ARTEMIS-1549
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Components: AMQP, Broker
> Affects Versions: 2.2.0, 2.3.0, 2.4.0
> Environment: AMQP .NET lite client, .NET Core runtime 2.0, connecting with brokers on local machine (fedora 26)
> Reporter: Todd Baert
>
> I have setup a cluster of 2 brokers, using a simple static cluster configuration (see below). Sending a CORE message to broker1, and consuming that message from broker2 works as expected. Attempting the same over AMQP (using the AMQP .Net Lite client) results in an NPE in broker2:
> {code:java}
> 08:44:28,061 ERROR [org.apache.activemq.artemis.core.server] AMQ224016: Caught exception: java.lang.NullPointerException
> at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:613) [artemis-amqp-protocol-2.3.0.jar:]
> at org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage.setAddress(AMQPMessage.java:64) [artemis-amqp-protocol-2.3.0.jar:]
> at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1348) [artemis-server-2.3.0.jar:2.3.0]
> at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1309) [artemis-server-2.3.0.jar:2.3.0]
> at org.apache.activemq.artemis.core.server.impl.ServerSessionImpl.send(ServerSessionImpl.java:1302) [artemis-server-2.3.0.jar:2.3.0]
> at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onSessionSend(ServerSessionPacketHandler.java:690) [artemis-server-2.3.0.jar:2.3.0]
> at org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler.onMessagePacket(ServerSessionPacketHandler.java:290) [artemis-server-2.3.0.jar:2.3.0]
> at org.apache.activemq.artemis.utils.actors.Actor.doTask(Actor.java:33) [artemis-commons-2.3.0.jar:2.3.0]
> at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0]
> at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [artemis-commons-2.3.0.jar:2.3.0]
> at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [artemis-commons-2.3.0.jar:2.3.0]
> at org.apache.activemq.artemis.utils.actors.ProcessorBase$ExecutorTask.run(ProcessorBase.java:53) [artemis-commons-2.3.0.jar:2.3.0]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [rt.jar:1.8.0_151]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [rt.jar:1.8.0_151]
> at java.lang.Thread.run(Thread.java:748) [rt.jar:1.8.0_151]
> {code}
> To be clear, the NPE exists on the broker to which the receiver is attached, every time a message is SENT by the producer (which is attached to the other broker).
> Attempting to send/receive the AMQP messages on the same cluster member works as expected.
> Here is some client code that demonstrates the issue:
> {code:C#}
> using System;
> using System.Collections.Generic;
> using System.Threading;
> using System.Transactions;
> using Amqp;
> using Amqp.Framing;
> using Amqp.Sasl;
> using Amqp.Types;
> namespace Test
> {
> class Program
> {
> static void Main(string[] args)
> {
> string url1 = "amqp://localhost:5672";
> string url2 = "amqp://localhost:5673";
> String ADDRESS = "orders";
> Connection connection1 = new Connection(new Address(url1));
> Session session1 = new Session(connection1);
> ReceiverLink receiver = new ReceiverLink(session1, "sub1", CreateSharedDurableSubscriberSource(ADDRESS), null);
> Connection connection2 = new Connection(new Address(url2));
> Session session2 = new Session(connection2);
> SenderLink sender = new SenderLink(session2, "sender", ADDRESS);
> receiver.Start(300, (r, m) => {
> r.Accept(m);
> Console.WriteLine("Got message: " + m.Body);
> });
>
> Message outMessage = new Message("order placed at " + DateTime.Now.ToString());
> outMessage.Header = new Header();
> outMessage.Header.Durable = true;
> sender.Send(outMessage);
> Thread.CurrentThread.Join();
> }
> private static Source CreateSharedDurableSubscriberSource(String address)
> {
> Source source = new Source();
> source.Address = address;
> source.ExpiryPolicy = new Symbol("never");
>
> source.Durable = 2;
> source.Capabilities = new Symbol[]{"topic", "shared", "global"};
> source.DistributionMode = new Symbol("copy");
> return source;
> }
> }
> }
> {code}
> Here is the cluster config from broker1, broker2 is configured accordingly
> {code:java}
> <acceptors>
> <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
> <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300</acceptor>
> </acceptors>
> <connectors>
> <connector name="broker1-connector">tcp://localhost:61616</connector>
> <connector name="broker2-connector">tcp://localhost:61617</connector>
> </connectors>
> <!-- TODD: credentials for the cluster -->
> <cluster-user>todd</cluster-user>
> <cluster-password>password</cluster-password>
> <!-- ADDED -->
> <cluster-connections>
> <cluster-connection name="preprod">
> <!-- TODO - is this needed? -->
> <connector-ref>broker1-connector</connector-ref>
> <retry-interval>500</retry-interval>
> <use-duplicate-detection>true</use-duplicate-detection>
> <message-load-balancing>ON_DEMAND</message-load-balancing>
> <max-hops>1</max-hops>
> <static-connectors allow-direct-connections-only="true">
> <connector-ref>broker2-connector</connector-ref>
> </static-connectors>
> </cluster-connection>
> </cluster-connections>
> {code}
> Message load balancing has no effect on this issue, redistributiuon-delay is set to 0 for the address in question.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)