You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Justin Bertram (Jira)" <ji...@apache.org> on 2022/10/17 22:09:00 UTC

[jira] [Comment Edited] (ARTEMIS-4050) Last value queue not working as expected

    [ https://issues.apache.org/jira/browse/ARTEMIS-4050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17619126#comment-17619126 ] 

Justin Bertram edited comment on ARTEMIS-4050 at 10/17/22 10:08 PM:
--------------------------------------------------------------------

Providing a test that works as expected is not sufficient when fixing a bug. We need a test that fails _before_ the fix and succeeds _after_ the fix.

In any case, it's not clear if the behavior observed by the stomp-php client is actually a bug or not without a way to reproduce that behavior. 

Additionally, your test-code is not representative of the traditional producer/consumer behavior in my opinion. The consumer subscribes but doesn't actually attempt to receive any messages until the producer has sent all the messages. Normally if the consumer subscribed before messages were sent then it would be actively attempting to receive those messages immediately such that the producer and consumer would be operating _concurrently_. Despite the fact that the test creates a thread for the consumer and a thread for the producer the threads don't execute concurrently. The test could be rewritten without any threads and would be 100% equivalent, e.g.:
{code:java}
@Test
public void testLVQNoThreads() throws Exception {
   final String name = "lvq";

   producerConn.connect(defUser, defPass);
   consumerConn.connect(defUser, defPass);

   subscribe(consumerConn, "lvqtest", Stomp.Headers.Subscribe.AckModeValues.CLIENT, null, null, name, true, 0);

   try {
      for (int i = 1; i <= 100; i++) {
         String uuid = UUID.randomUUID().toString();

         ClientStompFrame frame = producerConn.createFrame(Stomp.Commands.SEND).addHeader(Stomp.Headers.Send.DESTINATION, name).addHeader(Message.HDR_LAST_VALUE_NAME.toString(), "test").addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid)
                                              // .addHeader(Stomp.Headers.Send.PERSISTENT, "true")
                                              .setBody(String.valueOf(i));

         frame = producerConn.sendFrame(frame);

         assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
         assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
      }
   } catch (Exception e) {
      logger.error(null, e);
   }

   try {
      List<ClientStompFrame> messages = new ArrayList<>();
      ClientStompFrame frame;

      while ((frame = consumerConn.receiveFrame(10000)) != null) {
         assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());

         ack(consumerConn, null, frame);

         messages.add(frame);
      }

      logger.info("Received messages: {}", messages);

      Assert.assertEquals(2, messages.size());
      Assert.assertEquals("1", messages.get(0).getBody());
      Assert.assertEquals("100", messages.get(1).getBody());
   } catch (Exception e) {
      logger.error(null, e);
   }
}{code}
It's likely that the stomp-php client _is_, in fact, actively attempting to consume messages while messages are being sent to the queue in which case it's totally normal for it to receive all the messages. Change your test to run the consumer and producer threads concurrently and you'll see this behavior.

To be clear, the main use-case for LVQ is to impact what messages are in the queue when no consumers are actually connected.

At this point I don't see the need to alter the existing behavior. Everything seems to be working as designed. Of course, I'm certainly be willing to revisit this with a test-case demonstrating erroneous behavior.


was (Author: jbertram):
Providing a test that works as expected is not sufficient when fixing a bug. We need a test that fails _before_ the fix and succeeds _after_ the fix.

In any case, it's not clear if the behavior observed by the stomp-php client is actually a bug or not without a way to reproduce that behavior. 

Additionally, your test-code is not representative of the traditional producer/consumer behavior in my opinion. The consumer subscribes but doesn't actually attempt to receive any messages until the producer has sent all the messages. Normally if the consumer subscribed before messages were sent then it would be actively attempting to receive those messages immediately such that the producer and consumer would be operating _concurrently_. Despite the fact that the test creates a thread for the consumer and a thread for the producer the threads don't execute concurrently. The test could be rewritten without any threads and would be 100% equivalent, e.g.:
{code:java}
@Test
public void testLVQNoThreads() throws Exception {
   final String name = "lvq";

   producerConn.connect(defUser, defPass);
   consumerConn.connect(defUser, defPass);

   subscribe(consumerConn, "lvqtest", Stomp.Headers.Subscribe.AckModeValues.CLIENT, null, null, name, true, 0);

   try {
      for (int i = 1; i <= 100; i++) {
         String uuid = UUID.randomUUID().toString();

         ClientStompFrame frame = producerConn.createFrame(Stomp.Commands.SEND).addHeader(Stomp.Headers.Send.DESTINATION, name).addHeader(Message.HDR_LAST_VALUE_NAME.toString(), "test").addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid)
                                              // .addHeader(Stomp.Headers.Send.PERSISTENT, "true")
                                              .setBody(String.valueOf(i));

         frame = producerConn.sendFrame(frame);

         assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
         assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
      }
   } catch (Exception e) {
      logger.error(null, e);
   }

   try {
      List<ClientStompFrame> messages = new ArrayList<>();
      ClientStompFrame frame;

      while ((frame = consumerConn.receiveFrame(10000)) != null) {
         assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());

         ack(consumerConn, null, frame);

         messages.add(frame);
      }

      logger.info("Received messages: {}", messages);

      Assert.assertEquals(2, messages.size());
      Assert.assertEquals("1", messages.get(0).getBody());
      Assert.assertEquals("100", messages.get(1).getBody());
   } catch (Exception e) {
      logger.error(null, e);
   }
}{code}
It's likely that the stomp-php client _is_, in fact, actively attempting to consume messages while messages are being sent to the queue in which case it's totally normal for it to receive all the messages. Change your test to run the consumer and producer threads concurrently and you'll see this behavior.

