You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Tim Jones <ti...@abcwxy.com> on 2022/02/17 21:17:13 UTC

IndexOutOfBounds Warning Message

This seems to appear on larger messages only - I am getting a warning when
calling getDataBuffer (2.20.0 Artemis Client).  Curious if there is
something I may be missing - or if this is completely ignorable? Thanks -
Tim

2022-02-17T21:06:11,908+01:00 [Thread-2 (ActiveMQ-client-global-threads)]
WARN o.a.a.a.c.m.i.CoreMessage [Thread-2 (ActiveMQ-client-global-threads)]
readerIndex(270740) + length(270740) exceeds writerIndex(271572):
UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
270740, widx: 271572, cap: 271572)
java.lang.IndexOutOfBoundsException: readerIndex(270740) + length(270740)
exceeds writerIndex(271572):
UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
270740, widx: 271572, cap: 271572)
at
io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
at
io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
at
org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
at
org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
at
org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)

Re: IndexOutOfBounds Warning Message

Posted by Tim Jones <ti...@abcwxy.com>.
Sorry - the last part was a bit confusing - I will use getBodyBuffer...

If I remember the code correctly, I think that after the exception is
thrown, the getBodyBuffer() getDataBuffer() method defaults to the
getReadOnlyBuffer path ... it attempts to use another path on largeMessages
- and that is the path that yields an exception (it goes through a netty ->
NIO internal use only buffer grab which has been deprecated in Netty... not
clear if something flaky may be going on there).  Anyway - it seems to
deliver the content in the end - it just goes through the same path as
small buffers do to achieve the results on exception....

On Tue, Feb 22, 2022 at 5:55 PM Tim Jones <ti...@abcwxy.com> wrote:

> Thanks, yes, I am using Core...  I see this in the API for getBodyBuffer
> which is what led me down that path:  *The buffer to write the body.
> Warning: If you just want to read the content of a message, use
> getDataBuffer() or getReadOnlyBuffer();*
>
> I am reading content - so that is why I was using getDataBuffer() - should
> I still use getBodyBuffer instead?  I will take a look at getBodyBuffer.
>
> If I remember the code correctly, I think that after the exception is
> thrown, the getBodyBuffer() method defaults to the getReadOnlyBuffer path
> ... it attempts to use another path on largeMessages - and that is the path
> that yields an exception (it goes through a netty -> NIO internal use only
> buffer grab which has been deprecated in Netty... not clear if something
> flaky may be going on there).  Anyway - it seems to deliver the content in
> the end - it just goes through the same path as small buffers do to achieve
> the results on exception....
>
>
>
> On Tue, Feb 22, 2022 at 11:18 AM Clebert Suconic <
> clebert.suconic@gmail.com> wrote:
>
>> or use the JMS API instead.. that's another possibility. but if you
>> want to use the core API, use getbodyBuffer instead.
>>
>> On Tue, Feb 22, 2022 at 1:17 PM Clebert Suconic
>> <cl...@gmail.com> wrote:
>> >
>> > Found the reason:
>> >
>> >
>> > You should use getBoddyBuffer instead.
>> >
>> >
>> > getbodybuffer() will perform a call to checkBuffer() before returning
>> > you the large message.
>> >
>> >
>> > This should fix your test / problem.
>> >
>> > On Tue, Feb 22, 2022 at 1:03 PM Clebert Suconic
>> > <cl...@gmail.com> wrote:
>> > >
>> > > this is something done on the core API.
>> > >
>> > > I can't pinpoint the exact reason, but the JMS facade works fine with
>> > > a similar sender and producer:
>> > >
>> > >
>> https://gist.github.com/clebertsuconic/03fe7206914d8753e9bd966f805a0257
>> > >
>> > >
>> > >
>> > > I created a branch with a test comparing Core and JMS API on my fork:
>> > >
>> > > https://github.com/clebertsuconic/activemq-artemis/tree/withCore
>> > >
>> > >
>> > >
>> > > I'm pasting the test here for future reference if the gist is ever
>> > > gone (or the branch is gone):
>> > >
>> > >
>> > > /**
>> > >  * Licensed to the Apache Software Foundation (ASF) under one or more
>> > >  * contributor license agreements. See the NOTICE file distributed
>> with
>> > >  * this work for additional information regarding copyright ownership.
>> > >  * The ASF licenses this file to You under the Apache License,
>> Version 2.0
>> > >  * (the "License"); you may not use this file except in compliance
>> with
>> > >  * the License. You may obtain a copy of the License at
>> > >  * <p>
>> > >  * http://www.apache.org/licenses/LICENSE-2.0
>> > >  * <p>
>> > >  * Unless required by applicable law or agreed to in writing, software
>> > >  * distributed under the License is distributed on an "AS IS" BASIS,
>> > >  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>> > >  * See the License for the specific language governing permissions and
>> > >  * limitations under the License.
>> > >  */
>> > >
>> > > package org.apache.activemq.artemis.tests.integration.paging;
>> > >
>> > > import javax.jms.Connection;
>> > > import javax.jms.ConnectionFactory;
>> > > import javax.jms.MessageConsumer;
>> > > import javax.jms.MessageProducer;
>> > > import javax.jms.Queue;
>> > > import javax.jms.Session;
>> > > import javax.jms.TextMessage;
>> > > import java.util.concurrent.atomic.AtomicInteger;
>> > >
>> > > import org.apache.activemq.artemis.api.core.QueueConfiguration;
>> > > import org.apache.activemq.artemis.api.core.RoutingType;
>> > > import org.apache.activemq.artemis.api.core.SimpleString;
>> > > import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
>> > > import org.apache.activemq.artemis.api.core.client.ClientMessage;
>> > > import org.apache.activemq.artemis.api.core.client.ClientProducer;
>> > > import org.apache.activemq.artemis.api.core.client.ClientSession;
>> > > import org.apache.activemq.artemis.core.config.Configuration;
>> > > import org.apache.activemq.artemis.core.server.ActiveMQServer;
>> > > import org.apache.activemq.artemis.core.server.impl.AddressInfo;
>> > > import
>> org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
>> > > import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
>> > > import
>> org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
>> > > import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
>> > > import org.apache.activemq.artemis.tests.util.Wait;
>> > > import org.checkerframework.checker.units.qual.A;
>> > > import org.junit.Assert;
>> > > import org.junit.Before;
>> > > import org.junit.Test;
>> > >
>> > > public class Main extends ActiveMQTestBase {
>> > >
>> > >    private final static String QUEUE = "service.images.dev::
>> service.images.dev";
>> > >
>> > >    ActiveMQServer server;
>> > >
>> > >
>> > >    @Before
>> > >    @Override
>> > >    public void setUp() throws Exception {
>> > >       super.setUp();
>> > >
>> > >       Configuration config = createDefaultConfig(0,
>> > > true).setJournalSyncNonTransactional(false);
>> > >
>> > >       config.setMessageExpiryScanPeriod(-1);
>> > >       server = createServer(true, config, 100 * 1024 * 1024, 10 *
>> 1024 * 1024);
>> > >
>> > >       server.getAddressSettingsRepository().clear();
>> > >
>> > >       AddressSettings defaultSetting = new
>> > > AddressSettings().setPageSizeBytes(100 * 1024 *
>> > > 1024).setMaxSizeBytes(10 * 1024 *
>> > >
>> 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false);
>> > >
>> > >       server.getAddressSettingsRepository().addMatch("#",
>> defaultSetting);
>> > >
>> > >
>> > >       server.start();
>> > >
>> > >
>> > >       server.addAddressInfo(new
>> > > AddressInfo(QUEUE).addRoutingType(RoutingType.ANYCAST));
>> > >       server.createQueue(new
>> > > QueueConfiguration(QUEUE).setRoutingType(RoutingType.ANYCAST));
>> > >
>> > >    }
>> > >
>> > >    @Test
>> > >    public void testSending() throws Exception {
>> > >
>> > >       final String username = null;
>> > >       final String password = null;
>> > >
>> > >       var serverLocator =
>> > >
>> ActiveMQClient.createServerLocator("tcp://localhost:61616").setBlockOnDurableSend(true).setBlockOnNonDurableSend(true).setMinLargeMessageSize(1024);
>> > >
>> > >       final var sessionFactory = serverLocator.createSessionFactory();
>> > >
>> > >       final var xa = false;
>> > >       final var autoCommitSends = true;
>> > >       final var autoCommitAcks = true;
>> > >       final var ackBatchSize = serverLocator.getAckBatchSize();
>> > >       final var preAcknowledge = serverLocator.isPreAcknowledge();
>> > >       final var clientSession = sessionFactory.createSession(username,
>> > > password, xa, autoCommitSends, autoCommitAcks, preAcknowledge,
>> > > ackBatchSize);
>> > >
>> > >       var queueQueryResult =
>> > > clientSession.queueQuery(SimpleString.toSimpleString(QUEUE));
>> > >       if (!queueQueryResult.isExists()) {
>> > >          clientSession.createQueue(_ServiceQueueConfiguration(new
>> > > SimpleString(QUEUE)));
>> > >       }
>> > >
>> > >       final var consumer = clientSession.createConsumer(QUEUE);
>> > >
>> > >       clientSession.start();
>> > >
>> > >       AtomicInteger errors = new AtomicInteger(0);
>> > >       AtomicInteger received = new AtomicInteger(0);
>> > >
>> > >       consumer.setMessageHandler((msg) -> {
>> > >          try {
>> > >             msg.getDataBuffer();
>> > >             received.incrementAndGet();
>> > >          } catch (Throwable e) {
>> > >             e.printStackTrace();
>> > >             errors.incrementAndGet();
>> > >          }
>> > >       });
>> > >
>> > >
>> > >       try (ClientSession producerSession =
>> sessionFactory.createSession()) {
>> > >          ClientProducer producer =
>> producerSession.createProducer(QUEUE);
>> > >          for (int i = 0; i < 100; i++) {
>> > >             ClientMessage message =
>> producerSession.createMessage(true);
>> > >             message.getBodyBuffer().writeBytes(new byte[1024 * 1024]);
>> > >             producer.send(message);
>> > >          }
>> > >       }
>> > >
>> > >       Wait.assertEquals(100, received::get);
>> > >       Assert.assertEquals(0, errors.get());
>> > >    }
>> > >
>> > >    @Test
>> > >    public void testWithJMSListener() throws Exception {
>> > >
>> > >       final String username = null;
>> > >       final String password = null;
>> > >
>> > >       ActiveMQConnectionFactory factory = new
>> ActiveMQConnectionFactory();
>> > >       factory.setMinLargeMessageSize(1024);
>> > >       Connection connection = factory.createConnection();
>> > >       Session sessionProducer = connection.createSession(true,
>> > > Session.SESSION_TRANSACTED);
>> > >
>> > >       Session sessionConsumer = connection.createSession(false,
>> > > Session.AUTO_ACKNOWLEDGE);
>> > >
>> > >       Queue jmsQueue = sessionProducer.createQueue(QUEUE);
>> > >
>> > >       MessageConsumer consumer =
>> sessionConsumer.createConsumer(jmsQueue);
>> > >
>> > >
>> > >       connection.start();
>> > >
>> > >       AtomicInteger errors = new AtomicInteger(0);
>> > >       AtomicInteger received = new AtomicInteger(0);
>> > >
>> > >       consumer.setMessageListener((msg) -> {
>> > >          try {
>> > >             System.out.println("Received: " +
>> > > ((TextMessage)msg).getText().length());
>> > >             received.incrementAndGet();
>> > >          } catch (Throwable e) {
>> > >             e.printStackTrace();
>> > >             errors.incrementAndGet();
>> > >          }
>> > >
>> > >       });
>> > >
>> > >       MessageProducer producer =
>> sessionProducer.createProducer(jmsQueue);
>> > >
>> > >       StringBuffer buffer = new StringBuffer();
>> > >       while (buffer.length() < 100 * 1024) {
>> > >          buffer.append("*****");
>> > >       }
>> > >
>> > >       for (int i = 0; i < 100; i++) {
>> > >          TextMessage message =
>> > > sessionProducer.createTextMessage(buffer.toString());
>> > >          producer.send(message);
>> > >       }
>> > >       sessionProducer.commit();
>> > >
>> > >       Wait.assertEquals(100, received::get);
>> > >       Assert.assertEquals(0, errors.get());
>> > >    }
>> > >
>> > >    private static QueueConfiguration
>> > > _ServiceQueueConfiguration(SimpleString queueName) {
>> > >       final var config = new QueueConfiguration(queueName);
>> > >       config.setMaxConsumers(1);
>> > >       config.setPurgeOnNoConsumers(false);
>> > >       config.setDurable(false);
>> > >       config.setAutoDelete(false);
>> > >       config.setRoutingType(RoutingType.MULTICAST);
>> > >       return config;
>> > >    }
>> > > }
>> > >
>> > > On Mon, Feb 21, 2022 at 2:00 PM Justin Bertram <jb...@apache.org>
>> wrote:
>> > > >
>> > > > No. The test was super-simple. Just send one large message and then
>> consume
>> > > > it.
>> > > >
>> > > >
>> > > > Justin
>> > > >
>> > > > On Mon, Feb 21, 2022 at 12:57 PM Clebert Suconic <
>> clebert.suconic@gmail.com>
>> > > > wrote:
>> > > >
>> > > > > It did not involved paging ?
>> > > > >
>> > > > > On Mon, Feb 21, 2022 at 11:07 AM Justin Bertram <
>> jbertram@apache.org>
>> > > > > wrote:
>> > > > >
>> > > > > > I recreated this exception with a very simple test-case. I took
>> the
>> > > > > > consumer code pasted earlier in the thread and just added a
>> producer
>> > > > > > sending a large message. I lowered the minLargeMessageSize to
>> make it
>> > > > > > faster. I thought I still had that code laying around
>> somewhere, but I
>> > > > > > can't find it at the moment.
>> > > > > >
>> > > > > >
>> > > > > > Justin
>> > > > > >
>> > > > > > On Sun, Feb 20, 2022 at 9:03 AM Clebert Suconic <
>> > > > > clebert.suconic@gmail.com
>> > > > > > >
>> > > > > > wrote:
>> > > > > >
>> > > > > > > I have seen (and fixed) cases where the large message file is
>> gone.  I
>> > > > > > > would need a reproducer creating the issue from scratch (send
>> and
>> > > > > > consume)
>> > > > > > >
>> > > > > > > Typically it could be associated with paging ?  Did you have
>> the
>> > > > > > > destination in page mode ?
>> > > > > > >
>> > > > > > > On Thu, Feb 17, 2022 at 6:53 PM Tim Jones <ti...@abcwxy.com>
>> wrote:
>> > > > > > >
>> > > > > > > > The client code below the stack trace - will reproduce it
>> - the
>> > > > > > > > msg.getDataBuffer() inside the handler of that code will
>> trigger it
>> > > > > > when
>> > > > > > > a
>> > > > > > > > large message is sent to the address.  The exception in
>> question
>> > > > > > > > (IndexOutOfBoundsException) is being caught in line 249 of
>> the apache
>> > > > > > > > CoreMessage code... and turned into a logged warning on
>> line 250.
>> > > > > The
>> > > > > > > > CoreMessage code then returns the buffer from
>> getReadOnlyBuffer()
>> > > > > > (which
>> > > > > > > > appears to be fine from a quick survey - data also seems to
>> be
>> > > > > > preserved
>> > > > > > > > and accessible from client code) - it is just not clear
>> what the
>> > > > > intent
>> > > > > > > is
>> > > > > > > > if the exception code is executed... is it a "don't worry
>> about it" -
>> > > > > > or
>> > > > > > > a
>> > > > > > > > "something is wrong here" and I should concern myself with
>> something?
>> > > > > > > (the
>> > > > > > > > warning level is making me believe that it is more than a
>> "don't
>> > > > > worry
>> > > > > > > > about it" - but the fact that the exception was caught and
>> a valid
>> > > > > > buffer
>> > > > > > > > is returned makes me think it is just a fallback for the
>> other
>> > > > > choices
>> > > > > > of
>> > > > > > > > buffers - and I should not worry about it?).
>> > > > > > > >
>> > > > > > > > Thanks for any insight you may have....
>> > > > > > > >
>> > > > > > > > logged warning from the caught exception is:
>> > > > > > > >
>> > > > > > > > 16:26:50.923 [Thread-0 (ActiveMQ-client-global-threads)]
>> WARN
>> > > > > > > > org.apache.activemq.artemis.core.message.impl.CoreMessage -
>> > > > > > > readerIndex(4)
>> > > > > > > > + length(270740) exceeds writerIndex(4):
>> > > > > > > >
>> UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
>> > > > > 4,
>> > > > > > > > widx: 4, cap: 270740)
>> > > > > > > > java.lang.IndexOutOfBoundsException: readerIndex(4) +
>> length(270740)
>> > > > > > > > exceeds writerIndex(4):
>> > > > > > > >
>> UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
>> > > > > 4,
>> > > > > > > > widx: 4, cap: 270740)
>> > > > > > > > at
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
>> > > > > > > > at
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
>> > > > > > > > at
>> > > > >
>> io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
>> > > > > > > > at
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
>> > > > > > > > at
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
>> > > > > > > > at
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
>> > > > > > > > at io.m45.sart.Main.lambda$main$0(Main.java:57)
>> > > > > > > > at
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1013)
>> > > > > > > > at
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1133)
>> > > > > > > > at
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)
>> > > > > > > > at
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)
>> > > > > > > > at
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65)
>> > > > > > > > at
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> > > > > > > > at
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> > > > > > > > at
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
>> > > > > > > > 16:26:50.924 [Thread-0 (ActiveMQ-client-netty-threads)]
>> DEBUG
>> > > > > > > >
>> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl -
>> > > > > > Sending
>> > > > > > > > 65962 from flow-control
>> > > > > > > >
>> > > > > > > > Simple Client Code below:
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > public class Main {
>> > > > > > > >
>> > > > > > > >     private final static String ACCEPTOR =
>> "tcp://localhost:9322";
>> > > > > > > >     private final static String QUEUE="service.images.dev::
>> > > > > > > > service.images.dev";
>> > > > > > > >
>> > > > > > > >     public static void main(String[] args) throws Exception
>> {
>> > > > > > > >
>> > > > > > > >         final String username = null;
>> > > > > > > >         final String password = null;
>> > > > > > > >
>> > > > > > > >         var serverLocator = ActiveMQClient
>> > > > > > > >                 .createServerLocator(ACCEPTOR)
>> > > > > > > >                 .setBlockOnDurableSend(true)
>> > > > > > > >                 .setBlockOnNonDurableSend(true);
>> > > > > > > >
>> > > > > > > >         final var sessionFactory =
>> > > > > > serverLocator.createSessionFactory();
>> > > > > > > >
>> > > > > > > >         final var xa = false;
>> > > > > > > >         final var autoCommitSends = true;
>> > > > > > > >         final var autoCommitAcks = true;
>> > > > > > > >         final var ackBatchSize =
>> serverLocator.getAckBatchSize();
>> > > > > > > >         final var preAcknowledge =
>> serverLocator.isPreAcknowledge();
>> > > > > > > >         final var clientSession =
>> sessionFactory.createSession(
>> > > > > > > >                 username,
>> > > > > > > >                 password,
>> > > > > > > >                 xa,
>> > > > > > > >                 autoCommitSends,
>> > > > > > > >                 autoCommitAcks,
>> > > > > > > >                 preAcknowledge,
>> > > > > > > >                 ackBatchSize
>> > > > > > > >         );
>> > > > > > > >
>> > > > > > > >         var queueQueryResult =
>> > > > > > > >
>> clientSession.queueQuery(SimpleString.toSimpleString(QUEUE));
>> > > > > > > >         if (!queueQueryResult.isExists()) {
>> > > > > > > >
>>  clientSession.createQueue(_ServiceQueueConfiguration(new
>> > > > > > > > SimpleString(QUEUE)));
>> > > > > > > >         }
>> > > > > > > >
>> > > > > > > >         final var consumer =
>> clientSession.createConsumer(QUEUE);
>> > > > > > > >
>> > > > > > > >         clientSession.start();
>> > > > > > > >
>> > > > > > > >         consumer.setMessageHandler((msg) -> {
>> > > > > > > >
>> > > > > > > >             System.out.println("Received:
>> "+msg.getBodySize());
>> > > > > > > >             msg.getDataBuffer();
>> > > > > > > >
>> > > > > > > >         });
>> > > > > > > >
>> > > > > > > >         while(true) {
>> > > > > > > >             Thread.sleep(1000);
>> > > > > > > >         }
>> > > > > > > >
>> > > > > > > >     }
>> > > > > > > >
>> > > > > > > >     private static QueueConfiguration
>> > > > > > > > _ServiceQueueConfiguration(SimpleString queueName) {
>> > > > > > > >         final var config = new
>> QueueConfiguration(queueName);
>> > > > > > > >         config.setMaxConsumers(1);
>> > > > > > > >         config.setPurgeOnNoConsumers(false);
>> > > > > > > >         config.setDurable(false);
>> > > > > > > >         config.setAutoDelete(false);
>> > > > > > > >         config.setRoutingType(RoutingType.MULTICAST);
>> > > > > > > >         return config;
>> > > > > > > >     }
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Thu, Feb 17, 2022 at 2:28 PM Justin Bertram <
>> jbertram@apache.org>
>> > > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Typically an IndexOutOfBoundsException indicates a bug.
>> Do you
>> > > > > have a
>> > > > > > > way
>> > > > > > > > > to reproduce this?
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > Justin
>> > > > > > > > >
>> > > > > > > > > On Thu, Feb 17, 2022 at 3:17 PM Tim Jones <ti...@abcwxy.com>
>> wrote:
>> > > > > > > > >
>> > > > > > > > > > This seems to appear on larger messages only - I am
>> getting a
>> > > > > > warning
>> > > > > > > > > when
>> > > > > > > > > > calling getDataBuffer (2.20.0 Artemis Client).  Curious
>> if there
>> > > > > is
>> > > > > > > > > > something I may be missing - or if this is completely
>> ignorable?
>> > > > > > > > Thanks -
>> > > > > > > > > > Tim
>> > > > > > > > > >
>> > > > > > > > > > 2022-02-17T21:06:11,908+01:00 [Thread-2
>> > > > > > > > (ActiveMQ-client-global-threads)]
>> > > > > > > > > > WARN o.a.a.a.c.m.i.CoreMessage [Thread-2
>> > > > > > > > > (ActiveMQ-client-global-threads)]
>> > > > > > > > > > readerIndex(270740) + length(270740) exceeds
>> writerIndex(271572):
>> > > > > > > > > >
>> > > > > >
>> UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
>> > > > > > > > > > 270740, widx: 271572, cap: 271572)
>> > > > > > > > > > java.lang.IndexOutOfBoundsException:
>> readerIndex(270740) +
>> > > > > > > > length(270740)
>> > > > > > > > > > exceeds writerIndex(271572):
>> > > > > > > > > >
>> > > > > >
>> UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
>> > > > > > > > > > 270740, widx: 271572, cap: 271572)
>> > > > > > > > > > at
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
>> > > > > > > > > > at
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
>> > > > > > > > > > at
>> > > > > > >
>> io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
>> > > > > > > > > > at
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
>> > > > > > > > > > at
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
>> > > > > > > > > > at
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > > --
>> > > > > > > Clebert Suconic
>> > > > > > >
>> > > > > >
>> > > > > --
>> > > > > Clebert Suconic
>> > > > >
>> > >
>> > >
>> > >
>> > > --
>> > > Clebert Suconic
>> >
>> >
>> >
>> > --
>> > Clebert Suconic
>>
>>
>>
>> --
>> Clebert Suconic
>>
>

