You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/10/22 17:41:29 UTC
[activemq-artemis] branch master updated: ARTEMIS-2927 LVQ broken
after restart
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new f6ef285 ARTEMIS-2927 LVQ broken after restart
new aa975ad This closes #3285
f6ef285 is described below
commit f6ef285859347a57749ac4e5881d879df23e4f88
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Thu Oct 22 13:32:19 2020 -0400
ARTEMIS-2927 LVQ broken after restart
---
.../artemis/core/server/impl/LastValueQueue.java | 18 ++++++++++++
.../artemis/tests/integration/server/LVQTest.java | 34 ++++++++++++++++++++++
2 files changed, 52 insertions(+)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 3cb3d09..a93d27c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -285,6 +285,19 @@ public class LastValueQueue extends QueueImpl {
super.acknowledge(tx, ref, reason, consumer);
}
+ @Override
+ public synchronized void reload(final MessageReference ref) {
+ // repopulate LVQ map & reload proper HolderReferences
+ SimpleString lastValueProp = ref.getLastValueProperty();
+ if (lastValueProp != null) {
+ HolderReference hr = new HolderReference(lastValueProp, ref);
+ map.put(lastValueProp, hr);
+ super.reload(hr);
+ } else {
+ super.reload(ref);
+ }
+ }
+
private synchronized void removeIfCurrent(MessageReference ref) {
SimpleString lastValueProp = ref.getLastValueProperty();
if (lastValueProp != null) {
@@ -540,6 +553,11 @@ public class LastValueQueue extends QueueImpl {
}
@Override
+ public String toString() {
+ return new StringBuilder().append("HolderReference").append("@").append(Integer.toHexString(System.identityHashCode(this))).append("[ref=").append(ref).append("]").toString();
+ }
+
+ @Override
public PagingStore getOwner() {
return ref.getOwner();
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
index d35a96b..b040f6b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
@@ -77,6 +77,40 @@ public class LVQTest extends ActiveMQTestBase {
}
@Test
+ public void testSimpleRestart() throws Exception {
+ ClientProducer producer = clientSession.createProducer(address);
+ ClientMessage m1 = createTextMessage(clientSession, "m1");
+ SimpleString rh = new SimpleString("SMID1");
+ m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
+ producer.send(m1);
+ ClientMessage m2 = createTextMessage(clientSession, "m2");
+ m2.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
+ producer.send(m2);
+ assertEquals(1, server.locateQueue(qName1).getMessageCount());
+ clientSession.close();
+
+ server.stop();
+ server.start();
+
+ assertEquals(1, server.locateQueue(qName1).getMessageCount());
+ ServerLocator locator = createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0);
+ ClientSessionFactory sf = createSessionFactory(locator);
+ clientSession = addClientSession(sf.createSession(false, true, true));
+ producer = clientSession.createProducer(address);
+ ClientMessage m3 = createTextMessage(clientSession, "m3");
+ m3.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
+ producer.send(m3);
+ assertEquals(1, server.locateQueue(qName1).getMessageCount());
+
+ ClientConsumer consumer = clientSession.createConsumer(qName1);
+ clientSession.start();
+ ClientMessage m = consumer.receive(1000);
+ Assert.assertNotNull(m);
+ m.acknowledge();
+ Assert.assertEquals("m3", m.getBodyBuffer().readString());
+ }
+
+ @Test
public void testMultipleMessages() throws Exception {
ClientProducer producer = clientSession.createProducer(address);
ClientConsumer consumer = clientSession.createConsumer(qName1);