You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/10/07 08:57:54 UTC
activemq git commit: AMQ-6454 - ensure message.acknowledge throws if
consumer has closed and message has been released broker side
Repository: activemq
Updated Branches:
refs/heads/master 1316b57ed -> e91f5c806
AMQ-6454 - ensure message.acknowledge throws if consumer has closed and message has been released broker side
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e91f5c80
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e91f5c80
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e91f5c80
Branch: refs/heads/master
Commit: e91f5c8062f81a76e6983c489bfd092ce4071480
Parents: 1316b57
Author: gtully <ga...@gmail.com>
Authored: Fri Oct 7 09:57:14 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Fri Oct 7 09:57:28 2016 +0100
----------------------------------------------------------------------
.../activemq/ActiveMQMessageConsumer.java | 2 ++
.../org/apache/activemq/JMSConsumerTest.java | 28 ++++++++++++++++++++
.../org/apache/activemq/JMSXAConsumerTest.java | 4 +++
3 files changed, 34 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/e91f5c80/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index 83ce137..a52e2d4 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -602,6 +602,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
m.setAcknowledgeCallback(new Callback() {
@Override
public void execute() throws Exception {
+ checkClosed();
session.checkClosed();
session.acknowledge();
}
@@ -610,6 +611,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
m.setAcknowledgeCallback(new Callback() {
@Override
public void execute() throws Exception {
+ checkClosed();
session.checkClosed();
acknowledge(md);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/e91f5c80/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
index 4f02d47..8785acb 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
+import javax.jms.JMSException;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -859,6 +860,33 @@ public class JMSConsumerTest extends JmsTestSupport {
redispatchSession.close();
}
+ public void testExceptionOnClientAckAfterConsumerClose() throws Exception {
+
+ connection.start();
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
+
+ sendMessages(connection, destination, 1);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ Message message = consumer.receive(1000);
+ assertNotNull(message);
+ consumer.close();
+
+ try {
+ message.acknowledge();
+ fail("Expect exception on ack after close - consumer gone so message available again");
+ } catch (JMSException expected) {}
+
+ Session redispatchSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination);
+ Message msg = redispatchConsumer.receive(1000);
+ assertNotNull(msg);
+
+ redispatchSession.close();
+ }
+
+
public void initCombosForTestAckOfExpired() {
addCombinationValues("destinationType",
new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
http://git-wip-us.apache.org/repos/asf/activemq/blob/e91f5c80/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
index 7deff27..89fb25b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSXAConsumerTest.java
@@ -47,4 +47,8 @@ public class JMSXAConsumerTest extends JMSConsumerTest {
public void testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception {
}
+
+ // needs client ack, xa is auto ack if no transaction
+ public void testExceptionOnClientAckAfterConsumerClose() throws Exception {
+ }
}