Re: IndexOutOfBounds Warning Message

Posted by Tim Jones <ti...@abcwxy.com>.
Thanks, yes, I am using Core...  I see this in the API for getBodyBuffer
which is what led me down that path:  *The buffer to write the body.
Warning: If you just want to read the content of a message, use
getDataBuffer() or getReadOnlyBuffer();*

I am reading content - so that is why I was using getDataBuffer() - should
I still use getBodyBuffer instead?  I will take a look at getBodyBuffer.

If I remember the code correctly, I think that after the exception is
thrown, the getBodyBuffer() method defaults to the getReadOnlyBuffer path
... it attempts to use another path on largeMessages - and that is the path
that yields an exception (it goes through a netty -> NIO internal use only
buffer grab which has been deprecated in Netty... not clear if something
flaky may be going on there).  Anyway - it seems to deliver the content in
the end - it just goes through the same path as small buffers do to achieve
the results on exception....



On Tue, Feb 22, 2022 at 11:18 AM Clebert Suconic <cl...@gmail.com>
wrote:

> or use the JMS API instead.. that's another possibility. but if you
> want to use the core API, use getbodyBuffer instead.
>
> On Tue, Feb 22, 2022 at 1:17 PM Clebert Suconic
> <cl...@gmail.com> wrote:
> >
> > Found the reason:
> >
> >
> > You should use getBoddyBuffer instead.
> >
> >
> > getbodybuffer() will perform a call to checkBuffer() before returning
> > you the large message.
> >
> >
> > This should fix your test / problem.
> >
> > On Tue, Feb 22, 2022 at 1:03 PM Clebert Suconic
> > <cl...@gmail.com> wrote:
> > >
> > > this is something done on the core API.
> > >
> > > I can't pinpoint the exact reason, but the JMS facade works fine with
> > > a similar sender and producer:
> > >
> > >
> https://gist.github.com/clebertsuconic/03fe7206914d8753e9bd966f805a0257
> > >
> > >
> > >
> > > I created a branch with a test comparing Core and JMS API on my fork:
> > >
> > > https://github.com/clebertsuconic/activemq-artemis/tree/withCore
> > >
> > >
> > >
> > > I'm pasting the test here for future reference if the gist is ever
> > > gone (or the branch is gone):
> > >
> > >
> > > /**
> > >  * Licensed to the Apache Software Foundation (ASF) under one or more
> > >  * contributor license agreements. See the NOTICE file distributed with
> > >  * this work for additional information regarding copyright ownership.
> > >  * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > >  * (the "License"); you may not use this file except in compliance with
> > >  * the License. You may obtain a copy of the License at
> > >  * <p>
> > >  * http://www.apache.org/licenses/LICENSE-2.0
> > >  * <p>
> > >  * Unless required by applicable law or agreed to in writing, software
> > >  * distributed under the License is distributed on an "AS IS" BASIS,
> > >  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > >  * See the License for the specific language governing permissions and
> > >  * limitations under the License.
> > >  */
> > >
> > > package org.apache.activemq.artemis.tests.integration.paging;
> > >
> > > import javax.jms.Connection;
> > > import javax.jms.ConnectionFactory;
> > > import javax.jms.MessageConsumer;
> > > import javax.jms.MessageProducer;
> > > import javax.jms.Queue;
> > > import javax.jms.Session;
> > > import javax.jms.TextMessage;
> > > import java.util.concurrent.atomic.AtomicInteger;
> > >
> > > import org.apache.activemq.artemis.api.core.QueueConfiguration;
> > > import org.apache.activemq.artemis.api.core.RoutingType;
> > > import org.apache.activemq.artemis.api.core.SimpleString;
> > > import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
> > > import org.apache.activemq.artemis.api.core.client.ClientMessage;
> > > import org.apache.activemq.artemis.api.core.client.ClientProducer;
> > > import org.apache.activemq.artemis.api.core.client.ClientSession;
> > > import org.apache.activemq.artemis.core.config.Configuration;
> > > import org.apache.activemq.artemis.core.server.ActiveMQServer;
> > > import org.apache.activemq.artemis.core.server.impl.AddressInfo;
> > > import
> org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
> > > import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
> > > import
> org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
> > > import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
> > > import org.apache.activemq.artemis.tests.util.Wait;
> > > import org.checkerframework.checker.units.qual.A;
> > > import org.junit.Assert;
> > > import org.junit.Before;
> > > import org.junit.Test;
> > >
> > > public class Main extends ActiveMQTestBase {
> > >
> > >    private final static String QUEUE = "service.images.dev::
> service.images.dev";
> > >
> > >    ActiveMQServer server;
> > >
> > >
> > >    @Before
> > >    @Override
> > >    public void setUp() throws Exception {
> > >       super.setUp();
> > >
> > >       Configuration config = createDefaultConfig(0,
> > > true).setJournalSyncNonTransactional(false);
> > >
> > >       config.setMessageExpiryScanPeriod(-1);
> > >       server = createServer(true, config, 100 * 1024 * 1024, 10 * 1024
> * 1024);
> > >
> > >       server.getAddressSettingsRepository().clear();
> > >
> > >       AddressSettings defaultSetting = new
> > > AddressSettings().setPageSizeBytes(100 * 1024 *
> > > 1024).setMaxSizeBytes(10 * 1024 *
> > >
> 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false);
> > >
> > >       server.getAddressSettingsRepository().addMatch("#",
> defaultSetting);
> > >
> > >
> > >       server.start();
> > >
> > >
> > >       server.addAddressInfo(new
> > > AddressInfo(QUEUE).addRoutingType(RoutingType.ANYCAST));
> > >       server.createQueue(new
> > > QueueConfiguration(QUEUE).setRoutingType(RoutingType.ANYCAST));
> > >
> > >    }
> > >
> > >    @Test
> > >    public void testSending() throws Exception {
> > >
> > >       final String username = null;
> > >       final String password = null;
> > >
> > >       var serverLocator =
> > >
> ActiveMQClient.createServerLocator("tcp://localhost:61616").setBlockOnDurableSend(true).setBlockOnNonDurableSend(true).setMinLargeMessageSize(1024);
> > >
> > >       final var sessionFactory = serverLocator.createSessionFactory();
> > >
> > >       final var xa = false;
> > >       final var autoCommitSends = true;
> > >       final var autoCommitAcks = true;
> > >       final var ackBatchSize = serverLocator.getAckBatchSize();
> > >       final var preAcknowledge = serverLocator.isPreAcknowledge();
> > >       final var clientSession = sessionFactory.createSession(username,
> > > password, xa, autoCommitSends, autoCommitAcks, preAcknowledge,
> > > ackBatchSize);
> > >
> > >       var queueQueryResult =
> > > clientSession.queueQuery(SimpleString.toSimpleString(QUEUE));
> > >       if (!queueQueryResult.isExists()) {
> > >          clientSession.createQueue(_ServiceQueueConfiguration(new
> > > SimpleString(QUEUE)));
> > >       }
> > >
> > >       final var consumer = clientSession.createConsumer(QUEUE);
> > >
> > >       clientSession.start();
> > >
> > >       AtomicInteger errors = new AtomicInteger(0);
> > >       AtomicInteger received = new AtomicInteger(0);
> > >
> > >       consumer.setMessageHandler((msg) -> {
> > >          try {
> > >             msg.getDataBuffer();
> > >             received.incrementAndGet();
> > >          } catch (Throwable e) {
> > >             e.printStackTrace();
> > >             errors.incrementAndGet();
> > >          }
> > >       });
> > >
> > >
> > >       try (ClientSession producerSession =
> sessionFactory.createSession()) {
> > >          ClientProducer producer =
> producerSession.createProducer(QUEUE);
> > >          for (int i = 0; i < 100; i++) {
> > >             ClientMessage message =
> producerSession.createMessage(true);
> > >             message.getBodyBuffer().writeBytes(new byte[1024 * 1024]);
> > >             producer.send(message);
> > >          }
> > >       }
> > >
> > >       Wait.assertEquals(100, received::get);
> > >       Assert.assertEquals(0, errors.get());
> > >    }
> > >
> > >    @Test
> > >    public void testWithJMSListener() throws Exception {
> > >
> > >       final String username = null;
> > >       final String password = null;
> > >
> > >       ActiveMQConnectionFactory factory = new
> ActiveMQConnectionFactory();
> > >       factory.setMinLargeMessageSize(1024);
> > >       Connection connection = factory.createConnection();
> > >       Session sessionProducer = connection.createSession(true,
> > > Session.SESSION_TRANSACTED);
> > >
> > >       Session sessionConsumer = connection.createSession(false,
> > > Session.AUTO_ACKNOWLEDGE);
> > >
> > >       Queue jmsQueue = sessionProducer.createQueue(QUEUE);
> > >
> > >       MessageConsumer consumer =
> sessionConsumer.createConsumer(jmsQueue);
> > >
> > >
> > >       connection.start();
> > >
> > >       AtomicInteger errors = new AtomicInteger(0);
> > >       AtomicInteger received = new AtomicInteger(0);
> > >
> > >       consumer.setMessageListener((msg) -> {
> > >          try {
> > >             System.out.println("Received: " +
> > > ((TextMessage)msg).getText().length());
> > >             received.incrementAndGet();
> > >          } catch (Throwable e) {
> > >             e.printStackTrace();
> > >             errors.incrementAndGet();
> > >          }
> > >
> > >       });
> > >
> > >       MessageProducer producer =
> sessionProducer.createProducer(jmsQueue);
> > >
> > >       StringBuffer buffer = new StringBuffer();
> > >       while (buffer.length() < 100 * 1024) {
> > >          buffer.append("*****");
> > >       }
> > >
> > >       for (int i = 0; i < 100; i++) {
> > >          TextMessage message =
> > > sessionProducer.createTextMessage(buffer.toString());
> > >          producer.send(message);
> > >       }
> > >       sessionProducer.commit();
> > >
> > >       Wait.assertEquals(100, received::get);
> > >       Assert.assertEquals(0, errors.get());
> > >    }
> > >
> > >    private static QueueConfiguration
> > > _ServiceQueueConfiguration(SimpleString queueName) {
> > >       final var config = new QueueConfiguration(queueName);
> > >       config.setMaxConsumers(1);
> > >       config.setPurgeOnNoConsumers(false);
> > >       config.setDurable(false);
> > >       config.setAutoDelete(false);
> > >       config.setRoutingType(RoutingType.MULTICAST);
> > >       return config;
> > >    }
> > > }
> > >
> > > On Mon, Feb 21, 2022 at 2:00 PM Justin Bertram <jb...@apache.org>
> wrote:
> > > >
> > > > No. The test was super-simple. Just send one large message and then
> consume
> > > > it.
> > > >
> > > >
> > > > Justin
> > > >
> > > > On Mon, Feb 21, 2022 at 12:57 PM Clebert Suconic <
> clebert.suconic@gmail.com>
> > > > wrote:
> > > >
> > > > > It did not involved paging ?
> > > > >
> > > > > On Mon, Feb 21, 2022 at 11:07 AM Justin Bertram <
> jbertram@apache.org>
> > > > > wrote:
> > > > >
> > > > > > I recreated this exception with a very simple test-case. I took
> the
> > > > > > consumer code pasted earlier in the thread and just added a
> producer
> > > > > > sending a large message. I lowered the minLargeMessageSize to
> make it
> > > > > > faster. I thought I still had that code laying around somewhere,
> but I
> > > > > > can't find it at the moment.
> > > > > >
> > > > > >
> > > > > > Justin
> > > > > >
> > > > > > On Sun, Feb 20, 2022 at 9:03 AM Clebert Suconic <
> > > > > clebert.suconic@gmail.com
> > > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > I have seen (and fixed) cases where the large message file is
> gone.  I
> > > > > > > would need a reproducer creating the issue from scratch (send
> and
> > > > > > consume)
> > > > > > >
> > > > > > > Typically it could be associated with paging ?  Did you have
> the
> > > > > > > destination in page mode ?
> > > > > > >
> > > > > > > On Thu, Feb 17, 2022 at 6:53 PM Tim Jones <ti...@abcwxy.com>
> wrote:
> > > > > > >
> > > > > > > > The client code below the stack trace - will reproduce it  -
> the
> > > > > > > > msg.getDataBuffer() inside the handler of that code will
> trigger it
> > > > > > when
> > > > > > > a
> > > > > > > > large message is sent to the address.  The exception in
> question
> > > > > > > > (IndexOutOfBoundsException) is being caught in line 249 of
> the apache
> > > > > > > > CoreMessage code... and turned into a logged warning on line
> 250.
> > > > > The
> > > > > > > > CoreMessage code then returns the buffer from
> getReadOnlyBuffer()
> > > > > > (which
> > > > > > > > appears to be fine from a quick survey - data also seems to
> be
> > > > > > preserved
> > > > > > > > and accessible from client code) - it is just not clear what
> the
> > > > > intent
> > > > > > > is
> > > > > > > > if the exception code is executed... is it a "don't worry
> about it" -
> > > > > > or
> > > > > > > a
> > > > > > > > "something is wrong here" and I should concern myself with
> something?
> > > > > > > (the
> > > > > > > > warning level is making me believe that it is more than a
> "don't
> > > > > worry
> > > > > > > > about it" - but the fact that the exception was caught and a
> valid
> > > > > > buffer
> > > > > > > > is returned makes me think it is just a fallback for the
> other
> > > > > choices
> > > > > > of
> > > > > > > > buffers - and I should not worry about it?).
> > > > > > > >
> > > > > > > > Thanks for any insight you may have....
> > > > > > > >
> > > > > > > > logged warning from the caught exception is:
> > > > > > > >
> > > > > > > > 16:26:50.923 [Thread-0 (ActiveMQ-client-global-threads)] WARN
> > > > > > > > org.apache.activemq.artemis.core.message.impl.CoreMessage -
> > > > > > > readerIndex(4)
> > > > > > > > + length(270740) exceeds writerIndex(4):
> > > > > > > >
> UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > > > 4,
> > > > > > > > widx: 4, cap: 270740)
> > > > > > > > java.lang.IndexOutOfBoundsException: readerIndex(4) +
> length(270740)
> > > > > > > > exceeds writerIndex(4):
> > > > > > > >
> UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > > > 4,
> > > > > > > > widx: 4, cap: 270740)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
> > > > > > > > at
> > > > > io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
> > > > > > > > at io.m45.sart.Main.lambda$main$0(Main.java:57)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1013)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1133)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
> > > > > > > > 16:26:50.924 [Thread-0 (ActiveMQ-client-netty-threads)] DEBUG
> > > > > > > >
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl -
> > > > > > Sending
> > > > > > > > 65962 from flow-control
> > > > > > > >
> > > > > > > > Simple Client Code below:
> > > > > > > >
> > > > > > > >
> > > > > > > > public class Main {
> > > > > > > >
> > > > > > > >     private final static String ACCEPTOR =
> "tcp://localhost:9322";
> > > > > > > >     private final static String QUEUE="service.images.dev::
> > > > > > > > service.images.dev";
> > > > > > > >
> > > > > > > >     public static void main(String[] args) throws Exception {
> > > > > > > >
> > > > > > > >         final String username = null;
> > > > > > > >         final String password = null;
> > > > > > > >
> > > > > > > >         var serverLocator = ActiveMQClient
> > > > > > > >                 .createServerLocator(ACCEPTOR)
> > > > > > > >                 .setBlockOnDurableSend(true)
> > > > > > > >                 .setBlockOnNonDurableSend(true);
> > > > > > > >
> > > > > > > >         final var sessionFactory =
> > > > > > serverLocator.createSessionFactory();
> > > > > > > >
> > > > > > > >         final var xa = false;
> > > > > > > >         final var autoCommitSends = true;
> > > > > > > >         final var autoCommitAcks = true;
> > > > > > > >         final var ackBatchSize =
> serverLocator.getAckBatchSize();
> > > > > > > >         final var preAcknowledge =
> serverLocator.isPreAcknowledge();
> > > > > > > >         final var clientSession =
> sessionFactory.createSession(
> > > > > > > >                 username,
> > > > > > > >                 password,
> > > > > > > >                 xa,
> > > > > > > >                 autoCommitSends,
> > > > > > > >                 autoCommitAcks,
> > > > > > > >                 preAcknowledge,
> > > > > > > >                 ackBatchSize
> > > > > > > >         );
> > > > > > > >
> > > > > > > >         var queueQueryResult =
> > > > > > > > clientSession.queueQuery(SimpleString.toSimpleString(QUEUE));
> > > > > > > >         if (!queueQueryResult.isExists()) {
> > > > > > > >
>  clientSession.createQueue(_ServiceQueueConfiguration(new
> > > > > > > > SimpleString(QUEUE)));
> > > > > > > >         }
> > > > > > > >
> > > > > > > >         final var consumer =
> clientSession.createConsumer(QUEUE);
> > > > > > > >
> > > > > > > >         clientSession.start();
> > > > > > > >
> > > > > > > >         consumer.setMessageHandler((msg) -> {
> > > > > > > >
> > > > > > > >             System.out.println("Received:
> "+msg.getBodySize());
> > > > > > > >             msg.getDataBuffer();
> > > > > > > >
> > > > > > > >         });
> > > > > > > >
> > > > > > > >         while(true) {
> > > > > > > >             Thread.sleep(1000);
> > > > > > > >         }
> > > > > > > >
> > > > > > > >     }
> > > > > > > >
> > > > > > > >     private static QueueConfiguration
> > > > > > > > _ServiceQueueConfiguration(SimpleString queueName) {
> > > > > > > >         final var config = new QueueConfiguration(queueName);
> > > > > > > >         config.setMaxConsumers(1);
> > > > > > > >         config.setPurgeOnNoConsumers(false);
> > > > > > > >         config.setDurable(false);
> > > > > > > >         config.setAutoDelete(false);
> > > > > > > >         config.setRoutingType(RoutingType.MULTICAST);
> > > > > > > >         return config;
> > > > > > > >     }
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Feb 17, 2022 at 2:28 PM Justin Bertram <
> jbertram@apache.org>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Typically an IndexOutOfBoundsException indicates a bug. Do
> you
> > > > > have a
> > > > > > > way
> > > > > > > > > to reproduce this?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Justin
> > > > > > > > >
> > > > > > > > > On Thu, Feb 17, 2022 at 3:17 PM Tim Jones <ti...@abcwxy.com>
> wrote:
> > > > > > > > >
> > > > > > > > > > This seems to appear on larger messages only - I am
> getting a
> > > > > > warning
> > > > > > > > > when
> > > > > > > > > > calling getDataBuffer (2.20.0 Artemis Client).  Curious
> if there
> > > > > is
> > > > > > > > > > something I may be missing - or if this is completely
> ignorable?
> > > > > > > > Thanks -
> > > > > > > > > > Tim
> > > > > > > > > >
> > > > > > > > > > 2022-02-17T21:06:11,908+01:00 [Thread-2
> > > > > > > > (ActiveMQ-client-global-threads)]
> > > > > > > > > > WARN o.a.a.a.c.m.i.CoreMessage [Thread-2
> > > > > > > > > (ActiveMQ-client-global-threads)]
> > > > > > > > > > readerIndex(270740) + length(270740) exceeds
> writerIndex(271572):
> > > > > > > > > >
> > > > > >
> UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > > > > > > > > 270740, widx: 271572, cap: 271572)
> > > > > > > > > > java.lang.IndexOutOfBoundsException: readerIndex(270740)
> +
> > > > > > > > length(270740)
> > > > > > > > > > exceeds writerIndex(271572):
> > > > > > > > > >
> > > > > >
> UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > > > > > > > > 270740, widx: 271572, cap: 271572)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
> > > > > > > > > > at
> > > > > > >
> io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
> > > > > > > > > > at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > --
> > > > > > > Clebert Suconic
> > > > > > >
> > > > > >
> > > > > --
> > > > > Clebert Suconic
> > > > >
> > >
> > >
> > >
> > > --
> > > Clebert Suconic
> >
> >
> >
> > --
> > Clebert Suconic
>
>
>
> --
> Clebert Suconic
>

