You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Justin Bertram (Jira)" <ji...@apache.org> on 2022/11/07 17:49:00 UTC

[jira] [Updated] (ARTEMIS-4085) Exclusive LVQ not working as expected

     [ https://issues.apache.org/jira/browse/ARTEMIS-4085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Justin Bertram updated ARTEMIS-4085:
------------------------------------
    Description: 
Currently exclusive last value queues deliver all messages to consumers as opposed to only the last one.

I wrote the following test:
{code:java}
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.artemis.tests.integration.stomp;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

@RunWith(Parameterized.class)
public class StompLVQTest extends StompTestBase {

 private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

 protected StompClientConnection producerConn;
 protected StompClientConnection consumerConn;

 @Override
 protected ActiveMQServer createServer() throws Exception {
  ActiveMQServer server = super.createServer();
  server.getConfiguration().setAddressQueueScanPeriod(100);
  return server;
 }

 @Override
 @Before
 public void setUp() throws Exception {
  super.setUp();

  server.createQueue(new QueueConfiguration("lvq").setAddress("lvq").setLastValue(true).setDurable(true).setExclusive(true));

  producerConn = StompClientConnectionFactory.createClientConnection(uri);
  consumerConn = StompClientConnectionFactory.createClientConnection(uri);
 }

 @Override
 @After
 public void tearDown() throws Exception {
  try {
   boolean connected = producerConn != null && producerConn.isConnected();
   logger.debug("Connection 1.0 connected: {}", connected);
   if (connected) {
    try {
     producerConn.disconnect();
    } catch (Exception e) {
     // ignore
    }
   }
  } finally {
   super.tearDown();
   producerConn.closeTransport();
  }

  try {
   boolean connected = consumerConn != null && consumerConn.isConnected();
   logger.debug("Connection 1.0 connected: {}", connected);
   if (connected) {
    try {
     consumerConn.disconnect();
    } catch (Exception e) {
     // ignore
    }
   }
  } finally {
   super.tearDown();
   consumerConn.closeTransport();
  }
 }

 @Test
 public void testLVQ() throws Exception {
  final String name = "lvq";

  producerConn.connect(defUser, defPass);
  consumerConn.connect(defUser, defPass);

  subscribe(consumerConn, "lvqtest", Stomp.Headers.Subscribe.AckModeValues.CLIENT, null, null, name, true, 0);

  Thread producer = new Thread() {
   @Override
   public void run() {
    try {
     for (int i = 1; i <= 100; i++) {
      String uuid = UUID.randomUUID().toString();

      ClientStompFrame frame = producerConn.createFrame(Stomp.Commands.SEND)
        .addHeader(Stomp.Headers.Send.DESTINATION, name)
        .addHeader(Message.HDR_LAST_VALUE_NAME.toString(), "test")
        .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid)
        // .addHeader(Stomp.Headers.Send.PERSISTENT, "true")
        .setBody(String.valueOf(i));

      frame = producerConn.sendFrame(frame);

      assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
      assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
     }
    } catch(Exception e) {
     logger.error(null, e);
    }
   }
  };

  Thread consumer = new Thread() {
   @Override
   public void run() {
    try {
     List<ClientStompFrame> messages = new ArrayList<>();
     ClientStompFrame frame;

     while((frame = consumerConn.receiveFrame(10000)) != null)
     {
      assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());

      ack(consumerConn, null, frame);

      messages.add(frame);
     }

     logger.info("Received messages: {}", messages);

     Assert.assertEquals(2, messages.size());
     Assert.assertEquals("1", messages.get(0).getBody());
     Assert.assertEquals("100", messages.get(1).getBody());
    } catch(Exception e) {
     logger.error(null, e);
    }
   }
  };

  producer.start();
  producer.join();

  consumer.start();
  consumer.join();
 }
} {code}

  was:
Currently in some cases last value queues deliver all messages to consumers as opposed to only the last one.

I wrote the following test which is working fine:
{code:java}
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.artemis.tests.integration.stomp;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

@RunWith(Parameterized.class)
public class StompLVQTest extends StompTestBase {

 private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

 protected StompClientConnection producerConn;
 protected StompClientConnection consumerConn;

 @Override
 protected ActiveMQServer createServer() throws Exception {
  ActiveMQServer server = super.createServer();
  server.getConfiguration().setAddressQueueScanPeriod(100);
  return server;
 }

 @Override
 @Before
 public void setUp() throws Exception {
  super.setUp();

  server.createQueue(new QueueConfiguration("lvq").setAddress("lvq").setLastValue(true).setDurable(true));

  producerConn = StompClientConnectionFactory.createClientConnection(uri);
  consumerConn = StompClientConnectionFactory.createClientConnection(uri);
 }

