You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/09/11 13:41:49 UTC

[activemq-artemis] 01/03: ARTEMIS-2478 Expired message not removed in non destructive queue

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit e43c5390cf11655a883b9f96f608e65bdd21d6fa
Author: Wei Yang <wy...@gmail.com>
AuthorDate: Mon Sep 9 18:45:48 2019 +0800

    ARTEMIS-2478 Expired message not removed in non destructive queue
---
 .../activemq/artemis/core/server/impl/QueueImpl.java       | 14 +++++---------
 .../tests/integration/amqp/JMSNonDestructiveTest.java      | 10 ++++++----
 2 files changed, 11 insertions(+), 13 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index dd7d9cf..d18ecbf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2745,9 +2745,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                      logger.trace("Reference " + ref + " being expired");
                   }
                   removeMessageReference(holder, ref);
-
-
-
                   handled++;
                   consumers.reset();
                   continue;
@@ -2778,8 +2775,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
                   deliveriesInTransit.countUp();
 
-
-                  removeMessageReference(holder, ref);
+                  if (!nonDestructive) {
+                     removeMessageReference(holder, ref);
+                  }
                   ref.setInDelivery(true);
                   handledconsumer = consumer;
                   handled++;
@@ -2836,10 +2834,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    protected void removeMessageReference(ConsumerHolder<? extends Consumer> holder, MessageReference ref) {
-      if (!nonDestructive) {
-         holder.iter.remove();
-         refRemoved(ref);
-      }
+      holder.iter.remove();
+      refRemoved(ref);
    }
 
    private void checkDepage() {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
index 32dcbb8..6fbb71d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
@@ -53,14 +53,16 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
    private ConnectionSupplier CoreConnection = () -> createCoreConnection();
 
    protected final boolean persistenceEnabled;
+   protected final long scanPeriod;
 
-   public JMSNonDestructiveTest(boolean persistenceEnabled) {
+   public JMSNonDestructiveTest(boolean persistenceEnabled, long scanPeriod) {
       this.persistenceEnabled = persistenceEnabled;
+      this.scanPeriod = scanPeriod;
    }
 
-   @Parameterized.Parameters(name = "persistenceEnabled={0}")
+   @Parameterized.Parameters(name = "persistenceEnabled={0}, scanPeriod={1}")
    public static Collection<Object[]> data() {
-      Object[][] params = new Object[][]{{false}, {true}};
+      Object[][] params = new Object[][]{{false, 100}, {true, 100}, {true, -1}};
       return Arrays.asList(params);
    }
 
@@ -72,7 +74,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
    @Override
    protected void addConfiguration(ActiveMQServer server) {
       server.getConfiguration().setPersistenceEnabled(persistenceEnabled);
-      server.getConfiguration().setMessageExpiryScanPeriod(100);
+      server.getConfiguration().setMessageExpiryScanPeriod(scanPeriod);
       server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true));
       server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_EXPIRY_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true).setExpiryDelay(100L));
       server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_LVQ_QUEUE_NAME, new AddressSettings().setDefaultLastValueQueue(true).setDefaultNonDestructive(true));