You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/08/22 15:50:37 UTC
[activemq-artemis] branch master updated: ARTEMIS-2459 Fix err in
the replacement of a non-destructively consumed LVQ message
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 5103394 ARTEMIS-2459 Fix err in the replacement of a non-destructively consumed LVQ message
new 777eede This closes #2806
5103394 is described below
commit 510339423eeb6054e623698433b7daa91ef8eda5
Author: Wei Yang <wy...@gmail.com>
AuthorDate: Wed Aug 21 20:25:24 2019 +0800
ARTEMIS-2459 Fix err in the replacement of a non-destructively consumed LVQ message
---
.../artemis/core/server/impl/LastValueQueue.java | 13 ++++++-
.../artemis/core/server/impl/QueueImpl.java | 40 ++++++++++++++--------
.../artemis/core/server/impl/RefsOperation.java | 4 +++
.../integration/amqp/JMSNonDestructiveTest.java | 20 ++++++++++-
4 files changed, 60 insertions(+), 17 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 23df9a5..40222b8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -187,12 +187,23 @@ public class LastValueQueue extends QueueImpl {
@Override
public void acknowledge(final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
- if (isNonDestructive() && reason == AckReason.EXPIRED || reason == AckReason.KILLED ) {
+ if (isNonDestructive() && reason == AckReason.EXPIRED || reason == AckReason.KILLED) {
removeIfCurrent(ref);
}
super.acknowledge(ref, reason, consumer);
}
+ @Override
+ public void acknowledge(Transaction tx,
+ MessageReference ref,
+ AckReason reason,
+ ServerConsumer consumer) throws Exception {
+ if (isNonDestructive() && reason == AckReason.EXPIRED || reason == AckReason.KILLED) {
+ removeIfCurrent(ref);
+ }
+ super.acknowledge(tx, ref, reason, consumer);
+ }
+
private synchronized void removeIfCurrent(MessageReference ref) {
SimpleString lastValueProp = ref.getLastValueProperty();
if (lastValueProp != null) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 090a83e..15f76e1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1593,28 +1593,35 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
public void acknowledge(final Transaction tx, final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
RefsOperation refsOperation = getRefsOperation(tx, reason);
- if (ref.isPaged()) {
- pageSubscription.ackTx(tx, (PagedReference) ref);
-
- refsOperation.addAck(ref);
+ if (nonDestructive && reason == AckReason.NORMAL) {
+ refsOperation.addOnlyRefAck(ref);
+ if (logger.isDebugEnabled()) {
+ logger.debug("acknowledge tx ignored nonDestructive=true and reason=NORMAL");
+ }
} else {
- Message message = ref.getMessage();
+ if (ref.isPaged()) {
+ pageSubscription.ackTx(tx, (PagedReference) ref);
- boolean durableRef = message.isDurable() && isDurable();
+ refsOperation.addAck(ref);
+ } else {
+ Message message = ref.getMessage();
- if (durableRef) {
- storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
+ boolean durableRef = message.isDurable() && isDurable();
- tx.setContainsPersistent();
- }
+ if (durableRef) {
+ storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
- ackAttempts.incrementAndGet();
+ tx.setContainsPersistent();
+ }
- refsOperation.addAck(ref);
- }
+ ackAttempts.incrementAndGet();
- if (server != null && server.hasBrokerMessagePlugins()) {
- server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
+ refsOperation.addAck(ref);
+ }
+
+ if (server != null && server.hasBrokerMessagePlugins()) {
+ server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
+ }
}
}
@@ -3435,6 +3442,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
QueueImpl queue = (QueueImpl) ref.getQueue();
queue.decDelivering(ref);
+ if (nonDestructive && reason == AckReason.NORMAL) {
+ return;
+ }
if (reason == AckReason.EXPIRED) {
messagesExpired.incrementAndGet();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index 925f439..f0b6d34 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -64,6 +64,10 @@ public class RefsOperation extends TransactionOperationAbstract {
ignoreRedeliveryCheck = true;
}
+ synchronized void addOnlyRefAck(final MessageReference ref) {
+ refsToAck.add(ref);
+ }
+
synchronized void addAck(final MessageReference ref) {
refsToAck.add(ref);
if (ref.isPaged()) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
index 7b1f155..32dcbb8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSNonDestructiveTest.java
@@ -24,6 +24,9 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import java.util.Arrays;
+import java.util.Collection;
+
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -35,7 +38,10 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.LastValueQueue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
public class JMSNonDestructiveTest extends JMSClientTestSupport {
private static final String NON_DESTRUCTIVE_QUEUE_NAME = "NON_DESTRUCTIVE_QUEUE";
@@ -46,6 +52,18 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
private ConnectionSupplier AMQPConnection = () -> createConnection();
private ConnectionSupplier CoreConnection = () -> createCoreConnection();
+ protected final boolean persistenceEnabled;
+
+ public JMSNonDestructiveTest(boolean persistenceEnabled) {
+ this.persistenceEnabled = persistenceEnabled;
+ }
+
+ @Parameterized.Parameters(name = "persistenceEnabled={0}")
+ public static Collection<Object[]> data() {
+ Object[][] params = new Object[][]{{false}, {true}};
+ return Arrays.asList(params);
+ }
+
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
@@ -53,7 +71,7 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
@Override
protected void addConfiguration(ActiveMQServer server) {
- server.getConfiguration().setPersistenceEnabled(false);
+ server.getConfiguration().setPersistenceEnabled(persistenceEnabled);
server.getConfiguration().setMessageExpiryScanPeriod(100);
server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true));
server.getAddressSettingsRepository().addMatch(NON_DESTRUCTIVE_EXPIRY_QUEUE_NAME, new AddressSettings().setDefaultNonDestructive(true).setExpiryDelay(100L));