You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/12/04 13:20:58 UTC
svn commit: r1772532 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/exchange/
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/org/apache/qpid/server/message/internal/
broker-core/src/main/jav...
Author: rgodfrey
Date: Sun Dec 4 13:20:58 2016
New Revision: 1772532
URL: http://svn.apache.org/viewvc?rev=1772532&view=rev
Log:
QPID-7568 : Update support for delayed delivery in AMQP 1.0
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Sun Dec 4 13:20:58 2016
@@ -461,6 +461,7 @@ public abstract class AbstractExchange<T
}
List<? extends BaseQueue> queues = route(message, routingAddress, instanceProperties);
+
if(queues == null || queues.isEmpty())
{
Exchange altExchange = getAlternateExchange();
@@ -475,6 +476,13 @@ public abstract class AbstractExchange<T
}
else
{
+ for(BaseQueue q : queues)
+ {
+ if(!message.isResourceAcceptable(q))
+ {
+ return 0;
+ }
+ }
final BaseQueue[] baseQueues;
if(message.isReferenced())
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java Sun Dec 4 13:20:58 2016
@@ -49,4 +49,6 @@ public interface ServerMessage<T extends
long getArrivalTime();
Object getConnectionReference();
+
+ boolean isResourceAcceptable(TransactionLogResource resource);
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java Sun Dec 4 13:20:58 2016
@@ -38,6 +38,7 @@ import org.apache.qpid.server.message.Ab
import org.apache.qpid.server.store.MessageHandle;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.util.ByteBufferInputStream;
import org.apache.qpid.util.ByteBufferUtils;
@@ -104,6 +105,12 @@ public class InternalMessage extends Abs
return _header.getArrivalTime();
}
+ @Override
+ public boolean isResourceAcceptable(final TransactionLogResource resource)
+ {
+ return true;
+ }
+
public Object getMessageBody()
{
return _messageBody;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Sun Dec 4 13:20:58 2016
@@ -2604,8 +2604,7 @@ public abstract class AbstractQueue<X ex
{
throw new VirtualHostUnavailableException(this._virtualHost);
}
-
- if(!message.isReferenced(this))
+ if(message.isResourceAcceptable(this) && !message.isReferenced(this))
{
txn.enqueue(this, message, new ServerTransaction.EnqueueAction()
{
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java Sun Dec 4 13:20:58 2016
@@ -112,6 +112,12 @@ public class TestMessageMetaDataType imp
}
@Override
+ public boolean isResourceAcceptable(final TransactionLogResource resource)
+ {
+ return true;
+ }
+
+ @Override
public long getExpiration()
{
return 0;
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/txn/MockServerMessage.java Sun Dec 4 13:20:58 2016
@@ -121,6 +121,12 @@ class MockServerMessage implements Serve
}
@Override
+ public boolean isResourceAcceptable(final TransactionLogResource resource)
+ {
+ return true;
+ }
+
+ @Override
public long getArrivalTime()
{
throw new UnsupportedOperationException();
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java Sun Dec 4 13:20:58 2016
@@ -26,6 +26,7 @@ import org.apache.qpid.bytebuffer.QpidBy
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.transport.Header;
@@ -72,6 +73,12 @@ public class MessageTransferMessage exte
return getMetaData().getArrivalTime();
}
+ @Override
+ public boolean isResourceAcceptable(final TransactionLogResource resource)
+ {
+ return true;
+ }
+
public Header getHeader()
{
return getMetaData().getHeader();
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java Sun Dec 4 13:20:58 2016
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.MessagePu
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
/**
* A deliverable message.
@@ -86,6 +87,12 @@ public class AMQMessage extends Abstract
return getMessageMetaData().getArrivalTime();
}
+ @Override
+ public boolean isResourceAcceptable(final TransactionLogResource resource)
+ {
+ return true;
+ }
+
public boolean isImmediate()
{
return getMessagePublishInfo().isImmediate();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Sun Dec 4 13:20:58 2016
@@ -215,8 +215,9 @@ public class ExchangeDestination impleme
@Override
public Symbol[] getCapabilities()
{
- Symbol[] capabilities = new Symbol[1];
+ Symbol[] capabilities = new Symbol[2];
capabilities[0] = _discardUnroutable ? DISCARD_UNROUTABLE : REJECT_UNROUTABLE;
+ capabilities[1] = DELAYED_DELIVERY;
return capabilities;
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java Sun Dec 4 13:20:58 2016
@@ -59,10 +59,13 @@ import org.apache.qpid.server.util.Conne
public class MessageMetaData_1_0 implements StorableMessageMetaData
{
private static final Logger _logger = LoggerFactory.getLogger(MessageMetaData_1_0.class);
- // TODO move to somewhere more useful
- public static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type");
- public static final MessageMetaDataType.Factory<MessageMetaData_1_0> FACTORY = new MetaDataFactory();
private static final MessageMetaDataType_1_0 TYPE = new MessageMetaDataType_1_0();
+ public static final MessageMetaDataType.Factory<MessageMetaData_1_0> FACTORY = new MetaDataFactory();
+
+ // TODO move to somewhere more useful
+ private static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type");
+ private static final Symbol DELIVERY_TIME = Symbol.valueOf("x-opt-delivery-time");
+ private static final Symbol NOT_VALID_BEFORE = Symbol.valueOf("x-qpid-not-valid-before");
private Header _header;
@@ -529,7 +532,12 @@ public class MessageMetaData_1_0 impleme
{
long notValidBefore;
Object annotation;
- if(_messageAnnotations != null && (annotation = _messageAnnotations.get(Symbol.valueOf("x-qpid-not-valid-before"))) instanceof Number)
+
+ if(_messageAnnotations != null && (annotation = _messageAnnotations.get(DELIVERY_TIME)) instanceof Number)
+ {
+ notValidBefore = ((Number)annotation).longValue();
+ }
+ else if(_messageAnnotations != null && (annotation = _messageAnnotations.get(NOT_VALID_BEFORE)) instanceof Number)
{
notValidBefore = ((Number)annotation).longValue();
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Sun Dec 4 13:20:58 2016
@@ -24,12 +24,14 @@ package org.apache.qpid.server.protocol.
import java.util.Collection;
import java.util.Collections;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl;
import org.apache.qpid.server.protocol.v1_0.type.Section;
import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.message.AbstractServerMessageImpl;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageMetaData_1_0>
{
@@ -96,6 +98,19 @@ public class Message_1_0 extends Abstrac
return _arrivalTime;
}
+
+ @Override
+ public boolean isResourceAcceptable(final TransactionLogResource resource)
+ {
+ return getMessageHeader().getNotValidBefore() != 0L && !resourceSupportsDeliveryDelay(resource);
+ }
+
+ private boolean resourceSupportsDeliveryDelay(final TransactionLogResource resource)
+ {
+ return resource instanceof Queue && ((Queue<?>)resource).isHoldOnPublishEnabled();
+ }
+
+
public Collection<QpidByteBuffer> getFragments()
{
return getContent(0, (int) getSize());
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java Sun Dec 4 13:20:58 2016
@@ -192,8 +192,9 @@ public class NodeReceivingDestination im
@Override
public Symbol[] getCapabilities()
{
- Symbol[] capabilities = new Symbol[1];
+ Symbol[] capabilities = new Symbol[2];
capabilities[0] = _discardUnroutable ? DISCARD_UNROUTABLE : REJECT_UNROUTABLE;
+ capabilities[1] = DELAYED_DELIVERY;
return capabilities;
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java Sun Dec 4 13:20:58 2016
@@ -20,11 +20,15 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -112,4 +116,15 @@ public class QueueDestination extends Me
{
return _address;
}
+
+ @Override
+ public Symbol[] getCapabilities()
+ {
+ Set<Symbol> capabilities = new HashSet<>(Arrays.asList(super.getCapabilities()));
+ if(_queue.isHoldOnPublishEnabled())
+ {
+ capabilities.add(DELAYED_DELIVERY);
+ }
+ return capabilities.toArray(new Symbol[capabilities.size()]);
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java Sun Dec 4 13:20:58 2016
@@ -31,6 +31,8 @@ public interface ReceivingDestination ex
Symbol REJECT_UNROUTABLE = Symbol.valueOf("REJECT_UNROUTABLE");
Symbol DISCARD_UNROUTABLE = Symbol.valueOf("DISCARD_UNROUTABLE");
+ Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
+
Outcome[] getOutcomes();
Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1772532&r1=1772531&r2=1772532&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Sun Dec 4 13:20:58 2016
@@ -332,31 +332,37 @@ class ManagementNode implements MessageS
final ServerTransaction txn,
final Action<? super MessageInstance> postEnqueueAction)
{
-
- @SuppressWarnings("unchecked")
- MessageConverter<M, InternalMessage> converter =
- MessageConverterRegistry.getConverter(((Class<M>)message.getClass()), InternalMessage.class);
+ if(message.isResourceAcceptable(this))
+ {
+ @SuppressWarnings("unchecked")
+ MessageConverter<M, InternalMessage> converter =
+ MessageConverterRegistry.getConverter(((Class<M>) message.getClass()), InternalMessage.class);
- if(converter != null)
- {
- final InternalMessage msg = converter.convert(message, _addressSpace);
- txn.addPostTransactionAction(new ServerTransaction.Action()
+ if (converter != null)
{
- @Override
- public void postCommit()
- {
- enqueue(msg, instanceProperties, postEnqueueAction);
- }
-
- @Override
- public void onRollback()
+ final InternalMessage msg = converter.convert(message, _addressSpace);
+ txn.addPostTransactionAction(new ServerTransaction.Action()
{
-
- }
- });
-
- return 1;
+ @Override
+ public void postCommit()
+ {
+ enqueue(msg, instanceProperties, postEnqueueAction);
+ }
+
+ @Override
+ public void onRollback()
+ {
+
+ }
+ });
+
+ return 1;
+ }
+ else
+ {
+ return 0;
+ }
}
else
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org