You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Justin Bertram (Jira)" <ji...@apache.org> on 2020/01/11 21:06:00 UTC

[jira] [Comment Edited] (ARTEMIS-2325) SendAcknowledgementHandler when multiple mesages are sent

    [ https://issues.apache.org/jira/browse/ARTEMIS-2325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17013525#comment-17013525 ] 

Justin Bertram edited comment on ARTEMIS-2325 at 1/11/20 9:05 PM:
------------------------------------------------------------------

[~Riyafa Abdul Hameed], I totally missed that. Thanks for the clarification. That answers the first question.

However, you didn't indicate whether or not you could reproduce this with the latest release.


was (Author: jbertram):
[~Riyafa Abdul Hameed], I totally missed that. Thanks for the clarification. That answer the first question.

However, you didn't indicate whether or not you could reproduce this with the latest release.

> SendAcknowledgementHandler when multiple mesages are sent
> ---------------------------------------------------------
>
>                 Key: ARTEMIS-2325
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-2325
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>         Environment: Using maven artifact version {color:#6a8759}artemis-core-client 2.7.0
> {color}
>            Reporter: Riyafa Abdul Hameed
>            Priority: Major
>
> When I try to send multiple message while using a
> SendAcknowledgementHandler the following code fails:
> {code:java}
> import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
> import org.apache.activemq.artemis.api.core.client.ClientMessage;
> import org.apache.activemq.artemis.api.core.client.ClientProducer;
> import org.apache.activemq.artemis.api.core.client.ClientSession;
> import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
> import org.apache.activemq.artemis.api.core.client.ServerLocator;
> public class ProducerInvalid {
>     public static void main(String[] args) throws Exception {
>         ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61616");
>         locator.setConfirmationWindowSize(10240);
>         ClientSessionFactory factory = locator.createSessionFactory();
>         ClientSession session = factory.createSession();
>         // A producer is associated with an address ...
>         ClientProducer producer = session.createProducer("example");
>         for (int i = 0; i < 1000000; i++) {
>             ClientMessage message = session.createMessage(true);
>             message.getBodyBuffer().writeString("Hello " + i);
>             producer.send(message, message1 -> System.out.println(message1.getBodyBuffer().readString()));
>         }
>     }
> }{code}
> The exception thrown is as follows:
> Apr 29, 2019 11:08:44 AM
> {code:java}
> org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler bufferReceived
> ERROR: AMQ214031: Failed to decode buffer, disconnect immediately.
> java.lang.IllegalStateException: java.lang.IndexOutOfBoundsException: readerIndex(22) + length(4) exceeds writerIndex(22): UnpooledDuplicatedByteBuf(ridx: 22, widx: 22, cap: 1500, unwrapped: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 70, cap: 1500))
>     at org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:381)
>     at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler.bufferReceived(ClientSessionFactoryImpl.java:1191)
>     at org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler.channelRead(ActiveMQChannelHandler.java:73)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
>     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
>     at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
>     at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
>     at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:426)
>     at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
>     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
>     at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
>     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
>     at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
>     at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:796)
>     at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427)
>     at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328)
>     at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
>     at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
> Caused by: java.lang.IndexOutOfBoundsException: readerIndex(22) + length(4) exceeds writerIndex(22): UnpooledDuplicatedByteBuf(ridx: 22, widx: 22, cap: 1500, unwrapped: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 70, cap: 1500))
>     at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1428)
>     at io.netty.buffer.AbstractByteBuf.readInt(AbstractByteBuf.java:802)
>     at io.netty.buffer.WrappedByteBuf.readInt(WrappedByteBuf.java:571)
>     at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readStringInternal(ChannelBufferWrapper.java:92)
>     at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readString(ChannelBufferWrapper.java:88)
>     at org.riyafa.mytests.ProducerInvalid.lambda$main$0(ProducerInvalid.java:23)
>     at org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext$2.callSendAck(ActiveMQSessionContext.java:232)
>     at org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext$2.handleResponse(ActiveMQSessionContext.java:220)
>     at org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext$1.commandConfirmed(ActiveMQSessionContext.java:203)
>     at org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.clearUpTo(ChannelImpl.java:755)
>     at org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.handlePacket(ChannelImpl.java:693)
>     at org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.doBufferReceived(RemotingConnectionImpl.java:399)
>     at org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:376)
>     ... 21 more{code}
> I am not sure if SendAcknowledgementHandler is not supposed to be used in the above manner.
> The following works fine:
> {code:java}
> import org.apache.activemq.artemis.api.core.client.*;
> public class Producer {
>  public static void main(String[] args) throws Exception {
>  ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61616");
>  locator.setConfirmationWindowSize(10240);
>  ClientSessionFactory factory = locator.createSessionFactory();
>  ClientSession session = factory.createSession();
>  session.setSendAcknowledgementHandler(message1 -> System.out.println(message1.getBodyBuffer().readString()));
>  // A producer is associated with an address ...
>  ClientProducer producer = session.createProducer("example");
>  for (int i = 0; i < 1000000; i++) {
>  ClientMessage message = session.createMessage(true);
>  message.getBodyBuffer().writeString("Hello " + i);
>  producer.send(message);
>  }
>  }
> }{code}
> But I would like to have an acknowledgment handler per send. Is it not possible?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)