You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Riyafa Abdul Hameed (JIRA)" <ji...@apache.org> on 2019/04/29 07:09:00 UTC
[jira] [Created] (ARTEMIS-2325) SendAcknowledgementHandler when
multiple mesages are sent
Riyafa Abdul Hameed created ARTEMIS-2325:
--------------------------------------------
Summary: SendAcknowledgementHandler when multiple mesages are sent
Key: ARTEMIS-2325
URL: https://issues.apache.org/jira/browse/ARTEMIS-2325
Project: ActiveMQ Artemis
Issue Type: Bug
Environment: Using maven artifact version {color:#6a8759}artemis-core-client 2.7.0
{color}
Reporter: Riyafa Abdul Hameed
When I try to send multiple message while using a
SendAcknowledgementHandler the following code fails:
{code:java}
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
public class ProducerInvalid {
public static void main(String[] args) throws Exception {
ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61616");
locator.setConfirmationWindowSize(10240);
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession();
// A producer is associated with an address ...
ClientProducer producer = session.createProducer("example");
for (int i = 0; i < 1000000; i++) {
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString("Hello " + i);
producer.send(message, message1 -> System.out.println(message1.getBodyBuffer().readString()));
}
}
}{code}
The exception thrown is as follows:
Apr 29, 2019 11:08:44 AM
{code:java}
org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler bufferReceived
ERROR: AMQ214031: Failed to decode buffer, disconnect immediately.
java.lang.IllegalStateException: java.lang.IndexOutOfBoundsException: readerIndex(22) + length(4) exceeds writerIndex(22): UnpooledDuplicatedByteBuf(ridx: 22, widx: 22, cap: 1500, unwrapped: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 70, cap: 1500))
at org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:381)
at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler.bufferReceived(ClientSessionFactoryImpl.java:1191)
at org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler.channelRead(ActiveMQChannelHandler.java:73)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:426)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:796)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
Caused by: java.lang.IndexOutOfBoundsException: readerIndex(22) + length(4) exceeds writerIndex(22): UnpooledDuplicatedByteBuf(ridx: 22, widx: 22, cap: 1500, unwrapped: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 70, cap: 1500))
at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1428)
at io.netty.buffer.AbstractByteBuf.readInt(AbstractByteBuf.java:802)
at io.netty.buffer.WrappedByteBuf.readInt(WrappedByteBuf.java:571)
at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readStringInternal(ChannelBufferWrapper.java:92)
at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readString(ChannelBufferWrapper.java:88)
at org.riyafa.mytests.ProducerInvalid.lambda$main$0(ProducerInvalid.java:23)
at org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext$2.callSendAck(ActiveMQSessionContext.java:232)
at org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext$2.handleResponse(ActiveMQSessionContext.java:220)
at org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext$1.commandConfirmed(ActiveMQSessionContext.java:203)
at org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.clearUpTo(ChannelImpl.java:755)
at org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.handlePacket(ChannelImpl.java:693)
at org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.doBufferReceived(RemotingConnectionImpl.java:399)
at org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:376)
... 21 more{code}
I am not sure if SendAcknowledgementHandler is not supposed to be used in the above manner.
The following works fine:
{code:java}
import org.apache.activemq.artemis.api.core.client.*;
public class Producer {
public static void main(String[] args) throws Exception {
ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61616");
locator.setConfirmationWindowSize(10240);
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession();
session.setSendAcknowledgementHandler(message1 -> System.out.println(message1.getBodyBuffer().readString()));
// A producer is associated with an address ...
ClientProducer producer = session.createProducer("example");
for (int i = 0; i < 1000000; i++) {
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString("Hello " + i);
producer.send(message);
}
}
}{code}
But I would like to have an acknowledgment handler per send. Is it not possible?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)