You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/09/06 18:26:37 UTC
[activemq-artemis] branch master updated: ARTEMIS-2472 Persistent
and delivering size not right in replacement of lvq message
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new da6fedf ARTEMIS-2472 Persistent and delivering size not right in replacement of lvq message
new 0a0b0fc This closes #2823
da6fedf is described below
commit da6fedf6ae065bebf15cc27ac8c8f4ab8a66dcce
Author: Wei Yang <wy...@gmail.com>
AuthorDate: Fri Sep 6 17:25:11 2019 +0800
ARTEMIS-2472 Persistent and delivering size not right in replacement of lvq message
---
.../artemis/core/server/impl/LastValueQueue.java | 5 ++++-
.../activemq/artemis/core/server/impl/QueueImpl.java | 5 +++++
.../artemis/tests/integration/server/LVQTest.java | 20 ++++++++++++++++++++
3 files changed, 29 insertions(+), 1 deletion(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index d802259..44235b0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -166,7 +166,8 @@ public class LastValueQueue extends QueueImpl {
private void replaceLVQMessage(MessageReference ref, HolderReference hr) {
MessageReference oldRef = hr.getReference();
- referenceHandled(ref);
+ referenceHandled(oldRef);
+ super.refRemoved(oldRef);
try {
oldRef.acknowledge(null, AckReason.REPLACED, null);
@@ -175,6 +176,8 @@ public class LastValueQueue extends QueueImpl {
}
hr.setReference(ref);
+ addRefSize(ref);
+ refAdded(ref);
}
@Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index a9eab4f..dd7d9cf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2902,6 +2902,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
+ protected void addRefSize(MessageReference ref) {
+ queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
+ pendingMetrics.incrementMetrics(ref);
+ }
+
protected void refAdded(final MessageReference ref) {
if (ref.isPaged()) {
pagedReferences.incrementAndGet();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
index 432072c..f9df972 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
@@ -671,6 +671,26 @@ public class LVQTest extends ActiveMQTestBase {
clientSessionTxReceives.commit();
}
+ @Test
+ public void testSizeInReplace() throws Exception {
+ ClientProducer producer = clientSession.createProducer(address);
+ ClientMessage m1 = createTextMessage(clientSession, "m1");
+ SimpleString rh = new SimpleString("SMID1");
+ m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
+
+ ClientMessage m2 = clientSession.createMessage(true);
+ m2.setBodyInputStream(createFakeLargeStream(10 * 1024));
+ m2.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
+
+ Queue queue = server.locateQueue(qName1);
+ producer.send(m1);
+ long oldSize = queue.getPersistentSize();
+ producer.send(m2);
+ assertEquals(queue.getDeliveringSize(), 0);
+ assertNotEquals(queue.getPersistentSize(), oldSize);
+ assertTrue(queue.getPersistentSize() > 10 * 1024);
+ }
+
@Override
@Before
public void setUp() throws Exception {