Re: IndexOutOfBounds Warning Message

Posted by Clebert Suconic <cl...@gmail.com>.
or use the JMS API instead.. that's another possibility. but if you
want to use the core API, use getbodyBuffer instead.

On Tue, Feb 22, 2022 at 1:17 PM Clebert Suconic
<cl...@gmail.com> wrote:
>
> Found the reason:
>
>
> You should use getBoddyBuffer instead.
>
>
> getbodybuffer() will perform a call to checkBuffer() before returning
> you the large message.
>
>
> This should fix your test / problem.
>
> On Tue, Feb 22, 2022 at 1:03 PM Clebert Suconic
> <cl...@gmail.com> wrote:
> >
> > this is something done on the core API.
> >
> > I can't pinpoint the exact reason, but the JMS facade works fine with
> > a similar sender and producer:
> >
> > https://gist.github.com/clebertsuconic/03fe7206914d8753e9bd966f805a0257
> >
> >
> >
> > I created a branch with a test comparing Core and JMS API on my fork:
> >
> > https://github.com/clebertsuconic/activemq-artemis/tree/withCore
> >
> >
> >
> > I'm pasting the test here for future reference if the gist is ever
> > gone (or the branch is gone):
> >
> >
> > /**
> >  * Licensed to the Apache Software Foundation (ASF) under one or more
> >  * contributor license agreements. See the NOTICE file distributed with
> >  * this work for additional information regarding copyright ownership.
> >  * The ASF licenses this file to You under the Apache License, Version 2.0
> >  * (the "License"); you may not use this file except in compliance with
> >  * the License. You may obtain a copy of the License at
> >  * <p>
> >  * http://www.apache.org/licenses/LICENSE-2.0
> >  * <p>
> >  * Unless required by applicable law or agreed to in writing, software
> >  * distributed under the License is distributed on an "AS IS" BASIS,
> >  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> >  * See the License for the specific language governing permissions and
> >  * limitations under the License.
> >  */
> >
> > package org.apache.activemq.artemis.tests.integration.paging;
> >
> > import javax.jms.Connection;
> > import javax.jms.ConnectionFactory;
> > import javax.jms.MessageConsumer;
> > import javax.jms.MessageProducer;
> > import javax.jms.Queue;
> > import javax.jms.Session;
> > import javax.jms.TextMessage;
> > import java.util.concurrent.atomic.AtomicInteger;
> >
> > import org.apache.activemq.artemis.api.core.QueueConfiguration;
> > import org.apache.activemq.artemis.api.core.RoutingType;
> > import org.apache.activemq.artemis.api.core.SimpleString;
> > import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
> > import org.apache.activemq.artemis.api.core.client.ClientMessage;
> > import org.apache.activemq.artemis.api.core.client.ClientProducer;
> > import org.apache.activemq.artemis.api.core.client.ClientSession;
> > import org.apache.activemq.artemis.core.config.Configuration;
> > import org.apache.activemq.artemis.core.server.ActiveMQServer;
> > import org.apache.activemq.artemis.core.server.impl.AddressInfo;
> > import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
> > import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
> > import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
> > import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
> > import org.apache.activemq.artemis.tests.util.Wait;
> > import org.checkerframework.checker.units.qual.A;
> > import org.junit.Assert;
> > import org.junit.Before;
> > import org.junit.Test;
> >
> > public class Main extends ActiveMQTestBase {
> >
> >    private final static String QUEUE = "service.images.dev::service.images.dev";
> >
> >    ActiveMQServer server;
> >
> >
> >    @Before
> >    @Override
> >    public void setUp() throws Exception {
> >       super.setUp();
> >
> >       Configuration config = createDefaultConfig(0,
> > true).setJournalSyncNonTransactional(false);
> >
> >       config.setMessageExpiryScanPeriod(-1);
> >       server = createServer(true, config, 100 * 1024 * 1024, 10 * 1024 * 1024);
> >
> >       server.getAddressSettingsRepository().clear();
> >
> >       AddressSettings defaultSetting = new
> > AddressSettings().setPageSizeBytes(100 * 1024 *
> > 1024).setMaxSizeBytes(10 * 1024 *
> > 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false);
> >
> >       server.getAddressSettingsRepository().addMatch("#", defaultSetting);
> >
> >
> >       server.start();
> >
> >
> >       server.addAddressInfo(new
> > AddressInfo(QUEUE).addRoutingType(RoutingType.ANYCAST));
> >       server.createQueue(new
> > QueueConfiguration(QUEUE).setRoutingType(RoutingType.ANYCAST));
> >
> >    }
> >
> >    @Test
> >    public void testSending() throws Exception {
> >
> >       final String username = null;
> >       final String password = null;
> >
> >       var serverLocator =
> > ActiveMQClient.createServerLocator("tcp://localhost:61616").setBlockOnDurableSend(true).setBlockOnNonDurableSend(true).setMinLargeMessageSize(1024);
> >
> >       final var sessionFactory = serverLocator.createSessionFactory();
> >
> >       final var xa = false;
> >       final var autoCommitSends = true;
> >       final var autoCommitAcks = true;
> >       final var ackBatchSize = serverLocator.getAckBatchSize();
> >       final var preAcknowledge = serverLocator.isPreAcknowledge();
> >       final var clientSession = sessionFactory.createSession(username,
> > password, xa, autoCommitSends, autoCommitAcks, preAcknowledge,
> > ackBatchSize);
> >
> >       var queueQueryResult =
> > clientSession.queueQuery(SimpleString.toSimpleString(QUEUE));
> >       if (!queueQueryResult.isExists()) {
> >          clientSession.createQueue(_ServiceQueueConfiguration(new
> > SimpleString(QUEUE)));
> >       }
> >
> >       final var consumer = clientSession.createConsumer(QUEUE);
> >
> >       clientSession.start();
> >
> >       AtomicInteger errors = new AtomicInteger(0);
> >       AtomicInteger received = new AtomicInteger(0);
> >
> >       consumer.setMessageHandler((msg) -> {
> >          try {
> >             msg.getDataBuffer();
> >             received.incrementAndGet();
> >          } catch (Throwable e) {
> >             e.printStackTrace();
> >             errors.incrementAndGet();
> >          }
> >       });
> >
> >
> >       try (ClientSession producerSession = sessionFactory.createSession()) {
> >          ClientProducer producer = producerSession.createProducer(QUEUE);
> >          for (int i = 0; i < 100; i++) {
> >             ClientMessage message = producerSession.createMessage(true);
> >             message.getBodyBuffer().writeBytes(new byte[1024 * 1024]);
> >             producer.send(message);
> >          }
> >       }
> >
> >       Wait.assertEquals(100, received::get);
> >       Assert.assertEquals(0, errors.get());
> >    }
> >
> >    @Test
> >    public void testWithJMSListener() throws Exception {
> >
> >       final String username = null;
> >       final String password = null;
> >
> >       ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
> >       factory.setMinLargeMessageSize(1024);
> >       Connection connection = factory.createConnection();
> >       Session sessionProducer = connection.createSession(true,
> > Session.SESSION_TRANSACTED);
> >
> >       Session sessionConsumer = connection.createSession(false,
> > Session.AUTO_ACKNOWLEDGE);
> >
> >       Queue jmsQueue = sessionProducer.createQueue(QUEUE);
> >
> >       MessageConsumer consumer = sessionConsumer.createConsumer(jmsQueue);
> >
> >
> >       connection.start();
> >
> >       AtomicInteger errors = new AtomicInteger(0);
> >       AtomicInteger received = new AtomicInteger(0);
> >
> >       consumer.setMessageListener((msg) -> {
> >          try {
> >             System.out.println("Received: " +
> > ((TextMessage)msg).getText().length());
> >             received.incrementAndGet();
> >          } catch (Throwable e) {
> >             e.printStackTrace();
> >             errors.incrementAndGet();
> >          }
> >
> >       });
> >
> >       MessageProducer producer = sessionProducer.createProducer(jmsQueue);
> >
> >       StringBuffer buffer = new StringBuffer();
> >       while (buffer.length() < 100 * 1024) {
> >          buffer.append("*****");
> >       }
> >
> >       for (int i = 0; i < 100; i++) {
> >          TextMessage message =
> > sessionProducer.createTextMessage(buffer.toString());
> >          producer.send(message);
> >       }
> >       sessionProducer.commit();
> >
> >       Wait.assertEquals(100, received::get);
> >       Assert.assertEquals(0, errors.get());
> >    }
> >
> >    private static QueueConfiguration
> > _ServiceQueueConfiguration(SimpleString queueName) {
> >       final var config = new QueueConfiguration(queueName);
> >       config.setMaxConsumers(1);
> >       config.setPurgeOnNoConsumers(false);
> >       config.setDurable(false);
> >       config.setAutoDelete(false);
> >       config.setRoutingType(RoutingType.MULTICAST);
> >       return config;
> >    }
> > }
> >
> > On Mon, Feb 21, 2022 at 2:00 PM Justin Bertram <jb...@apache.org> wrote:
> > >
> > > No. The test was super-simple. Just send one large message and then consume
> > > it.
> > >
> > >
> > > Justin
> > >
> > > On Mon, Feb 21, 2022 at 12:57 PM Clebert Suconic <cl...@gmail.com>
> > > wrote:
> > >
> > > > It did not involved paging ?
> > > >
> > > > On Mon, Feb 21, 2022 at 11:07 AM Justin Bertram <jb...@apache.org>
> > > > wrote:
> > > >
> > > > > I recreated this exception with a very simple test-case. I took the
> > > > > consumer code pasted earlier in the thread and just added a producer
> > > > > sending a large message. I lowered the minLargeMessageSize to make it
> > > > > faster. I thought I still had that code laying around somewhere, but I
> > > > > can't find it at the moment.
> > > > >
> > > > >
> > > > > Justin
> > > > >
> > > > > On Sun, Feb 20, 2022 at 9:03 AM Clebert Suconic <
> > > > clebert.suconic@gmail.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > I have seen (and fixed) cases where the large message file is gone.  I
> > > > > > would need a reproducer creating the issue from scratch (send and
> > > > > consume)
> > > > > >
> > > > > > Typically it could be associated with paging ?  Did you have the
> > > > > > destination in page mode ?
> > > > > >
> > > > > > On Thu, Feb 17, 2022 at 6:53 PM Tim Jones <ti...@abcwxy.com> wrote:
> > > > > >
> > > > > > > The client code below the stack trace - will reproduce it  - the
> > > > > > > msg.getDataBuffer() inside the handler of that code will trigger it
> > > > > when
> > > > > > a
> > > > > > > large message is sent to the address.  The exception in question
> > > > > > > (IndexOutOfBoundsException) is being caught in line 249 of the apache
> > > > > > > CoreMessage code... and turned into a logged warning on line 250.
> > > > The
> > > > > > > CoreMessage code then returns the buffer from getReadOnlyBuffer()
> > > > > (which
> > > > > > > appears to be fine from a quick survey - data also seems to be
> > > > > preserved
> > > > > > > and accessible from client code) - it is just not clear what the
> > > > intent
> > > > > > is
> > > > > > > if the exception code is executed... is it a "don't worry about it" -
> > > > > or
> > > > > > a
> > > > > > > "something is wrong here" and I should concern myself with something?
> > > > > > (the
> > > > > > > warning level is making me believe that it is more than a "don't
> > > > worry
> > > > > > > about it" - but the fact that the exception was caught and a valid
> > > > > buffer
> > > > > > > is returned makes me think it is just a fallback for the other
> > > > choices
> > > > > of
> > > > > > > buffers - and I should not worry about it?).
> > > > > > >
> > > > > > > Thanks for any insight you may have....
> > > > > > >
> > > > > > > logged warning from the caught exception is:
> > > > > > >
> > > > > > > 16:26:50.923 [Thread-0 (ActiveMQ-client-global-threads)] WARN
> > > > > > > org.apache.activemq.artemis.core.message.impl.CoreMessage -
> > > > > > readerIndex(4)
> > > > > > > + length(270740) exceeds writerIndex(4):
> > > > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > > 4,
> > > > > > > widx: 4, cap: 270740)
> > > > > > > java.lang.IndexOutOfBoundsException: readerIndex(4) + length(270740)
> > > > > > > exceeds writerIndex(4):
> > > > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > > 4,
> > > > > > > widx: 4, cap: 270740)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
> > > > > > > at
> > > > io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
> > > > > > > at io.m45.sart.Main.lambda$main$0(Main.java:57)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1013)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1133)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
> > > > > > > 16:26:50.924 [Thread-0 (ActiveMQ-client-netty-threads)] DEBUG
> > > > > > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl -
> > > > > Sending
> > > > > > > 65962 from flow-control
> > > > > > >
> > > > > > > Simple Client Code below:
> > > > > > >
> > > > > > >
> > > > > > > public class Main {
> > > > > > >
> > > > > > >     private final static String ACCEPTOR = "tcp://localhost:9322";
> > > > > > >     private final static String QUEUE="service.images.dev::
> > > > > > > service.images.dev";
> > > > > > >
> > > > > > >     public static void main(String[] args) throws Exception {
> > > > > > >
> > > > > > >         final String username = null;
> > > > > > >         final String password = null;
> > > > > > >
> > > > > > >         var serverLocator = ActiveMQClient
> > > > > > >                 .createServerLocator(ACCEPTOR)
> > > > > > >                 .setBlockOnDurableSend(true)
> > > > > > >                 .setBlockOnNonDurableSend(true);
> > > > > > >
> > > > > > >         final var sessionFactory =
> > > > > serverLocator.createSessionFactory();
> > > > > > >
> > > > > > >         final var xa = false;
> > > > > > >         final var autoCommitSends = true;
> > > > > > >         final var autoCommitAcks = true;
> > > > > > >         final var ackBatchSize = serverLocator.getAckBatchSize();
> > > > > > >         final var preAcknowledge = serverLocator.isPreAcknowledge();
> > > > > > >         final var clientSession = sessionFactory.createSession(
> > > > > > >                 username,
> > > > > > >                 password,
> > > > > > >                 xa,
> > > > > > >                 autoCommitSends,
> > > > > > >                 autoCommitAcks,
> > > > > > >                 preAcknowledge,
> > > > > > >                 ackBatchSize
> > > > > > >         );
> > > > > > >
> > > > > > >         var queueQueryResult =
> > > > > > > clientSession.queueQuery(SimpleString.toSimpleString(QUEUE));
> > > > > > >         if (!queueQueryResult.isExists()) {
> > > > > > >             clientSession.createQueue(_ServiceQueueConfiguration(new
> > > > > > > SimpleString(QUEUE)));
> > > > > > >         }
> > > > > > >
> > > > > > >         final var consumer = clientSession.createConsumer(QUEUE);
> > > > > > >
> > > > > > >         clientSession.start();
> > > > > > >
> > > > > > >         consumer.setMessageHandler((msg) -> {
> > > > > > >
> > > > > > >             System.out.println("Received: "+msg.getBodySize());
> > > > > > >             msg.getDataBuffer();
> > > > > > >
> > > > > > >         });
> > > > > > >
> > > > > > >         while(true) {
> > > > > > >             Thread.sleep(1000);
> > > > > > >         }
> > > > > > >
> > > > > > >     }
> > > > > > >
> > > > > > >     private static QueueConfiguration
> > > > > > > _ServiceQueueConfiguration(SimpleString queueName) {
> > > > > > >         final var config = new QueueConfiguration(queueName);
> > > > > > >         config.setMaxConsumers(1);
> > > > > > >         config.setPurgeOnNoConsumers(false);
> > > > > > >         config.setDurable(false);
> > > > > > >         config.setAutoDelete(false);
> > > > > > >         config.setRoutingType(RoutingType.MULTICAST);
> > > > > > >         return config;
> > > > > > >     }
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Feb 17, 2022 at 2:28 PM Justin Bertram <jb...@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Typically an IndexOutOfBoundsException indicates a bug. Do you
> > > > have a
> > > > > > way
> > > > > > > > to reproduce this?
> > > > > > > >
> > > > > > > >
> > > > > > > > Justin
> > > > > > > >
> > > > > > > > On Thu, Feb 17, 2022 at 3:17 PM Tim Jones <ti...@abcwxy.com> wrote:
> > > > > > > >
> > > > > > > > > This seems to appear on larger messages only - I am getting a
> > > > > warning
> > > > > > > > when
> > > > > > > > > calling getDataBuffer (2.20.0 Artemis Client).  Curious if there
> > > > is
> > > > > > > > > something I may be missing - or if this is completely ignorable?
> > > > > > > Thanks -
> > > > > > > > > Tim
> > > > > > > > >
> > > > > > > > > 2022-02-17T21:06:11,908+01:00 [Thread-2
> > > > > > > (ActiveMQ-client-global-threads)]
> > > > > > > > > WARN o.a.a.a.c.m.i.CoreMessage [Thread-2
> > > > > > > > (ActiveMQ-client-global-threads)]
> > > > > > > > > readerIndex(270740) + length(270740) exceeds writerIndex(271572):
> > > > > > > > >
> > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > > > > > > > 270740, widx: 271572, cap: 271572)
> > > > > > > > > java.lang.IndexOutOfBoundsException: readerIndex(270740) +
> > > > > > > length(270740)
> > > > > > > > > exceeds writerIndex(271572):
> > > > > > > > >
> > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > > > > > > > 270740, widx: 271572, cap: 271572)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
> > > > > > > > > at
> > > > > > io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
> > > > > > > > > at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > --
> > > > > > Clebert Suconic
> > > > > >
> > > > >
> > > > --
> > > > Clebert Suconic
> > > >
> >
> >
> >
> > --
> > Clebert Suconic
>
>
>
> --
> Clebert Suconic



