You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2015/01/07 17:44:43 UTC

[3/7] qpid-jms git commit: make the producer targets contain a capability indicating the desired destination type/capability

make the producer targets contain a capability indicating the desired destination type/capability


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/0422085a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/0422085a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/0422085a

Branch: refs/heads/master
Commit: 0422085ab5ab634fc719112fcd9ba9ab4029fa01
Parents: 5eca494
Author: Robert Gemmell <ro...@apache.org>
Authored: Wed Jan 7 11:41:18 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Wed Jan 7 16:34:47 2015 +0000

----------------------------------------------------------------------
 .../jms/provider/amqp/AmqpFixedProducer.java    | 12 ++--
 .../amqp/message/AmqpDestinationHelper.java     | 26 +++++++++
 .../jms/integration/SessionIntegrationTest.java | 60 ++++++++++++++++++++
 3 files changed, 92 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0422085a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index d05824d..32eecd7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -235,12 +235,8 @@ public class AmqpFixedProducer extends AmqpProducer {
 
     @Override
     protected void doOpen() {
-        String targetAddress = null;
-
-        if (resource.getDestination() != null) {
-            JmsDestination destination = resource.getDestination();
-            targetAddress = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, session.getConnection());
-        }
+        JmsDestination destination = resource.getDestination();
+        String targetAddress = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, session.getConnection());
 
         Symbol[] outcomes = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL};
         String sourceAddress = getProducerId().toString();
@@ -251,6 +247,10 @@ public class AmqpFixedProducer extends AmqpProducer {
 
         Target target = new Target();
         target.setAddress(targetAddress);
+        Symbol typeCapability =  AmqpDestinationHelper.INSTANCE.toTypeCapability(destination);
+        if(typeCapability != null) {
+            target.setCapabilities(typeCapability);
+        }
 
         String senderName = sourceAddress + ":" + targetAddress;
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0422085a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
index 47a7bb1..afeeb69 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
@@ -25,6 +25,8 @@ import org.apache.qpid.jms.JmsTemporaryQueue;
 import org.apache.qpid.jms.JmsTemporaryTopic;
 import org.apache.qpid.jms.JmsTopic;
 import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpTemporaryDestination;
+import org.apache.qpid.proton.amqp.Symbol;
 
 /**
  * A set of static utility method useful when mapping JmsDestination types to / from the AMQP
@@ -289,4 +291,28 @@ public class AmqpDestinationHelper {
         }
     }
 
+    /**
+     * @return the type capability, or null if the supplied destination is null or can't be classified
+     */
+    public Symbol toTypeCapability(JmsDestination destination) {
+        if (destination == null) {
+            return null;
+        }
+
+        if (destination.isQueue()) {
+            if (destination.isTemporary()) {
+                return AmqpTemporaryDestination.TEMP_QUEUE_CAPABILITY;
+            } else {
+                return Symbol.valueOf("queue");// TODO: constant;
+            }
+        } else if (destination.isTopic()) {
+            if (destination.isTemporary()) {
+                return AmqpTemporaryDestination.TEMP_TOPIC_CAPABILITY;
+            } else {
+                return Symbol.valueOf("topic");// TODO: constant;
+            }
+        }
+
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0422085a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 53f846b..ebabe4e 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -33,6 +33,7 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 
 import javax.jms.Connection;
+import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -47,6 +48,7 @@ import javax.jms.TopicSubscriber;
 
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.provider.amqp.AmqpConnectionProperties;
+import org.apache.qpid.jms.provider.amqp.AmqpTemporaryDestination;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
@@ -193,6 +195,64 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 5000)
+    public void testCreateProducerTargetContainsQueueCapability() throws Exception {
+        doCreateProducerTargetContainsCapabilityTestImpl(Queue.class);
+    }
+
+    @Test(timeout = 5000)
+    public void testCreateProducerTargetContainsTopicCapability() throws Exception {
+        doCreateProducerTargetContainsCapabilityTestImpl(Topic.class);
+    }
+
+    @Test(timeout = 5000)
+    public void testCreateProducerTargetContainsTempQueueCapability() throws Exception {
+        doCreateProducerTargetContainsCapabilityTestImpl(TemporaryQueue.class);
+    }
+
+    @Test(timeout = 5000)
+    public void testCreateProducerTargetContainsTempTopicCapability() throws Exception {
+        doCreateProducerTargetContainsCapabilityTestImpl(TemporaryTopic.class);
+    }
+
+    private void doCreateProducerTargetContainsCapabilityTestImpl(Class<? extends Destination> destType) throws JMSException, Exception, IOException {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin(true);
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String destName = "myDest";
+            Symbol nodeTypeCapability = null;
+
+            Destination dest = null;
+            if (destType == Queue.class) {
+                dest = session.createQueue(destName);
+                nodeTypeCapability = Symbol.valueOf("queue");// TODO: constant
+            } else if (destType == Topic.class) {
+                dest = session.createTopic(destName);
+                nodeTypeCapability = Symbol.valueOf("topic");// TODO: constant
+            } else if (destType == TemporaryQueue.class) {
+                testPeer.expectTempQueueCreationAttach(destName);
+                dest = session.createTemporaryQueue();
+                nodeTypeCapability = AmqpTemporaryDestination.TEMP_QUEUE_CAPABILITY;
+            } else if (destType == TemporaryTopic.class) {
+                testPeer.expectTempTopicCreationAttach(destName);
+                dest = session.createTemporaryTopic();
+                nodeTypeCapability = AmqpTemporaryDestination.TEMP_TOPIC_CAPABILITY;
+            } else {
+                fail("unexpected type");
+            }
+
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withCapabilities(arrayContaining(nodeTypeCapability));
+
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+
+            session.createProducer(dest);
+        }
+    }
+
+    @Test(timeout = 5000)
     public void testCreateDurableTopicSubscriber() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer(testFixture.getAvailablePort());) {
             Connection connection = testFixture.establishConnecton(testPeer);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org