You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/05/15 07:52:40 UTC
[1/2] qpid-broker-j git commit: QPID-8139: [Broker-J][AMQP 1.0] Make
sure that selector filter can handle JMSMessageID and JMSCorrelationID values
with prefixes as defined in AMQP JMS mapping specification
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 2fdbde270 -> 2d85f73e6
QPID-8139: [Broker-J][AMQP 1.0] Make sure that selector filter can handle JMSMessageID and JMSCorrelationID values with prefixes as defined in AMQP JMS mapping specification
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/86b31afb
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/86b31afb
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/86b31afb
Branch: refs/heads/master
Commit: 86b31afbc98bbd980c2566d0fd48f72e30fe5c69
Parents: 2fdbde2
Author: Alex Rudyy <or...@apache.org>
Authored: Fri Apr 20 17:46:46 2018 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Tue May 15 08:38:36 2018 +0100
----------------------------------------------------------------------
.../qpid/server/exchange/HeadersBinding.java | 2 +-
.../apache/qpid/server/filter/Filterable.java | 8 +-
.../qpid/server/filter/JMSSelectorFilter.java | 11 +-
.../protocol/v1_0/SendingLinkEndpoint.java | 5 +-
.../v1_0/selector/AmqpMessageIdHelper.java | 391 +++++++++++++++++++
.../selector/JMSMessagePropertyExpression.java | 162 ++++++++
.../JMSMessagePropertyExpressionTest.java | 377 ++++++++++++++++++
.../qpid/tests/protocol/v1_0/Interaction.java | 8 +
.../tests/protocol/v1_0/MessageDecoder.java | 11 +
.../JMSMessagePropertiesFilterTest.java | 356 +++++++++++++++++
10 files changed, 1321 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/86b31afb/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
index 5fb5329..3d305d6 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
@@ -138,7 +138,7 @@ class HeadersBinding
public boolean matches(Filterable message)
{
- return matches(message.getMessageHeader()) && (_filter == null || _filter.allAllow(message));
+ return matches(message.getServerMessage().getMessageHeader()) && (_filter == null || _filter.allAllow(message));
}
private boolean and(AMQMessageHeader headers)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/86b31afb/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java b/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
index a15bdc1..ed64029 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/filter/Filterable.java
@@ -20,14 +20,12 @@
*/
package org.apache.qpid.server.filter;
-import org.apache.qpid.server.filter.FilterableMessage;
-import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
public interface Filterable extends FilterableMessage
{
- AMQMessageHeader getMessageHeader();
+ ServerMessage<?> getServerMessage();
@Override
boolean isPersistent();
@@ -50,9 +48,9 @@ public interface Filterable extends FilterableMessage
{
@Override
- public AMQMessageHeader getMessageHeader()
+ public ServerMessage<?> getServerMessage()
{
- return message.getMessageHeader();
+ return message;
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/86b31afb/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
index 3e85861..5ba3da8 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java
@@ -39,9 +39,16 @@ public class JMSSelectorFilter implements MessageFilter
public JMSSelectorFilter(String selector) throws ParseException, TokenMgrError, SelectorParsingException
{
+ this(selector, JMSMessagePropertyExpression.FACTORY);
+ }
+
+ public JMSSelectorFilter(String selector,
+ PropertyExpressionFactory<? extends FilterableMessage> propertyExpressionFactory)
+ throws ParseException, TokenMgrError, SelectorParsingException
+ {
_selector = selector;
- SelectorParser<FilterableMessage> selectorParser = new SelectorParser<>();
- selectorParser.setPropertyExpressionFactory(JMSMessagePropertyExpression.FACTORY);
+ SelectorParser selectorParser = new SelectorParser<>();
+ selectorParser.setPropertyExpressionFactory(propertyExpressionFactory);
_matcher = selectorParser.parse(selector);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/86b31afb/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index a69b96a..0bd9b4b 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -51,6 +51,7 @@ import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.NotFoundException;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.LinkModel;
+import org.apache.qpid.server.protocol.v1_0.selector.JMSMessagePropertyExpression;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
@@ -170,8 +171,8 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter selectorFilter = (org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) entry.getValue();
try
{
- messageFilter = new JMSSelectorFilter(selectorFilter.getValue());
-
+ messageFilter = new JMSSelectorFilter(selectorFilter.getValue(),
+ JMSMessagePropertyExpression.FACTORY);
actualFilters.put(entry.getKey(), entry.getValue());
}
catch (ParseException | SelectorParsingException | TokenMgrError e)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/86b31afb/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/AmqpMessageIdHelper.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/AmqpMessageIdHelper.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/AmqpMessageIdHelper.java
new file mode 100644
index 0000000..75f05ad
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/AmqpMessageIdHelper.java
@@ -0,0 +1,391 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.selector;
+
+//
+// Based on like named file from 3c8d67a6eee38934d233f39b2d614ac4c28b39be of the Apache Qpid JMS <https://git-wip-us.apache.org/repos/asf/qpid-jms.git>
+//
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.apache.qpid.server.protocol.converter.MessageConversionException;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
+
+
+/**
+ * Helper class for identifying and converting message-id and correlation-id values between
+ * the AMQP types and the Strings values used by JMS.
+ * <p>
+ * <p>AMQP messages allow for 4 types of message-id/correlation-id: message-id-string, message-id-binary,
+ * message-id-uuid, or message-id-ulong. In order to accept or return a string representation of these
+ * for interoperability with other AMQP clients, the following encoding can be used after removing or
+ * before adding the "ID:" prefix used for a JMSMessageID value:<br>
+ * <p>
+ * {@literal "AMQP_BINARY:<hex representation of binary content>"}<br>
+ * {@literal "AMQP_UUID:<string representation of uuid>"}<br>
+ * {@literal "AMQP_ULONG:<string representation of ulong>"}<br>
+ * {@literal "AMQP_STRING:<string>"}<br>
+ * <p>
+ * <p>The AMQP_STRING encoding exists only for escaping message-id-string values that happen to begin
+ * with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used otherwise.
+ * <p>
+ * <p>When provided a string for conversion which attempts to identify itself as an encoded binary, uuid, or
+ * ulong but can't be converted into the indicated format, an exception will be thrown.
+ */
+public class AmqpMessageIdHelper
+{
+ public static final AmqpMessageIdHelper INSTANCE = new AmqpMessageIdHelper();
+
+ public static final String AMQP_STRING_PREFIX = "AMQP_STRING:";
+ public static final String AMQP_UUID_PREFIX = "AMQP_UUID:";
+ public static final String AMQP_ULONG_PREFIX = "AMQP_ULONG:";
+ public static final String AMQP_BINARY_PREFIX = "AMQP_BINARY:";
+ public static final String AMQP_NO_PREFIX = "AMQP_NO_PREFIX:";
+ public static final String JMS_ID_PREFIX = "ID:";
+
+ private static final String AMQP_PREFIX = "AMQP_";
+ private static final int JMS_ID_PREFIX_LENGTH = JMS_ID_PREFIX.length();
+ private static final int AMQP_UUID_PREFIX_LENGTH = AMQP_UUID_PREFIX.length();
+ private static final int AMQP_ULONG_PREFIX_LENGTH = AMQP_ULONG_PREFIX.length();
+ private static final int AMQP_STRING_PREFIX_LENGTH = AMQP_STRING_PREFIX.length();
+ private static final int AMQP_BINARY_PREFIX_LENGTH = AMQP_BINARY_PREFIX.length();
+ private static final int AMQP_NO_PREFIX_LENGTH = AMQP_NO_PREFIX.length();
+ private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray();
+
+ /**
+ * Checks whether the given string begins with "ID:" prefix used to denote a JMSMessageID
+ *
+ * @param string the string to check
+ * @return true if and only id the string begins with "ID:"
+ */
+ public boolean hasMessageIdPrefix(String string)
+ {
+ if (string == null)
+ {
+ return false;
+ }
+
+ return string.startsWith(JMS_ID_PREFIX);
+ }
+
+ public String toMessageIdString(Object idObject)
+ {
+ if (idObject instanceof String)
+ {
+ final String stringId = (String) idObject;
+
+ boolean hasMessageIdPrefix = hasMessageIdPrefix(stringId);
+ if (!hasMessageIdPrefix)
+ {
+ // For JMSMessageID, has no "ID:" prefix, we need to record
+ // that for later use as a JMSCorrelationID.
+ return JMS_ID_PREFIX + AMQP_NO_PREFIX + stringId;
+ }
+ else if (hasTypeEncodingPrefix(stringId, JMS_ID_PREFIX_LENGTH))
+ {
+ // We are for a JMSMessageID value, but have 'ID:' followed by
+ // one of the encoding prefixes. Need to escape the entire string
+ // to preserve for later re-use as a JMSCorrelationID.
+ return JMS_ID_PREFIX + AMQP_STRING_PREFIX + stringId;
+ }
+ else
+ {
+ // It has "ID:" prefix and doesn't have encoding prefix, use it as-is.
+ return stringId;
+ }
+ }
+ else
+ {
+ // Not a string, convert it
+ return convertToIdString(idObject);
+ }
+ }
+
+ public String toCorrelationIdString(Object idObject)
+ {
+
+ if (idObject instanceof String)
+ {
+ final String stringId = (String) idObject;
+
+ boolean hasMessageIdPrefix = hasMessageIdPrefix(stringId);
+ if (!hasMessageIdPrefix)
+ {
+ // For JMSCorrelationID, has no "ID:" prefix, use it as-is.
+ return stringId;
+ }
+ else if (hasTypeEncodingPrefix(stringId, JMS_ID_PREFIX_LENGTH))
+ {
+ // We are for a JMSCorrelationID value, but have 'ID:' followed by
+ // one of the encoding prefixes. Need to escape the entire string
+ // to preserve for later re-use as a JMSCorrelationID.
+ return JMS_ID_PREFIX + AMQP_STRING_PREFIX + stringId;
+ }
+ else
+ {
+ // It has "ID:" prefix and doesn't have encoding prefix, use it as-is.
+ return stringId;
+ }
+ }
+ else
+ {
+ // Not a string, convert it
+ return convertToIdString(idObject);
+ }
+ }
+
+ /**
+ * Takes the provided non-String AMQP message-id/correlation-id object, and
+ * convert it it to a String usable as either a JMSMessageID or JMSCorrelationID
+ * value, encoding the type information as a prefix to convey for later use
+ * in reversing the process if used to set JMSCorrelationID on a message.
+ *
+ * @param idObject the object to process
+ * @return string to be used for the actual JMS ID.
+ */
+ private String convertToIdString(Object idObject)
+ {
+ if (idObject == null)
+ {
+ return null;
+ }
+
+ if (idObject instanceof UUID)
+ {
+ return JMS_ID_PREFIX + AMQP_UUID_PREFIX + idObject.toString();
+ }
+ else if (idObject instanceof UnsignedLong)
+ {
+ return JMS_ID_PREFIX + AMQP_ULONG_PREFIX + idObject.toString();
+ }
+ else if (idObject instanceof Binary)
+ {
+ ByteBuffer dup = ((Binary) idObject).asByteBuffer();
+
+ byte[] bytes = new byte[dup.remaining()];
+ dup.get(bytes);
+
+ String hex = convertBinaryToHexString(bytes);
+
+ return JMS_ID_PREFIX + AMQP_BINARY_PREFIX + hex;
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unsupported type provided: " + idObject.getClass());
+ }
+ }
+
+ private boolean hasTypeEncodingPrefix(String stringId, int offset)
+ {
+ if (!stringId.startsWith(AMQP_PREFIX, offset))
+ {
+ return false;
+ }
+
+ return hasAmqpBinaryPrefix(stringId, offset) ||
+ hasAmqpUuidPrefix(stringId, offset) ||
+ hasAmqpUlongPrefix(stringId, offset) ||
+ hasAmqpStringPrefix(stringId, offset) ||
+ hasAmqpNoPrefix(stringId, offset);
+ }
+
+ private boolean hasAmqpStringPrefix(String stringId, int offset)
+ {
+ return stringId.startsWith(AMQP_STRING_PREFIX, offset);
+ }
+
+ private boolean hasAmqpUlongPrefix(String stringId, int offset)
+ {
+ return stringId.startsWith(AMQP_ULONG_PREFIX, offset);
+ }
+
+ private boolean hasAmqpUuidPrefix(String stringId, int offset)
+ {
+ return stringId.startsWith(AMQP_UUID_PREFIX, offset);
+ }
+
+ private boolean hasAmqpBinaryPrefix(String stringId, int offset)
+ {
+ return stringId.startsWith(AMQP_BINARY_PREFIX, offset);
+ }
+
+ private boolean hasAmqpNoPrefix(String stringId, int offset)
+ {
+ return stringId.startsWith(AMQP_NO_PREFIX, offset);
+ }
+
+ /**
+ * Takes the provided id string and return the appropriate amqp messageId style object.
+ * Converts the type based on any relevant encoding information found as a prefix.
+ *
+ * @param origId the object to be converted
+ * @return the amqp messageId style object
+ * @throws public class MessageConversionException extends RuntimeException
+ * if the provided baseId String indicates an encoded type but can't be converted to that type.
+ */
+ public Object toIdObject(final String origId) throws MessageConversionException
+ {
+ if (origId == null)
+ {
+ return null;
+ }
+
+ if (!AmqpMessageIdHelper.INSTANCE.hasMessageIdPrefix(origId))
+ {
+ // We have a string without any "ID:" prefix, it is an
+ // application-specific String, use it as-is.
+ return origId;
+ }
+
+ try
+ {
+ if (hasAmqpNoPrefix(origId, JMS_ID_PREFIX_LENGTH))
+ {
+ // Prefix telling us there was originally no "ID:" prefix,
+ // strip it and return the remainder
+ return origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_NO_PREFIX_LENGTH);
+ }
+ else if (hasAmqpUuidPrefix(origId, JMS_ID_PREFIX_LENGTH))
+ {
+ String uuidString = origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_UUID_PREFIX_LENGTH);
+ return UUID.fromString(uuidString);
+ }
+ else if (hasAmqpUlongPrefix(origId, JMS_ID_PREFIX_LENGTH))
+ {
+ String ulongString = origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_ULONG_PREFIX_LENGTH);
+ return UnsignedLong.valueOf(ulongString);
+ }
+ else if (hasAmqpStringPrefix(origId, JMS_ID_PREFIX_LENGTH))
+ {
+ return origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_STRING_PREFIX_LENGTH);
+ }
+ else if (hasAmqpBinaryPrefix(origId, JMS_ID_PREFIX_LENGTH))
+ {
+ String hexString = origId.substring(JMS_ID_PREFIX_LENGTH + AMQP_BINARY_PREFIX_LENGTH);
+ byte[] bytes = convertHexStringToBinary(hexString);
+ return new Binary(bytes);
+ }
+ else
+ {
+ // We have a string without any encoding prefix needing processed,
+ // so transmit it as-is, including the "ID:"
+ return origId;
+ }
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new MessageConversionException("Unable to convert ID value", e);
+ }
+ }
+
+ /**
+ * Convert the provided hex-string into a binary representation where each byte represents
+ * two characters of the hex string.
+ * <p>
+ * The hex characters may be upper or lower case.
+ *
+ * @param hexString string to convert
+ * @return a byte array containing the binary representation
+ * @throws IllegalArgumentException if the provided String is a non-even length or contains non-hex characters
+ */
+ public byte[] convertHexStringToBinary(String hexString) throws IllegalArgumentException
+ {
+ int length = hexString.length();
+
+ // As each byte needs two characters in the hex encoding, the string must be an even length.
+ if (length % 2 != 0)
+ {
+ throw new IllegalArgumentException("The provided hex String must be an even length, but was of length "
+ + length
+ + ": "
+ + hexString);
+ }
+
+ byte[] binary = new byte[length / 2];
+
+ for (int i = 0; i < length; i += 2)
+ {
+ char highBitsChar = hexString.charAt(i);
+ char lowBitsChar = hexString.charAt(i + 1);
+
+ int highBits = hexCharToInt(highBitsChar, hexString) << 4;
+ int lowBits = hexCharToInt(lowBitsChar, hexString);
+
+ binary[i / 2] = (byte) (highBits + lowBits);
+ }
+
+ return binary;
+ }
+
+ private int hexCharToInt(char ch, String orig) throws IllegalArgumentException
+ {
+ if (ch >= '0' && ch <= '9')
+ {
+ // subtract '0' to get difference in position as an int
+ return ch - '0';
+ }
+ else if (ch >= 'A' && ch <= 'F')
+ {
+ // subtract 'A' to get difference in position as an int
+ // and then add 10 for the offset of 'A'
+ return ch - 'A' + 10;
+ }
+ else if (ch >= 'a' && ch <= 'f')
+ {
+ // subtract 'a' to get difference in position as an int
+ // and then add 10 for the offset of 'a'
+ return ch - 'a' + 10;
+ }
+
+ throw new IllegalArgumentException("The provided hex string contains non-hex character '" + ch + "': " + orig);
+ }
+
+ /**
+ * Convert the provided binary into a hex-string representation where each character
+ * represents 4 bits of the provided binary, i.e each byte requires two characters.
+ * <p>
+ * The returned hex characters are upper-case.
+ *
+ * @param bytes binary to convert
+ * @return a String containing a hex representation of the bytes
+ */
+ public String convertBinaryToHexString(byte[] bytes)
+ {
+ // Each byte is represented as 2 chars
+ StringBuilder builder = new StringBuilder(bytes.length * 2);
+
+ for (byte b : bytes)
+ {
+ // The byte will be expanded to int before shifting, replicating the
+ // sign bit, so mask everything beyond the first 4 bits afterwards
+ int highBitsInt = (b >> 4) & 0xF;
+ // We only want the first 4 bits
+ int lowBitsInt = b & 0xF;
+
+ builder.append(HEX_CHARS[highBitsInt]);
+ builder.append(HEX_CHARS[lowBitsInt]);
+ }
+
+ return builder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/86b31afb/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpression.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpression.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpression.java
new file mode 100644
index 0000000..07b6441
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpression.java
@@ -0,0 +1,162 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.selector;
+
+import java.util.HashMap;
+
+import org.apache.qpid.server.filter.Expression;
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.filter.PropertyExpression;
+import org.apache.qpid.server.filter.PropertyExpressionFactory;
+import org.apache.qpid.server.protocol.v1_0.Message_1_0;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
+
+public class JMSMessagePropertyExpression implements PropertyExpression<Filterable>
+{
+ public static final PropertyExpressionFactory<Filterable> FACTORY = JMSMessagePropertyExpression::new;
+
+ static final String JMS_MESSAGE_ID = "JMSMessageID";
+ static final String JMS_CORRELATION_ID = "JMSCorrelationID";
+ static final String JMS_DESTINATION = "JMSDestination";
+ static final String JMS_REPLY_TO = "JMSReplyTo";
+ static final String JMS_TYPE = "JMSType";
+ static final String JMS_DELIVERY_MODE = "JMSDeliveryMode";
+ static final String JMS_PRIORITY = "JMSPriority";
+ static final String JMS_TIMESTAMP = "JMSTimestamp";
+ static final String JMS_EXPIRATION = "JMSExpiration";
+ static final String JMS_REDELIVERED = "JMSRedelivered";
+
+ private static final HashMap<String, Expression> JMS_PROPERTY_EXPRESSIONS = new HashMap<>();
+
+ static
+ {
+ JMS_PROPERTY_EXPRESSIONS.put(JMS_DESTINATION,
+ (Expression<Filterable>) message -> message.getServerMessage().getTo());
+ JMS_PROPERTY_EXPRESSIONS.put(JMS_REPLY_TO, (Expression<Filterable>) Filterable::getReplyTo);
+ JMS_PROPERTY_EXPRESSIONS.put(JMS_TYPE, (Expression<Filterable>) Filterable::getType);
+ JMS_PROPERTY_EXPRESSIONS.put(JMS_DELIVERY_MODE,
+ (Expression<Filterable>) message -> (message.isPersistent()
+ ? JMSDeliveryMode.PERSISTENT
+ : JMSDeliveryMode.NON_PERSISTENT).toString());
+ JMS_PROPERTY_EXPRESSIONS.put(JMS_PRIORITY, (Expression<Filterable>) message -> (int) message.getPriority());
+ JMS_PROPERTY_EXPRESSIONS.put(JMS_MESSAGE_ID, new MessageIDExpression());
+ JMS_PROPERTY_EXPRESSIONS.put(JMS_TIMESTAMP, (Expression<Filterable>) Filterable::getTimestamp);
+ JMS_PROPERTY_EXPRESSIONS.put(JMS_CORRELATION_ID, new CorrelationIdExpression());
+ JMS_PROPERTY_EXPRESSIONS.put(JMS_EXPIRATION, (Expression<Filterable>) Filterable::getExpiration);
+ JMS_PROPERTY_EXPRESSIONS.put(JMS_REDELIVERED, (Expression<Filterable>) Filterable::isRedelivered);
+ }
+
+ private final String name;
+ private final Expression jmsPropertyExpression;
+
+ private JMSMessagePropertyExpression(String name)
+ {
+ this.name = name;
+ jmsPropertyExpression = JMS_PROPERTY_EXPRESSIONS.get(name);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Object evaluate(Filterable message)
+ {
+ if (jmsPropertyExpression != null)
+ {
+ return jmsPropertyExpression.evaluate(message);
+ }
+ else
+ {
+ return message.getHeader(name);
+ }
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ public String toString()
+ {
+ return name;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return name.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ return (o != null)
+ && this.getClass().equals(o.getClass())
+ && name.equals(((JMSMessagePropertyExpression) o).name);
+ }
+
+ private static class MessageIDExpression implements Expression<Filterable>
+ {
+ @Override
+ public Object evaluate(Filterable message)
+ {
+ Message_1_0 original = (Message_1_0) message.getServerMessage();
+ PropertiesSection propertiesSection = original.getPropertiesSection();
+ String id = null;
+ if (propertiesSection != null)
+ {
+ Properties properties = propertiesSection.getValue();
+ if (properties != null)
+ {
+ id = AmqpMessageIdHelper.INSTANCE.toMessageIdString(properties.getMessageId());
+ }
+ }
+ return id;
+ }
+ }
+
+ private static class CorrelationIdExpression implements Expression<Filterable>
+ {
+ @Override
+ public Object evaluate(Filterable message)
+ {
+
+ Message_1_0 original = (Message_1_0) message.getServerMessage();
+ PropertiesSection propertiesSection = original.getPropertiesSection();
+ String id = null;
+ if (propertiesSection != null)
+ {
+ Properties properties = propertiesSection.getValue();
+ if (properties != null)
+ {
+ id = AmqpMessageIdHelper.INSTANCE.toCorrelationIdString(properties.getCorrelationId());
+ }
+ }
+ return id;
+ }
+ }
+
+ // Constants - defined the same as JMS
+ enum JMSDeliveryMode
+ {
+ NON_PERSISTENT, PERSISTENT
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/86b31afb/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpressionTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpressionTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpressionTest.java
new file mode 100644
index 0000000..27c3c6e
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/selector/JMSMessagePropertyExpressionTest.java
@@ -0,0 +1,377 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.selector;
+
+import static org.apache.qpid.server.protocol.v1_0.selector.AmqpMessageIdHelper.AMQP_BINARY_PREFIX;
+import static org.apache.qpid.server.protocol.v1_0.selector.AmqpMessageIdHelper.AMQP_NO_PREFIX;
+import static org.apache.qpid.server.protocol.v1_0.selector.AmqpMessageIdHelper.AMQP_STRING_PREFIX;
+import static org.apache.qpid.server.protocol.v1_0.selector.AmqpMessageIdHelper.AMQP_ULONG_PREFIX;
+import static org.apache.qpid.server.protocol.v1_0.selector.AmqpMessageIdHelper.AMQP_UUID_PREFIX;
+import static org.apache.qpid.server.protocol.v1_0.selector.AmqpMessageIdHelper.JMS_ID_PREFIX;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.UUID;
+
+import org.junit.Test;
+
+import org.apache.qpid.server.filter.Filterable;
+import org.apache.qpid.server.filter.PropertyExpression;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.v1_0.Message_1_0;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
+
+public class JMSMessagePropertyExpressionTest
+{
+
+ @Test
+ public void evaluateJMSMessageIDForStringWithIDPrefix()
+ {
+ final String id = "ID:testID";
+ final Filterable message = createFilterableWithMessageId(id);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_MESSAGE_ID);
+ Object value = expression.evaluate(message);
+
+ assertThat(value, is(equalTo(id)));
+ }
+
+ @Test
+ public void evaluateJMSMessageIDForStringWithoutIDPrefix()
+ {
+ final String id = "testID";
+ final Filterable message = createFilterableWithMessageId(id);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_MESSAGE_ID);
+ Object value = expression.evaluate(message);
+
+ assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_NO_PREFIX + id)));
+ }
+
+ @Test
+ public void evaluateJMSMessageIDForUUID()
+ {
+ final UUID id = UUID.randomUUID();
+ final Filterable message = createFilterableWithMessageId(id);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_MESSAGE_ID);
+ Object value = expression.evaluate(message);
+
+ assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_UUID_PREFIX + id)));
+ }
+
+ @Test
+ public void evaluateJMSMessageIDForUnsignedLong()
+ {
+ final UnsignedLong id = UnsignedLong.valueOf(42);
+ final Filterable message = createFilterableWithMessageId(id);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_MESSAGE_ID);
+ Object value = expression.evaluate(message);
+
+ assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_ULONG_PREFIX + id)));
+ }
+
+ @Test
+ public void evaluateJMSMessageIDForBinary()
+ {
+ byte[] data = {1, 2, 3};
+ final Binary id = new Binary(data);
+ final Filterable message = createFilterableWithMessageId(id);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_MESSAGE_ID);
+ Object value = expression.evaluate(message);
+
+ assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_BINARY_PREFIX + AmqpMessageIdHelper.INSTANCE.convertBinaryToHexString(data))));
+ }
+
+ @Test
+ public void evaluateJMSMessageIDForStringWithAMQPBindingPrefixes()
+ {
+ final String id = "ID:AMQP_ULONG:string-id";
+ final Filterable message = createFilterableWithMessageId(id);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_MESSAGE_ID);
+ Object value = expression.evaluate(message);
+
+ assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_STRING_PREFIX + id)));
+ }
+
+ @Test
+ public void evaluateJMSCorrelationIDForStringWithIDPrefix()
+ {
+ final String id = "ID:testID";
+ final Filterable message = createFilterableWithCorrelationId(id);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_CORRELATION_ID);
+ Object value = expression.evaluate(message);
+
+ assertThat(value, is(equalTo(id)));
+ }
+
+ @Test
+ public void evaluateJMSCorrelationIDForStringWithoutIDPrefix()
+ {
+ final String id = "testID";
+ final Filterable message = createFilterableWithCorrelationId(id);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_CORRELATION_ID);
+ Object value = expression.evaluate(message);
+
+ assertThat(value, is(equalTo(id)));
+ }
+
+ @Test
+ public void evaluateJMSCorrelationIDForUUID()
+ {
+ final UUID id = UUID.randomUUID();
+ final Filterable message = createFilterableWithCorrelationId(id);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_CORRELATION_ID);
+ Object value = expression.evaluate(message);
+
+ assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_UUID_PREFIX + id)));
+ }
+
+ @Test
+ public void evaluateJMSCorrelationIDForUnsignedLong()
+ {
+ final UnsignedLong id = UnsignedLong.valueOf(42);
+ final Filterable message = createFilterableWithCorrelationId(id);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_CORRELATION_ID);
+ Object value = expression.evaluate(message);
+
+ assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_ULONG_PREFIX + id)));
+ }
+
+ @Test
+ public void evaluateJMSCorrelationIDForBinary()
+ {
+ byte[] data = {1, 2, 3};
+ final Binary id = new Binary(data);
+ final Filterable message = createFilterableWithCorrelationId(id);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_CORRELATION_ID);
+ Object value = expression.evaluate(message);
+
+ assertThat(value,
+ is(equalTo(JMS_ID_PREFIX
+ + AMQP_BINARY_PREFIX
+ + AmqpMessageIdHelper.INSTANCE.convertBinaryToHexString(data))));
+ }
+
+ @Test
+ public void evaluateJMSCorrelationIDForStringWithAMQPBindingPrefixes()
+ {
+ final String id = "ID:AMQP_ULONG:string-id";
+ final Filterable message = createFilterableWithCorrelationId(id);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_CORRELATION_ID);
+ Object value = expression.evaluate(message);
+
+ assertThat(value, is(equalTo(JMS_ID_PREFIX + AMQP_STRING_PREFIX + id)));
+ }
+
+ @Test
+ public void evaluateJMSCorrelationIDForStringWithAMQPTypePrefixes()
+ {
+ final String id = "AMQP_ULONG:foo";
+ final Filterable message = createFilterableWithCorrelationId(id);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_CORRELATION_ID);
+ Object value = expression.evaluate(message);
+
+ assertThat(value, is(equalTo(id)));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void evaluateJMSDestination()
+ {
+ final String destinationName = "testDestination";
+ final Message_1_0 message_1_0 = mock(Message_1_0.class);
+ when(message_1_0.getTo()).thenReturn(destinationName);
+ final Filterable message = mock(Filterable.class);
+ when(message.getServerMessage()).thenReturn((ServerMessage) message_1_0);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_DESTINATION);
+ Object value = expression.evaluate(message);
+
+ assertThat(value, is(equalTo(destinationName)));
+ }
+
+ @Test
+ public void evaluateJMSReplyTo()
+ {
+ final String replyTo = "testReplyTo";
+ final Filterable message = mock(Filterable.class);
+ when(message.getReplyTo()).thenReturn(replyTo);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_REPLY_TO);
+ Object value = expression.evaluate(message);
+
+ assertThat(value, is(equalTo(replyTo)));
+ }
+
+ @Test
+ public void evaluateJMSType()
+ {
+ final String type = "testType";
+ final Filterable message = mock(Filterable.class);
+ when(message.getType()).thenReturn(type);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_TYPE);
+ Object value = expression.evaluate(message);
+
+ assertThat(value, is(equalTo(type)));
+ }
+
+ @Test
+ public void evaluateJMSDeliveryModeForPersistentMessage()
+ {
+ final Filterable message = mock(Filterable.class);
+ when(message.isPersistent()).thenReturn(true);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_DELIVERY_MODE);
+ Object value = expression.evaluate(message);
+
+ assertThat(value, is(equalTo(JMSMessagePropertyExpression.JMSDeliveryMode.PERSISTENT.toString())));
+ }
+
+ @Test
+ public void evaluateJMSDeliveryModeForNonPersistentMessage()
+ {
+ final Filterable message = mock(Filterable.class);
+ when(message.isPersistent()).thenReturn(false);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_DELIVERY_MODE);
+ Object value = expression.evaluate(message);
+
+ assertThat(value, is(equalTo(JMSMessagePropertyExpression.JMSDeliveryMode.NON_PERSISTENT.toString())));
+ }
+
+ @Test
+ public void evaluateJMSPriority()
+ {
+ int priority = 5;
+ final Filterable message = mock(Filterable.class);
+ when(message.getPriority()).thenReturn((byte) priority);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_PRIORITY);
+ Object value = expression.evaluate(message);
+
+ assertThat(value, is(equalTo(priority)));
+ }
+
+ @Test
+ public void evaluateJMSTimestamp()
+ {
+ long timestamp = System.currentTimeMillis();
+ final Filterable message = mock(Filterable.class);
+ when(message.getTimestamp()).thenReturn(timestamp);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_TIMESTAMP);
+ Object value = expression.evaluate(message);
+
+ assertThat(value, is(equalTo(timestamp)));
+ }
+
+ @Test
+ public void evaluateJMSExpiration()
+ {
+ long expiration = System.currentTimeMillis() + 10000;
+ final Filterable message = mock(Filterable.class);
+ when(message.getExpiration()).thenReturn(expiration);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_EXPIRATION);
+ Object value = expression.evaluate(message);
+
+ assertThat(value, is(equalTo(expiration)));
+ }
+
+ @Test
+ public void evaluateJMSRedelivered()
+ {
+ boolean redelivered = true;
+ final Filterable message = mock(Filterable.class);
+ when(message.isRedelivered()).thenReturn(redelivered);
+
+ PropertyExpression<Filterable> expression =
+ JMSMessagePropertyExpression.FACTORY.createPropertyExpression(JMSMessagePropertyExpression.JMS_REDELIVERED);
+ Object value = expression.evaluate(message);
+
+ assertThat(value, is(equalTo(redelivered)));
+ }
+
+ Filterable createFilterableWithCorrelationId(final Object id)
+ {
+ final Properties properties = mock(Properties.class);
+ when(properties.getCorrelationId()).thenReturn(id);
+ return createFilterable(properties);
+ }
+
+ Filterable createFilterableWithMessageId(final Object id)
+ {
+ final Properties properties = mock(Properties.class);
+ when(properties.getMessageId()).thenReturn(id);
+ return createFilterable(properties);
+ }
+
+ @SuppressWarnings("unchecked")
+ Filterable createFilterable(final Properties properties)
+ {
+ final Filterable message = mock(Filterable.class);
+ final Message_1_0 message_1_0 = mock(Message_1_0.class);
+ final PropertiesSection propertiesSection = mock(PropertiesSection.class);
+ when(propertiesSection.getValue()).thenReturn(properties);
+ when(message_1_0.getPropertiesSection()).thenReturn(propertiesSection);
+ when(message.getServerMessage()).thenReturn((ServerMessage) message_1_0);
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/86b31afb/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 77f20d0..0919edc 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -58,6 +58,7 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
@@ -113,6 +114,7 @@ public class Interaction extends AbstractInteraction<Interaction>
private Object _decodedLatestDelivery;
private UnsignedInteger _latestDeliveryId;
private Map<String, Object> _latestDeliveryApplicationProperties;
+ private Properties _latestDeliveryProperties;
Interaction(final FrameTransport frameTransport)
{
@@ -1083,6 +1085,7 @@ public class Interaction extends AbstractInteraction<Interaction>
});
_decodedLatestDelivery = messageDecoder.getData();
_latestDeliveryApplicationProperties = messageDecoder.getApplicationProperties();
+ _latestDeliveryProperties = messageDecoder.getProperties();
_latestDelivery = null;
return this;
}
@@ -1102,6 +1105,11 @@ public class Interaction extends AbstractInteraction<Interaction>
return _latestDeliveryApplicationProperties;
}
+ public Properties getLatestDeliveryProperties()
+ {
+ return _latestDeliveryProperties;
+ }
+
private List<Transfer> receiveAllTransfers(final Class<?>... ignore) throws Exception
{
List<Transfer> transfers = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/86b31afb/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
index 0df1abd..8af6f3e 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
@@ -45,6 +45,7 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSect
import org.apache.qpid.server.protocol.v1_0.type.messaging.FooterSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.HeaderSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
import org.apache.qpid.server.protocol.v1_0.type.messaging.PropertiesSection;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
@@ -213,4 +214,14 @@ public class MessageDecoder
}
return Collections.emptyMap();
}
+
+ public Properties getProperties() throws AmqpErrorException
+ {
+ parse();
+ if (_propertiesSection != null)
+ {
+ return _propertiesSection.getValue();
+ }
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/86b31afb/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/JMSMessagePropertiesFilterTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/JMSMessagePropertiesFilterTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/JMSMessagePropertiesFilterTest.java
new file mode 100644
index 0000000..49a2e37
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/JMSMessagePropertiesFilterTest.java
@@ -0,0 +1,356 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.extensions.bindmapjms;
+
+import static org.apache.qpid.tests.utils.BrokerAdmin.KIND_BROKER_J;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assume.assumeThat;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.UUID;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.selector.AmqpMessageIdHelper;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+
+@BrokerSpecific(kind = KIND_BROKER_J)
+public class JMSMessagePropertiesFilterTest extends BrokerAdminUsingTestBase
+{
+ private static final String TEST_MESSAGE_CONTENT = "testContent";
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ }
+
+ @Test
+ @BrokerSpecific(kind = KIND_BROKER_J)
+ @SpecificationTest(section = "3.5.1",
+ description = "A source can restrict the messages transferred from a source by specifying a filter."
+ + ""
+ + "AMQP JMS Mapping\n"
+ + ""
+ + "3.2.1.1 JMSMessageID And JMSCorrelationID Handling\n"
+ + "JMSMessageID representation for UUID values : "
+ + "'ID:AMQP_UUID:<string representation of uuid>'")
+ public void selectorWithJMSMessageIDAsUUID() throws Exception
+ {
+ final UUID messageId = UUID.randomUUID();
+ final Properties properties = new Properties();
+ properties.setMessageId(messageId);
+ perform(properties, String.format("JMSMessageID='ID:AMQP_UUID:%s'", messageId), "messageId", messageId);
+ }
+
+ @Test
+ @BrokerSpecific(kind = KIND_BROKER_J)
+ @SpecificationTest(section = "3.5.1",
+ description = "A source can restrict the messages transferred from a source by specifying a filter.\n"
+ + ""
+ + "AMQP JMS Mapping\n"
+ + ""
+ + "3.2.1.1 JMSMessageID And JMSCorrelationID Handling\n"
+ + "JMSMessageID representation for unsigned long values : "
+ + "'ID:AMQP_ULONG:<string representation of ulong>'")
+ public void selectorWithJMSMessageIDAsUnsignedLong() throws Exception
+ {
+ final UnsignedLong messageId = UnsignedLong.ONE;
+ final Properties properties = new Properties();
+ properties.setMessageId(messageId);
+ perform(properties,
+ String.format("JMSMessageID='ID:AMQP_ULONG:%d'", messageId.longValue()),
+ "messageId",
+ messageId);
+ }
+
+ @Test
+ @BrokerSpecific(kind = KIND_BROKER_J)
+ @SpecificationTest(section = "3.5.1",
+ description = "A source can restrict the messages transferred from a source by specifying a filter.\n"
+ + ""
+ + "AMQP JMS Mapping\n"
+ + ""
+ + "3.2.1.1 JMSMessageID And JMSCorrelationID Handling\n"
+ + "JMSMessageID representation for Binary values : "
+ + "'ID:AMQP_BINARY:<hex representation of bytes>'")
+ public void selectorWithJMSMessageIDAsBinary() throws Exception
+ {
+ byte[] data = {1, 2, 3};
+ final Binary messageId = new Binary(data);
+ final Properties properties = new Properties();
+ properties.setMessageId(messageId);
+ perform(properties,
+ String.format("JMSMessageID='ID:AMQP_BINARY:%s'",
+ AmqpMessageIdHelper.INSTANCE.convertBinaryToHexString(data)),
+ "messageId",
+ messageId);
+ }
+
+ @Test
+ @BrokerSpecific(kind = KIND_BROKER_J)
+ @SpecificationTest(section = "3.5.1",
+ description = "A source can restrict the messages transferred from a source by specifying a filter.\n"
+ + ""
+ + "AMQP JMS Mapping\n"
+ + ""
+ + "3.2.1.1 JMSMessageID And JMSCorrelationID Handling\n"
+ + "JMSMessageID representation for String values without prefix 'ID:' : "
+ + "'ID:AMQP_NO_PREFIX:<original-string>'")
+ public void selectorWithJMSMessageIDAsStringWithoutPrefix() throws Exception
+ {
+ final String messageId = "testId";
+ final Properties properties = new Properties();
+ properties.setMessageId(messageId);
+ perform(properties, String.format("JMSMessageID='ID:AMQP_NO_PREFIX:%s'", messageId), "messageId", messageId);
+ }
+
+ @Test
+ @BrokerSpecific(kind = KIND_BROKER_J)
+ @SpecificationTest(section = "3.5.1",
+ description = "A source can restrict the messages transferred from a source by specifying a filter.\n"
+ + ""
+ + "AMQP JMS Mapping\n"
+ + ""
+ + "3.2.1.1 JMSMessageID And JMSCorrelationID Handling\n"
+ + "JMSMessageID representation for String values with prefix 'ID:' : "
+ + "'ID:<original-string>'")
+ public void selectorWithJMSMessageIDAsStringWithPrefix() throws Exception
+ {
+ final String messageId = "ID:testId";
+ final Properties properties = new Properties();
+ properties.setMessageId(messageId);
+ perform(properties, String.format("JMSMessageID='%s'", messageId), "messageId", messageId);
+ }
+
+
+ @Test
+ @BrokerSpecific(kind = KIND_BROKER_J)
+ @SpecificationTest(section = "3.5.1",
+ description = "A source can restrict the messages transferred from a source by specifying a filter."
+ + ""
+ + "AMQP JMS Mapping\n"
+ + ""
+ + "3.2.1.1 JMSMessageID And JMSCorrelationID Handling\n"
+ + "JMSCorrelationID representation for UUID values : "
+ + "'ID:AMQP_UUID:<string representation of uuid>'")
+ public void selectorWithJMSCorrelationIDAsUUID() throws Exception
+ {
+ final UUID correlationId = UUID.randomUUID();
+ final Properties properties = new Properties();
+ properties.setCorrelationId(correlationId);
+ perform(properties,
+ String.format("JMSCorrelationID='ID:AMQP_UUID:%s'", correlationId),
+ "correlationId",
+ correlationId);
+ }
+
+ @Test
+ @BrokerSpecific(kind = KIND_BROKER_J)
+ @SpecificationTest(section = "3.5.1",
+ description = "A source can restrict the messages transferred from a source by specifying a filter.\n"
+ + ""
+ + "AMQP JMS Mapping\n"
+ + ""
+ + "3.2.1.1 JMSMessageID And JMSCorrelationID Handling\n"
+ + "JMSCorrelationID representation for unsigned long values : "
+ + "'ID:AMQP_ULONG:<string representation of ulong>'")
+ public void selectorWithJMSCorrelationIDAsUnsignedLong() throws Exception
+ {
+ final UnsignedLong correlationId = UnsignedLong.ONE;
+ final Properties properties = new Properties();
+ properties.setCorrelationId(correlationId);
+ perform(properties,
+ String.format("JMSCorrelationID='ID:AMQP_ULONG:%d'", correlationId.longValue()),
+ "correlationId",
+ correlationId);
+ }
+
+ @Test
+ @BrokerSpecific(kind = KIND_BROKER_J)
+ @SpecificationTest(section = "3.5.1",
+ description = "A source can restrict the messages transferred from a source by specifying a filter.\n"
+ + ""
+ + "AMQP JMS Mapping\n"
+ + ""
+ + "3.2.1.1 JMSMessageID And JMSCorrelationID Handling\n"
+ + "JMSCorrelationID representation for Binary values : "
+ + "'ID:AMQP_BINARY:<hex representation of bytes>'")
+ public void selectorWithJMSCorrelationIDAsBinary() throws Exception
+ {
+ byte[] data = {1, 2, 3};
+ final Binary correlationId = new Binary(data);
+ final Properties properties = new Properties();
+ properties.setCorrelationId(correlationId);
+ perform(properties,
+ String.format("JMSCorrelationID='ID:AMQP_BINARY:%s'",
+ AmqpMessageIdHelper.INSTANCE.convertBinaryToHexString(data)),
+ "correlationId",
+ correlationId);
+ }
+
+ @Test
+ @BrokerSpecific(kind = KIND_BROKER_J)
+ @SpecificationTest(section = "3.5.1",
+ description = "A source can restrict the messages transferred from a source by specifying a filter.\n"
+ + ""
+ + "AMQP JMS Mapping\n"
+ + ""
+ + "3.2.1.1 JMSMessageID And JMSCorrelationID Handling\n"
+ + "JMSCorrelationID representation for String values without prefix 'ID:' : "
+ + "'<original-string>'")
+ public void selectorWithJMSCorrelationIDAsStringWithoutPrefix() throws Exception
+ {
+ final String correlationId = "testId";
+ final Properties properties = new Properties();
+ properties.setCorrelationId(correlationId);
+ perform(properties,
+ String.format("JMSCorrelationID='%s'", correlationId),
+ "correlationId",
+ correlationId);
+ }
+
+ @Test
+ @BrokerSpecific(kind = KIND_BROKER_J)
+ @SpecificationTest(section = "3.5.1",
+ description = "A source can restrict the messages transferred from a source by specifying a filter.\n"
+ + ""
+ + "AMQP JMS Mapping\n"
+ + ""
+ + "3.2.1.1 JMSMessageID And JMSCorrelationID Handling\n"
+ + "JMSCorrelationID representation for String values with prefix 'ID:' : "
+ + "'ID:<original-string>'")
+ public void selectorWithJMSCorrelationIDAsStringWithPrefix() throws Exception
+ {
+ final String correlationId = "ID:testId";
+ final Properties properties = new Properties();
+ properties.setCorrelationId(correlationId);
+ perform(properties, String.format("JMSCorrelationID='%s'", correlationId), "correlationId", correlationId);
+ }
+
+ private void perform(final Properties properties,
+ final String selector,
+ final String propertyName,
+ final Object propertyValue) throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class);
+ Flow flow = interaction.getLatestResponse(Flow.class);
+ assumeThat("insufficient credit for the test", flow.getLinkCredit().intValue(), is(greaterThan(1)));
+
+ for (int i = 0; i < 2; i++)
+ {
+
+ QpidByteBuffer payload =
+ generateMessagePayloadWithMessageProperties(properties,
+ TEST_MESSAGE_CONTENT);
+ interaction.transferPayload(payload)
+ .transferSettled(true)
+ .transfer();
+ }
+ interaction.detachClose(true).detach().close().sync();
+ }
+
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+
+ interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.RECEIVER)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+ .attachSourceFilter(Collections.singletonMap(Symbol.valueOf("selector-filter"),
+ new JMSSelectorFilter(selector)))
+ .attach().consumeResponse()
+ .flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow();
+
+ Object data = interaction.receiveDelivery().decodeLatestDelivery().getDecodedLatestDelivery();
+ assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+ Properties deliveryProperties = interaction.getLatestDeliveryProperties();
+ assertThat(deliveryProperties, is(notNullValue()));
+
+ Method getter = Properties.class.getMethod("get"
+ + Character.toUpperCase(propertyName.charAt(0))
+ + propertyName.substring(1));
+ assertThat(getter.invoke(deliveryProperties), is(equalTo(propertyValue)));
+
+ interaction.dispositionSettled(true)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionState(new Accepted())
+ .disposition();
+ interaction.close().sync();
+ }
+ }
+
+ private QpidByteBuffer generateMessagePayloadWithMessageProperties(final Properties properties, String content)
+ {
+ MessageEncoder messageEncoder = new MessageEncoder();
+ messageEncoder.setProperties(properties);
+ messageEncoder.addData(content);
+ return messageEncoder.getPayload();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-broker-j git commit: QPID-8167: [Broker-J] Fix
documentation and the name of a constant.
Posted by kw...@apache.org.
QPID-8167: [Broker-J] Fix documentation and the name of a constant.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/2d85f73e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/2d85f73e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/2d85f73e
Branch: refs/heads/master
Commit: 2d85f73e6704f3b69d40611fd6d52895067fc093
Parents: 86b31af
Author: Keith Wall <kw...@apache.org>
Authored: Tue May 15 08:52:07 2018 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Tue May 15 08:52:07 2018 +0100
----------------------------------------------------------------------
broker/src/main/java/org/apache/qpid/server/Main.java | 13 +++++++------
.../src/docbkx/Java-Broker-Getting-Started.xml | 2 +-
2 files changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2d85f73e/broker/src/main/java/org/apache/qpid/server/Main.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/qpid/server/Main.java b/broker/src/main/java/org/apache/qpid/server/Main.java
index 446cf8b..407eea2 100644
--- a/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -132,10 +132,11 @@ public class Main
.longOpt("management-mode")
.build();
- private static final Option OPTION_MM_QUIESCE_VHOST = Option.builder("mmqv")
- .desc("make virtual host nodes stay in the quiesced state during management mode")
- .longOpt("management-mode-quiesce-virtualhostnodes")
- .build();
+ private static final Option OPTION_MM_QUIESCE_VHOST_NODE = Option.builder("mmqv")
+ .desc("make virtual host nodes stay in the "
+ + "quiesced state during management mode")
+ .longOpt("management-mode-quiesce-virtualhostnodes")
+ .build();
private static final Option OPTION_MM_HTTP_PORT = Option.builder("mmhttp")
.argName("port")
@@ -170,7 +171,7 @@ public class Main
OPTIONS.addOption(OPTION_CREATE_INITIAL_CONFIG);
OPTIONS.addOption(OPTION_INITIAL_CONFIGURATION_PATH);
OPTIONS.addOption(OPTION_MANAGEMENT_MODE);
- OPTIONS.addOption(OPTION_MM_QUIESCE_VHOST);
+ OPTIONS.addOption(OPTION_MM_QUIESCE_VHOST_NODE);
OPTIONS.addOption(OPTION_MM_HTTP_PORT);
OPTIONS.addOption(OPTION_MM_PASSWORD);
OPTIONS.addOption(OPTION_CONFIGURATION_PROPERTY);
@@ -276,7 +277,7 @@ public class Main
attributes.put(SystemConfig.MANAGEMENT_MODE_HTTP_PORT_OVERRIDE, httpPort);
}
- boolean quiesceVhosts = _commandLine.hasOption(OPTION_MM_QUIESCE_VHOST.getOpt());
+ boolean quiesceVhosts = _commandLine.hasOption(OPTION_MM_QUIESCE_VHOST_NODE.getOpt());
attributes.put(SystemConfig.MANAGEMENT_MODE_QUIESCE_VIRTUAL_HOSTS, quiesceVhosts);
String password = _commandLine.getOptionValue(OPTION_MM_PASSWORD.getOpt());
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2d85f73e/doc/java-broker/src/docbkx/Java-Broker-Getting-Started.xml
----------------------------------------------------------------------
diff --git a/doc/java-broker/src/docbkx/Java-Broker-Getting-Started.xml b/doc/java-broker/src/docbkx/Java-Broker-Getting-Started.xml
index 3d7a95e..54fd58a 100644
--- a/doc/java-broker/src/docbkx/Java-Broker-Getting-Started.xml
+++ b/doc/java-broker/src/docbkx/Java-Broker-Getting-Started.xml
@@ -126,7 +126,7 @@
-mmpass,--management-mode-password <password> Set the password for
the management mode
user mm_admin
- -mmqv,--management-mode-quiesce-virtualhosts make virtualhosts
+ -mmqv,--management-mode-quiesce-virtualhostnodes make virtualhost nodes
stay in the quiesced
state during
management mode.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org