You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/10/07 08:57:54 UTC

activemq git commit: AMQ-6454 - ensure message.acknowledge throws if consumer has closed and message has been released broker side

Repository: activemq
Updated Branches:
  refs/heads/master 1316b57ed -> e91f5c806


AMQ-6454 - ensure message.acknowledge throws if consumer has closed and message has been released broker side


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e91f5c80
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e91f5c80
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e91f5c80

Branch: refs/heads/master
Commit: e91f5c8062f81a76e6983c489bfd092ce4071480
Parents: 1316b57
Author: gtully <ga...@gmail.com>
Authored: Fri Oct 7 09:57:14 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Fri Oct 7 09:57:28 2016 +0100

----------------------------------------------------------------------
 .../activemq/ActiveMQMessageConsumer.java       |  2 ++
 .../org/apache/activemq/JMSConsumerTest.java    | 28 ++++++++++++++++++++
 .../org/apache/activemq/JMSXAConsumerTest.java  |  4 +++
 3 files changed, 34 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e91f5c80/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index 83ce137..a52e2d4 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -602,6 +602,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
             m.setAcknowledgeCallback(new Callback() {
                 @Override
                 public void execute() throws Exception {
+                    checkClosed();
                     session.checkClosed();
                     session.acknowledge();
                 }
@@ -610,6 +611,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
             m.setAcknowledgeCallback(new Callback() {
                 @Override
                 public void execute() throws Exception {
+                    checkClosed();
                     session.checkClosed();
                     acknowledge(md);
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/e91f5c80/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
index 4f02d47..8785acb 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.BytesMessage;
+import javax.jms.JMSException;
 import javax.jms.DeliveryMode;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -859,6 +860,33 @@ public class JMSConsumerTest extends JmsTestSupport {
         redispatchSession.close();
     }
 
+    public void testExceptionOnClientAckAfterConsumerClose() throws Exception {
+
+        connection.start();
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+        sendMessages(connection, destination, 1);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        Message message = consumer.receive(1000);
+        assertNotNull(message);
+        consumer.close();
+
+        try {
+            message.acknowledge();
+            fail("Expect exception on ack after close - consumer gone so message available again");
+        } catch (JMSException expected) {}
+
+        Session redispatchSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination);
+        Message msg = redispatchConsumer.receive(1000);
+        assertNotNull(msg);
+
+        redispatchSession.close();
+    }
+
+
     public void initCombosForTestAckOfExpired() {
         addCombinationValues("destinationType",
             new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});

http://git-wip-us.apache.org/repos/asf/activemq/blob/e91f5c80/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
index 7deff27..89fb25b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
@@ -47,4 +47,8 @@ public class JMSXAConsumerTest extends JMSConsumerTest {
 
     public void testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception {
     }
+
+    // needs client ack, xa is auto ack if no transaction
+    public void testExceptionOnClientAckAfterConsumerClose() throws Exception {
+    }
 }