You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/12/22 17:39:53 UTC

[5/7] qpid-jms git commit: add support for queue+topic prefixes being defined by the server via connection properties

add support for queue+topic prefixes being defined by the server via connection properties


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/c6a5498d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/c6a5498d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/c6a5498d

Branch: refs/heads/master
Commit: c6a5498d05a46141543c80bb6cf9618126617cd5
Parents: d4016cb
Author: Robert Gemmell <ro...@apache.org>
Authored: Mon Dec 22 14:26:57 2014 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Mon Dec 22 14:26:57 2014 +0000

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpConnection.java  |  12 +
 .../provider/amqp/AmqpConnectionProperties.java |  28 ++-
 .../jms/integration/MessageIntegrationTest.java | 225 +++++++++++++++++++
 3 files changed, 264 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c6a5498d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index 1bc1f53..6423365 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -125,6 +125,18 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
             this.properties = new AmqpConnectionProperties(
                 getEndpoint().getRemoteOfferedCapabilities(), getEndpoint().getRemoteProperties());
 
+            String brokerQueuePrefix = properties.getQueuePrefix();
+            if (brokerQueuePrefix != null) {
+                queuePrefix = brokerQueuePrefix;
+                resource.setQueuePrefix(brokerQueuePrefix);
+            }
+
+            String brokerTopicPrefix = properties.getTopicPrefix();
+            if (brokerTopicPrefix != null) {
+                topicPrefix = brokerTopicPrefix;
+                resource.setTopicPrefix(brokerTopicPrefix);
+            }
+
             connectionSession.open(new AsyncResult() {
 
                 @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c6a5498d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
index c3b0297..de513f5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
@@ -30,8 +30,12 @@ import org.apache.qpid.proton.amqp.Symbol;
 public class AmqpConnectionProperties {
 
     public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
+    public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
+    public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
 
     private boolean anonymousRelaySupported = false;
+    private String queuePrefix = null;
+    private String topicPrefix = null;
 
     /**
      * Creates a new instance of this class from the given remote capabilities and properties.
@@ -61,10 +65,32 @@ public class AmqpConnectionProperties {
     }
 
     protected void processProperties(Map<Symbol, Object> properties) {
-        // TODO - Inspect properties for configuration options
+        if (properties.containsKey(QUEUE_PREFIX)) {
+            Object o = properties.get(QUEUE_PREFIX);
+            if (o instanceof String) {
+                queuePrefix = (String) o;
+            }
+        }
+
+        if (properties.containsKey(TOPIC_PREFIX)) {
+            Object o = properties.get(TOPIC_PREFIX);
+            if (o instanceof String) {
+                topicPrefix = (String) o;
+            }
+        }
+
+        // TODO - Inspect properties for any other configuration options
     }
 
     public boolean isAnonymousRelaySupported() {
         return anonymousRelaySupported;
     }
+
+    public String getQueuePrefix() {
+        return queuePrefix;
+    }
+
+    public String getTopicPrefix() {
+        return topicPrefix;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c6a5498d/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
index 6590fc2..a36307b 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
@@ -26,9 +26,12 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
 import java.util.Date;
 import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 
 import javax.jms.Connection;
@@ -44,6 +47,7 @@ import javax.jms.Topic;
 
 import org.apache.qpid.jms.JmsClientProperties;
 import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpConnectionProperties;
 import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
 import org.apache.qpid.jms.provider.amqp.message.AmqpMessageIdHelper;
 import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
@@ -652,6 +656,227 @@ public class MessageIntegrationTest extends QpidJmsTestCase
         }
     }
 
+    /**
+     * Tests that a connection with 'prefixes' set on it via broker-provided connection properties
+     * strips the prefix from the to/reply-to fields for incoming messages with Topic destinations.
+     */
+    @Test(timeout = 2000)
+    public void testReceivedMessageWithTopicDestinationsOnConnectionWithBrokerDefinedPrefixProperties() throws Exception {
+        Class<? extends Destination> destType = Topic.class;
+        String destPrefix = "t-broker-provided-prefix-";
+        String destName = "myTopic";
+        String replyName = "myReplyTopic";
+        String destAddress = destPrefix + destName;
+        String replyAddress = destPrefix + replyName;
+        String annotationName = AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME;
+        Byte annotationValue = AmqpDestinationHelper.TOPIC_TYPE;
+        String replyAnnotationName = AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME;
+        Byte replyAnnotationValue = AmqpDestinationHelper.TOPIC_TYPE;
+
+        doReceivedMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(destType, destPrefix, destName, replyName,
+                                                                        destAddress, replyAddress, annotationName,
+                                                                        annotationValue, replyAnnotationName, replyAnnotationValue);
+    }
+
+    /**
+     * Tests that a connection with 'prefixes' set on it via broker-provided connection properties
+     * strips the prefix from the to/reply-to fields for incoming messages with Queue destinations.
+     */
+    @Test(timeout = 2000)
+    public void testReceivedMessageWithQueueDestinationsOnConnectionWithBrokerDefinedPrefixProperties() throws Exception {
+        Class<? extends Destination> destType = Queue.class;
+        String destPrefix = "q-broker-provided-prefix-";
+        String destName = "myQueue";
+        String replyName = "myReplyQueue";
+        String destAddress = destPrefix + destName;
+        String replyAddress = destPrefix + replyName;
+        String annotationName = AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME;
+        Byte annotationValue = AmqpDestinationHelper.QUEUE_TYPE;
+        String replyAnnotationName = AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME;
+        Byte replyAnnotationValue = AmqpDestinationHelper.QUEUE_TYPE;
+
+        doReceivedMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(destType, destPrefix, destName, replyName,
+                                                                        destAddress, replyAddress, annotationName,
+                                                                        annotationValue, replyAnnotationName, replyAnnotationValue);
+    }
+
+    private void doReceivedMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(Class<? extends Destination> destType,
+                                                                                  String destPrefix,
+                                                                                  String destName,
+                                                                                  String replyName,
+                                                                                  String destAddress,
+                                                                                  String replyAddress,
+                                                                                  String annotationName,
+                                                                                  Object annotationValue,
+                                                                                  String replyAnnotationName,
+                                                                                  Object replyAnnotationValue) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
+            // Have the test peer provide the destination prefixes as connection properties
+            Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
+            properties.put(AmqpConnectionProperties.QUEUE_PREFIX, destPrefix);
+            properties.put(AmqpConnectionProperties.TOPIC_PREFIX, destPrefix);
+
+            Connection connection = testFixture.establishConnecton(testPeer, null, null, properties);
+            connection.start();
+
+            testPeer.expectBegin(true);
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Create the destination
+            Destination dest = null;
+            if (destType == Topic.class) {
+                dest= session.createTopic(destName);
+            } else if (destType == Queue.class) {
+                dest = session.createQueue(destName);
+            } else {
+                fail("non-temporary destination type set");
+            }
+
+            MessageAnnotationsDescribedType msgAnnotations = null;
+            if (annotationName != null || replyAnnotationName != null) {
+                msgAnnotations = new MessageAnnotationsDescribedType();
+                if (annotationName != null) {
+                    msgAnnotations.setSymbolKeyedAnnotation(annotationName, annotationValue);
+                }
+
+                if (replyAnnotationName != null) {
+                    msgAnnotations.setSymbolKeyedAnnotation(replyAnnotationName, replyAnnotationValue);
+                }
+            }
+
+            PropertiesDescribedType props = new PropertiesDescribedType();
+            props.setTo(destAddress);
+            props.setReplyTo(replyAddress);
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            SourceMatcher sourceMatcher = new SourceMatcher();
+            sourceMatcher.withAddress(equalTo(destAddress));
+
+            testPeer.expectReceiverAttach(notNullValue(), sourceMatcher);
+            testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, props, null, amqpValueNullContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            MessageConsumer messageConsumer = session.createConsumer(dest);
+            Message receivedMessage = messageConsumer.receive(1000);
+
+            testPeer.waitForAllHandlersToComplete(2000);
+            assertNotNull(receivedMessage);
+
+            Destination jmsDest = receivedMessage.getJMSDestination();
+            Destination jmsReplyTo = receivedMessage.getJMSReplyTo();
+
+            assertNotNull("Expected JMSDestination but got null", jmsDest);
+            assertNotNull("Expected JMSReplyTo but got null", jmsReplyTo);
+
+            // Verify destination/replyto names on received message
+            String recievedName = null;
+            String recievedReplyName = null;
+            if (destType == Topic.class) {
+                recievedName = ((Topic) jmsDest).getTopicName();
+                recievedReplyName = ((Topic) jmsReplyTo).getTopicName();
+            } else if (destType == Queue.class) {
+                recievedName = ((Queue) jmsDest).getQueueName();
+                recievedReplyName = ((Queue) jmsReplyTo).getQueueName();
+            }
+
+            assertEquals("Unexpected name for JMSDestination", destName, recievedName);
+            assertEquals("Unexpected name for JMSReplyTo", replyName, recievedReplyName);
+        }
+    }
+
+    /**
+     * Tests that the a connection with a 'queue prefix' set on it via broker-provided connection
+     * properties adds the prefix to the content of the to/reply-to fields for outgoing messages.
+     */
+    @Test(timeout = 2000)
+    public void testSendMessageWithQueueDestinationsOnConnectionWithBrokerDefinedPrefixProperties() throws Exception {
+        Class<? extends Destination> destType = Queue.class;
+        String destPrefix = "q-broker-provided-prefix-";
+        String destName = "myQueue";
+        String destAddress = destPrefix + destName;
+        Byte annotationValue = AmqpDestinationHelper.QUEUE_TYPE;
+
+        doSendMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(destType, destPrefix, destName, destAddress, annotationValue);
+    }
+
+    /**
+     * Tests that the a connection with a 'topic prefix' set on it via broker-provided connection
+     * properties adds the prefix to the content of the to/reply-to fields for outgoing messages.
+     */
+    @Test(timeout = 2000)
+    public void testSendMessageWithTopicDestinationsOnConnectionWithBrokerDefinedPrefixProperties() throws Exception {
+        Class<? extends Destination> destType = Topic.class;
+        String destPrefix = "t-broker-provided-prefix-";
+        String destName = "myTopic";
+        String destAddress = destPrefix + destName;
+        Byte annotationValue = AmqpDestinationHelper.TOPIC_TYPE;
+
+        doSendMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(destType, destPrefix, destName, destAddress, annotationValue);
+    }
+
+    private void doSendMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(Class<? extends Destination> destType,
+                                                             String destPrefix,
+                                                             String destName,
+                                                             String destAddress,
+                                                             Byte destTypeAnnotationValue) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
+            // Have the test peer provide the destination prefixes as connection properties
+            Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
+            properties.put(AmqpConnectionProperties.QUEUE_PREFIX, destPrefix);
+            properties.put(AmqpConnectionProperties.TOPIC_PREFIX, destPrefix);
+
+            Connection connection = testFixture.establishConnecton(testPeer, null, null, properties);
+
+            connection.start();
+
+            testPeer.expectBegin(true);
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Create the destination
+            Destination dest = null;
+            if (destType == Topic.class) {
+                dest = session.createTopic(destName);
+            } else if (destType == Queue.class) {
+                dest = session.createQueue(destName);
+            } else {
+                fail("non-temporary destination type set");
+            }
+
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo(destAddress));
+
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+
+            MessageProducer producer = session.createProducer(dest);
+
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+            msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME), equalTo(destTypeAnnotationValue));
+            msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME), equalTo(destTypeAnnotationValue));
+            MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
+            propsMatcher.withTo(equalTo(destAddress));
+            propsMatcher.withReplyTo(equalTo(destAddress));
+
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+            messageMatcher.setPropertiesMatcher(propsMatcher);
+
+            //TODO: currently we aren't sending any body section, decide if this is allowed
+            //messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
+            testPeer.expectTransfer(messageMatcher);
+
+            Message message = session.createMessage();
+            message.setJMSReplyTo(dest);
+
+            producer.send(message);
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
     // --- byte type annotation values --- //
 
     /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org