-- 
Clebert Suconic

Re: IndexOutOfBounds Warning Message

Posted by Clebert Suconic <cl...@gmail.com>.
Found the reason:


You should use getBoddyBuffer instead.


getbodybuffer() will perform a call to checkBuffer() before returning
you the large message.


This should fix your test / problem.

On Tue, Feb 22, 2022 at 1:03 PM Clebert Suconic
<cl...@gmail.com> wrote:
>
> this is something done on the core API.
>
> I can't pinpoint the exact reason, but the JMS facade works fine with
> a similar sender and producer:
>
> https://gist.github.com/clebertsuconic/03fe7206914d8753e9bd966f805a0257
>
>
>
> I created a branch with a test comparing Core and JMS API on my fork:
>
> https://github.com/clebertsuconic/activemq-artemis/tree/withCore
>
>
>
> I'm pasting the test here for future reference if the gist is ever
> gone (or the branch is gone):
>
>
> /**
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements. See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License. You may obtain a copy of the License at
>  * <p>
>  * http://www.apache.org/licenses/LICENSE-2.0
>  * <p>
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
>
> package org.apache.activemq.artemis.tests.integration.paging;
>
> import javax.jms.Connection;
> import javax.jms.ConnectionFactory;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageProducer;
> import javax.jms.Queue;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import java.util.concurrent.atomic.AtomicInteger;
>
> import org.apache.activemq.artemis.api.core.QueueConfiguration;
> import org.apache.activemq.artemis.api.core.RoutingType;
> import org.apache.activemq.artemis.api.core.SimpleString;
> import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
> import org.apache.activemq.artemis.api.core.client.ClientMessage;
> import org.apache.activemq.artemis.api.core.client.ClientProducer;
> import org.apache.activemq.artemis.api.core.client.ClientSession;
> import org.apache.activemq.artemis.core.config.Configuration;
> import org.apache.activemq.artemis.core.server.ActiveMQServer;
> import org.apache.activemq.artemis.core.server.impl.AddressInfo;
> import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
> import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
> import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
> import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
> import org.apache.activemq.artemis.tests.util.Wait;
> import org.checkerframework.checker.units.qual.A;
> import org.junit.Assert;
> import org.junit.Before;
> import org.junit.Test;
>
> public class Main extends ActiveMQTestBase {
>
>    private final static String QUEUE = "service.images.dev::service.images.dev";
>
>    ActiveMQServer server;
>
>
>    @Before
>    @Override
>    public void setUp() throws Exception {
>       super.setUp();
>
>       Configuration config = createDefaultConfig(0,
> true).setJournalSyncNonTransactional(false);
>
>       config.setMessageExpiryScanPeriod(-1);
>       server = createServer(true, config, 100 * 1024 * 1024, 10 * 1024 * 1024);
>
>       server.getAddressSettingsRepository().clear();
>
>       AddressSettings defaultSetting = new
> AddressSettings().setPageSizeBytes(100 * 1024 *
> 1024).setMaxSizeBytes(10 * 1024 *
> 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false);
>
>       server.getAddressSettingsRepository().addMatch("#", defaultSetting);
>
>
>       server.start();
>
>
>       server.addAddressInfo(new
> AddressInfo(QUEUE).addRoutingType(RoutingType.ANYCAST));
>       server.createQueue(new
> QueueConfiguration(QUEUE).setRoutingType(RoutingType.ANYCAST));
>
>    }
>
>    @Test
>    public void testSending() throws Exception {
>
>       final String username = null;
>       final String password = null;
>
>       var serverLocator =
> ActiveMQClient.createServerLocator("tcp://localhost:61616").setBlockOnDurableSend(true).setBlockOnNonDurableSend(true).setMinLargeMessageSize(1024);
>
>       final var sessionFactory = serverLocator.createSessionFactory();
>
>       final var xa = false;
>       final var autoCommitSends = true;
>       final var autoCommitAcks = true;
>       final var ackBatchSize = serverLocator.getAckBatchSize();
>       final var preAcknowledge = serverLocator.isPreAcknowledge();
>       final var clientSession = sessionFactory.createSession(username,
> password, xa, autoCommitSends, autoCommitAcks, preAcknowledge,
> ackBatchSize);
>
>       var queueQueryResult =
> clientSession.queueQuery(SimpleString.toSimpleString(QUEUE));
>       if (!queueQueryResult.isExists()) {
>          clientSession.createQueue(_ServiceQueueConfiguration(new
> SimpleString(QUEUE)));
>       }
>
>       final var consumer = clientSession.createConsumer(QUEUE);
>
>       clientSession.start();
>
>       AtomicInteger errors = new AtomicInteger(0);
>       AtomicInteger received = new AtomicInteger(0);
>
>       consumer.setMessageHandler((msg) -> {
>          try {
>             msg.getDataBuffer();
>             received.incrementAndGet();
>          } catch (Throwable e) {
>             e.printStackTrace();
>             errors.incrementAndGet();
>          }
>       });
>
>
>       try (ClientSession producerSession = sessionFactory.createSession()) {
>          ClientProducer producer = producerSession.createProducer(QUEUE);
>          for (int i = 0; i < 100; i++) {
>             ClientMessage message = producerSession.createMessage(true);
>             message.getBodyBuffer().writeBytes(new byte[1024 * 1024]);
>             producer.send(message);
>          }
>       }
>
>       Wait.assertEquals(100, received::get);
>       Assert.assertEquals(0, errors.get());
>    }
>
>    @Test
>    public void testWithJMSListener() throws Exception {
>
>       final String username = null;
>       final String password = null;
>
>       ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
>       factory.setMinLargeMessageSize(1024);
>       Connection connection = factory.createConnection();
>       Session sessionProducer = connection.createSession(true,
> Session.SESSION_TRANSACTED);
>
>       Session sessionConsumer = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>
>       Queue jmsQueue = sessionProducer.createQueue(QUEUE);
>
>       MessageConsumer consumer = sessionConsumer.createConsumer(jmsQueue);
>
>
>       connection.start();
>
>       AtomicInteger errors = new AtomicInteger(0);
>       AtomicInteger received = new AtomicInteger(0);
>
>       consumer.setMessageListener((msg) -> {
>          try {
>             System.out.println("Received: " +
> ((TextMessage)msg).getText().length());
>             received.incrementAndGet();
>          } catch (Throwable e) {
>             e.printStackTrace();
>             errors.incrementAndGet();
>          }
>
>       });
>
>       MessageProducer producer = sessionProducer.createProducer(jmsQueue);
>
>       StringBuffer buffer = new StringBuffer();
>       while (buffer.length() < 100 * 1024) {
>          buffer.append("*****");
>       }
>
>       for (int i = 0; i < 100; i++) {
>          TextMessage message =
> sessionProducer.createTextMessage(buffer.toString());
>          producer.send(message);
>       }
>       sessionProducer.commit();
>
>       Wait.assertEquals(100, received::get);
>       Assert.assertEquals(0, errors.get());
>    }
>
>    private static QueueConfiguration
> _ServiceQueueConfiguration(SimpleString queueName) {
>       final var config = new QueueConfiguration(queueName);
>       config.setMaxConsumers(1);
>       config.setPurgeOnNoConsumers(false);
>       config.setDurable(false);
>       config.setAutoDelete(false);
>       config.setRoutingType(RoutingType.MULTICAST);
>       return config;
>    }
> }
>
> On Mon, Feb 21, 2022 at 2:00 PM Justin Bertram <jb...@apache.org> wrote:
> >
> > No. The test was super-simple. Just send one large message and then consume
> > it.
> >
> >
> > Justin
> >
> > On Mon, Feb 21, 2022 at 12:57 PM Clebert Suconic <cl...@gmail.com>
> > wrote:
> >
> > > It did not involved paging ?
> > >
> > > On Mon, Feb 21, 2022 at 11:07 AM Justin Bertram <jb...@apache.org>
> > > wrote:
> > >
> > > > I recreated this exception with a very simple test-case. I took the
> > > > consumer code pasted earlier in the thread and just added a producer
> > > > sending a large message. I lowered the minLargeMessageSize to make it
> > > > faster. I thought I still had that code laying around somewhere, but I
> > > > can't find it at the moment.
> > > >
> > > >
> > > > Justin
> > > >
> > > > On Sun, Feb 20, 2022 at 9:03 AM Clebert Suconic <
> > > clebert.suconic@gmail.com
> > > > >
> > > > wrote:
> > > >
> > > > > I have seen (and fixed) cases where the large message file is gone.  I
> > > > > would need a reproducer creating the issue from scratch (send and
> > > > consume)
> > > > >
> > > > > Typically it could be associated with paging ?  Did you have the
> > > > > destination in page mode ?
> > > > >
> > > > > On Thu, Feb 17, 2022 at 6:53 PM Tim Jones <ti...@abcwxy.com> wrote:
> > > > >
> > > > > > The client code below the stack trace - will reproduce it  - the
> > > > > > msg.getDataBuffer() inside the handler of that code will trigger it
> > > > when
> > > > > a
> > > > > > large message is sent to the address.  The exception in question
> > > > > > (IndexOutOfBoundsException) is being caught in line 249 of the apache
> > > > > > CoreMessage code... and turned into a logged warning on line 250.
> > > The
> > > > > > CoreMessage code then returns the buffer from getReadOnlyBuffer()
> > > > (which
> > > > > > appears to be fine from a quick survey - data also seems to be
> > > > preserved
> > > > > > and accessible from client code) - it is just not clear what the
> > > intent
> > > > > is
> > > > > > if the exception code is executed... is it a "don't worry about it" -
> > > > or
> > > > > a
> > > > > > "something is wrong here" and I should concern myself with something?
> > > > > (the
> > > > > > warning level is making me believe that it is more than a "don't
> > > worry
> > > > > > about it" - but the fact that the exception was caught and a valid
> > > > buffer
> > > > > > is returned makes me think it is just a fallback for the other
> > > choices
> > > > of
> > > > > > buffers - and I should not worry about it?).
> > > > > >
> > > > > > Thanks for any insight you may have....
> > > > > >
> > > > > > logged warning from the caught exception is:
> > > > > >
> > > > > > 16:26:50.923 [Thread-0 (ActiveMQ-client-global-threads)] WARN
> > > > > > org.apache.activemq.artemis.core.message.impl.CoreMessage -
> > > > > readerIndex(4)
> > > > > > + length(270740) exceeds writerIndex(4):
> > > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > 4,
> > > > > > widx: 4, cap: 270740)
> > > > > > java.lang.IndexOutOfBoundsException: readerIndex(4) + length(270740)
> > > > > > exceeds writerIndex(4):
> > > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > 4,
> > > > > > widx: 4, cap: 270740)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
> > > > > > at
> > > io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
> > > > > > at io.m45.sart.Main.lambda$main$0(Main.java:57)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1013)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1133)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
> > > > > > 16:26:50.924 [Thread-0 (ActiveMQ-client-netty-threads)] DEBUG
> > > > > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl -
> > > > Sending
> > > > > > 65962 from flow-control
> > > > > >
> > > > > > Simple Client Code below:
> > > > > >
> > > > > >
> > > > > > public class Main {
> > > > > >
> > > > > >     private final static String ACCEPTOR = "tcp://localhost:9322";
> > > > > >     private final static String QUEUE="service.images.dev::
> > > > > > service.images.dev";
> > > > > >
> > > > > >     public static void main(String[] args) throws Exception {
> > > > > >
> > > > > >         final String username = null;
> > > > > >         final String password = null;
> > > > > >
> > > > > >         var serverLocator = ActiveMQClient
> > > > > >                 .createServerLocator(ACCEPTOR)
> > > > > >                 .setBlockOnDurableSend(true)
> > > > > >                 .setBlockOnNonDurableSend(true);
> > > > > >
> > > > > >         final var sessionFactory =
> > > > serverLocator.createSessionFactory();
> > > > > >
> > > > > >         final var xa = false;
> > > > > >         final var autoCommitSends = true;
> > > > > >         final var autoCommitAcks = true;
> > > > > >         final var ackBatchSize = serverLocator.getAckBatchSize();
> > > > > >         final var preAcknowledge = serverLocator.isPreAcknowledge();
> > > > > >         final var clientSession = sessionFactory.createSession(
> > > > > >                 username,
> > > > > >                 password,
> > > > > >                 xa,
> > > > > >                 autoCommitSends,
> > > > > >                 autoCommitAcks,
> > > > > >                 preAcknowledge,
> > > > > >                 ackBatchSize
> > > > > >         );
> > > > > >
> > > > > >         var queueQueryResult =
> > > > > > clientSession.queueQuery(SimpleString.toSimpleString(QUEUE));
> > > > > >         if (!queueQueryResult.isExists()) {
> > > > > >             clientSession.createQueue(_ServiceQueueConfiguration(new
> > > > > > SimpleString(QUEUE)));
> > > > > >         }
> > > > > >
> > > > > >         final var consumer = clientSession.createConsumer(QUEUE);
> > > > > >
> > > > > >         clientSession.start();
> > > > > >
> > > > > >         consumer.setMessageHandler((msg) -> {
> > > > > >
> > > > > >             System.out.println("Received: "+msg.getBodySize());
> > > > > >             msg.getDataBuffer();
> > > > > >
> > > > > >         });
> > > > > >
> > > > > >         while(true) {
> > > > > >             Thread.sleep(1000);
> > > > > >         }
> > > > > >
> > > > > >     }
> > > > > >
> > > > > >     private static QueueConfiguration
> > > > > > _ServiceQueueConfiguration(SimpleString queueName) {
> > > > > >         final var config = new QueueConfiguration(queueName);
> > > > > >         config.setMaxConsumers(1);
> > > > > >         config.setPurgeOnNoConsumers(false);
> > > > > >         config.setDurable(false);
> > > > > >         config.setAutoDelete(false);
> > > > > >         config.setRoutingType(RoutingType.MULTICAST);
> > > > > >         return config;
> > > > > >     }
> > > > > >
> > > > > >
> > > > > > On Thu, Feb 17, 2022 at 2:28 PM Justin Bertram <jb...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Typically an IndexOutOfBoundsException indicates a bug. Do you
> > > have a
> > > > > way
> > > > > > > to reproduce this?
> > > > > > >
> > > > > > >
> > > > > > > Justin
> > > > > > >
> > > > > > > On Thu, Feb 17, 2022 at 3:17 PM Tim Jones <ti...@abcwxy.com> wrote:
> > > > > > >
> > > > > > > > This seems to appear on larger messages only - I am getting a
> > > > warning
> > > > > > > when
> > > > > > > > calling getDataBuffer (2.20.0 Artemis Client).  Curious if there
> > > is
> > > > > > > > something I may be missing - or if this is completely ignorable?
> > > > > > Thanks -
> > > > > > > > Tim
> > > > > > > >
> > > > > > > > 2022-02-17T21:06:11,908+01:00 [Thread-2
> > > > > > (ActiveMQ-client-global-threads)]
> > > > > > > > WARN o.a.a.a.c.m.i.CoreMessage [Thread-2
> > > > > > > (ActiveMQ-client-global-threads)]
> > > > > > > > readerIndex(270740) + length(270740) exceeds writerIndex(271572):
> > > > > > > >
> > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > > > > > > 270740, widx: 271572, cap: 271572)
> > > > > > > > java.lang.IndexOutOfBoundsException: readerIndex(270740) +
> > > > > > length(270740)
> > > > > > > > exceeds writerIndex(271572):
> > > > > > > >
> > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > > > > > > 270740, widx: 271572, cap: 271572)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
> > > > > > > > at
> > > > > io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > --
> > > > > Clebert Suconic
> > > > >
> > > >
> > > --
> > > Clebert Suconic
> > >
>
>
>
> --
> Clebert Suconic



-- 
Clebert Suconic

Re: IndexOutOfBounds Warning Message

Posted by Clebert Suconic <cl...@gmail.com>.
this is something done on the core API.

I can't pinpoint the exact reason, but the JMS facade works fine with
a similar sender and producer:

https://gist.github.com/clebertsuconic/03fe7206914d8753e9bd966f805a0257



I created a branch with a test comparing Core and JMS API on my fork:

https://github.com/clebertsuconic/activemq-artemis/tree/withCore



I'm pasting the test here for future reference if the gist is ever
gone (or the branch is gone):


/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.activemq.artemis.tests.integration.paging;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.checkerframework.checker.units.qual.A;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class Main extends ActiveMQTestBase {

   private final static String QUEUE = "service.images.dev::service.images.dev";

   ActiveMQServer server;


   @Before
   @Override
   public void setUp() throws Exception {
      super.setUp();

      Configuration config = createDefaultConfig(0,
true).setJournalSyncNonTransactional(false);

      config.setMessageExpiryScanPeriod(-1);
      server = createServer(true, config, 100 * 1024 * 1024, 10 * 1024 * 1024);

      server.getAddressSettingsRepository().clear();

      AddressSettings defaultSetting = new
AddressSettings().setPageSizeBytes(100 * 1024 *
1024).setMaxSizeBytes(10 * 1024 *
1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false);

      server.getAddressSettingsRepository().addMatch("#", defaultSetting);


      server.start();


      server.addAddressInfo(new
AddressInfo(QUEUE).addRoutingType(RoutingType.ANYCAST));
      server.createQueue(new
QueueConfiguration(QUEUE).setRoutingType(RoutingType.ANYCAST));

   }

   @Test
   public void testSending() throws Exception {

      final String username = null;
      final String password = null;

      var serverLocator =
ActiveMQClient.createServerLocator("tcp://localhost:61616").setBlockOnDurableSend(true).setBlockOnNonDurableSend(true).setMinLargeMessageSize(1024);

      final var sessionFactory = serverLocator.createSessionFactory();

      final var xa = false;
      final var autoCommitSends = true;
      final var autoCommitAcks = true;
      final var ackBatchSize = serverLocator.getAckBatchSize();
      final var preAcknowledge = serverLocator.isPreAcknowledge();
      final var clientSession = sessionFactory.createSession(username,
password, xa, autoCommitSends, autoCommitAcks, preAcknowledge,
ackBatchSize);

      var queueQueryResult =
clientSession.queueQuery(SimpleString.toSimpleString(QUEUE));
      if (!queueQueryResult.isExists()) {
         clientSession.createQueue(_ServiceQueueConfiguration(new
SimpleString(QUEUE)));
      }

      final var consumer = clientSession.createConsumer(QUEUE);

      clientSession.start();

      AtomicInteger errors = new AtomicInteger(0);
      AtomicInteger received = new AtomicInteger(0);

      consumer.setMessageHandler((msg) -> {
         try {
            msg.getDataBuffer();
            received.incrementAndGet();
         } catch (Throwable e) {
            e.printStackTrace();
            errors.incrementAndGet();
         }
      });


      try (ClientSession producerSession = sessionFactory.createSession()) {
         ClientProducer producer = producerSession.createProducer(QUEUE);
         for (int i = 0; i < 100; i++) {
            ClientMessage message = producerSession.createMessage(true);
            message.getBodyBuffer().writeBytes(new byte[1024 * 1024]);
            producer.send(message);
         }
      }

      Wait.assertEquals(100, received::get);
      Assert.assertEquals(0, errors.get());
   }

   @Test
   public void testWithJMSListener() throws Exception {

      final String username = null;
      final String password = null;

      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
      factory.setMinLargeMessageSize(1024);
      Connection connection = factory.createConnection();
      Session sessionProducer = connection.createSession(true,
Session.SESSION_TRANSACTED);

      Session sessionConsumer = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);

      Queue jmsQueue = sessionProducer.createQueue(QUEUE);

      MessageConsumer consumer = sessionConsumer.createConsumer(jmsQueue);


      connection.start();

      AtomicInteger errors = new AtomicInteger(0);
      AtomicInteger received = new AtomicInteger(0);

      consumer.setMessageListener((msg) -> {
         try {
            System.out.println("Received: " +
((TextMessage)msg).getText().length());
            received.incrementAndGet();
         } catch (Throwable e) {
            e.printStackTrace();
            errors.incrementAndGet();
         }

      });

      MessageProducer producer = sessionProducer.createProducer(jmsQueue);

      StringBuffer buffer = new StringBuffer();
      while (buffer.length() < 100 * 1024) {
         buffer.append("*****");
      }

      for (int i = 0; i < 100; i++) {
         TextMessage message =
sessionProducer.createTextMessage(buffer.toString());
         producer.send(message);
      }
      sessionProducer.commit();

      Wait.assertEquals(100, received::get);
      Assert.assertEquals(0, errors.get());
   }

   private static QueueConfiguration
_ServiceQueueConfiguration(SimpleString queueName) {
      final var config = new QueueConfiguration(queueName);
      config.setMaxConsumers(1);
      config.setPurgeOnNoConsumers(false);
      config.setDurable(false);
      config.setAutoDelete(false);
      config.setRoutingType(RoutingType.MULTICAST);
      return config;
   }
}

On Mon, Feb 21, 2022 at 2:00 PM Justin Bertram <jb...@apache.org> wrote:
>
> No. The test was super-simple. Just send one large message and then consume
> it.
>
>
> Justin
>
> On Mon, Feb 21, 2022 at 12:57 PM Clebert Suconic <cl...@gmail.com>
> wrote:
>
> > It did not involved paging ?
> >
> > On Mon, Feb 21, 2022 at 11:07 AM Justin Bertram <jb...@apache.org>
> > wrote:
> >
> > > I recreated this exception with a very simple test-case. I took the
> > > consumer code pasted earlier in the thread and just added a producer
> > > sending a large message. I lowered the minLargeMessageSize to make it
> > > faster. I thought I still had that code laying around somewhere, but I
> > > can't find it at the moment.
> > >
> > >
> > > Justin
> > >
> > > On Sun, Feb 20, 2022 at 9:03 AM Clebert Suconic <
> > clebert.suconic@gmail.com
> > > >
> > > wrote:
> > >
> > > > I have seen (and fixed) cases where the large message file is gone.  I
> > > > would need a reproducer creating the issue from scratch (send and
> > > consume)
> > > >
> > > > Typically it could be associated with paging ?  Did you have the
> > > > destination in page mode ?
> > > >
> > > > On Thu, Feb 17, 2022 at 6:53 PM Tim Jones <ti...@abcwxy.com> wrote:
> > > >
> > > > > The client code below the stack trace - will reproduce it  - the
> > > > > msg.getDataBuffer() inside the handler of that code will trigger it
> > > when
> > > > a
> > > > > large message is sent to the address.  The exception in question
> > > > > (IndexOutOfBoundsException) is being caught in line 249 of the apache
> > > > > CoreMessage code... and turned into a logged warning on line 250.
> > The
> > > > > CoreMessage code then returns the buffer from getReadOnlyBuffer()
> > > (which
> > > > > appears to be fine from a quick survey - data also seems to be
> > > preserved
> > > > > and accessible from client code) - it is just not clear what the
> > intent
> > > > is
> > > > > if the exception code is executed... is it a "don't worry about it" -
> > > or
> > > > a
> > > > > "something is wrong here" and I should concern myself with something?
> > > > (the
> > > > > warning level is making me believe that it is more than a "don't
> > worry
> > > > > about it" - but the fact that the exception was caught and a valid
> > > buffer
> > > > > is returned makes me think it is just a fallback for the other
> > choices
> > > of
> > > > > buffers - and I should not worry about it?).
> > > > >
> > > > > Thanks for any insight you may have....
> > > > >
> > > > > logged warning from the caught exception is:
> > > > >
> > > > > 16:26:50.923 [Thread-0 (ActiveMQ-client-global-threads)] WARN
> > > > > org.apache.activemq.artemis.core.message.impl.CoreMessage -
> > > > readerIndex(4)
> > > > > + length(270740) exceeds writerIndex(4):
> > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > 4,
> > > > > widx: 4, cap: 270740)
> > > > > java.lang.IndexOutOfBoundsException: readerIndex(4) + length(270740)
> > > > > exceeds writerIndex(4):
> > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > 4,
> > > > > widx: 4, cap: 270740)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> > io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> > io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
> > > > > at
> > io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> > org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> > org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> > org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
> > > > > at io.m45.sart.Main.lambda$main$0(Main.java:57)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1013)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1133)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> > org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> > org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> > org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> > org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
> > > > > 16:26:50.924 [Thread-0 (ActiveMQ-client-netty-threads)] DEBUG
> > > > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl -
> > > Sending
> > > > > 65962 from flow-control
> > > > >
> > > > > Simple Client Code below:
> > > > >
> > > > >
> > > > > public class Main {
> > > > >
> > > > >     private final static String ACCEPTOR = "tcp://localhost:9322";
> > > > >     private final static String QUEUE="service.images.dev::
> > > > > service.images.dev";
> > > > >
> > > > >     public static void main(String[] args) throws Exception {
> > > > >
> > > > >         final String username = null;
> > > > >         final String password = null;
> > > > >
> > > > >         var serverLocator = ActiveMQClient
> > > > >                 .createServerLocator(ACCEPTOR)
> > > > >                 .setBlockOnDurableSend(true)
> > > > >                 .setBlockOnNonDurableSend(true);
> > > > >
> > > > >         final var sessionFactory =
> > > serverLocator.createSessionFactory();
> > > > >
> > > > >         final var xa = false;
> > > > >         final var autoCommitSends = true;
> > > > >         final var autoCommitAcks = true;
> > > > >         final var ackBatchSize = serverLocator.getAckBatchSize();
> > > > >         final var preAcknowledge = serverLocator.isPreAcknowledge();
> > > > >         final var clientSession = sessionFactory.createSession(
> > > > >                 username,
> > > > >                 password,
> > > > >                 xa,
> > > > >                 autoCommitSends,
> > > > >                 autoCommitAcks,
> > > > >                 preAcknowledge,
> > > > >                 ackBatchSize
> > > > >         );
> > > > >
> > > > >         var queueQueryResult =
> > > > > clientSession.queueQuery(SimpleString.toSimpleString(QUEUE));
> > > > >         if (!queueQueryResult.isExists()) {
> > > > >             clientSession.createQueue(_ServiceQueueConfiguration(new
> > > > > SimpleString(QUEUE)));
> > > > >         }
> > > > >
> > > > >         final var consumer = clientSession.createConsumer(QUEUE);
> > > > >
> > > > >         clientSession.start();
> > > > >
> > > > >         consumer.setMessageHandler((msg) -> {
> > > > >
> > > > >             System.out.println("Received: "+msg.getBodySize());
> > > > >             msg.getDataBuffer();
> > > > >
> > > > >         });
> > > > >
> > > > >         while(true) {
> > > > >             Thread.sleep(1000);
> > > > >         }
> > > > >
> > > > >     }
> > > > >
> > > > >     private static QueueConfiguration
> > > > > _ServiceQueueConfiguration(SimpleString queueName) {
> > > > >         final var config = new QueueConfiguration(queueName);
> > > > >         config.setMaxConsumers(1);
> > > > >         config.setPurgeOnNoConsumers(false);
> > > > >         config.setDurable(false);
> > > > >         config.setAutoDelete(false);
> > > > >         config.setRoutingType(RoutingType.MULTICAST);
> > > > >         return config;
> > > > >     }
> > > > >
> > > > >
> > > > > On Thu, Feb 17, 2022 at 2:28 PM Justin Bertram <jb...@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Typically an IndexOutOfBoundsException indicates a bug. Do you
> > have a
> > > > way
> > > > > > to reproduce this?
> > > > > >
> > > > > >
> > > > > > Justin
> > > > > >
> > > > > > On Thu, Feb 17, 2022 at 3:17 PM Tim Jones <ti...@abcwxy.com> wrote:
> > > > > >
> > > > > > > This seems to appear on larger messages only - I am getting a
> > > warning
> > > > > > when
> > > > > > > calling getDataBuffer (2.20.0 Artemis Client).  Curious if there
> > is
> > > > > > > something I may be missing - or if this is completely ignorable?
> > > > > Thanks -
> > > > > > > Tim
> > > > > > >
> > > > > > > 2022-02-17T21:06:11,908+01:00 [Thread-2
> > > > > (ActiveMQ-client-global-threads)]
> > > > > > > WARN o.a.a.a.c.m.i.CoreMessage [Thread-2
> > > > > > (ActiveMQ-client-global-threads)]
> > > > > > > readerIndex(270740) + length(270740) exceeds writerIndex(271572):
> > > > > > >
> > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > > > > > 270740, widx: 271572, cap: 271572)
> > > > > > > java.lang.IndexOutOfBoundsException: readerIndex(270740) +
> > > > > length(270740)
> > > > > > > exceeds writerIndex(271572):
> > > > > > >
> > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > > > > > 270740, widx: 271572, cap: 271572)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
> > > > > > > at
> > > > io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
> > > > > > >
> > > > > >
> > > > >
> > > > --
> > > > Clebert Suconic
> > > >
> > >
> > --
> > Clebert Suconic
> >



-- 
Clebert Suconic

Re: IndexOutOfBounds Warning Message

Posted by Justin Bertram <jb...@apache.org>.
No. The test was super-simple. Just send one large message and then consume
it.


Justin

On Mon, Feb 21, 2022 at 12:57 PM Clebert Suconic <cl...@gmail.com>
wrote:

> It did not involved paging ?
>
> On Mon, Feb 21, 2022 at 11:07 AM Justin Bertram <jb...@apache.org>
> wrote:
>
> > I recreated this exception with a very simple test-case. I took the
> > consumer code pasted earlier in the thread and just added a producer
> > sending a large message. I lowered the minLargeMessageSize to make it
> > faster. I thought I still had that code laying around somewhere, but I
> > can't find it at the moment.
> >
> >
> > Justin
> >
> > On Sun, Feb 20, 2022 at 9:03 AM Clebert Suconic <
> clebert.suconic@gmail.com
> > >
> > wrote:
> >
> > > I have seen (and fixed) cases where the large message file is gone.  I
> > > would need a reproducer creating the issue from scratch (send and
> > consume)
> > >
> > > Typically it could be associated with paging ?  Did you have the
> > > destination in page mode ?
> > >
> > > On Thu, Feb 17, 2022 at 6:53 PM Tim Jones <ti...@abcwxy.com> wrote:
> > >
> > > > The client code below the stack trace - will reproduce it  - the
> > > > msg.getDataBuffer() inside the handler of that code will trigger it
> > when
> > > a
> > > > large message is sent to the address.  The exception in question
> > > > (IndexOutOfBoundsException) is being caught in line 249 of the apache
> > > > CoreMessage code... and turned into a logged warning on line 250.
> The
> > > > CoreMessage code then returns the buffer from getReadOnlyBuffer()
> > (which
> > > > appears to be fine from a quick survey - data also seems to be
> > preserved
> > > > and accessible from client code) - it is just not clear what the
> intent
> > > is
> > > > if the exception code is executed... is it a "don't worry about it" -
> > or
> > > a
> > > > "something is wrong here" and I should concern myself with something?
> > > (the
> > > > warning level is making me believe that it is more than a "don't
> worry
> > > > about it" - but the fact that the exception was caught and a valid
> > buffer
> > > > is returned makes me think it is just a fallback for the other
> choices
> > of
> > > > buffers - and I should not worry about it?).
> > > >
> > > > Thanks for any insight you may have....
> > > >
> > > > logged warning from the caught exception is:
> > > >
> > > > 16:26:50.923 [Thread-0 (ActiveMQ-client-global-threads)] WARN
> > > > org.apache.activemq.artemis.core.message.impl.CoreMessage -
> > > readerIndex(4)
> > > > + length(270740) exceeds writerIndex(4):
> > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> 4,
> > > > widx: 4, cap: 270740)
> > > > java.lang.IndexOutOfBoundsException: readerIndex(4) + length(270740)
> > > > exceeds writerIndex(4):
> > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> 4,
> > > > widx: 4, cap: 270740)
> > > > at
> > > >
> > > >
> > >
> >
> io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
> > > > at
> > > >
> > > >
> > >
> >
> io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
> > > > at
> io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
> > > > at io.m45.sart.Main.lambda$main$0(Main.java:57)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1013)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1133)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65)
> > > > at
> > > >
> > > >
> > >
> >
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> > > > at
> > > >
> > > >
> > >
> >
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
> > > > 16:26:50.924 [Thread-0 (ActiveMQ-client-netty-threads)] DEBUG
> > > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl -
> > Sending
> > > > 65962 from flow-control
> > > >
> > > > Simple Client Code below:
> > > >
> > > >
> > > > public class Main {
> > > >
> > > >     private final static String ACCEPTOR = "tcp://localhost:9322";
> > > >     private final static String QUEUE="service.images.dev::
> > > > service.images.dev";
> > > >
> > > >     public static void main(String[] args) throws Exception {
> > > >
> > > >         final String username = null;
> > > >         final String password = null;
> > > >
> > > >         var serverLocator = ActiveMQClient
> > > >                 .createServerLocator(ACCEPTOR)
> > > >                 .setBlockOnDurableSend(true)
> > > >                 .setBlockOnNonDurableSend(true);
> > > >
> > > >         final var sessionFactory =
> > serverLocator.createSessionFactory();
> > > >
> > > >         final var xa = false;
> > > >         final var autoCommitSends = true;
> > > >         final var autoCommitAcks = true;
> > > >         final var ackBatchSize = serverLocator.getAckBatchSize();
> > > >         final var preAcknowledge = serverLocator.isPreAcknowledge();
> > > >         final var clientSession = sessionFactory.createSession(
> > > >                 username,
> > > >                 password,
> > > >                 xa,
> > > >                 autoCommitSends,
> > > >                 autoCommitAcks,
> > > >                 preAcknowledge,
> > > >                 ackBatchSize
> > > >         );
> > > >
> > > >         var queueQueryResult =
> > > > clientSession.queueQuery(SimpleString.toSimpleString(QUEUE));
> > > >         if (!queueQueryResult.isExists()) {
> > > >             clientSession.createQueue(_ServiceQueueConfiguration(new
> > > > SimpleString(QUEUE)));
> > > >         }
> > > >
> > > >         final var consumer = clientSession.createConsumer(QUEUE);
> > > >
> > > >         clientSession.start();
> > > >
> > > >         consumer.setMessageHandler((msg) -> {
> > > >
> > > >             System.out.println("Received: "+msg.getBodySize());
> > > >             msg.getDataBuffer();
> > > >
> > > >         });
> > > >
> > > >         while(true) {
> > > >             Thread.sleep(1000);
> > > >         }
> > > >
> > > >     }
> > > >
> > > >     private static QueueConfiguration
> > > > _ServiceQueueConfiguration(SimpleString queueName) {
> > > >         final var config = new QueueConfiguration(queueName);
> > > >         config.setMaxConsumers(1);
> > > >         config.setPurgeOnNoConsumers(false);
> > > >         config.setDurable(false);
> > > >         config.setAutoDelete(false);
> > > >         config.setRoutingType(RoutingType.MULTICAST);
> > > >         return config;
> > > >     }
> > > >
> > > >
> > > > On Thu, Feb 17, 2022 at 2:28 PM Justin Bertram <jb...@apache.org>
> > > > wrote:
> > > >
> > > > > Typically an IndexOutOfBoundsException indicates a bug. Do you
> have a
> > > way
> > > > > to reproduce this?
> > > > >
> > > > >
> > > > > Justin
> > > > >
> > > > > On Thu, Feb 17, 2022 at 3:17 PM Tim Jones <ti...@abcwxy.com> wrote:
> > > > >
> > > > > > This seems to appear on larger messages only - I am getting a
> > warning
> > > > > when
> > > > > > calling getDataBuffer (2.20.0 Artemis Client).  Curious if there
> is
> > > > > > something I may be missing - or if this is completely ignorable?
> > > > Thanks -
> > > > > > Tim
> > > > > >
> > > > > > 2022-02-17T21:06:11,908+01:00 [Thread-2
> > > > (ActiveMQ-client-global-threads)]
> > > > > > WARN o.a.a.a.c.m.i.CoreMessage [Thread-2
> > > > > (ActiveMQ-client-global-threads)]
> > > > > > readerIndex(270740) + length(270740) exceeds writerIndex(271572):
> > > > > >
> > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > > > > 270740, widx: 271572, cap: 271572)
> > > > > > java.lang.IndexOutOfBoundsException: readerIndex(270740) +
> > > > length(270740)
> > > > > > exceeds writerIndex(271572):
> > > > > >
> > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > > > > 270740, widx: 271572, cap: 271572)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
> > > > > > at
> > > io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
> > > > > >
> > > > >
> > > >
> > > --
> > > Clebert Suconic
> > >
> >
> --
> Clebert Suconic
>

Re: IndexOutOfBounds Warning Message

Posted by Clebert Suconic <cl...@gmail.com>.
It did not involved paging ?

On Mon, Feb 21, 2022 at 11:07 AM Justin Bertram <jb...@apache.org> wrote:

> I recreated this exception with a very simple test-case. I took the
> consumer code pasted earlier in the thread and just added a producer
> sending a large message. I lowered the minLargeMessageSize to make it
> faster. I thought I still had that code laying around somewhere, but I
> can't find it at the moment.
>
>
> Justin
>
> On Sun, Feb 20, 2022 at 9:03 AM Clebert Suconic <clebert.suconic@gmail.com
> >
> wrote:
>
> > I have seen (and fixed) cases where the large message file is gone.  I
> > would need a reproducer creating the issue from scratch (send and
> consume)
> >
> > Typically it could be associated with paging ?  Did you have the
> > destination in page mode ?
> >
> > On Thu, Feb 17, 2022 at 6:53 PM Tim Jones <ti...@abcwxy.com> wrote:
> >
> > > The client code below the stack trace - will reproduce it  - the
> > > msg.getDataBuffer() inside the handler of that code will trigger it
> when
> > a
> > > large message is sent to the address.  The exception in question
> > > (IndexOutOfBoundsException) is being caught in line 249 of the apache
> > > CoreMessage code... and turned into a logged warning on line 250.  The
> > > CoreMessage code then returns the buffer from getReadOnlyBuffer()
> (which
> > > appears to be fine from a quick survey - data also seems to be
> preserved
> > > and accessible from client code) - it is just not clear what the intent
> > is
> > > if the exception code is executed... is it a "don't worry about it" -
> or
> > a
> > > "something is wrong here" and I should concern myself with something?
> > (the
> > > warning level is making me believe that it is more than a "don't worry
> > > about it" - but the fact that the exception was caught and a valid
> buffer
> > > is returned makes me think it is just a fallback for the other choices
> of
> > > buffers - and I should not worry about it?).
> > >
> > > Thanks for any insight you may have....
> > >
> > > logged warning from the caught exception is:
> > >
> > > 16:26:50.923 [Thread-0 (ActiveMQ-client-global-threads)] WARN
> > > org.apache.activemq.artemis.core.message.impl.CoreMessage -
> > readerIndex(4)
> > > + length(270740) exceeds writerIndex(4):
> > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 4,
> > > widx: 4, cap: 270740)
> > > java.lang.IndexOutOfBoundsException: readerIndex(4) + length(270740)
> > > exceeds writerIndex(4):
> > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 4,
> > > widx: 4, cap: 270740)
> > > at
> > >
> > >
> >
> io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
> > > at
> > >
> > >
> >
> io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
> > > at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
> > > at
> > >
> > >
> >
> org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
> > > at
> > >
> > >
> >
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
> > > at
> > >
> > >
> >
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
> > > at io.m45.sart.Main.lambda$main$0(Main.java:57)
> > > at
> > >
> > >
> >
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1013)
> > > at
> > >
> > >
> >
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1133)
> > > at
> > >
> > >
> >
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)
> > > at
> > >
> > >
> >
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)
> > > at
> > >
> > >
> >
> org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65)
> > > at
> > >
> > >
> >
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> > > at
> > >
> > >
> >
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> > > at
> > >
> > >
> >
> org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
> > > 16:26:50.924 [Thread-0 (ActiveMQ-client-netty-threads)] DEBUG
> > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl -
> Sending
> > > 65962 from flow-control
> > >
> > > Simple Client Code below:
> > >
> > >
> > > public class Main {
> > >
> > >     private final static String ACCEPTOR = "tcp://localhost:9322";
> > >     private final static String QUEUE="service.images.dev::
> > > service.images.dev";
> > >
> > >     public static void main(String[] args) throws Exception {
> > >
> > >         final String username = null;
> > >         final String password = null;
> > >
> > >         var serverLocator = ActiveMQClient
> > >                 .createServerLocator(ACCEPTOR)
> > >                 .setBlockOnDurableSend(true)
> > >                 .setBlockOnNonDurableSend(true);
> > >
> > >         final var sessionFactory =
> serverLocator.createSessionFactory();
> > >
> > >         final var xa = false;
> > >         final var autoCommitSends = true;
> > >         final var autoCommitAcks = true;
> > >         final var ackBatchSize = serverLocator.getAckBatchSize();
> > >         final var preAcknowledge = serverLocator.isPreAcknowledge();
> > >         final var clientSession = sessionFactory.createSession(
> > >                 username,
> > >                 password,
> > >                 xa,
> > >                 autoCommitSends,
> > >                 autoCommitAcks,
> > >                 preAcknowledge,
> > >                 ackBatchSize
> > >         );
> > >
> > >         var queueQueryResult =
> > > clientSession.queueQuery(SimpleString.toSimpleString(QUEUE));
> > >         if (!queueQueryResult.isExists()) {
> > >             clientSession.createQueue(_ServiceQueueConfiguration(new
> > > SimpleString(QUEUE)));
> > >         }
> > >
> > >         final var consumer = clientSession.createConsumer(QUEUE);
> > >
> > >         clientSession.start();
> > >
> > >         consumer.setMessageHandler((msg) -> {
> > >
> > >             System.out.println("Received: "+msg.getBodySize());
> > >             msg.getDataBuffer();
> > >
> > >         });
> > >
> > >         while(true) {
> > >             Thread.sleep(1000);
> > >         }
> > >
> > >     }
> > >
> > >     private static QueueConfiguration
> > > _ServiceQueueConfiguration(SimpleString queueName) {
> > >         final var config = new QueueConfiguration(queueName);
> > >         config.setMaxConsumers(1);
> > >         config.setPurgeOnNoConsumers(false);
> > >         config.setDurable(false);
> > >         config.setAutoDelete(false);
> > >         config.setRoutingType(RoutingType.MULTICAST);
> > >         return config;
> > >     }
> > >
> > >
> > > On Thu, Feb 17, 2022 at 2:28 PM Justin Bertram <jb...@apache.org>
> > > wrote:
> > >
> > > > Typically an IndexOutOfBoundsException indicates a bug. Do you have a
> > way
> > > > to reproduce this?
> > > >
> > > >
> > > > Justin
> > > >
> > > > On Thu, Feb 17, 2022 at 3:17 PM Tim Jones <ti...@abcwxy.com> wrote:
> > > >
> > > > > This seems to appear on larger messages only - I am getting a
> warning
> > > > when
> > > > > calling getDataBuffer (2.20.0 Artemis Client).  Curious if there is
> > > > > something I may be missing - or if this is completely ignorable?
> > > Thanks -
> > > > > Tim
> > > > >
> > > > > 2022-02-17T21:06:11,908+01:00 [Thread-2
> > > (ActiveMQ-client-global-threads)]
> > > > > WARN o.a.a.a.c.m.i.CoreMessage [Thread-2
> > > > (ActiveMQ-client-global-threads)]
> > > > > readerIndex(270740) + length(270740) exceeds writerIndex(271572):
> > > > >
> UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > > > 270740, widx: 271572, cap: 271572)
> > > > > java.lang.IndexOutOfBoundsException: readerIndex(270740) +
> > > length(270740)
> > > > > exceeds writerIndex(271572):
> > > > >
> UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > > > 270740, widx: 271572, cap: 271572)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
> > > > > at
> > io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
> > > > >
> > > >
> > >
> > --
> > Clebert Suconic
> >
>
-- 
Clebert Suconic

Re: IndexOutOfBounds Warning Message

Posted by Justin Bertram <jb...@apache.org>.
I recreated this exception with a very simple test-case. I took the
consumer code pasted earlier in the thread and just added a producer
sending a large message. I lowered the minLargeMessageSize to make it
faster. I thought I still had that code laying around somewhere, but I
can't find it at the moment.


Justin

On Sun, Feb 20, 2022 at 9:03 AM Clebert Suconic <cl...@gmail.com>
wrote:

> I have seen (and fixed) cases where the large message file is gone.  I
> would need a reproducer creating the issue from scratch (send and consume)
>
> Typically it could be associated with paging ?  Did you have the
> destination in page mode ?
>
> On Thu, Feb 17, 2022 at 6:53 PM Tim Jones <ti...@abcwxy.com> wrote:
>
> > The client code below the stack trace - will reproduce it  - the
> > msg.getDataBuffer() inside the handler of that code will trigger it when
> a
> > large message is sent to the address.  The exception in question
> > (IndexOutOfBoundsException) is being caught in line 249 of the apache
> > CoreMessage code... and turned into a logged warning on line 250.  The
> > CoreMessage code then returns the buffer from getReadOnlyBuffer() (which
> > appears to be fine from a quick survey - data also seems to be preserved
> > and accessible from client code) - it is just not clear what the intent
> is
> > if the exception code is executed... is it a "don't worry about it" - or
> a
> > "something is wrong here" and I should concern myself with something?
> (the
> > warning level is making me believe that it is more than a "don't worry
> > about it" - but the fact that the exception was caught and a valid buffer
> > is returned makes me think it is just a fallback for the other choices of
> > buffers - and I should not worry about it?).
> >
> > Thanks for any insight you may have....
> >
> > logged warning from the caught exception is:
> >
> > 16:26:50.923 [Thread-0 (ActiveMQ-client-global-threads)] WARN
> > org.apache.activemq.artemis.core.message.impl.CoreMessage -
> readerIndex(4)
> > + length(270740) exceeds writerIndex(4):
> > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 4,
> > widx: 4, cap: 270740)
> > java.lang.IndexOutOfBoundsException: readerIndex(4) + length(270740)
> > exceeds writerIndex(4):
> > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 4,
> > widx: 4, cap: 270740)
> > at
> >
> >
> io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
> > at
> >
> >
> io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
> > at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
> > at
> >
> >
> org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
> > at
> >
> >
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
> > at
> >
> >
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
> > at io.m45.sart.Main.lambda$main$0(Main.java:57)
> > at
> >
> >
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1013)
> > at
> >
> >
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1133)
> > at
> >
> >
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)
> > at
> >
> >
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)
> > at
> >
> >
> org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65)
> > at
> >
> >
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> > at
> >
> >
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> > at
> >
> >
> org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
> > 16:26:50.924 [Thread-0 (ActiveMQ-client-netty-threads)] DEBUG
> > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl - Sending
> > 65962 from flow-control
> >
> > Simple Client Code below:
> >
> >
> > public class Main {
> >
> >     private final static String ACCEPTOR = "tcp://localhost:9322";
> >     private final static String QUEUE="service.images.dev::
> > service.images.dev";
> >
> >     public static void main(String[] args) throws Exception {
> >
> >         final String username = null;
> >         final String password = null;
> >
> >         var serverLocator = ActiveMQClient
> >                 .createServerLocator(ACCEPTOR)
> >                 .setBlockOnDurableSend(true)
> >                 .setBlockOnNonDurableSend(true);
> >
> >         final var sessionFactory = serverLocator.createSessionFactory();
> >
> >         final var xa = false;
> >         final var autoCommitSends = true;
> >         final var autoCommitAcks = true;
> >         final var ackBatchSize = serverLocator.getAckBatchSize();
> >         final var preAcknowledge = serverLocator.isPreAcknowledge();
> >         final var clientSession = sessionFactory.createSession(
> >                 username,
> >                 password,
> >                 xa,
> >                 autoCommitSends,
> >                 autoCommitAcks,
> >                 preAcknowledge,
> >                 ackBatchSize
> >         );
> >
> >         var queueQueryResult =
> > clientSession.queueQuery(SimpleString.toSimpleString(QUEUE));
> >         if (!queueQueryResult.isExists()) {
> >             clientSession.createQueue(_ServiceQueueConfiguration(new
> > SimpleString(QUEUE)));
> >         }
> >
> >         final var consumer = clientSession.createConsumer(QUEUE);
> >
> >         clientSession.start();
> >
> >         consumer.setMessageHandler((msg) -> {
> >
> >             System.out.println("Received: "+msg.getBodySize());
> >             msg.getDataBuffer();
> >
> >         });
> >
> >         while(true) {
> >             Thread.sleep(1000);
> >         }
> >
> >     }
> >
> >     private static QueueConfiguration
> > _ServiceQueueConfiguration(SimpleString queueName) {
> >         final var config = new QueueConfiguration(queueName);
> >         config.setMaxConsumers(1);
> >         config.setPurgeOnNoConsumers(false);
> >         config.setDurable(false);
> >         config.setAutoDelete(false);
> >         config.setRoutingType(RoutingType.MULTICAST);
> >         return config;
> >     }
> >
> >
> > On Thu, Feb 17, 2022 at 2:28 PM Justin Bertram <jb...@apache.org>
> > wrote:
> >
> > > Typically an IndexOutOfBoundsException indicates a bug. Do you have a
> way
> > > to reproduce this?
> > >
> > >
> > > Justin
> > >
> > > On Thu, Feb 17, 2022 at 3:17 PM Tim Jones <ti...@abcwxy.com> wrote:
> > >
> > > > This seems to appear on larger messages only - I am getting a warning
> > > when
> > > > calling getDataBuffer (2.20.0 Artemis Client).  Curious if there is
> > > > something I may be missing - or if this is completely ignorable?
> > Thanks -
> > > > Tim
> > > >
> > > > 2022-02-17T21:06:11,908+01:00 [Thread-2
> > (ActiveMQ-client-global-threads)]
> > > > WARN o.a.a.a.c.m.i.CoreMessage [Thread-2
> > > (ActiveMQ-client-global-threads)]
> > > > readerIndex(270740) + length(270740) exceeds writerIndex(271572):
> > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > > 270740, widx: 271572, cap: 271572)
> > > > java.lang.IndexOutOfBoundsException: readerIndex(270740) +
> > length(270740)
> > > > exceeds writerIndex(271572):
> > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > > 270740, widx: 271572, cap: 271572)
> > > > at
> > > >
> > > >
> > >
> >
> io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
> > > > at
> > > >
> > > >
> > >
> >
> io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
> > > > at
> io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
> > > >
> > >
> >
> --
> Clebert Suconic
>

Re: IndexOutOfBounds Warning Message

Posted by Clebert Suconic <cl...@gmail.com>.
I have seen (and fixed) cases where the large message file is gone.  I
would need a reproducer creating the issue from scratch (send and consume)

Typically it could be associated with paging ?  Did you have the
destination in page mode ?

On Thu, Feb 17, 2022 at 6:53 PM Tim Jones <ti...@abcwxy.com> wrote:

> The client code below the stack trace - will reproduce it  - the
> msg.getDataBuffer() inside the handler of that code will trigger it when a
> large message is sent to the address.  The exception in question
> (IndexOutOfBoundsException) is being caught in line 249 of the apache
> CoreMessage code... and turned into a logged warning on line 250.  The
> CoreMessage code then returns the buffer from getReadOnlyBuffer() (which
> appears to be fine from a quick survey - data also seems to be preserved
> and accessible from client code) - it is just not clear what the intent is
> if the exception code is executed... is it a "don't worry about it" - or a
> "something is wrong here" and I should concern myself with something? (the
> warning level is making me believe that it is more than a "don't worry
> about it" - but the fact that the exception was caught and a valid buffer
> is returned makes me think it is just a fallback for the other choices of
> buffers - and I should not worry about it?).
>
> Thanks for any insight you may have....
>
> logged warning from the caught exception is:
>
> 16:26:50.923 [Thread-0 (ActiveMQ-client-global-threads)] WARN
> org.apache.activemq.artemis.core.message.impl.CoreMessage - readerIndex(4)
> + length(270740) exceeds writerIndex(4):
> UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 4,
> widx: 4, cap: 270740)
> java.lang.IndexOutOfBoundsException: readerIndex(4) + length(270740)
> exceeds writerIndex(4):
> UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 4,
> widx: 4, cap: 270740)
> at
>
> io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
> at
>
> io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
> at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
> at
>
> org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
> at
>
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
> at
>
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
> at io.m45.sart.Main.lambda$main$0(Main.java:57)
> at
>
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1013)
> at
>
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1133)
> at
>
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)
> at
>
> org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)
> at
>
> org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65)
> at
>
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
>
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at
>
> org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
> 16:26:50.924 [Thread-0 (ActiveMQ-client-netty-threads)] DEBUG
> org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl - Sending
> 65962 from flow-control
>
> Simple Client Code below:
>
>
> public class Main {
>
>     private final static String ACCEPTOR = "tcp://localhost:9322";
>     private final static String QUEUE="service.images.dev::
> service.images.dev";
>
>     public static void main(String[] args) throws Exception {
>
>         final String username = null;
>         final String password = null;
>
>         var serverLocator = ActiveMQClient
>                 .createServerLocator(ACCEPTOR)
>                 .setBlockOnDurableSend(true)
>                 .setBlockOnNonDurableSend(true);
>
>         final var sessionFactory = serverLocator.createSessionFactory();
>
>         final var xa = false;
>         final var autoCommitSends = true;
>         final var autoCommitAcks = true;
>         final var ackBatchSize = serverLocator.getAckBatchSize();
>         final var preAcknowledge = serverLocator.isPreAcknowledge();
>         final var clientSession = sessionFactory.createSession(
>                 username,
>                 password,
>                 xa,
>                 autoCommitSends,
>                 autoCommitAcks,
>                 preAcknowledge,
>                 ackBatchSize
>         );
>
>         var queueQueryResult =
> clientSession.queueQuery(SimpleString.toSimpleString(QUEUE));
>         if (!queueQueryResult.isExists()) {
>             clientSession.createQueue(_ServiceQueueConfiguration(new
> SimpleString(QUEUE)));
>         }
>
>         final var consumer = clientSession.createConsumer(QUEUE);
>
>         clientSession.start();
>
>         consumer.setMessageHandler((msg) -> {
>
>             System.out.println("Received: "+msg.getBodySize());
>             msg.getDataBuffer();
>
>         });
>
>         while(true) {
>             Thread.sleep(1000);
>         }
>
>     }
>
>     private static QueueConfiguration
> _ServiceQueueConfiguration(SimpleString queueName) {
>         final var config = new QueueConfiguration(queueName);
>         config.setMaxConsumers(1);
>         config.setPurgeOnNoConsumers(false);
>         config.setDurable(false);
>         config.setAutoDelete(false);
>         config.setRoutingType(RoutingType.MULTICAST);
>         return config;
>     }
>
>
> On Thu, Feb 17, 2022 at 2:28 PM Justin Bertram <jb...@apache.org>
> wrote:
>
> > Typically an IndexOutOfBoundsException indicates a bug. Do you have a way
> > to reproduce this?
> >
> >
> > Justin
> >
> > On Thu, Feb 17, 2022 at 3:17 PM Tim Jones <ti...@abcwxy.com> wrote:
> >
> > > This seems to appear on larger messages only - I am getting a warning
> > when
> > > calling getDataBuffer (2.20.0 Artemis Client).  Curious if there is
> > > something I may be missing - or if this is completely ignorable?
> Thanks -
> > > Tim
> > >
> > > 2022-02-17T21:06:11,908+01:00 [Thread-2
> (ActiveMQ-client-global-threads)]
> > > WARN o.a.a.a.c.m.i.CoreMessage [Thread-2
> > (ActiveMQ-client-global-threads)]
> > > readerIndex(270740) + length(270740) exceeds writerIndex(271572):
> > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > 270740, widx: 271572, cap: 271572)
> > > java.lang.IndexOutOfBoundsException: readerIndex(270740) +
> length(270740)
> > > exceeds writerIndex(271572):
> > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > 270740, widx: 271572, cap: 271572)
> > > at
> > >
> > >
> >
> io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
> > > at
> > >
> > >
> >
> io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
> > > at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
> > > at
> > >
> > >
> >
> org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
> > > at
> > >
> > >
> >
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
> > > at
> > >
> > >
> >
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
> > >
> >
>
-- 
Clebert Suconic

Re: IndexOutOfBounds Warning Message

Posted by Tim Jones <ti...@abcwxy.com>.
The client code below the stack trace - will reproduce it  - the
msg.getDataBuffer() inside the handler of that code will trigger it when a
large message is sent to the address.  The exception in question
(IndexOutOfBoundsException) is being caught in line 249 of the apache
CoreMessage code... and turned into a logged warning on line 250.  The
CoreMessage code then returns the buffer from getReadOnlyBuffer() (which
appears to be fine from a quick survey - data also seems to be preserved
and accessible from client code) - it is just not clear what the intent is
if the exception code is executed... is it a "don't worry about it" - or a
"something is wrong here" and I should concern myself with something? (the
warning level is making me believe that it is more than a "don't worry
about it" - but the fact that the exception was caught and a valid buffer
is returned makes me think it is just a fallback for the other choices of
buffers - and I should not worry about it?).

Thanks for any insight you may have....

logged warning from the caught exception is:

16:26:50.923 [Thread-0 (ActiveMQ-client-global-threads)] WARN
org.apache.activemq.artemis.core.message.impl.CoreMessage - readerIndex(4)
+ length(270740) exceeds writerIndex(4):
UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 4,
widx: 4, cap: 270740)
java.lang.IndexOutOfBoundsException: readerIndex(4) + length(270740)
exceeds writerIndex(4):
UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 4,
widx: 4, cap: 270740)
at
io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
at
io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
at
org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
at
org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
at
org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
at io.m45.sart.Main.lambda$main$0(Main.java:57)
at
org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1013)
at
org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1133)
at
org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)
at
org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)
at
org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at
org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
16:26:50.924 [Thread-0 (ActiveMQ-client-netty-threads)] DEBUG
org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl - Sending
65962 from flow-control

Simple Client Code below:


public class Main {

    private final static String ACCEPTOR = "tcp://localhost:9322";
    private final static String QUEUE="service.images.dev::service.images.dev";

    public static void main(String[] args) throws Exception {

        final String username = null;
        final String password = null;

        var serverLocator = ActiveMQClient
                .createServerLocator(ACCEPTOR)
                .setBlockOnDurableSend(true)
                .setBlockOnNonDurableSend(true);

        final var sessionFactory = serverLocator.createSessionFactory();

        final var xa = false;
        final var autoCommitSends = true;
        final var autoCommitAcks = true;
        final var ackBatchSize = serverLocator.getAckBatchSize();
        final var preAcknowledge = serverLocator.isPreAcknowledge();
        final var clientSession = sessionFactory.createSession(
                username,
                password,
                xa,
                autoCommitSends,
                autoCommitAcks,
                preAcknowledge,
                ackBatchSize
        );

        var queueQueryResult =
clientSession.queueQuery(SimpleString.toSimpleString(QUEUE));
        if (!queueQueryResult.isExists()) {
            clientSession.createQueue(_ServiceQueueConfiguration(new
SimpleString(QUEUE)));
        }

        final var consumer = clientSession.createConsumer(QUEUE);

        clientSession.start();

        consumer.setMessageHandler((msg) -> {

            System.out.println("Received: "+msg.getBodySize());
            msg.getDataBuffer();

        });

        while(true) {
            Thread.sleep(1000);
        }

    }

    private static QueueConfiguration
_ServiceQueueConfiguration(SimpleString queueName) {
        final var config = new QueueConfiguration(queueName);
        config.setMaxConsumers(1);
        config.setPurgeOnNoConsumers(false);
        config.setDurable(false);
        config.setAutoDelete(false);
        config.setRoutingType(RoutingType.MULTICAST);
        return config;
    }


On Thu, Feb 17, 2022 at 2:28 PM Justin Bertram <jb...@apache.org> wrote:

> Typically an IndexOutOfBoundsException indicates a bug. Do you have a way
> to reproduce this?
>
>
> Justin
>
> On Thu, Feb 17, 2022 at 3:17 PM Tim Jones <ti...@abcwxy.com> wrote:
>
> > This seems to appear on larger messages only - I am getting a warning
> when
> > calling getDataBuffer (2.20.0 Artemis Client).  Curious if there is
> > something I may be missing - or if this is completely ignorable? Thanks -
> > Tim
> >
> > 2022-02-17T21:06:11,908+01:00 [Thread-2 (ActiveMQ-client-global-threads)]
> > WARN o.a.a.a.c.m.i.CoreMessage [Thread-2
> (ActiveMQ-client-global-threads)]
> > readerIndex(270740) + length(270740) exceeds writerIndex(271572):
> > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > 270740, widx: 271572, cap: 271572)
> > java.lang.IndexOutOfBoundsException: readerIndex(270740) + length(270740)
> > exceeds writerIndex(271572):
> > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > 270740, widx: 271572, cap: 271572)
> > at
> >
> >
> io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
> > at
> >
> >
> io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
> > at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
> > at
> >
> >
> org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
> > at
> >
> >
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
> > at
> >
> >
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
> >
>

Re: IndexOutOfBounds Warning Message

Posted by Justin Bertram <jb...@apache.org>.
Typically an IndexOutOfBoundsException indicates a bug. Do you have a way
to reproduce this?


Justin

On Thu, Feb 17, 2022 at 3:17 PM Tim Jones <ti...@abcwxy.com> wrote:

> This seems to appear on larger messages only - I am getting a warning when
> calling getDataBuffer (2.20.0 Artemis Client).  Curious if there is
> something I may be missing - or if this is completely ignorable? Thanks -
> Tim
>
> 2022-02-17T21:06:11,908+01:00 [Thread-2 (ActiveMQ-client-global-threads)]
> WARN o.a.a.a.c.m.i.CoreMessage [Thread-2 (ActiveMQ-client-global-threads)]
> readerIndex(270740) + length(270740) exceeds writerIndex(271572):
> UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> 270740, widx: 271572, cap: 271572)
> java.lang.IndexOutOfBoundsException: readerIndex(270740) + length(270740)
> exceeds writerIndex(271572):
> UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> 270740, widx: 271572, cap: 271572)
> at
>
> io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
> at
>
> io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
> at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
> at
>
> org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
> at
>
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
> at
>
> org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
>