You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/11/28 15:03:55 UTC
[activemq-artemis] 07/13: ARTEMIS-4085 exclusive LVQ sending all messages to consumer
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch 2.27.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit f1030ddc19c0b317164b4e20aa1b8af66bb3b659
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Mon Nov 7 15:07:35 2022 -0600
ARTEMIS-4085 exclusive LVQ sending all messages to consumer
(cherry picked from commit ca580814de2c2f4466d00e76d5cf70935a94ff81)
---
.../artemis/core/server/impl/QueueImpl.java | 1 +
.../artemis/tests/integration/server/LVQTest.java | 24 ++++
.../tests/integration/stomp/StompLVQTest.java | 134 +++++++++++++++++++++
3 files changed, 159 insertions(+)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index d222d192fd..488109c83e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -3155,6 +3155,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (groupConsumer != null) {
if (noDelivery > 0) {
+ pruneLastValues();
break;
}
noDelivery = 0;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
index a37b690d4d..4c8d14a34d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
@@ -86,6 +86,30 @@ public class LVQTest extends ActiveMQTestBase {
Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
}
+ @Test
+ public void testSimpleExclusive() throws Exception {
+ ServerLocator locator = createNettyNonHALocator().setConsumerWindowSize(0);
+ ClientSessionFactory sf = createSessionFactory(locator);
+ ClientSession clientSession = addClientSession(sf.createSession(false, true, true));
+ final String EXCLUSIVE_QUEUE = "exclusiveQueue";
+
+ clientSession.createQueue(new QueueConfiguration(EXCLUSIVE_QUEUE).setExclusive(true).setLastValue(true));
+ ClientProducer producer = clientSession.createProducer(EXCLUSIVE_QUEUE);
+ ClientConsumer consumer = clientSession.createConsumer(EXCLUSIVE_QUEUE);
+ clientSession.start();
+ ClientMessage m1 = createTextMessage(clientSession, "m1");
+ SimpleString rh = new SimpleString("SMID1");
+ m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
+ ClientMessage m2 = createTextMessage(clientSession, "m2");
+ m2.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
+ producer.send(m1);
+ producer.send(m2);
+ ClientMessage m = consumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
+ }
+
@Test
public void testSimpleRestart() throws Exception {
ClientProducer producer = clientSession.createProducer(address);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompLVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompLVQTest.java
new file mode 100644
index 0000000000..8004199848
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompLVQTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.stomp;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public class StompLVQTest extends StompTestBase {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ protected StompClientConnection producerConn;
+ protected StompClientConnection consumerConn;
+
+ private final String queue = "lvq";
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ server.createQueue(new QueueConfiguration(queue).setLastValue(true).setExclusive(true));
+
+ producerConn = StompClientConnectionFactory.createClientConnection(uri);
+ consumerConn = StompClientConnectionFactory.createClientConnection(uri);
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ try {
+ if (producerConn != null && producerConn.isConnected()) {
+ try {
+ producerConn.disconnect();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ } finally {
+ producerConn.closeTransport();
+ }
+
+ try {
+ if (consumerConn != null && consumerConn.isConnected()) {
+ try {
+ consumerConn.disconnect();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ } finally {
+ consumerConn.closeTransport();
+ }
+
+ super.tearDown();
+ }
+
+ @Test
+ public void testLVQ() throws Exception {
+
+ producerConn.connect(defUser, defPass);
+ consumerConn.connect(defUser, defPass);
+
+ subscribe(consumerConn, "lvqtest", Stomp.Headers.Subscribe.AckModeValues.CLIENT, null, null, queue, true, 0);
+
+ try {
+ for (int i = 1; i <= 100; i++) {
+ String uuid = UUID.randomUUID().toString();
+
+ ClientStompFrame frame = producerConn.sendFrame(producerConn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION, queue)
+ .addHeader(Message.HDR_LAST_VALUE_NAME.toString(), "test")
+ .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid)
+ .setBody(String.valueOf(i)));
+
+ assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
+ assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
+ }
+ } catch (Exception e) {
+ logger.error(null, e);
+ }
+
+ List<ClientStompFrame> messages = new ArrayList<>();
+ try {
+ ClientStompFrame frame;
+
+ while ((frame = consumerConn.receiveFrame(10000)) != null) {
+ assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+
+ ack(consumerConn, null, frame);
+
+ messages.add(frame);
+ }
+ } catch (Exception e) {
+ logger.error(null, e);
+ }
+
+ Assert.assertEquals(2, messages.size());
+ Assert.assertEquals("1", messages.get(0).getBody());
+ Assert.assertEquals("100", messages.get(1).getBody());
+ }
+}
\ No newline at end of file