You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/10/22 17:41:29 UTC

[activemq-artemis] branch master updated: ARTEMIS-2927 LVQ broken after restart

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new f6ef285  ARTEMIS-2927 LVQ broken after restart
     new aa975ad  This closes #3285
f6ef285 is described below

commit f6ef285859347a57749ac4e5881d879df23e4f88
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Thu Oct 22 13:32:19 2020 -0400

    ARTEMIS-2927 LVQ broken after restart
---
 .../artemis/core/server/impl/LastValueQueue.java   | 18 ++++++++++++
 .../artemis/tests/integration/server/LVQTest.java  | 34 ++++++++++++++++++++++
 2 files changed, 52 insertions(+)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 3cb3d09..a93d27c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -285,6 +285,19 @@ public class LastValueQueue extends QueueImpl {
       super.acknowledge(tx, ref, reason, consumer);
    }
 
+   @Override
+   public synchronized void reload(final MessageReference ref) {
+      // repopulate LVQ map & reload proper HolderReferences
+      SimpleString lastValueProp = ref.getLastValueProperty();
+      if (lastValueProp != null) {
+         HolderReference hr = new HolderReference(lastValueProp, ref);
+         map.put(lastValueProp, hr);
+         super.reload(hr);
+      } else {
+         super.reload(ref);
+      }
+   }
+
    private synchronized void removeIfCurrent(MessageReference ref) {
       SimpleString lastValueProp = ref.getLastValueProperty();
       if (lastValueProp != null) {
@@ -540,6 +553,11 @@ public class LastValueQueue extends QueueImpl {
       }
 
       @Override
+      public String toString() {
+         return new StringBuilder().append("HolderReference").append("@").append(Integer.toHexString(System.identityHashCode(this))).append("[ref=").append(ref).append("]").toString();
+      }
+
+      @Override
       public PagingStore getOwner() {
          return ref.getOwner();
       }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
index d35a96b..b040f6b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
@@ -77,6 +77,40 @@ public class LVQTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testSimpleRestart() throws Exception {
+      ClientProducer producer = clientSession.createProducer(address);
+      ClientMessage m1 = createTextMessage(clientSession, "m1");
+      SimpleString rh = new SimpleString("SMID1");
+      m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
+      producer.send(m1);
+      ClientMessage m2 = createTextMessage(clientSession, "m2");
+      m2.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
+      producer.send(m2);
+      assertEquals(1, server.locateQueue(qName1).getMessageCount());
+      clientSession.close();
+
+      server.stop();
+      server.start();
+
+      assertEquals(1, server.locateQueue(qName1).getMessageCount());
+      ServerLocator locator = createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0);
+      ClientSessionFactory sf = createSessionFactory(locator);
+      clientSession = addClientSession(sf.createSession(false, true, true));
+      producer = clientSession.createProducer(address);
+      ClientMessage m3 = createTextMessage(clientSession, "m3");
+      m3.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
+      producer.send(m3);
+      assertEquals(1, server.locateQueue(qName1).getMessageCount());
+
+      ClientConsumer consumer = clientSession.createConsumer(qName1);
+      clientSession.start();
+      ClientMessage m = consumer.receive(1000);
+      Assert.assertNotNull(m);
+      m.acknowledge();
+      Assert.assertEquals("m3", m.getBodyBuffer().readString());
+   }
+
+   @Test
    public void testMultipleMessages() throws Exception {
       ClientProducer producer = clientSession.createProducer(address);
       ClientConsumer consumer = clientSession.createConsumer(qName1);