You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/06/18 02:32:43 UTC

[activemq-artemis] branch master updated: ARTEMIS-2378 respect openwire removeInfo lastSequenceId when dealing with delivery count

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new d1add00  ARTEMIS-2378 respect openwire removeInfo lastSequenceId when dealing with delivery count
     new 1bd4d45  This closes #2701
d1add00 is described below

commit d1add00b0021d0e98b6707da926b610c5a20628f
Author: gtully <ga...@gmail.com>
AuthorDate: Thu Jun 13 12:28:51 2019 +0100

    ARTEMIS-2378 respect openwire removeInfo lastSequenceId when dealing with delivery count
---
 .../core/protocol/openwire/OpenWireConnection.java | 12 +++++
 .../core/protocol/openwire/amq/AMQConsumer.java    | 27 +++--------
 .../integration/openwire/amq/JMSConsumer2Test.java |  1 +
 .../openwire/amq/RedeliveryPolicyTest.java         | 52 ++++++++++++++++++++++
 4 files changed, 72 insertions(+), 20 deletions(-)

diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 95656e9..b1f63fe 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -113,6 +113,7 @@ import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionId;
@@ -866,6 +867,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
          this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumersList);
          ss.addConsumer(info);
+         info.setLastDeliveredSequenceId(RemoveInfo.LAST_DELIVERED_UNKNOWN);
 
          if (consumersList.size() == 0) {
             return;
@@ -1075,6 +1077,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       }
    }
 
+   private void propagateLastSequenceId(SessionState sessionState, long lastDeliveredSequenceId) {
+      for (ConsumerState consumerState : sessionState.getConsumerStates()) {
+         consumerState.getInfo().setLastDeliveredSequenceId(lastDeliveredSequenceId);
+      }
+   }
+
    CommandProcessor commandProcessorInstance = new CommandProcessor();
 
    // This will listen for commands through the protocolmanager
@@ -1181,6 +1189,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
             }
          }
          state.removeSession(id);
+         propagateLastSequenceId(session, lastDeliveredSequenceId);
          removeSession(context, session.getInfo());
          return null;
       }
@@ -1632,6 +1641,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       public Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception {
          //we let protocol manager to handle connection add/remove
          try {
+            for (SessionState sessionState : state.getSessionStates()) {
+               propagateLastSequenceId(sessionState, lastDeliveredSequenceId);
+            }
             protocolManager.removeConnection(state.getInfo(), null);
          } catch (Throwable e) {
             // log
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index c35bc64..f9a7520 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -386,30 +386,17 @@ public class AMQConsumer {
    }
 
    public boolean updateDeliveryCountAfterCancel(MessageReference ref) {
-      long lastDelSeqId = info.getLastDeliveredSequenceId();
 
-      //in activemq5, closing a durable subscription won't close the consumer
-      //at broker. Messages will be treated as if being redelivered to
-      //the same consumer.
-      if (this.info.isDurable() && this.getOpenwireDestination().isTopic()) {
+      if (RemoveInfo.LAST_DELIVERED_UNKNOWN == info.getLastDeliveredSequenceId()) {
+         // treat as delivered
          return true;
       }
-
-      //because delivering count is always one greater than redelivery count
-      //we adjust it down before further calculating.
-      ref.decrementDeliveryCount();
-
-      // This is a specific rule of the protocol
-      if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNKNOWN) {
-         // this takes care of un-acked messages in non-tx deliveries
-         // tx cases are handled by
-         // org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection.CommandProcessor.processRollbackTransaction()
-         ref.incrementDeliveryCount();
-      } else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNSET && !isRolledBack(ref)) {
-         ref.incrementDeliveryCount();
+      if (ref.getMessageID() <= info.getLastDeliveredSequenceId() && !isRolledBack(ref)) {
+         // treat as delivered
+         return true;
       }
-
-      return true;
+      // default behaviour
+      return false;
    }
 
    /**
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSConsumer2Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSConsumer2Test.java
index ba1ae85..9107e9e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSConsumer2Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/JMSConsumer2Test.java
@@ -148,6 +148,7 @@ public class JMSConsumer2Test extends BasicOpenWireTest {
       m = consumer.receive(5000);
       System.out.println("m2 received: " + m);
       assertNotNull(m);
+      assertFalse("redelivered flag set", m.getJMSRedelivered());
 
       // install another consumer while message dispatch is unacked/uncommitted
       Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
index 3e50cc7..05625b8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
@@ -25,6 +25,8 @@ import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.RedeliveryPolicy;
 import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
@@ -685,4 +687,54 @@ public class RedeliveryPolicyTest extends BasicOpenWireTest {
 
    }
 
+   @Test
+   public void verifyNoRedeliveryFlagAfterCloseNoReceive() throws Exception {
+
+      try {
+
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         this.makeSureCoreQueueExist("TEST");
+
+         Queue queue = session.createQueue("TEST");
+
+         MessageProducer producer = session.createProducer(queue);
+
+         producer.send(session.createTextMessage("test"));
+
+      } finally {
+         connection.close();
+      }
+
+      connection = (ActiveMQConnection) factory.createConnection();
+
+      connection.start();
+
+      try {
+
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         Queue queue = session.createQueue("TEST");
+
+         MessageConsumer consumer = session.createConsumer(queue);
+         TimeUnit.MILLISECONDS.sleep(500);
+         // nothing received
+         consumer.close();
+
+         // try again, expect no redelivery flag
+         consumer = session.createConsumer(queue);
+         Message message = consumer.receive(1000);
+
+         assertNotNull("Message null", message);
+
+         System.out.println("received message: " + message);
+         System.out.println("is redelivered: " + message.getJMSRedelivered());
+         assertFalse(message.getJMSRedelivered());
+
+      } finally {
+         connection.close();
+      }
+   }
+
+
 }