You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2015/01/07 17:44:43 UTC
[3/7] qpid-jms git commit: make the producer targets contain a
capability indicating the desired destination type/capability
make the producer targets contain a capability indicating the desired destination type/capability
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/0422085a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/0422085a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/0422085a
Branch: refs/heads/master
Commit: 0422085ab5ab634fc719112fcd9ba9ab4029fa01
Parents: 5eca494
Author: Robert Gemmell <ro...@apache.org>
Authored: Wed Jan 7 11:41:18 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Wed Jan 7 16:34:47 2015 +0000
----------------------------------------------------------------------
.../jms/provider/amqp/AmqpFixedProducer.java | 12 ++--
.../amqp/message/AmqpDestinationHelper.java | 26 +++++++++
.../jms/integration/SessionIntegrationTest.java | 60 ++++++++++++++++++++
3 files changed, 92 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0422085a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index d05824d..32eecd7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -235,12 +235,8 @@ public class AmqpFixedProducer extends AmqpProducer {
@Override
protected void doOpen() {
- String targetAddress = null;
-
- if (resource.getDestination() != null) {
- JmsDestination destination = resource.getDestination();
- targetAddress = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, session.getConnection());
- }
+ JmsDestination destination = resource.getDestination();
+ String targetAddress = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, session.getConnection());
Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL};
String sourceAddress = getProducerId().toString();
@@ -251,6 +247,10 @@ public class AmqpFixedProducer extends AmqpProducer {
Target target = new Target();
target.setAddress(targetAddress);
+ Symbol typeCapability = AmqpDestinationHelper.INSTANCE.toTypeCapability(destination);
+ if(typeCapability != null) {
+ target.setCapabilities(typeCapability);
+ }
String senderName = sourceAddress + ":" + targetAddress;
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0422085a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
index 47a7bb1..afeeb69 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
@@ -25,6 +25,8 @@ import org.apache.qpid.jms.JmsTemporaryQueue;
import org.apache.qpid.jms.JmsTemporaryTopic;
import org.apache.qpid.jms.JmsTopic;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpTemporaryDestination;
+import org.apache.qpid.proton.amqp.Symbol;
/**
* A set of static utility method useful when mapping JmsDestination types to / from the AMQP
@@ -289,4 +291,28 @@ public class AmqpDestinationHelper {
}
}
+ /**
+ * @return the type capability, or null if the supplied destination is null or can't be classified
+ */
+ public Symbol toTypeCapability(JmsDestination destination) {
+ if (destination == null) {
+ return null;
+ }
+
+ if (destination.isQueue()) {
+ if (destination.isTemporary()) {
+ return AmqpTemporaryDestination.TEMP_QUEUE_CAPABILITY;
+ } else {
+ return Symbol.valueOf("queue");// TODO: constant;
+ }
+ } else if (destination.isTopic()) {
+ if (destination.isTemporary()) {
+ return AmqpTemporaryDestination.TEMP_TOPIC_CAPABILITY;
+ } else {
+ return Symbol.valueOf("topic");// TODO: constant;
+ }
+ }
+
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0422085a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 53f846b..ebabe4e 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -33,6 +33,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import javax.jms.Connection;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -47,6 +48,7 @@ import javax.jms.TopicSubscriber;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.provider.amqp.AmqpConnectionProperties;
+import org.apache.qpid.jms.provider.amqp.AmqpTemporaryDestination;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
@@ -193,6 +195,64 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 5000)
+ public void testCreateProducerTargetContainsQueueCapability() throws Exception {
+ doCreateProducerTargetContainsCapabilityTestImpl(Queue.class);
+ }
+
+ @Test(timeout = 5000)
+ public void testCreateProducerTargetContainsTopicCapability() throws Exception {
+ doCreateProducerTargetContainsCapabilityTestImpl(Topic.class);
+ }
+
+ @Test(timeout = 5000)
+ public void testCreateProducerTargetContainsTempQueueCapability() throws Exception {
+ doCreateProducerTargetContainsCapabilityTestImpl(TemporaryQueue.class);
+ }
+
+ @Test(timeout = 5000)
+ public void testCreateProducerTargetContainsTempTopicCapability() throws Exception {
+ doCreateProducerTargetContainsCapabilityTestImpl(TemporaryTopic.class);
+ }
+
+ private void doCreateProducerTargetContainsCapabilityTestImpl(Class<? extends Destination> destType) throws JMSException, Exception, IOException {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String destName = "myDest";
+ Symbol nodeTypeCapability = null;
+
+ Destination dest = null;
+ if (destType == Queue.class) {
+ dest = session.createQueue(destName);
+ nodeTypeCapability = Symbol.valueOf("queue");// TODO: constant
+ } else if (destType == Topic.class) {
+ dest = session.createTopic(destName);
+ nodeTypeCapability = Symbol.valueOf("topic");// TODO: constant
+ } else if (destType == TemporaryQueue.class) {
+ testPeer.expectTempQueueCreationAttach(destName);
+ dest = session.createTemporaryQueue();
+ nodeTypeCapability = AmqpTemporaryDestination.TEMP_QUEUE_CAPABILITY;
+ } else if (destType == TemporaryTopic.class) {
+ testPeer.expectTempTopicCreationAttach(destName);
+ dest = session.createTemporaryTopic();
+ nodeTypeCapability = AmqpTemporaryDestination.TEMP_TOPIC_CAPABILITY;
+ } else {
+ fail("unexpected type");
+ }
+
+ TargetMatcher targetMatcher = new TargetMatcher();
+ targetMatcher.withCapabilities(arrayContaining(nodeTypeCapability));
+
+ testPeer.expectSenderAttach(targetMatcher, false, false);
+
+ session.createProducer(dest);
+ }
+ }
+
+ @Test(timeout = 5000)
public void testCreateDurableTopicSubscriber() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer(testFixture.getAvailablePort());) {
Connection connection = testFixture.establishConnecton(testPeer);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org