You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/08/22 15:50:37 UTC

[activemq-artemis] branch master updated: ARTEMIS-2459 Fix err in the replacement of a non-destructively consumed LVQ message

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 5103394  ARTEMIS-2459 Fix err in the replacement of a non-destructively consumed LVQ message
     new 777eede  This closes #2806
5103394 is described below

commit 510339423eeb6054e623698433b7daa91ef8eda5
Author: Wei Yang <wy...@gmail.com>
AuthorDate: Wed Aug 21 20:25:24 2019 +0800

    ARTEMIS-2459 Fix err in the replacement of a non-destructively consumed LVQ message
---
 .../artemis/core/server/impl/LastValueQueue.java   | 13 ++++++-
 .../artemis/core/server/impl/QueueImpl.java        | 40 ++++++++++++++--------
 .../artemis/core/server/impl/RefsOperation.java    |  4 +++
 .../integration/amqp/JMSNonDestructiveTest.java    | 20 ++++++++++-
 4 files changed, 60 insertions(+), 17 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 23df9a5..40222b8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -187,12 +187,23 @@ public class LastValueQueue extends QueueImpl {
 
    @Override
    public void acknowledge(final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
-      if (isNonDestructive() && reason == AckReason.EXPIRED || reason == AckReason.KILLED ) {
+      if (isNonDestructive() && reason == AckReason.EXPIRED || reason == AckReason.KILLED) {
          removeIfCurrent(ref);
       }
       super.acknowledge(ref, reason, consumer);
    }
 
+   @Override
+   public void acknowledge(Transaction tx,
+                           MessageReference ref,
+                           AckReason reason,
+                           ServerConsumer consumer) throws Exception {
+      if (isNonDestructive() && reason == AckReason.EXPIRED || reason == AckReason.KILLED) {
+         removeIfCurrent(ref);
+      }
+      super.acknowledge(tx, ref, reason, consumer);
+   }
+
    private synchronized void removeIfCurrent(MessageReference ref) {
       SimpleString lastValueProp = ref.getLastValueProperty();
       if (lastValueProp != null) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 090a83e..15f76e1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1593,28 +1593,35 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    public void acknowledge(final Transaction tx, final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
       RefsOperation refsOperation = getRefsOperation(tx, reason);
 
-      if (ref.isPaged()) {
-         pageSubscription.ackTx(tx, (PagedReference) ref);
-
-         refsOperation.addAck(ref);
+      if (nonDestructive && reason == AckReason.NORMAL) {
+         refsOperation.addOnlyRefAck(ref);
+         if (logger.isDebugEnabled()) {
+            logger.debug("acknowledge tx ignored nonDestructive=true and reason=NORMAL");
+         }
       } else {
-         Message message = ref.getMessage();
+         if (ref.isPaged()) {
+            pageSubscription.ackTx(tx, (PagedReference) ref);
 
-         boolean durableRef = message.isDurable() && isDurable();
+            refsOperation.addAck(ref);
+         } else {
+            Message message = ref.getMessage();
 
-         if (durableRef) {
-            storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
+            boolean durableRef = message.isDurable() && isDurable();
 
-            tx.setContainsPersistent();
-         }
+            if (durableRef) {
+               storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
 
-         ackAttempts.incrementAndGet();
+               tx.setContainsPersistent();
+            }
 
-         refsOperation.addAck(ref);
-      }
+            ackAttempts.incrementAndGet();
 
-      if (server != null && server.hasBrokerMessagePlugins()) {
-         server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
+            refsOperation.addAck(ref);
+         }
+
+         if (server != null && server.hasBrokerMessagePlugins()) {
+            server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
+         }
       }
    }
 
@@ -3435,6 +3442,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       QueueImpl queue = (QueueImpl) ref.getQueue();
 
       queue.decDelivering(ref);
+      if (nonDestructive && reason == AckReason.NORMAL) {
+         return;
+      }
 
       if (reason == AckReason.EXPIRED) {
          messagesExpired.incrementAndGet();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index 925f439..f0b6d34 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -64,6 +64,10 @@ public class RefsOperation extends TransactionOperationAbstract {
       ignoreRedeliveryCheck = true;
    }
 
+   synchronized void addOnlyRefAck(final MessageReference ref) {
+      refsToAck.add(ref);
+   }
+
    synchronized void addAck(final MessageReference ref) {
       refsToAck.add(ref);
       if (ref.isPaged()) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
index 7b1f155..32dcbb8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
@@ -24,6 +24,9 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import java.util.Arrays;
+import java.util.Collection;
+
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -35,7 +38,10 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.LastValueQueue;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class JMSNonDestructiveTest extends JMSClientTestSupport {
 
    private static final String NON_DESTRUCTIVE_QUEUE_NAME = "NON_DESTRUCTIVE_QUEUE";
@@ -46,6 +52,18 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
    private ConnectionSupplier AMQPConnection = () -> createConnection();
    private ConnectionSupplier CoreConnection = () -> createCoreConnection();
 
+   protected final boolean persistenceEnabled;
+
+   public JMSNonDestructiveTest(boolean persistenceEnabled) {
+      this.persistenceEnabled = persistenceEnabled;
+   }
+
+   @Parameterized.Parameters(name = "persistenceEnabled={0}")
+   public static Collection<Object[]> data() {
+      Object[][] params = new Object[][]{{false}, {true}};
+      return Arrays.asList(params);
+   }
+
    @Override
    protected String getConfiguredProtocols() {
       return "AMQP,OPENWIRE,CORE";
@@ -53,7 +71,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
 
    @Override
    protected void addConfiguration(ActiveMQServer server) {
-      server.getConfiguration().setPersistenceEnabled(false);
+      server.getConfiguration().setPersistenceEnabled(persistenceEnabled);
       server.getConfiguration().setMessageExpiryScanPeriod(100);
       server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true));
       server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_EXPIRY_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true).setExpiryDelay(100L));