 @Override
 @After
 public void tearDown() throws Exception {
  try {
   boolean connected = producerConn != null && producerConn.isConnected();
   logger.debug("Connection 1.0 connected: {}", connected);
   if (connected) {
    try {
     producerConn.disconnect();
    } catch (Exception e) {
     // ignore
    }
   }
  } finally {
   super.tearDown();
   producerConn.closeTransport();
  }

  try {
   boolean connected = consumerConn != null && consumerConn.isConnected();
   logger.debug("Connection 1.0 connected: {}", connected);
   if (connected) {
    try {
     consumerConn.disconnect();
    } catch (Exception e) {
     // ignore
    }
   }
  } finally {
   super.tearDown();
   consumerConn.closeTransport();
  }
 }

 @Test
 public void testLVQ() throws Exception {
  final String name = "lvq";

  producerConn.connect(defUser, defPass);
  consumerConn.connect(defUser, defPass);

  subscribe(consumerConn, "lvqtest", Stomp.Headers.Subscribe.AckModeValues.CLIENT, null, null, name, true, 0);

  Thread producer = new Thread() {
   @Override
   public void run() {
    try {
     for (int i = 1; i <= 100; i++) {
      String uuid = UUID.randomUUID().toString();

      ClientStompFrame frame = producerConn.createFrame(Stomp.Commands.SEND)
        .addHeader(Stomp.Headers.Send.DESTINATION, name)
        .addHeader(Message.HDR_LAST_VALUE_NAME.toString(), "test")
        .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid)
        // .addHeader(Stomp.Headers.Send.PERSISTENT, "true")
        .setBody(String.valueOf(i));

      frame = producerConn.sendFrame(frame);

      assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
      assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
     }
    } catch(Exception e) {
     logger.error(null, e);
    }
   }
  };

  Thread consumer = new Thread() {
   @Override
   public void run() {
    try {
     List<ClientStompFrame> messages = new ArrayList<>();
     ClientStompFrame frame;

     while((frame = consumerConn.receiveFrame(10000)) != null)
     {
      assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());

      ack(consumerConn, null, frame);

      messages.add(frame);
     }

     logger.info("Received messages: {}", messages);

     Assert.assertEquals(2, messages.size());
     Assert.assertEquals("1", messages.get(0).getBody());
     Assert.assertEquals("100", messages.get(1).getBody());
    } catch(Exception e) {
     logger.error(null, e);
    }
   }
  };

  producer.start();
  producer.join();

  consumer.start();
  consumer.join();
 }
} {code}
The client subscribes before the producer starts sending the messages and receives only the first and the last message which is expected – the first one is in delivery and for the next ones the very last one will be kept.

In the actual application (via stomp-php) it is not working: all messages get delivered to the consumer.

The queue with the issue is created by defining:
{code:xml}
<address name="queue">
    <anycast>
        <queue name="queue" last-value="true">
            <durable>true</durable>
        </queue>
    </anycast>
</address>{code}
which should be the equivalent of the one in the test.

The issue seems to be in {{QueueImpl::deliver}}:
{code:java}
ConsumerHolder<? extends Consumer> holder;
if (consumers.hasNext()) {
   holder = consumers.next();
} else {
   pruneLastValues();
   break;
} {code}
where {{pruneLastValues()}} should always be called (or rather each message checked for duplicates).

If that is the expected behaviour, then I suggest to add an optional feature to always deduplicate the messages.

