You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "ASF subversion and git services (Jira)" <ji...@apache.org> on 2022/11/10 19:00:00 UTC
[jira] [Commented] (ARTEMIS-4085) Exclusive LVQ not working as expected
[ https://issues.apache.org/jira/browse/ARTEMIS-4085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17631864#comment-17631864 ]
ASF subversion and git services commented on ARTEMIS-4085:
----------------------------------------------------------
Commit ca580814de2c2f4466d00e76d5cf70935a94ff81 in activemq-artemis's branch refs/heads/main from Justin Bertram
[ https://gitbox.apache.org/repos/asf?p=activemq-artemis.git;h=ca580814de ]
ARTEMIS-4085 exclusive LVQ sending all messages to consumer
> Exclusive LVQ not working as expected
> -------------------------------------
>
> Key: ARTEMIS-4085
> URL: https://issues.apache.org/jira/browse/ARTEMIS-4085
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Components: STOMP
> Affects Versions: 2.26.0
> Reporter: Lauri Keel
> Assignee: Justin Bertram
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Currently exclusive last value queues deliver all messages to consumers as opposed to only the last one.
> I wrote the following test:
> {code:java}
> /*
> * Licensed to the Apache Software Foundation (ASF) under one or more
> * contributor license agreements. See the NOTICE file distributed with
> * this work for additional information regarding copyright ownership.
> * The ASF licenses this file to You under the Apache License, Version 2.0
> * (the "License"); you may not use this file except in compliance with
> * the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> package org.apache.activemq.artemis.tests.integration.stomp;
> import java.lang.invoke.MethodHandles;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.UUID;
> import org.apache.activemq.artemis.api.core.Message;
> import org.apache.activemq.artemis.api.core.QueueConfiguration;
> import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
> import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
> import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
> import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
> import org.junit.After;
> import org.junit.Assert;
> import org.junit.Before;
> import org.junit.Test;
> import org.junit.runner.RunWith;
> import org.junit.runners.Parameterized;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> @RunWith(Parameterized.class)
> public class StompLVQTest extends StompTestBase {
> private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
> protected StompClientConnection producerConn;
> protected StompClientConnection consumerConn;
> private final String queue = "lvq";
> @Override
> @Before
> public void setUp() throws Exception {
> super.setUp();
> server.createQueue(new QueueConfiguration(queue).setLastValue(true).setExclusive(true));
> producerConn = StompClientConnectionFactory.createClientConnection(uri);
> consumerConn = StompClientConnectionFactory.createClientConnection(uri);
> }
> @Override
> @After
> public void tearDown() throws Exception {
> try {
> if (producerConn != null && producerConn.isConnected()) {
> try {
> producerConn.disconnect();
> } catch (Exception e) {
> // ignore
> }
> }
> } finally {
> producerConn.closeTransport();
> }
> try {
> if (consumerConn != null && consumerConn.isConnected()) {
> try {
> consumerConn.disconnect();
> } catch (Exception e) {
> // ignore
> }
> }
> } finally {
> consumerConn.closeTransport();
> }
> super.tearDown();
> }
> @Test
> public void testLVQ() throws Exception {
> producerConn.connect(defUser, defPass);
> consumerConn.connect(defUser, defPass);
> subscribe(consumerConn, "lvqtest", Stomp.Headers.Subscribe.AckModeValues.CLIENT, null, null, queue, true, 0);
> try {
> for (int i = 1; i <= 100; i++) {
> String uuid = UUID.randomUUID().toString();
> ClientStompFrame frame = producerConn.sendFrame(producerConn.createFrame(Stomp.Commands.SEND)
> .addHeader(Stomp.Headers.Send.DESTINATION, queue)
> .addHeader(Message.HDR_LAST_VALUE_NAME.toString(), "test")
> .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid)
> .setBody(String.valueOf(i)));
> assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
> assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
> }
> } catch (Exception e) {
> logger.error(null, e);
> }
> List<ClientStompFrame> messages = new ArrayList<>();
> try {
> ClientStompFrame frame;
> while ((frame = consumerConn.receiveFrame(10000)) != null) {
> assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
> ack(consumerConn, null, frame);
> messages.add(frame);
> }
> } catch (Exception e) {
> logger.error(null, e);
> }
> Assert.assertEquals(2, messages.size());
> Assert.assertEquals("1", messages.get(0).getBody());
> Assert.assertEquals("100", messages.get(1).getBody());
> }
> }{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)