You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/05/24 20:29:19 UTC

[04/33] activemq-artemis git commit: ARTEMIS-1868 Openwire doesn't add delivery count in client ack mode

ARTEMIS-1868 Openwire doesn't add delivery count in client ack mode

If a client ack mode consumer receives a message and closes without
acking it, the redelivery of the message won't set the redelivery
flag (JMSRedelivered) because it doesn't increment the delivery count
when message is cancelled back to queue.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/47b31b53
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/47b31b53
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/47b31b53

Branch: refs/heads/2.6.x
Commit: 47b31b53d608f762b9c38e924c5a9f8b92f384b5
Parents: 9eed307
Author: Howard Gao <ho...@gmail.com>
Authored: Wed May 16 11:14:48 2018 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon May 21 18:00:08 2018 -0400

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   |  1 +
 .../core/protocol/openwire/amq/AMQConsumer.java |  4 +-
 .../core/protocol/openwire/amq/AMQSession.java  | 14 ++++++
 .../openwire/amq/RedeliveryPolicyTest.java      | 52 ++++++++++++++++++++
 4 files changed, 70 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47b31b53/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 6a10de7..f666785 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1294,6 +1294,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
                   referenceIterator.remove();
                   ref.incrementDeliveryCount();
                   consumer.backToDelivering(ref);
+                  session.addRolledback(ref.getMessageID());
                }
             }
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47b31b53/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index e0b02ae..0b7eff5 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -353,7 +353,7 @@ public class AMQConsumer {
    }
 
    public boolean updateDeliveryCountAfterCancel(MessageReference ref) {
-      long seqId = ref.getMessage().getMessageID();
+      long seqId = ref.getMessageID();
       long lastDelSeqId = info.getLastDeliveredSequenceId();
 
       //in activemq5, closing a durable subscription won't close the consumer
@@ -373,6 +373,8 @@ public class AMQConsumer {
          // tx cases are handled by
          // org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection.CommandProcessor.processRollbackTransaction()
          ref.incrementDeliveryCount();
+      } else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNSET && !session.isRolledBack(seqId)) {
+         ref.incrementDeliveryCount();
       }
 
       return true;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47b31b53/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index c3b1a20..34e2c0f 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -20,6 +20,7 @@ import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUt
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -52,6 +53,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
+import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
@@ -95,6 +97,8 @@ public class AMQSession implements SessionCallback {
 
    private final SimpleString clientId;
 
+   private final Set<Long> rollbackedIds = new ConcurrentHashSet<>();
+
    public AMQSession(ConnectionInfo connInfo,
                      SessionInfo sessInfo,
                      ActiveMQServer server,
@@ -308,6 +312,8 @@ public class AMQSession implements SessionCallback {
                           ServerConsumer consumer,
                           int deliveryCount) {
       AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
+      //clear up possible rolledback ids.
+      rollbackedIds.remove(message.getMessageID());
       // TODO: use encoders and proper conversions here
       return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount);
    }
@@ -542,4 +548,12 @@ public class AMQSession implements SessionCallback {
    public boolean isInternal() {
       return sessInfo.getSessionId().getValue() == -1;
    }
+
+   public void addRolledback(long messageID) {
+      this.rollbackedIds.add(messageID);
+   }
+
+   public boolean isRolledBack(long mid) {
+      return rollbackedIds.remove(mid);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47b31b53/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
index 7ee0eb9..3e50cc7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
@@ -21,9 +21,11 @@ import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.RedeliveryPolicy;
 import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
 import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
@@ -633,4 +635,54 @@ public class RedeliveryPolicyTest extends BasicOpenWireTest {
       session.commit();
    }
 
+   @Test
+   public void testClientRedlivery() throws Exception {
+
+      try {
+
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         this.makeSureCoreQueueExist("TEST");
+
+         Queue queue = session.createQueue("TEST");
+
+         MessageProducer producer = session.createProducer(queue);
+
+         producer.send(session.createTextMessage("test"));
+
+      } finally {
+         connection.close();
+      }
+
+      for (int i = 0; i < 10; ++i) {
+
+         connection = (ActiveMQConnection) factory.createConnection();
+
+         connection.start();
+
+         try {
+
+            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+            Queue queue = session.createQueue("TEST");
+
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            Message message = consumer.receive(1000);
+
+            assertNotNull("Message null on iteration " + i, message);
+
+            System.out.println("received message: " + i);
+            System.out.println("is redelivered: " + message.getJMSRedelivered());
+            if (i > 0) {
+               assertTrue(message.getJMSRedelivered());
+            }
+
+         } finally {
+            connection.close();
+         }
+      }
+
+   }
+
 }