I propose the following as the solution:
{noformat}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index e67ae7dab1..82bebeaf19 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -199,21 +199,31 @@ public class LastValueQueue extends QueueImpl {
       // called with synchronized(this) from super.deliver()
       try (LinkedListIterator<MessageReference> iter = messageReferences.iterator()) {
          while (iter.hasNext()) {
-            MessageReference ref = iter.next();
-            if (!currentLastValue(ref)) {
+            MessageReference ref = interceptMessage(iter.next());
+            if (ref == null) {
                iter.remove();
-               try {
-                  referenceHandled(ref);
-                  super.refRemoved(ref);
-                  ref.acknowledge(null, AckReason.REPLACED, null);
-               } catch (Exception e) {
-                  ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
-               }
             }
          }
       }
    }
 
+   @Override
+   protected MessageReference interceptMessage(MessageReference ref) {
+      if (!currentLastValue(ref)) {
+         try {
+            referenceHandled(ref);
+            super.refRemoved(ref);
+            ref.acknowledge(null, AckReason.REPLACED, null);
+         } catch (Exception e) {
+            ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
+         }
+
+         return null;
+      }
+
+      return super.interceptMessage(ref);
+   }
+
    private boolean currentLastValue(final MessageReference ref) {
       boolean currentLastValue = false;
       SimpleString lastValueProp = ref.getLastValueProperty();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 66d6ff789b..77cdc0db90 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -3035,15 +3035,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
             Consumer consumer = holder.consumer;
             Consumer groupConsumer = null;
+            ref = null;
 
             if (holder.iter == null) {
                holder.iter = messageReferences.iterator();
             }
 
-            if (holder.iter.hasNext()) {
-               ref = holder.iter.next();
-            } else {
-               ref = null;
+            while (holder.iter.hasNext()) {
+               ref = interceptMessage(holder.iter.next());
+
+               if (ref == null) {
+                  holder.iter.remove();
+                  handled++;
+               } else {
+                  break;
+               }
             }
 
             if (ref == null) {
@@ -3154,6 +3160,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       // interception point for LVQ
    }
 
+   protected MessageReference interceptMessage(MessageReference ref) {
+      return ref;
+   }
+
    protected void removeMessageReference(ConsumerHolder<? extends Consumer> holder, MessageReference ref) {
       holder.iter.remove();
       refRemoved(ref);{noformat}
{{currentLastValue()}} is lightweight, so there is no major performance impact checking whether each message is the last version or not.


> Exclusive LVQ not working as expected
> -------------------------------------
>
>                 Key: ARTEMIS-4085
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4085
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: STOMP
>    Affects Versions: 2.26.0
>            Reporter: Lauri Keel
>            Assignee: Justin Bertram
>            Priority: Major
>
> Currently exclusive last value queues deliver all messages to consumers as opposed to only the last one.
> I wrote the following test:
> {code:java}
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements. See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License. You may obtain a copy of the License at
>  *
>  *     http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> package org.apache.activemq.artemis.tests.integration.stomp;
> import org.apache.activemq.artemis.api.core.Message;
> import org.apache.activemq.artemis.api.core.QueueConfiguration;
> import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
> import org.apache.activemq.artemis.core.server.ActiveMQServer;
> import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
> import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
> import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
> import org.junit.After;
> import org.junit.Assert;
> import org.junit.Before;
> import org.junit.Test;
> import org.junit.runner.RunWith;
> import org.junit.runners.Parameterized;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import java.lang.invoke.MethodHandles;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.UUID;
> @RunWith(Parameterized.class)
> public class StompLVQTest extends StompTestBase {
>  private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
>  protected StompClientConnection producerConn;
>  protected StompClientConnection consumerConn;
>  @Override
>  protected ActiveMQServer createServer() throws Exception {
>   ActiveMQServer server = super.createServer();
>   server.getConfiguration().setAddressQueueScanPeriod(100);
>   return server;
>  }
>  @Override
>  @Before
>  public void setUp() throws Exception {
>   super.setUp();
>   server.createQueue(new QueueConfiguration("lvq").setAddress("lvq").setLastValue(true).setDurable(true).setExclusive(true));
>   producerConn = StompClientConnectionFactory.createClientConnection(uri);
>   consumerConn = StompClientConnectionFactory.createClientConnection(uri);
>  }
>  @Override
>  @After
>  public void tearDown() throws Exception {
>   try {
>    boolean connected = producerConn != null && producerConn.isConnected();
>    logger.debug("Connection 1.0 connected: {}", connected);
>    if (connected) {
>     try {
>      producerConn.disconnect();
>     } catch (Exception e) {
>      // ignore
>     }
>    }
>   } finally {
>    super.tearDown();
>    producerConn.closeTransport();
>   }
>   try {
>    boolean connected = consumerConn != null && consumerConn.isConnected();
>    logger.debug("Connection 1.0 connected: {}", connected);
>    if (connected) {
>     try {
>      consumerConn.disconnect();
>     } catch (Exception e) {
>      // ignore
>     }
>    }
>   } finally {
>    super.tearDown();
>    consumerConn.closeTransport();
>   }
>  }
>  @Test
>  public void testLVQ() throws Exception {
>   final String name = "lvq";
>   producerConn.connect(defUser, defPass);
>   consumerConn.connect(defUser, defPass);
>   subscribe(consumerConn, "lvqtest", Stomp.Headers.Subscribe.AckModeValues.CLIENT, null, null, name, true, 0);
>   Thread producer = new Thread() {
>    @Override
>    public void run() {
>     try {
>      for (int i = 1; i <= 100; i++) {
>       String uuid = UUID.randomUUID().toString();
>       ClientStompFrame frame = producerConn.createFrame(Stomp.Commands.SEND)
>         .addHeader(Stomp.Headers.Send.DESTINATION, name)
>         .addHeader(Message.HDR_LAST_VALUE_NAME.toString(), "test")
>         .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid)
>         // .addHeader(Stomp.Headers.Send.PERSISTENT, "true")
>         .setBody(String.valueOf(i));
>       frame = producerConn.sendFrame(frame);
>       assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
>       assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
>      }
>     } catch(Exception e) {
>      logger.error(null, e);
>     }
>    }
>   };
>   Thread consumer = new Thread() {
>    @Override
>    public void run() {
>     try {
>      List<ClientStompFrame> messages = new ArrayList<>();
>      ClientStompFrame frame;
>      while((frame = consumerConn.receiveFrame(10000)) != null)
>      {
>       assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
>       ack(consumerConn, null, frame);
>       messages.add(frame);
>      }
>      logger.info("Received messages: {}", messages);
>      Assert.assertEquals(2, messages.size());
>      Assert.assertEquals("1", messages.get(0).getBody());
>      Assert.assertEquals("100", messages.get(1).getBody());
>     } catch(Exception e) {
>      logger.error(null, e);
>     }
>    }
>   };
>   producer.start();
>   producer.join();
>   consumer.start();
>   consumer.join();
>  }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)