You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/09/06 18:26:37 UTC

[activemq-artemis] branch master updated: ARTEMIS-2472 Persistent and delivering size not right in replacement of lvq message

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new da6fedf  ARTEMIS-2472 Persistent and delivering size not right in replacement of lvq message
     new 0a0b0fc  This closes #2823
da6fedf is described below

commit da6fedf6ae065bebf15cc27ac8c8f4ab8a66dcce
Author: Wei Yang <wy...@gmail.com>
AuthorDate: Fri Sep 6 17:25:11 2019 +0800

    ARTEMIS-2472 Persistent and delivering size not right in replacement of lvq message
---
 .../artemis/core/server/impl/LastValueQueue.java     |  5 ++++-
 .../activemq/artemis/core/server/impl/QueueImpl.java |  5 +++++
 .../artemis/tests/integration/server/LVQTest.java    | 20 ++++++++++++++++++++
 3 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index d802259..44235b0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -166,7 +166,8 @@ public class LastValueQueue extends QueueImpl {
    private void replaceLVQMessage(MessageReference ref, HolderReference hr) {
       MessageReference oldRef = hr.getReference();
 
-      referenceHandled(ref);
+      referenceHandled(oldRef);
+      super.refRemoved(oldRef);
 
       try {
          oldRef.acknowledge(null, AckReason.REPLACED, null);
@@ -175,6 +176,8 @@ public class LastValueQueue extends QueueImpl {
       }
 
       hr.setReference(ref);
+      addRefSize(ref);
+      refAdded(ref);
    }
 
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index a9eab4f..dd7d9cf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2902,6 +2902,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
+   protected void addRefSize(MessageReference ref) {
+      queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
+      pendingMetrics.incrementMetrics(ref);
+   }
+
    protected void refAdded(final MessageReference ref) {
       if (ref.isPaged()) {
          pagedReferences.incrementAndGet();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
index 432072c..f9df972 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
@@ -671,6 +671,26 @@ public class LVQTest extends ActiveMQTestBase {
       clientSessionTxReceives.commit();
    }
 
+   @Test
+   public void testSizeInReplace() throws Exception {
+      ClientProducer producer = clientSession.createProducer(address);
+      ClientMessage m1 = createTextMessage(clientSession, "m1");
+      SimpleString rh = new SimpleString("SMID1");
+      m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
+
+      ClientMessage m2 = clientSession.createMessage(true);
+      m2.setBodyInputStream(createFakeLargeStream(10 * 1024));
+      m2.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
+
+      Queue queue = server.locateQueue(qName1);
+      producer.send(m1);
+      long oldSize = queue.getPersistentSize();
+      producer.send(m2);
+      assertEquals(queue.getDeliveringSize(), 0);
+      assertNotEquals(queue.getPersistentSize(), oldSize);
+      assertTrue(queue.getPersistentSize() > 10 * 1024);
+   }
+
    @Override
    @Before
    public void setUp() throws Exception {