You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Lauri Keel (Jira)" <ji...@apache.org> on 2022/10/15 14:44:00 UTC
[jira] [Updated] (ARTEMIS-4050) Last value queue not working as expected
[ https://issues.apache.org/jira/browse/ARTEMIS-4050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lauri Keel updated ARTEMIS-4050:
--------------------------------
Description:
Currently in some cases last value queues deliver all messages to consumers as opposed to only the last one.
I wrote the following test which is working fine:
{code:java}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.stomp;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@RunWith(Parameterized.class)
public class StompLVQTest extends StompTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected StompClientConnection producerConn;
protected StompClientConnection consumerConn;
@Override
protected ActiveMQServer createServer() throws Exception {
ActiveMQServer server = super.createServer();
server.getConfiguration().setAddressQueueScanPeriod(100);
return server;
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
server.createQueue(new QueueConfiguration("lvq").setAddress("lvq").setLastValue(true).setDurable(true));
producerConn = StompClientConnectionFactory.createClientConnection(uri);
consumerConn = StompClientConnectionFactory.createClientConnection(uri);
}
@Override
@After
public void tearDown() throws Exception {
try {
boolean connected = producerConn != null && producerConn.isConnected();
logger.debug("Connection 1.0 connected: {}", connected);
if (connected) {
try {
producerConn.disconnect();
} catch (Exception e) {
// ignore
}
}
} finally {
super.tearDown();
producerConn.closeTransport();
}
try {
boolean connected = consumerConn != null && consumerConn.isConnected();
logger.debug("Connection 1.0 connected: {}", connected);
if (connected) {
try {
consumerConn.disconnect();
} catch (Exception e) {
// ignore
}
}
} finally {
super.tearDown();
consumerConn.closeTransport();
}
}
@Test
public void testLVQ() throws Exception {
final String name = "lvq";
producerConn.connect(defUser, defPass);
consumerConn.connect(defUser, defPass);
subscribe(consumerConn, "lvqtest", Stomp.Headers.Subscribe.AckModeValues.CLIENT, null, null, name, true, 0);
Thread producer = new Thread() {
@Override
public void run() {
try {
for (int i = 1; i <= 100; i++) {
String uuid = UUID.randomUUID().toString();
ClientStompFrame frame = producerConn.createFrame(Stomp.Commands.SEND)
.addHeader(Stomp.Headers.Send.DESTINATION, name)
.addHeader(Message.HDR_LAST_VALUE_NAME.toString(), "test")
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid)
// .addHeader(Stomp.Headers.Send.PERSISTENT, "true")
.setBody(String.valueOf(i));
frame = producerConn.sendFrame(frame);
assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
}
} catch(Exception e) {
logger.error(null, e);
}
}
};
Thread consumer = new Thread() {
@Override
public void run() {
try {
List<ClientStompFrame> messages = new ArrayList<>();
ClientStompFrame frame;
while((frame = consumerConn.receiveFrame(10000)) != null)
{
assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
ack(consumerConn, null, frame);
messages.add(frame);
}
logger.info("Received messages: {}", messages);
Assert.assertEquals(2, messages.size());
Assert.assertEquals("1", messages.get(0).getBody());
Assert.assertEquals("100", messages.get(1).getBody());
} catch(Exception e) {
logger.error(null, e);
}
}
};
producer.start();
producer.join();
consumer.start();
consumer.join();
}
} {code}
The client subscribes before the producer starts sending the messages and receives only the first and the last message which is expected – the first one is in delivery and for the next ones the very last one will be kept.
In the actual application (via stomp-php) it is not working: all messages get delivered to the consumer.
The queue with the issue is created by defining:
{code:java}
<address name="queue">
<anycast>
<queue name="queue" last-value="true">
<durable>true</durable>
</queue>
</anycast>
</address>{code}
which should be the equivalent of the one in the test.
The issue seems to be in QueueImpl::deliver:
{code:java}
ConsumerHolder<? extends Consumer> holder;
if (consumers.hasNext()) {
holder = consumers.next();
} else {
pruneLastValues();
break;
} {code}
where pruneLastValues() should always be called.
If that is the expected behaviour, then I suggest to add an optional feature to always deduplicate the messages.
was:
Currently in some cases last value queues deliver all messages to consumers as opposed to only the last one.
I wrote the following test which is working fine:
{code:java}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.stomp;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@RunWith(Parameterized.class)
public class StompLVQTest extends StompTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected StompClientConnection producerConn;
protected StompClientConnection consumerConn;
@Override
protected ActiveMQServer createServer() throws Exception {
ActiveMQServer server = super.createServer();
server.getConfiguration().setAddressQueueScanPeriod(100);
return server;
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
server.createQueue(new QueueConfiguration("lvq").setAddress("lvq").setLastValue(true).setDurable(true));
producerConn = StompClientConnectionFactory.createClientConnection(uri);
consumerConn = StompClientConnectionFactory.createClientConnection(uri);
}
@Override
@After
public void tearDown() throws Exception {
try {
boolean connected = producerConn != null && producerConn.isConnected();
logger.debug("Connection 1.0 connected: {}", connected);
if (connected) {
try {
producerConn.disconnect();
} catch (Exception e) {
// ignore
}
}
} finally {
super.tearDown();
producerConn.closeTransport();
}
try {
boolean connected = consumerConn != null && consumerConn.isConnected();
logger.debug("Connection 1.0 connected: {}", connected);
if (connected) {
try {
consumerConn.disconnect();
} catch (Exception e) {
// ignore
}
}
} finally {
super.tearDown();
consumerConn.closeTransport();
}
}
@Test
public void testLVQ() throws Exception {
final String name = "lvq";
producerConn.connect(defUser, defPass);
consumerConn.connect(defUser, defPass);
subscribe(consumerConn, "lvqtest", Stomp.Headers.Subscribe.AckModeValues.CLIENT, null, null, name, true, 0);
Thread producer = new Thread() {
@Override
public void run() {
try {
for (int i = 1; i <= 100; i++) {
String uuid = UUID.randomUUID().toString();
ClientStompFrame frame = producerConn.createFrame(Stomp.Commands.SEND)
.addHeader(Stomp.Headers.Send.DESTINATION, name)
.addHeader(Message.HDR_LAST_VALUE_NAME.toString(), "test")
.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid)
// .addHeader(Stomp.Headers.Send.PERSISTENT, "true")
.setBody(String.valueOf(i));
frame = producerConn.sendFrame(frame);
assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
}
} catch(Exception e) {
logger.error(null, e);
}
}
};
Thread consumer = new Thread() {
@Override
public void run() {
try {
List<ClientStompFrame> messages = new ArrayList<>();
ClientStompFrame frame;
while((frame = consumerConn.receiveFrame(10000)) != null)
{
assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
ack(consumerConn, null, frame);
messages.add(frame);
}
logger.info("Received messages: {}", messages);
Assert.assertEquals(2, messages.size());
Assert.assertEquals("1", messages.get(0).getBody());
Assert.assertEquals("100", messages.get(1).getBody());
} catch(Exception e) {
logger.error(null, e);
}
}
};
producer.start();
producer.join();
consumer.start();
consumer.join();
}
} {code}
The client subscribes before the producer starts sending the messages and receives only the first and the last message which is expected – the first one is in delivery and for the next ones the very last one will be kept.
In the actual application (via stomp-php) it is not working: all messages get delivered to the consumer.
The queue with the issue is created by defining:
{code:java}
<address name="queue">
<anycast>
<queue name="queue" last-value="true">
<durable>true</durable>
</queue>
</anycast>
</address>{code}
which should be the equivalent of the one in the test.
The issue seems to be in QueueImpl::deliver:
{code:java}
ConsumerHolder<? extends Consumer> holder;
if (consumers.hasNext()) {
holder = consumers.next();
} else {
pruneLastValues();
break;
} {code}
where pruneLastValues() should always be called.
If that is the expected behaviour, then I suggest to add an optional feature to always deduplicate the messages.
> Last value queue not working as expected
> ----------------------------------------
>
> Key: ARTEMIS-4050
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4050
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Components: STOMP
> Affects Versions: 2.26.0
> Reporter: Lauri Keel
> Assignee: Justin Bertram
> Priority: Major
>
> Currently in some cases last value queues deliver all messages to consumers as opposed to only the last one.
> I wrote the following test which is working fine:
> {code:java}
> /*
> * Licensed to the Apache Software Foundation (ASF) under one or more
> * contributor license agreements. See the NOTICE file distributed with
> * this work for additional information regarding copyright ownership.
> * The ASF licenses this file to You under the Apache License, Version 2.0
> * (the "License"); you may not use this file except in compliance with
> * the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> package org.apache.activemq.artemis.tests.integration.stomp;
> import org.apache.activemq.artemis.api.core.Message;
> import org.apache.activemq.artemis.api.core.QueueConfiguration;
> import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
> import org.apache.activemq.artemis.core.server.ActiveMQServer;
> import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
> import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
> import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
> import org.junit.After;
> import org.junit.Assert;
> import org.junit.Before;
> import org.junit.Test;
> import org.junit.runner.RunWith;
> import org.junit.runners.Parameterized;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import java.lang.invoke.MethodHandles;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.UUID;
> @RunWith(Parameterized.class)
> public class StompLVQTest extends StompTestBase {
> private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
> protected StompClientConnection producerConn;
> protected StompClientConnection consumerConn;
> @Override
> protected ActiveMQServer createServer() throws Exception {
> ActiveMQServer server = super.createServer();
> server.getConfiguration().setAddressQueueScanPeriod(100);
> return server;
> }
> @Override
> @Before
> public void setUp() throws Exception {
> super.setUp();
> server.createQueue(new QueueConfiguration("lvq").setAddress("lvq").setLastValue(true).setDurable(true));
> producerConn = StompClientConnectionFactory.createClientConnection(uri);
> consumerConn = StompClientConnectionFactory.createClientConnection(uri);
> }
> @Override
> @After
> public void tearDown() throws Exception {
> try {
> boolean connected = producerConn != null && producerConn.isConnected();
> logger.debug("Connection 1.0 connected: {}", connected);
> if (connected) {
> try {
> producerConn.disconnect();
> } catch (Exception e) {
> // ignore
> }
> }
> } finally {
> super.tearDown();
> producerConn.closeTransport();
> }
> try {
> boolean connected = consumerConn != null && consumerConn.isConnected();
> logger.debug("Connection 1.0 connected: {}", connected);
> if (connected) {
> try {
> consumerConn.disconnect();
> } catch (Exception e) {
> // ignore
> }
> }
> } finally {
> super.tearDown();
> consumerConn.closeTransport();
> }
> }
> @Test
> public void testLVQ() throws Exception {
> final String name = "lvq";
> producerConn.connect(defUser, defPass);
> consumerConn.connect(defUser, defPass);
> subscribe(consumerConn, "lvqtest", Stomp.Headers.Subscribe.AckModeValues.CLIENT, null, null, name, true, 0);
> Thread producer = new Thread() {
> @Override
> public void run() {
> try {
> for (int i = 1; i <= 100; i++) {
> String uuid = UUID.randomUUID().toString();
> ClientStompFrame frame = producerConn.createFrame(Stomp.Commands.SEND)
> .addHeader(Stomp.Headers.Send.DESTINATION, name)
> .addHeader(Message.HDR_LAST_VALUE_NAME.toString(), "test")
> .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid)
> // .addHeader(Stomp.Headers.Send.PERSISTENT, "true")
> .setBody(String.valueOf(i));
> frame = producerConn.sendFrame(frame);
> assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
> assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
> }
> } catch(Exception e) {
> logger.error(null, e);
> }
> }
> };
> Thread consumer = new Thread() {
> @Override
> public void run() {
> try {
> List<ClientStompFrame> messages = new ArrayList<>();
> ClientStompFrame frame;
> while((frame = consumerConn.receiveFrame(10000)) != null)
> {
> assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
> ack(consumerConn, null, frame);
> messages.add(frame);
> }
> logger.info("Received messages: {}", messages);
> Assert.assertEquals(2, messages.size());
> Assert.assertEquals("1", messages.get(0).getBody());
> Assert.assertEquals("100", messages.get(1).getBody());
> } catch(Exception e) {
> logger.error(null, e);
> }
> }
> };
> producer.start();
> producer.join();
> consumer.start();
> consumer.join();
> }
> } {code}
> The client subscribes before the producer starts sending the messages and receives only the first and the last message which is expected – the first one is in delivery and for the next ones the very last one will be kept.
> In the actual application (via stomp-php) it is not working: all messages get delivered to the consumer.
> The queue with the issue is created by defining:
> {code:java}
> <address name="queue">
> <anycast>
> <queue name="queue" last-value="true">
> <durable>true</durable>
> </queue>
> </anycast>
> </address>{code}
> which should be the equivalent of the one in the test.
> The issue seems to be in QueueImpl::deliver:
> {code:java}
> ConsumerHolder<? extends Consumer> holder;
> if (consumers.hasNext()) {
> holder = consumers.next();
> } else {
> pruneLastValues();
> break;
> } {code}
> where pruneLastValues() should always be called.
> If that is the expected behaviour, then I suggest to add an optional feature to always deduplicate the messages.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)