To be clear, the main use-case for LVQ is to impact what messages are in the queue when no consumers are actually connected.

> Last value queue not working as expected
> ----------------------------------------
>
>                 Key: ARTEMIS-4050
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4050
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: STOMP
>    Affects Versions: 2.26.0
>            Reporter: Lauri Keel
>            Assignee: Justin Bertram
>            Priority: Major
>
> Currently in some cases last value queues deliver all messages to consumers as opposed to only the last one.
> I wrote the following test which is working fine:
> {code:java}
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements. See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License. You may obtain a copy of the License at
>  *
>  *     http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> package org.apache.activemq.artemis.tests.integration.stomp;
> import org.apache.activemq.artemis.api.core.Message;
> import org.apache.activemq.artemis.api.core.QueueConfiguration;
> import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
> import org.apache.activemq.artemis.core.server.ActiveMQServer;
> import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
> import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
> import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
> import org.junit.After;
> import org.junit.Assert;
> import org.junit.Before;
> import org.junit.Test;
> import org.junit.runner.RunWith;
> import org.junit.runners.Parameterized;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import java.lang.invoke.MethodHandles;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.UUID;
> @RunWith(Parameterized.class)
> public class StompLVQTest extends StompTestBase {
>  private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
>  protected StompClientConnection producerConn;
>  protected StompClientConnection consumerConn;
>  @Override
>  protected ActiveMQServer createServer() throws Exception {
>   ActiveMQServer server = super.createServer();
>   server.getConfiguration().setAddressQueueScanPeriod(100);
>   return server;
>  }
>  @Override
>  @Before
>  public void setUp() throws Exception {
>   super.setUp();
>   server.createQueue(new QueueConfiguration("lvq").setAddress("lvq").setLastValue(true).setDurable(true));
>   producerConn = StompClientConnectionFactory.createClientConnection(uri);
>   consumerConn = StompClientConnectionFactory.createClientConnection(uri);
>  }
>  @Override
>  @After
>  public void tearDown() throws Exception {
>   try {
>    boolean connected = producerConn != null && producerConn.isConnected();
>    logger.debug("Connection 1.0 connected: {}", connected);
>    if (connected) {
>     try {
>      producerConn.disconnect();
>     } catch (Exception e) {
>      // ignore
>     }
>    }
>   } finally {
>    super.tearDown();
>    producerConn.closeTransport();
>   }
>   try {
>    boolean connected = consumerConn != null && consumerConn.isConnected();
>    logger.debug("Connection 1.0 connected: {}", connected);
>    if (connected) {
>     try {
>      consumerConn.disconnect();
>     } catch (Exception e) {
>      // ignore
>     }
>    }
>   } finally {
>    super.tearDown();
>    consumerConn.closeTransport();
>   }
>  }
>  @Test
>  public void testLVQ() throws Exception {
>   final String name = "lvq";
>   producerConn.connect(defUser, defPass);
>   consumerConn.connect(defUser, defPass);
>   subscribe(consumerConn, "lvqtest", Stomp.Headers.Subscribe.AckModeValues.CLIENT, null, null, name, true, 0);
>   Thread producer = new Thread() {
>    @Override
>    public void run() {
>     try {
>      for (int i = 1; i <= 100; i++) {
>       String uuid = UUID.randomUUID().toString();
>       ClientStompFrame frame = producerConn.createFrame(Stomp.Commands.SEND)
>         .addHeader(Stomp.Headers.Send.DESTINATION, name)
>         .addHeader(Message.HDR_LAST_VALUE_NAME.toString(), "test")
>         .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid)
>         // .addHeader(Stomp.Headers.Send.PERSISTENT, "true")
>         .setBody(String.valueOf(i));
>       frame = producerConn.sendFrame(frame);
>       assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
>       assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
>      }
>     } catch(Exception e) {
>      logger.error(null, e);
>     }
>    }
>   };
>   Thread consumer = new Thread() {
>    @Override
>    public void run() {
>     try {
>      List<ClientStompFrame> messages = new ArrayList<>();
>      ClientStompFrame frame;
>      while((frame = consumerConn.receiveFrame(10000)) != null)
>      {
>       assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
>       ack(consumerConn, null, frame);
>       messages.add(frame);
>      }
>      logger.info("Received messages: {}", messages);
>      Assert.assertEquals(2, messages.size());
>      Assert.assertEquals("1", messages.get(0).getBody());
>      Assert.assertEquals("100", messages.get(1).getBody());
>     } catch(Exception e) {
>      logger.error(null, e);
>     }
>    }
>   };
>   producer.start();
>   producer.join();
>   consumer.start();
>   consumer.join();
>  }
> } {code}
> The client subscribes before the producer starts sending the messages and receives only the first and the last message which is expected – the first one is in delivery and for the next ones the very last one will be kept.
> In the actual application (via stomp-php) it is not working: all messages get delivered to the consumer.
> The queue with the issue is created by defining:
> {code:xml}
> <address name="queue">
>     <anycast>
>         <queue name="queue" last-value="true">
>             <durable>true</durable>
>         </queue>
>     </anycast>
> </address>{code}
> which should be the equivalent of the one in the test.
> The issue seems to be in {{QueueImpl::deliver}}:
> {code:java}
> ConsumerHolder<? extends Consumer> holder;
> if (consumers.hasNext()) {
>    holder = consumers.next();
> } else {
>    pruneLastValues();
>    break;
> } {code}
> where {{pruneLastValues()}} should always be called (or rather each message checked for duplicates).
> If that is the expected behaviour, then I suggest to add an optional feature to always deduplicate the messages.
> I propose the following as the solution:
> {noformat}
> diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
> index e67ae7dab1..82bebeaf19 100644
> --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
> +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
> @@ -199,21 +199,31 @@ public class LastValueQueue extends QueueImpl {
>        // called with synchronized(this) from super.deliver()
>        try (LinkedListIterator<MessageReference> iter = messageReferences.iterator()) {
>           while (iter.hasNext()) {
> -            MessageReference ref = iter.next();
> -            if (!currentLastValue(ref)) {
> +            MessageReference ref = interceptMessage(iter.next());
> +            if (ref == null) {
>                 iter.remove();
> -               try {
> -                  referenceHandled(ref);
> -                  super.refRemoved(ref);
> -                  ref.acknowledge(null, AckReason.REPLACED, null);
> -               } catch (Exception e) {
> -                  ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
> -               }
>              }
>           }
>        }
>     }
>  
> +   @Override
> +   protected MessageReference interceptMessage(MessageReference ref) {
> +      if (!currentLastValue(ref)) {
> +         try {
> +            referenceHandled(ref);
> +            super.refRemoved(ref);
> +            ref.acknowledge(null, AckReason.REPLACED, null);
> +         } catch (Exception e) {
> +            ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
> +         }
> +
> +         return null;
> +      }
> +
> +      return super.interceptMessage(ref);
> +   }
> +
>     private boolean currentLastValue(final MessageReference ref) {
>        boolean currentLastValue = false;
>        SimpleString lastValueProp = ref.getLastValueProperty();
> diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
> index 66d6ff789b..77cdc0db90 100644
> --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
> +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
> @@ -3035,15 +3035,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
>  
>              Consumer consumer = holder.consumer;
>              Consumer groupConsumer = null;
> +            ref = null;
>  
>              if (holder.iter == null) {
>                 holder.iter = messageReferences.iterator();
>              }
>  
> -            if (holder.iter.hasNext()) {
> -               ref = holder.iter.next();
> -            } else {
> -               ref = null;
> +            while (holder.iter.hasNext()) {
> +               ref = interceptMessage(holder.iter.next());
> +
> +               if (ref == null) {
> +                  holder.iter.remove();
> +                  handled++;
> +               } else {
> +                  break;
> +               }
>              }
>  
>              if (ref == null) {
> @@ -3154,6 +3160,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
>        // interception point for LVQ
>     }
>  
> +   protected MessageReference interceptMessage(MessageReference ref) {
> +      return ref;
> +   }
> +
>     protected void removeMessageReference(ConsumerHolder<? extends Consumer> holder, MessageReference ref) {
>        holder.iter.remove();
>        refRemoved(ref);{noformat}
> {{currentLastValue()}} is lightweight, so there is no major performance impact checking whether each message is the last version or not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)