You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Justin Bertram (Jira)" <ji...@apache.org> on 2020/01/11 21:06:00 UTC
[jira] [Comment Edited] (ARTEMIS-2325) SendAcknowledgementHandler
when multiple mesages are sent
[ https://issues.apache.org/jira/browse/ARTEMIS-2325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17013525#comment-17013525 ]
Justin Bertram edited comment on ARTEMIS-2325 at 1/11/20 9:05 PM:
------------------------------------------------------------------
[~Riyafa Abdul Hameed], I totally missed that. Thanks for the clarification. That answers the first question.
However, you didn't indicate whether or not you could reproduce this with the latest release.
was (Author: jbertram):
[~Riyafa Abdul Hameed], I totally missed that. Thanks for the clarification. That answer the first question.
However, you didn't indicate whether or not you could reproduce this with the latest release.
> SendAcknowledgementHandler when multiple mesages are sent
> ---------------------------------------------------------
>
> Key: ARTEMIS-2325
> URL: https://issues.apache.org/jira/browse/ARTEMIS-2325
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Environment: Using maven artifact version {color:#6a8759}artemis-core-client 2.7.0
> {color}
> Reporter: Riyafa Abdul Hameed
> Priority: Major
>
> When I try to send multiple message while using a
> SendAcknowledgementHandler the following code fails:
> {code:java}
> import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
> import org.apache.activemq.artemis.api.core.client.ClientMessage;
> import org.apache.activemq.artemis.api.core.client.ClientProducer;
> import org.apache.activemq.artemis.api.core.client.ClientSession;
> import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
> import org.apache.activemq.artemis.api.core.client.ServerLocator;
> public class ProducerInvalid {
> public static void main(String[] args) throws Exception {
> ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61616");
> locator.setConfirmationWindowSize(10240);
> ClientSessionFactory factory = locator.createSessionFactory();
> ClientSession session = factory.createSession();
> // A producer is associated with an address ...
> ClientProducer producer = session.createProducer("example");
> for (int i = 0; i < 1000000; i++) {
> ClientMessage message = session.createMessage(true);
> message.getBodyBuffer().writeString("Hello " + i);
> producer.send(message, message1 -> System.out.println(message1.getBodyBuffer().readString()));
> }
> }
> }{code}
> The exception thrown is as follows:
> Apr 29, 2019 11:08:44 AM
> {code:java}
> org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler bufferReceived
> ERROR: AMQ214031: Failed to decode buffer, disconnect immediately.
> java.lang.IllegalStateException: java.lang.IndexOutOfBoundsException: readerIndex(22) + length(4) exceeds writerIndex(22): UnpooledDuplicatedByteBuf(ridx: 22, widx: 22, cap: 1500, unwrapped: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 70, cap: 1500))
> at org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:381)
> at org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler.bufferReceived(ClientSessionFactoryImpl.java:1191)
> at org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler.channelRead(ActiveMQChannelHandler.java:73)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
> at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
> at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
> at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
> at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:426)
> at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
> at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
> at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
> at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
> at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
> at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:796)
> at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427)
> at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328)
> at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
> at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
> Caused by: java.lang.IndexOutOfBoundsException: readerIndex(22) + length(4) exceeds writerIndex(22): UnpooledDuplicatedByteBuf(ridx: 22, widx: 22, cap: 1500, unwrapped: UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 70, cap: 1500))
> at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1428)
> at io.netty.buffer.AbstractByteBuf.readInt(AbstractByteBuf.java:802)
> at io.netty.buffer.WrappedByteBuf.readInt(WrappedByteBuf.java:571)
> at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readStringInternal(ChannelBufferWrapper.java:92)
> at org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readString(ChannelBufferWrapper.java:88)
> at org.riyafa.mytests.ProducerInvalid.lambda$main$0(ProducerInvalid.java:23)
> at org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext$2.callSendAck(ActiveMQSessionContext.java:232)
> at org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext$2.handleResponse(ActiveMQSessionContext.java:220)
> at org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext$1.commandConfirmed(ActiveMQSessionContext.java:203)
> at org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.clearUpTo(ChannelImpl.java:755)
> at org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.handlePacket(ChannelImpl.java:693)
> at org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.doBufferReceived(RemotingConnectionImpl.java:399)
> at org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:376)
> ... 21 more{code}
> I am not sure if SendAcknowledgementHandler is not supposed to be used in the above manner.
> The following works fine:
> {code:java}
> import org.apache.activemq.artemis.api.core.client.*;
> public class Producer {
> public static void main(String[] args) throws Exception {
> ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61616");
> locator.setConfirmationWindowSize(10240);
> ClientSessionFactory factory = locator.createSessionFactory();
> ClientSession session = factory.createSession();
> session.setSendAcknowledgementHandler(message1 -> System.out.println(message1.getBodyBuffer().readString()));
> // A producer is associated with an address ...
> ClientProducer producer = session.createProducer("example");
> for (int i = 0; i < 1000000; i++) {
> ClientMessage message = session.createMessage(true);
> message.getBodyBuffer().writeString("Hello " + i);
> producer.send(message);
> }
> }
> }{code}
> But I would like to have an acknowledgment handler per send. Is it not possible?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)