You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/09/11 13:41:49 UTC
[activemq-artemis] 01/03: ARTEMIS-2478 Expired message not removed
in non destructive queue
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit e43c5390cf11655a883b9f96f608e65bdd21d6fa
Author: Wei Yang <wy...@gmail.com>
AuthorDate: Mon Sep 9 18:45:48 2019 +0800
ARTEMIS-2478 Expired message not removed in non destructive queue
---
.../activemq/artemis/core/server/impl/QueueImpl.java | 14 +++++---------
.../tests/integration/amqp/JMSNonDestructiveTest.java | 10 ++++++----
2 files changed, 11 insertions(+), 13 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index dd7d9cf..d18ecbf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2745,9 +2745,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
logger.trace("Reference " + ref + " being expired");
}
removeMessageReference(holder, ref);
-
-
-
handled++;
consumers.reset();
continue;
@@ -2778,8 +2775,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
deliveriesInTransit.countUp();
-
- removeMessageReference(holder, ref);
+ if (!nonDestructive) {
+ removeMessageReference(holder, ref);
+ }
ref.setInDelivery(true);
handledconsumer = consumer;
handled++;
@@ -2836,10 +2834,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
protected void removeMessageReference(ConsumerHolder<? extends Consumer> holder, MessageReference ref) {
- if (!nonDestructive) {
- holder.iter.remove();
- refRemoved(ref);
- }
+ holder.iter.remove();
+ refRemoved(ref);
}
private void checkDepage() {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
index 32dcbb8..6fbb71d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
@@ -53,14 +53,16 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
private ConnectionSupplier CoreConnection = () -> createCoreConnection();
protected final boolean persistenceEnabled;
+ protected final long scanPeriod;
- public JMSNonDestructiveTest(boolean persistenceEnabled) {
+ public JMSNonDestructiveTest(boolean persistenceEnabled, long scanPeriod) {
this.persistenceEnabled = persistenceEnabled;
+ this.scanPeriod = scanPeriod;
}
- @Parameterized.Parameters(name = "persistenceEnabled={0}")
+ @Parameterized.Parameters(name = "persistenceEnabled={0}, scanPeriod={1}")
public static Collection<Object[]> data() {
- Object[][] params = new Object[][]{{false}, {true}};
+ Object[][] params = new Object[][]{{false, 100}, {true, 100}, {true, -1}};
return Arrays.asList(params);
}
@@ -72,7 +74,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
@Override
protected void addConfiguration(ActiveMQServer server) {
server.getConfiguration().setPersistenceEnabled(persistenceEnabled);
- server.getConfiguration().setMessageExpiryScanPeriod(100);
+ server.getConfiguration().setMessageExpiryScanPeriod(scanPeriod);
server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true));
server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_EXPIRY_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true).setExpiryDelay(100L));
server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_LVQ_QUEUE_NAME, new AddressSettings().setDefaultLastValueQueue(true).setDefaultNonDestructive(true));