You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/11/10 18:59:01 UTC

[activemq-artemis] branch main updated: ARTEMIS-4085 exclusive LVQ sending all messages to consumer

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new ca580814de ARTEMIS-4085 exclusive LVQ sending all messages to consumer
ca580814de is described below

commit ca580814de2c2f4466d00e76d5cf70935a94ff81
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Mon Nov 7 15:07:35 2022 -0600

    ARTEMIS-4085 exclusive LVQ sending all messages to consumer
---
 .../artemis/core/server/impl/QueueImpl.java        |   1 +
 .../artemis/tests/integration/server/LVQTest.java  |  24 ++++
 .../tests/integration/stomp/StompLVQTest.java      | 134 +++++++++++++++++++++
 3 files changed, 159 insertions(+)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index d222d192fd..488109c83e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -3155,6 +3155,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
             if (groupConsumer != null) {
                if (noDelivery > 0) {
+                  pruneLastValues();
                   break;
                }
                noDelivery = 0;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
index a37b690d4d..4c8d14a34d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
@@ -86,6 +86,30 @@ public class LVQTest extends ActiveMQTestBase {
       Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
    }
 
+   @Test
+   public void testSimpleExclusive() throws Exception {
+      ServerLocator locator = createNettyNonHALocator().setConsumerWindowSize(0);
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession clientSession = addClientSession(sf.createSession(false, true, true));
+      final String EXCLUSIVE_QUEUE = "exclusiveQueue";
+
+      clientSession.createQueue(new QueueConfiguration(EXCLUSIVE_QUEUE).setExclusive(true).setLastValue(true));
+      ClientProducer producer = clientSession.createProducer(EXCLUSIVE_QUEUE);
+      ClientConsumer consumer = clientSession.createConsumer(EXCLUSIVE_QUEUE);
+      clientSession.start();
+      ClientMessage m1 = createTextMessage(clientSession, "m1");
+      SimpleString rh = new SimpleString("SMID1");
+      m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
+      ClientMessage m2 = createTextMessage(clientSession, "m2");
+      m2.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
+      producer.send(m1);
+      producer.send(m2);
+      ClientMessage m = consumer.receive(1000);
+      Assert.assertNotNull(m);
+      m.acknowledge();
+      Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
+   }
+
    @Test
    public void testSimpleRestart() throws Exception {
       ClientProducer producer = clientSession.createProducer(address);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompLVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompLVQTest.java
new file mode 100644
index 0000000000..8004199848
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompLVQTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.stomp;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public class StompLVQTest extends StompTestBase {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   protected StompClientConnection producerConn;
+   protected StompClientConnection consumerConn;
+
+   private final String queue = "lvq";
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      server.createQueue(new QueueConfiguration(queue).setLastValue(true).setExclusive(true));
+
+      producerConn = StompClientConnectionFactory.createClientConnection(uri);
+      consumerConn = StompClientConnectionFactory.createClientConnection(uri);
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      try {
+         if (producerConn != null && producerConn.isConnected()) {
+            try {
+               producerConn.disconnect();
+            } catch (Exception e) {
+               // ignore
+            }
+         }
+      } finally {
+         producerConn.closeTransport();
+      }
+
+      try {
+         if (consumerConn != null && consumerConn.isConnected()) {
+            try {
+               consumerConn.disconnect();
+            } catch (Exception e) {
+               // ignore
+            }
+         }
+      } finally {
+         consumerConn.closeTransport();
+      }
+
+      super.tearDown();
+   }
+
+   @Test
+   public void testLVQ() throws Exception {
+
+      producerConn.connect(defUser, defPass);
+      consumerConn.connect(defUser, defPass);
+
+      subscribe(consumerConn, "lvqtest", Stomp.Headers.Subscribe.AckModeValues.CLIENT, null, null, queue, true, 0);
+
+      try {
+         for (int i = 1; i <= 100; i++) {
+            String uuid = UUID.randomUUID().toString();
+
+            ClientStompFrame frame = producerConn.sendFrame(producerConn.createFrame(Stomp.Commands.SEND)
+                                                       .addHeader(Stomp.Headers.Send.DESTINATION, queue)
+                                                       .addHeader(Message.HDR_LAST_VALUE_NAME.toString(), "test")
+                                                       .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid)
+                                                       .setBody(String.valueOf(i)));
+
+            assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
+            assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
+         }
+      } catch (Exception e) {
+         logger.error(null, e);
+      }
+
+      List<ClientStompFrame> messages = new ArrayList<>();
+      try {
+         ClientStompFrame frame;
+
+         while ((frame = consumerConn.receiveFrame(10000)) != null) {
+            assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+
+            ack(consumerConn, null, frame);
+
+            messages.add(frame);
+         }
+      } catch (Exception e) {
+         logger.error(null, e);
+      }
+
+      Assert.assertEquals(2, messages.size());
+      Assert.assertEquals("1", messages.get(0).getBody());
+      Assert.assertEquals("100", messages.get(1).getBody());
+   }
+}
\ No newline at end of file