You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/09/12 09:33:59 UTC
[1/2] activemq-artemis git commit: ARTEMIS-723 - AMQP subscriptions
aren't deleted properly
Repository: activemq-artemis
Updated Branches:
refs/heads/master abed0cd5b -> 1bcac5c36
ARTEMIS-723 - AMQP subscriptions aren't deleted properly
https://issues.apache.org/jira/browse/ARTEMIS-723
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/cdb0391c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/cdb0391c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/cdb0391c
Branch: refs/heads/master
Commit: cdb0391c1c2b7c0e51f7bbfe0e9ec27306a735b4
Parents: abed0cd
Author: Andy Taylor <an...@gmail.com>
Authored: Fri Sep 9 10:36:01 2016 +0100
Committer: Andy Taylor <an...@gmail.com>
Committed: Mon Sep 12 10:27:49 2016 +0100
----------------------------------------------------------------------
.../plug/ProtonSessionIntegrationCallback.java | 2 +-
.../server/ProtonServerSenderContext.java | 10 +-
.../tests/integration/proton/ProtonTest.java | 113 +++++++++++++++++++
3 files changed, 123 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cdb0391c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index c3ac671..e422a34 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -195,7 +195,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
@Override
public void createTemporaryQueue(String address, String queueName, String filter) throws Exception {
- serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), false, true);
+ serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(filter), true, false);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cdb0391c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
index 78b1668..739f8e8 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
@@ -272,7 +272,6 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
@Override
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
super.close(remoteLinkClose);
-
try {
sessionSPI.closeSender(brokerConsumer);
//if this is a link close rather than a connection close or detach, we need to delete any durable resources for
@@ -285,6 +284,15 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender imple
if (exists) {
sessionSPI.deleteQueue(address);
}
+ else {
+ String clientId = connection.getRemoteContainer();
+ String pubId = sender.getName();
+ String queue = clientId + ":" + pubId;
+ exists = sessionSPI.queueQuery(queue);
+ if (exists) {
+ sessionSPI.deleteQueue(queue);
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cdb0391c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
index 8da5aa2..b3d9a5f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
@@ -39,6 +39,9 @@ import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
@@ -56,6 +59,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -184,6 +189,62 @@ public class ProtonTest extends ProtonTestBase {
}
@Test
+ public void testDurableSubscriptionUnsubscribe() throws Exception {
+ if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
+ Connection connection = createConnection("myClientId");
+ try {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic("amqp_testtopic");
+ TopicSubscriber myDurSub = session.createDurableSubscriber(topic, "myDurSub");
+ session.close();
+ connection.close();
+ connection = createConnection("myClientId");
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ myDurSub = session.createDurableSubscriber(topic, "myDurSub");
+ myDurSub.close();
+ Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("myClientId:myDurSub")));
+ session.unsubscribe("myDurSub");
+ Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("myClientId:myDurSub")));
+ session.close();
+ connection.close();
+ }
+ finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ @Test
+ public void testTemporarySubscriptionDeleted() throws Exception {
+ if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
+ try {
+ TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic("amqp_testtopic");
+ TopicSubscriber myDurSub = session.createSubscriber(topic);
+ Bindings bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString("amqp_testtopic"));
+ Assert.assertEquals(2, bindingsForAddress.getBindings().size());
+ session.close();
+ final CountDownLatch latch = new CountDownLatch(1);
+ server.getRemotingService().getConnections().iterator().next().addCloseListener(new CloseListener() {
+ @Override
+ public void connectionClosed() {
+ latch.countDown();
+ }
+ });
+ connection.close();
+ latch.await(5, TimeUnit.SECONDS);
+ bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString("amqp_testtopic"));
+ Assert.assertEquals(1, bindingsForAddress.getBindings().size());
+ }
+ finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+
+ @Test
public void testBrokerContainerId() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
@@ -1530,6 +1591,58 @@ public class ProtonTest extends ProtonTestBase {
return connection;
}
+ private javax.jms.Connection createConnection(String clientId) throws JMSException {
+ Connection connection;
+ if (protocol == 3) {
+ factory = new JmsConnectionFactory(amqpConnectionUri);
+ connection = factory.createConnection();
+ connection.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException exception) {
+ exception.printStackTrace();
+ }
+ });
+ connection.setClientID(clientId);
+ connection.start();
+ }
+ else if (protocol == 0) {
+ factory = new JmsConnectionFactory(userName, password, amqpConnectionUri);
+ connection = factory.createConnection();
+ connection.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException exception) {
+ exception.printStackTrace();
+ }
+ });
+ connection.setClientID(clientId);
+ connection.start();
+ }
+ else {
+ TransportConfiguration transport;
+
+ if (protocol == 1) {
+ transport = new TransportConfiguration(INVM_CONNECTOR_FACTORY);
+ factory = new ActiveMQConnectionFactory("vm:/0");
+ }
+ else {
+ factory = new ActiveMQConnectionFactory();
+ }
+
+ connection = factory.createConnection(userName, password);
+ connection.setClientID(clientId);
+ connection.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException exception) {
+ exception.printStackTrace();
+ }
+ });
+ connection.start();
+ }
+
+ return connection;
+ }
+
+
private void setAddressFullBlockPolicy() {
// For BLOCK tests
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#");
[2/2] activemq-artemis git commit: This closes #770
Posted by ma...@apache.org.
This closes #770
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1bcac5c3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1bcac5c3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1bcac5c3
Branch: refs/heads/master
Commit: 1bcac5c36ca23c6c6537c2864106cd561309de15
Parents: abed0cd cdb0391
Author: Martyn Taylor <mt...@redhat.com>
Authored: Mon Sep 12 10:33:35 2016 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Mon Sep 12 10:33:35 2016 +0100
----------------------------------------------------------------------
.../plug/ProtonSessionIntegrationCallback.java | 2 +-
.../server/ProtonServerSenderContext.java | 10 +-
.../tests/integration/proton/ProtonTest.java | 113 +++++++++++++++++++
3 files changed, 123 insertions(+), 2 deletions(-)
----------------------------------------------------------------------