You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Lauri Keel (Jira)" <ji...@apache.org> on 2022/10/15 14:42:00 UTC
[jira] [Created] (ARTEMIS-4050) Last value queue not working as expected
Lauri Keel created ARTEMIS-4050:
-----------------------------------
Summary: Last value queue not working as expected
Key: ARTEMIS-4050
URL: https://issues.apache.org/jira/browse/ARTEMIS-4050
Project: ActiveMQ Artemis
Issue Type: Bug
Components: STOMP
Affects Versions: 2.26.0
Reporter: Lauri Keel
Assignee: Justin Bertram
Currently in some cases last value queues deliver all messages to consumers as opposed to only the last one.
I wrote the following test which is working fine:
{code:java}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.stomp;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@RunWith(Parameterized.class)
public class StompLVQTest extends StompTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected StompClientConnection producerConn;
protected StompClientConnection consumerConn;
@Override
protected ActiveMQServer createServer() throws Exception {
ActiveMQServer server = super.createServer();
server.getConfiguration().setAddressQueueScanPeriod(100);
return server;
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
server.createQueue(new QueueConfiguration("lvq").setAddress("lvq").setLastValue(true).setDurable(true));
producerConn = StompClientConnectionFactory.createClientConnection(uri);
consumerConn = StompClientConnectionFactory.createClientConnection(uri);
}
@Override
@After
public void tearDown() throws Exception {
try {
boolean connected = producerConn != null && producerConn.isConnected();
logger.debug("Connection 1.0 connected: {}", connected);
if (connected) {
try {
producerConn.disconnect();
} catch (Exception e) {
// ignore
}
}
} finally {
super.tearDown();
producerConn.closeTransport();
}
try {
boolean connected = consumerConn != null && consumerConn.isConnected();
logger.debug("Connection 1.0 connected: {}", connected);
if (connected) {
try {
consumerConn.disconnect();
} catch (Exception e) {
// ignore
}
}
} finally {
super.tearDown();
consumerConn.closeTransport();
}
}
@Test
public void testLVQ() throws Exception {
final String name = "lvq";
producerConn.connect(defUser, defPass);
consumerConn.connect(defUser, defPass);
subscribe(consumerConn, "lvqtest", Stomp.Headers.Subscribe.AckModeValues.CLIENT, null, null, name, true, 0);
Thread producer = new Thread() {
@Override
public void run() {
try {
for (int i = 1; i <= 100; i++) {
String uuid = UUID.randomUUID().toString();
ClientStompFrame frame = producerConn.createFrame(Stomp.Commands.SEND)
.addHeader(Stomp.Headers.Send.DESTINATION, name)
.addHeader(Message.HDR_LAST_VALUE_NAME.toString(), "test")
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid)
// .addHeader(Stomp.Headers.Send.PERSISTENT, "true")
.setBody(String.valueOf(i));
frame = producerConn.sendFrame(frame);
assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
}
} catch(Exception e) {
logger.error(null, e);
}
}
};
Thread consumer = new Thread() {
@Override
public void run() {
try {
List<ClientStompFrame> messages = new ArrayList<>();
ClientStompFrame frame;
while((frame = consumerConn.receiveFrame(10000)) != null)
{
assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
ack(consumerConn, null, frame);
messages.add(frame);
}
logger.info("Received messages: {}", messages);
Assert.assertEquals(2, messages.size());
Assert.assertEquals("1", messages.get(0).getBody());
Assert.assertEquals("100", messages.get(1).getBody());
} catch(Exception e) {
logger.error(null, e);
}
}
};
producer.start();
producer.join();
consumer.start();
consumer.join();
}
} {code}
The client subscribes before the producer starts sending the messages and receives only the first and the last message which is expected – the first one is in delivery and for the next ones the very last one will be kept.
In the actual application (via stomp-php) it is not working: all messages get delivered to the consumer.
The queue with the issue is created by defining:
{code:java}
<address name="queue">
<anycast>
<queue name="queue" last-value="true">
<durable>true</durable>
</queue>
</anycast>
</address>{code}
which should be the equivalent of the one in the test.
The issue seems to be in QueueImpl::deliver:
{code:java}
ConsumerHolder<? extends Consumer> holder;
if (consumers.hasNext()) {
holder = consumers.next();
} else {
pruneLastValues();
break;
} {code}
where pruneLastValues() should always be called.
If that is the expected behaviour, then I suggest to add an optional feature to always deduplicate the messages.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)