You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/12/22 17:39:53 UTC
[5/7] qpid-jms git commit: add support for queue+topic prefixes being
defined by the server via connection properties
add support for queue+topic prefixes being defined by the server via connection properties
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/c6a5498d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/c6a5498d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/c6a5498d
Branch: refs/heads/master
Commit: c6a5498d05a46141543c80bb6cf9618126617cd5
Parents: d4016cb
Author: Robert Gemmell <ro...@apache.org>
Authored: Mon Dec 22 14:26:57 2014 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Mon Dec 22 14:26:57 2014 +0000
----------------------------------------------------------------------
.../qpid/jms/provider/amqp/AmqpConnection.java | 12 +
.../provider/amqp/AmqpConnectionProperties.java | 28 ++-
.../jms/integration/MessageIntegrationTest.java | 225 +++++++++++++++++++
3 files changed, 264 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c6a5498d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index 1bc1f53..6423365 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -125,6 +125,18 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
this.properties = new AmqpConnectionProperties(
getEndpoint().getRemoteOfferedCapabilities(), getEndpoint().getRemoteProperties());
+ String brokerQueuePrefix = properties.getQueuePrefix();
+ if (brokerQueuePrefix != null) {
+ queuePrefix = brokerQueuePrefix;
+ resource.setQueuePrefix(brokerQueuePrefix);
+ }
+
+ String brokerTopicPrefix = properties.getTopicPrefix();
+ if (brokerTopicPrefix != null) {
+ topicPrefix = brokerTopicPrefix;
+ resource.setTopicPrefix(brokerTopicPrefix);
+ }
+
connectionSession.open(new AsyncResult() {
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c6a5498d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
index c3b0297..de513f5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
@@ -30,8 +30,12 @@ import org.apache.qpid.proton.amqp.Symbol;
public class AmqpConnectionProperties {
public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
+ public static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
+ public static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
private boolean anonymousRelaySupported = false;
+ private String queuePrefix = null;
+ private String topicPrefix = null;
/**
* Creates a new instance of this class from the given remote capabilities and properties.
@@ -61,10 +65,32 @@ public class AmqpConnectionProperties {
}
protected void processProperties(Map<Symbol, Object> properties) {
- // TODO - Inspect properties for configuration options
+ if (properties.containsKey(QUEUE_PREFIX)) {
+ Object o = properties.get(QUEUE_PREFIX);
+ if (o instanceof String) {
+ queuePrefix = (String) o;
+ }
+ }
+
+ if (properties.containsKey(TOPIC_PREFIX)) {
+ Object o = properties.get(TOPIC_PREFIX);
+ if (o instanceof String) {
+ topicPrefix = (String) o;
+ }
+ }
+
+ // TODO - Inspect properties for any other configuration options
}
public boolean isAnonymousRelaySupported() {
return anonymousRelaySupported;
}
+
+ public String getQueuePrefix() {
+ return queuePrefix;
+ }
+
+ public String getTopicPrefix() {
+ return topicPrefix;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c6a5498d/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
index 6590fc2..a36307b 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
@@ -26,9 +26,12 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
import java.util.Date;
import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
import javax.jms.Connection;
@@ -44,6 +47,7 @@ import javax.jms.Topic;
import org.apache.qpid.jms.JmsClientProperties;
import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpConnectionProperties;
import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
import org.apache.qpid.jms.provider.amqp.message.AmqpMessageIdHelper;
import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
@@ -652,6 +656,227 @@ public class MessageIntegrationTest extends QpidJmsTestCase
}
}
+ /**
+ * Tests that a connection with 'prefixes' set on it via broker-provided connection properties
+ * strips the prefix from the to/reply-to fields for incoming messages with Topic destinations.
+ */
+ @Test(timeout = 2000)
+ public void testReceivedMessageWithTopicDestinationsOnConnectionWithBrokerDefinedPrefixProperties() throws Exception {
+ Class<? extends Destination> destType = Topic.class;
+ String destPrefix = "t-broker-provided-prefix-";
+ String destName = "myTopic";
+ String replyName = "myReplyTopic";
+ String destAddress = destPrefix + destName;
+ String replyAddress = destPrefix + replyName;
+ String annotationName = AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME;
+ Byte annotationValue = AmqpDestinationHelper.TOPIC_TYPE;
+ String replyAnnotationName = AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME;
+ Byte replyAnnotationValue = AmqpDestinationHelper.TOPIC_TYPE;
+
+ doReceivedMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(destType, destPrefix, destName, replyName,
+ destAddress, replyAddress, annotationName,
+ annotationValue, replyAnnotationName, replyAnnotationValue);
+ }
+
+ /**
+ * Tests that a connection with 'prefixes' set on it via broker-provided connection properties
+ * strips the prefix from the to/reply-to fields for incoming messages with Queue destinations.
+ */
+ @Test(timeout = 2000)
+ public void testReceivedMessageWithQueueDestinationsOnConnectionWithBrokerDefinedPrefixProperties() throws Exception {
+ Class<? extends Destination> destType = Queue.class;
+ String destPrefix = "q-broker-provided-prefix-";
+ String destName = "myQueue";
+ String replyName = "myReplyQueue";
+ String destAddress = destPrefix + destName;
+ String replyAddress = destPrefix + replyName;
+ String annotationName = AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME;
+ Byte annotationValue = AmqpDestinationHelper.QUEUE_TYPE;
+ String replyAnnotationName = AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME;
+ Byte replyAnnotationValue = AmqpDestinationHelper.QUEUE_TYPE;
+
+ doReceivedMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(destType, destPrefix, destName, replyName,
+ destAddress, replyAddress, annotationName,
+ annotationValue, replyAnnotationName, replyAnnotationValue);
+ }
+
+ private void doReceivedMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(Class<? extends Destination> destType,
+ String destPrefix,
+ String destName,
+ String replyName,
+ String destAddress,
+ String replyAddress,
+ String annotationName,
+ Object annotationValue,
+ String replyAnnotationName,
+ Object replyAnnotationValue) throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
+ // Have the test peer provide the destination prefixes as connection properties
+ Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
+ properties.put(AmqpConnectionProperties.QUEUE_PREFIX, destPrefix);
+ properties.put(AmqpConnectionProperties.TOPIC_PREFIX, destPrefix);
+
+ Connection connection = testFixture.establishConnecton(testPeer, null, null, properties);
+ connection.start();
+
+ testPeer.expectBegin(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create the destination
+ Destination dest = null;
+ if (destType == Topic.class) {
+ dest= session.createTopic(destName);
+ } else if (destType == Queue.class) {
+ dest = session.createQueue(destName);
+ } else {
+ fail("non-temporary destination type set");
+ }
+
+ MessageAnnotationsDescribedType msgAnnotations = null;
+ if (annotationName != null || replyAnnotationName != null) {
+ msgAnnotations = new MessageAnnotationsDescribedType();
+ if (annotationName != null) {
+ msgAnnotations.setSymbolKeyedAnnotation(annotationName, annotationValue);
+ }
+
+ if (replyAnnotationName != null) {
+ msgAnnotations.setSymbolKeyedAnnotation(replyAnnotationName, replyAnnotationValue);
+ }
+ }
+
+ PropertiesDescribedType props = new PropertiesDescribedType();
+ props.setTo(destAddress);
+ props.setReplyTo(replyAddress);
+ DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+ SourceMatcher sourceMatcher = new SourceMatcher();
+ sourceMatcher.withAddress(equalTo(destAddress));
+
+ testPeer.expectReceiverAttach(notNullValue(), sourceMatcher);
+ testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, props, null, amqpValueNullContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ MessageConsumer messageConsumer = session.createConsumer(dest);
+ Message receivedMessage = messageConsumer.receive(1000);
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ assertNotNull(receivedMessage);
+
+ Destination jmsDest = receivedMessage.getJMSDestination();
+ Destination jmsReplyTo = receivedMessage.getJMSReplyTo();
+
+ assertNotNull("Expected JMSDestination but got null", jmsDest);
+ assertNotNull("Expected JMSReplyTo but got null", jmsReplyTo);
+
+ // Verify destination/replyto names on received message
+ String recievedName = null;
+ String recievedReplyName = null;
+ if (destType == Topic.class) {
+ recievedName = ((Topic) jmsDest).getTopicName();
+ recievedReplyName = ((Topic) jmsReplyTo).getTopicName();
+ } else if (destType == Queue.class) {
+ recievedName = ((Queue) jmsDest).getQueueName();
+ recievedReplyName = ((Queue) jmsReplyTo).getQueueName();
+ }
+
+ assertEquals("Unexpected name for JMSDestination", destName, recievedName);
+ assertEquals("Unexpected name for JMSReplyTo", replyName, recievedReplyName);
+ }
+ }
+
+ /**
+ * Tests that the a connection with a 'queue prefix' set on it via broker-provided connection
+ * properties adds the prefix to the content of the to/reply-to fields for outgoing messages.
+ */
+ @Test(timeout = 2000)
+ public void testSendMessageWithQueueDestinationsOnConnectionWithBrokerDefinedPrefixProperties() throws Exception {
+ Class<? extends Destination> destType = Queue.class;
+ String destPrefix = "q-broker-provided-prefix-";
+ String destName = "myQueue";
+ String destAddress = destPrefix + destName;
+ Byte annotationValue = AmqpDestinationHelper.QUEUE_TYPE;
+
+ doSendMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(destType, destPrefix, destName, destAddress, annotationValue);
+ }
+
+ /**
+ * Tests that the a connection with a 'topic prefix' set on it via broker-provided connection
+ * properties adds the prefix to the content of the to/reply-to fields for outgoing messages.
+ */
+ @Test(timeout = 2000)
+ public void testSendMessageWithTopicDestinationsOnConnectionWithBrokerDefinedPrefixProperties() throws Exception {
+ Class<? extends Destination> destType = Topic.class;
+ String destPrefix = "t-broker-provided-prefix-";
+ String destName = "myTopic";
+ String destAddress = destPrefix + destName;
+ Byte annotationValue = AmqpDestinationHelper.TOPIC_TYPE;
+
+ doSendMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(destType, destPrefix, destName, destAddress, annotationValue);
+ }
+
+ private void doSendMessageOnConnectionWithBrokerDefinedPrefixPropertiesTestImpl(Class<? extends Destination> destType,
+ String destPrefix,
+ String destName,
+ String destAddress,
+ Byte destTypeAnnotationValue) throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
+ // Have the test peer provide the destination prefixes as connection properties
+ Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
+ properties.put(AmqpConnectionProperties.QUEUE_PREFIX, destPrefix);
+ properties.put(AmqpConnectionProperties.TOPIC_PREFIX, destPrefix);
+
+ Connection connection = testFixture.establishConnecton(testPeer, null, null, properties);
+
+ connection.start();
+
+ testPeer.expectBegin(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create the destination
+ Destination dest = null;
+ if (destType == Topic.class) {
+ dest = session.createTopic(destName);
+ } else if (destType == Queue.class) {
+ dest = session.createQueue(destName);
+ } else {
+ fail("non-temporary destination type set");
+ }
+
+ TargetMatcher targetMatcher = new TargetMatcher();
+ targetMatcher.withAddress(equalTo(destAddress));
+
+ testPeer.expectSenderAttach(targetMatcher, false, false);
+
+ MessageProducer producer = session.createProducer(dest);
+
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpDestinationHelper.JMS_DEST_TYPE_MSG_ANNOTATION_SYMBOL_NAME), equalTo(destTypeAnnotationValue));
+ msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpDestinationHelper.JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME), equalTo(destTypeAnnotationValue));
+ MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
+ propsMatcher.withTo(equalTo(destAddress));
+ propsMatcher.withReplyTo(equalTo(destAddress));
+
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propsMatcher);
+
+ //TODO: currently we aren't sending any body section, decide if this is allowed
+ //messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
+ testPeer.expectTransfer(messageMatcher);
+
+ Message message = session.createMessage();
+ message.setJMSReplyTo(dest);
+
+ producer.send(message);
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
+
// --- byte type annotation values --- //
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org