You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Riyafa Abdul Hameed (JIRA)" <ji...@apache.org> on 2019/04/29 07:09:00 UTC

[jira] [Created] (ARTEMIS-2325) SendAcknowledgementHandler when multiple mesages are sent

Riyafa Abdul Hameed created ARTEMIS-2325:
--------------------------------------------

             Summary: SendAcknowledgementHandler when multiple mesages are sent
                 Key: ARTEMIS-2325
                 URL: https://issues.apache.org/jira/browse/ARTEMIS-2325
             Project: ActiveMQ Artemis
          Issue Type: Bug
         Environment: Using maven artifact version {color:#6a8759}artemis-core-client 2.7.0
{color}
            Reporter: Riyafa Abdul Hameed


When I try to send multiple message while using a

SendAcknowledgementHandler the following code fails:
{code:java}
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;

public class ProducerInvalid {
    public static void main(String[] args) throws Exception {
        ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61616");
        locator.setConfirmationWindowSize(10240);

        ClientSessionFactory factory = locator.createSessionFactory();
        ClientSession session = factory.createSession();
        // A producer is associated with an address ...
        ClientProducer producer = session.createProducer("example");
        for (int i = 0; i < 1000000; i++) {
            ClientMessage message = session.createMessage(true);
            message.getBodyBuffer().writeString("Hello " + i);
            producer.send(message, message1 -> System.out.println(message1.getBodyBuffer().readString()));
        }
    }
}{code}
The exception thrown is as follows:

Apr 29, 2019 11:08:44 AM
{code:java}
org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler bufferReceived
ERROR: AMQ214031: Failed to decode buffer, disconnect immediately.
java.lang.IllegalStateException: java.lang.IndexOutOfBoundsException: readerIndex(22) + length(4) exceeds writerIndex(22): UnpooledDuplicatedByteBuf(ridx: 22, widx: 22, cap: 1500, unwrapped: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 70, cap: 1500))
    at org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:381)
    at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler.bufferReceived(ClientSessionFactoryImpl.java:1191)
    at org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler.channelRead(ActiveMQChannelHandler.java:73)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:426)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:796)
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
    at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
Caused by: java.lang.IndexOutOfBoundsException: readerIndex(22) + length(4) exceeds writerIndex(22): UnpooledDuplicatedByteBuf(ridx: 22, widx: 22, cap: 1500, unwrapped: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 70, cap: 1500))
    at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1428)
    at io.netty.buffer.AbstractByteBuf.readInt(AbstractByteBuf.java:802)
    at io.netty.buffer.WrappedByteBuf.readInt(WrappedByteBuf.java:571)
    at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readStringInternal(ChannelBufferWrapper.java:92)
    at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readString(ChannelBufferWrapper.java:88)
    at org.riyafa.mytests.ProducerInvalid.lambda$main$0(ProducerInvalid.java:23)
    at org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext$2.callSendAck(ActiveMQSessionContext.java:232)
    at org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext$2.handleResponse(ActiveMQSessionContext.java:220)
    at org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext$1.commandConfirmed(ActiveMQSessionContext.java:203)
    at org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.clearUpTo(ChannelImpl.java:755)
    at org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.handlePacket(ChannelImpl.java:693)
    at org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.doBufferReceived(RemotingConnectionImpl.java:399)
    at org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:376)
    ... 21 more{code}
I am not sure if SendAcknowledgementHandler is not supposed to be used in the above manner.

The following works fine:
{code:java}
import org.apache.activemq.artemis.api.core.client.*;

public class Producer {
 public static void main(String[] args) throws Exception {
 ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61616");
 locator.setConfirmationWindowSize(10240);

 ClientSessionFactory factory = locator.createSessionFactory();
 ClientSession session = factory.createSession();
 session.setSendAcknowledgementHandler(message1 -> System.out.println(message1.getBodyBuffer().readString()));
 // A producer is associated with an address ...
 ClientProducer producer = session.createProducer("example");
 for (int i = 0; i < 1000000; i++) {
 ClientMessage message = session.createMessage(true);
 message.getBodyBuffer().writeString("Hello " + i);
 producer.send(message);
 }
 }
}{code}
But I would like to have an acknowledgment handler per send. Is it not possible?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)