You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2015/01/29 14:14:22 UTC
[5/5] qpid-jms git commit: flesh out approach to deletion,
needs work to complete
flesh out approach to deletion, needs work to complete
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/c8eb2fe6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/c8eb2fe6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/c8eb2fe6
Branch: refs/heads/master
Commit: c8eb2fe64145fde3147b46c7a16809d373fdca00
Parents: 0da3808
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Jan 29 13:04:50 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Jan 29 13:04:50 2015 +0000
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsConnection.java | 7 +--
.../qpid/jms/JmsTemporaryDestination.java | 2 +-
.../qpid/jms/provider/amqp/AmqpConnection.java | 4 ++
.../qpid/jms/provider/amqp/AmqpProvider.java | 8 +++-
.../provider/amqp/AmqpTemporaryDestination.java | 4 +-
.../jms/integration/SessionIntegrationTest.java | 45 ++++++++++++++++++++
6 files changed, 60 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c8eb2fe6/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 77ce545..01c0edd 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -551,21 +551,18 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti
return topic;
}
- protected void deleteDestination(JmsTemporaryDestination destination) throws JMSException {
+ protected void deleteTemporaryDestination(JmsTemporaryDestination destination) throws JMSException {
checkClosedOrFailed();
connect();
try {
-
for (JmsSession session : this.sessions) {
if (session.isDestinationInUse(destination)) {
throw new IllegalStateException("A consumer is consuming from the temporary destination");
}
}
- if (destination.isTemporary()) {
- tempDestinations.remove(destination);
- }
+ tempDestinations.remove(destination);
destroyResource(destination);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c8eb2fe6/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryDestination.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryDestination.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryDestination.java
index 5afb030..f640164 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryDestination.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryDestination.java
@@ -52,7 +52,7 @@ public abstract class JmsTemporaryDestination extends JmsDestination implements
*/
protected void tryDelete() throws JMSException {
if (connection != null) {
- connection.deleteDestination(this);
+ connection.deleteTemporaryDestination(this);
}
deleted = true;
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c8eb2fe6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index 4c1d60e..81bdd21 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -98,6 +98,10 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
return temporary;
}
+ public AmqpTemporaryDestination getTemporaryDestination(JmsTemporaryDestination destination) {
+ return tempDests.get(destination);
+ }
+
public void unsubscribe(String subscriptionName, AsyncResult request) {
for (AmqpSession session : sessions.values()) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c8eb2fe6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index e3117a9..8ecf3fd 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -406,8 +406,12 @@ public class AmqpProvider implements Provider, TransportListener {
@Override
public void processDestination(JmsTemporaryDestination destination) throws Exception {
- // No current way to request a destination gets deleted, would need
- // some management mechanism to invoke this sort of operation.
+ // AmqpTemporaryDestination td = connection.getTemporaryDestination(destination);
+ // if(td != null) {
+ // td.close(request);
+ // } else {
+ // TODO: complete and test above. Signals deletion to peer by closing the creating link if present.
+
request.onSuccess();
}
});
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c8eb2fe6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
index 7a14e0e..e41b038 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
@@ -90,6 +90,8 @@ public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsTemporaryD
LOG.trace("Updated temp destination to: {} from: {}", resource, oldDestinationName);
+ this.connection.addTemporaryDestination(this);
+
super.opened();
}
@@ -133,8 +135,6 @@ public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsTemporaryD
setEndpoint(sender);
- this.connection.addTemporaryDestination(this);
-
super.doOpen();
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c8eb2fe6/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 2610b97..16cf2a7 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -76,6 +76,7 @@ import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.junit.Ignore;
import org.junit.Test;
public class SessionIntegrationTest extends QpidJmsTestCase {
@@ -175,6 +176,28 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
}
}
+ @Ignore // Need to complete implementation and update test peer link handle behaviour
+ @Test(timeout = 5000)
+ public void testCreateAndDeleteTemporaryQueue() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin(true);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String dynamicAddress = "myTempQueueAddress";
+ testPeer.expectTempQueueCreationAttach(dynamicAddress);
+ TemporaryQueue tempQueue = session.createTemporaryQueue();
+
+ // Deleting the TemporaryQueue will be achieved by closing its creating link.
+ testPeer.expectDetach(true, true, true);
+ tempQueue.delete();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
@Test(timeout = 5000)
public void testCreateTemporaryTopic() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
@@ -196,6 +219,28 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
}
}
+ @Ignore // Need to complete implementation and update test peer link handle behaviour
+ @Test(timeout = 5000)
+ public void testCreateAndDeleteTemporaryTopic() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin(true);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String dynamicAddress = "myTempTopicAddress";
+ testPeer.expectTempTopicCreationAttach(dynamicAddress);
+ TemporaryTopic tempTopic = session.createTemporaryTopic();
+
+ // Deleting the TemporaryTopic will be achieved by closing its creating link.
+ testPeer.expectDetach(true, true, true);
+ tempTopic.delete();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
@Test(timeout = 5000)
public void testCreateConsumerSourceContainsQueueCapability() throws Exception {
doCreateConsumerSourceContainsCapabilityTestImpl(Queue.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org