You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/06/18 02:32:43 UTC
[activemq-artemis] branch master updated: ARTEMIS-2378 respect
openwire removeInfo lastSequenceId when dealing with delivery count
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new d1add00 ARTEMIS-2378 respect openwire removeInfo lastSequenceId when dealing with delivery count
new 1bd4d45 This closes #2701
d1add00 is described below
commit d1add00b0021d0e98b6707da926b610c5a20628f
Author: gtully <ga...@gmail.com>
AuthorDate: Thu Jun 13 12:28:51 2019 +0100
ARTEMIS-2378 respect openwire removeInfo lastSequenceId when dealing with delivery count
---
.../core/protocol/openwire/OpenWireConnection.java | 12 +++++
.../core/protocol/openwire/amq/AMQConsumer.java | 27 +++--------
.../integration/openwire/amq/JMSConsumer2Test.java | 1 +
.../openwire/amq/RedeliveryPolicyTest.java | 52 ++++++++++++++++++++++
4 files changed, 72 insertions(+), 20 deletions(-)
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 95656e9..b1f63fe 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -113,6 +113,7 @@ import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
@@ -866,6 +867,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumersList);
ss.addConsumer(info);
+ info.setLastDeliveredSequenceId(RemoveInfo.LAST_DELIVERED_UNKNOWN);
if (consumersList.size() == 0) {
return;
@@ -1075,6 +1077,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
+ private void propagateLastSequenceId(SessionState sessionState, long lastDeliveredSequenceId) {
+ for (ConsumerState consumerState : sessionState.getConsumerStates()) {
+ consumerState.getInfo().setLastDeliveredSequenceId(lastDeliveredSequenceId);
+ }
+ }
+
CommandProcessor commandProcessorInstance = new CommandProcessor();
// This will listen for commands through the protocolmanager
@@ -1181,6 +1189,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
state.removeSession(id);
+ propagateLastSequenceId(session, lastDeliveredSequenceId);
removeSession(context, session.getInfo());
return null;
}
@@ -1632,6 +1641,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
//we let protocol manager to handle connection add/remove
try {
+ for (SessionState sessionState : state.getSessionStates()) {
+ propagateLastSequenceId(sessionState, lastDeliveredSequenceId);
+ }
protocolManager.removeConnection(state.getInfo(), null);
} catch (Throwable e) {
// log
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index c35bc64..f9a7520 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -386,30 +386,17 @@ public class AMQConsumer {
}
public boolean updateDeliveryCountAfterCancel(MessageReference ref) {
- long lastDelSeqId = info.getLastDeliveredSequenceId();
- //in activemq5, closing a durable subscription won't close the consumer
- //at broker. Messages will be treated as if being redelivered to
- //the same consumer.
- if (this.info.isDurable() && this.getOpenwireDestination().isTopic()) {
+ if (RemoveInfo.LAST_DELIVERED_UNKNOWN == info.getLastDeliveredSequenceId()) {
+ // treat as delivered
return true;
}
-
- //because delivering count is always one greater than redelivery count
- //we adjust it down before further calculating.
- ref.decrementDeliveryCount();
-
- // This is a specific rule of the protocol
- if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNKNOWN) {
- // this takes care of un-acked messages in non-tx deliveries
- // tx cases are handled by
- // org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection.CommandProcessor.processRollbackTransaction()
- ref.incrementDeliveryCount();
- } else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNSET && !isRolledBack(ref)) {
- ref.incrementDeliveryCount();
+ if (ref.getMessageID() <= info.getLastDeliveredSequenceId() && !isRolledBack(ref)) {
+ // treat as delivered
+ return true;
}
-
- return true;
+ // default behaviour
+ return false;
}
/**
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSConsumer2Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSConsumer2Test.java
index ba1ae85..9107e9e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSConsumer2Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSConsumer2Test.java
@@ -148,6 +148,7 @@ public class JMSConsumer2Test extends BasicOpenWireTest {
m = consumer.receive(5000);
System.out.println("m2 received: " + m);
assertNotNull(m);
+ assertFalse("redelivered flag set", m.getJMSRedelivered());
// install another consumer while message dispatch is unacked/uncommitted
Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
index 3e50cc7..05625b8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
@@ -25,6 +25,8 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import java.util.concurrent.TimeUnit;
+
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
@@ -685,4 +687,54 @@ public class RedeliveryPolicyTest extends BasicOpenWireTest {
}
+ @Test
+ public void verifyNoRedeliveryFlagAfterCloseNoReceive() throws Exception {
+
+ try {
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ this.makeSureCoreQueueExist("TEST");
+
+ Queue queue = session.createQueue("TEST");
+
+ MessageProducer producer = session.createProducer(queue);
+
+ producer.send(session.createTextMessage("test"));
+
+ } finally {
+ connection.close();
+ }
+
+ connection = (ActiveMQConnection) factory.createConnection();
+
+ connection.start();
+
+ try {
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ Queue queue = session.createQueue("TEST");
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ TimeUnit.MILLISECONDS.sleep(500);
+ // nothing received
+ consumer.close();
+
+ // try again, expect no redelivery flag
+ consumer = session.createConsumer(queue);
+ Message message = consumer.receive(1000);
+
+ assertNotNull("Message null", message);
+
+ System.out.println("received message: " + message);
+ System.out.println("is redelivered: " + message.getJMSRedelivered());
+ assertFalse(message.getJMSRedelivered());
+
+ } finally {
+ connection.close();
+ }
+ }
+
+
}