You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/10/17 20:59:15 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-5401
Repository: activemq
Updated Branches:
refs/heads/trunk 226e012d8 -> 004568234
https://issues.apache.org/jira/browse/AMQ-5401
Ensure that the sender is closed on error and add some tests for
unsubscribe failures.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/00456823
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/00456823
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/00456823
Branch: refs/heads/trunk
Commit: 004568234b3964441fa122b04bdb509e754585f4
Parents: 226e012
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Oct 17 14:58:56 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Oct 17 14:58:56 2014 -0400
----------------------------------------------------------------------
.../transport/amqp/AmqpProtocolConverter.java | 4 +-
.../activemq/transport/amqp/JMSClientTest.java | 58 ++++++++++++++++++++
2 files changed, 61 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/00456823/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index f4df997..fa49665 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -1277,8 +1277,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
} else {
sender.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
}
+ sender.close();
+ } else {
+ sender.open();
}
- sender.open();
pumpProtonToSocket();
}
});
http://git-wip-us.apache.org/repos/asf/activemq/blob/00456823/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 5a29ee2..64a4f3c 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -49,6 +49,7 @@ import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.objectweb.jtests.jms.framework.TestConfig;
import org.slf4j.Logger;
@@ -894,4 +895,61 @@ public class JMSClientTest extends JMSClientTestSupport {
}
}));
}
+
+ @Test(timeout=30000)
+ public void testDurableConsumerUnsubscribeWhileNoSubscription() throws Exception {
+ ActiveMQAdmin.enableJMSFrameTracing();
+
+ final BrokerViewMBean broker = getProxyToBroker();
+
+ connection = createConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return broker.getInactiveDurableTopicSubscribers().length == 0 &&
+ broker.getDurableTopicSubscribers().length == 0;
+ }
+ }));
+
+ try {
+ session.unsubscribe("DurbaleTopic");
+ fail("Should have thrown as subscription is in use.");
+ } catch (JMSException ex) {
+ }
+ }
+
+ @Ignore("Requires version 0.30 or higher to work.") // TODO
+ @Test(timeout=30000)
+ public void testDurableConsumerUnsubscribeWhileActive() throws Exception {
+ ActiveMQAdmin.enableJMSFrameTracing();
+
+ final BrokerViewMBean broker = getProxyToBroker();
+
+ connection = createConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(getDestinationName());
+ session.createDurableSubscriber(topic, "DurbaleTopic");
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return broker.getInactiveDurableTopicSubscribers().length == 0 &&
+ broker.getDurableTopicSubscribers().length == 1;
+ }
+ }));
+
+ try {
+ session.unsubscribe("DurbaleTopic");
+ fail("Should have thrown as subscription is in use.");
+ } catch (JMSException ex) {
+ }
+ }
}