You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/05/15 22:36:03 UTC

svn commit: r1743982 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/message/internal/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/o...

Author: rgodfrey
Date: Sun May 15 22:36:02 2016
New Revision: 1743982

URL: http://svn.apache.org/viewvc?rev=1743982&view=rev
Log:
QPID-7255 : Support delivery delay

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
    qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/Accessor.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/url/BindingURL.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java Sun May 15 22:36:02 2016
@@ -43,6 +43,8 @@ public interface AMQMessageHeader
 
     long getTimestamp();
 
+    long getNotValidBefore();
+
     String getType();
 
     String getReplyTo();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Sun May 15 22:36:02 2016
@@ -94,7 +94,7 @@ public interface MessageInstance
 
     public abstract class EntryState
     {
-        private EntryState()
+        protected EntryState()
         {
         }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java Sun May 15 22:36:02 2016
@@ -43,19 +43,24 @@ public final class InternalMessageHeader
     private final String _encoding;
     private final byte _priority;
     private final long _timestamp;
+    private final long _notValidBefore;
     private final String _type;
     private final String _replyTo;
     private long _arrivalTime;
 
     public InternalMessageHeader(final Map<String, Object> headers,
-                          final String correlationId,
-                          final long expiration,
-                          final String userId,
-                          final String appId,
-                          final String messageId,
-                          final String mimeType,
-                          final String encoding,
-                          final byte priority, final long timestamp, final String type, final String replyTo)
+                                 final String correlationId,
+                                 final long expiration,
+                                 final String userId,
+                                 final String appId,
+                                 final String messageId,
+                                 final String mimeType,
+                                 final String encoding,
+                                 final byte priority,
+                                 final long timestamp,
+                                 final long notValidBefore,
+                                 final String type,
+                                 final String replyTo)
     {
         _headers = headers == null ? new LinkedHashMap<String, Object>()
                 : new LinkedHashMap<String, Object>(headers);
@@ -69,6 +74,7 @@ public final class InternalMessageHeader
         _encoding = encoding;
         _priority = priority;
         _timestamp = timestamp;
+        _notValidBefore = notValidBefore;
         _type = type;
         _replyTo = replyTo;
         _arrivalTime = System.currentTimeMillis();
@@ -85,6 +91,7 @@ public final class InternalMessageHeader
         _encoding = header.getEncoding();
         _priority = header.getPriority();
         _timestamp = header.getTimestamp();
+        _notValidBefore = header.getNotValidBefore();
         _type = header.getType();
         _replyTo = header.getReplyTo();
         _headers = new LinkedHashMap<String, Object>();
@@ -150,6 +157,12 @@ public final class InternalMessageHeader
     }
 
     @Override
+    public long getNotValidBefore()
+    {
+        return _notValidBefore;
+    }
+
+    @Override
     public String getType()
     {
         return _type;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Sun May 15 22:36:02 2016
@@ -79,6 +79,8 @@ public interface Queue<X extends Queue<X
     String MINIMUM_MESSAGE_TTL = "minimumMessageTtl";
     String DEFAULT_FILTERS = "defaultFilters";
     String ENSURE_NONDESTRUCTIVE_CONSUMERS = "ensureNondestructiveConsumers";
+    String HOLD_ON_PUBLISH_ENABLED = "holdOnPublishEnabled";
+
 
     String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint";
     @ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT)
@@ -192,6 +194,13 @@ public interface Queue<X extends Queue<X
     @ManagedAttribute
     Map<String, Map<String,List<String>>> getDefaultFilters();
 
+
+    @ManagedContextDefault( name = "queue.holdOnPublishEnabled")
+    boolean DEFAULT_HOLD_ON_PUBLISH_ENABLED = false;
+
+    @ManagedAttribute( defaultValue = "${queue.holdOnPublishEnabled}")
+    boolean isHoldOnPublishEnabled();
+
     //children
     Collection<? extends Binding<?>> getBindings();
 
@@ -344,4 +353,6 @@ public interface Queue<X extends Queue<X
     void setTargetSize(long targetSize);
 
     long getPotentialMemoryFootprint();
+
+    boolean isHeld(QueueEntry queueEntry, final long evaluationTime);
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Sun May 15 22:36:02 2016
@@ -253,6 +253,9 @@ public abstract class AbstractQueue<X ex
     private long _maximumMessageTtl;
     @ManagedAttributeField
     private boolean _ensureNondestructiveConsumers;
+    @ManagedAttributeField
+    private volatile boolean _holdOnPublishEnabled;
+
 
     private static final int RECOVERING = 1;
     private static final int COMPLETING_RECOVERY = 2;
@@ -266,6 +269,12 @@ public abstract class AbstractQueue<X ex
     private final QueueRunner _queueRunner;
     private boolean _closing;
     private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
+    private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>();
+
+    private interface HoldMethod
+    {
+        boolean isHeld(MessageReference<?> message, long evalutaionTime);
+    }
 
     protected AbstractQueue(Map<String, Object> attributes, VirtualHost<?> virtualHost)
     {
@@ -499,6 +508,18 @@ public abstract class AbstractQueue<X ex
             }
         }
 
+        if(isHoldOnPublishEnabled())
+        {
+            _holdMethods.add(new HoldMethod()
+                            {
+                                @Override
+                                public boolean isHeld(final MessageReference<?> messageReference, final long evaluationTime)
+                                {
+                                    return messageReference.getMessage().getMessageHeader().getNotValidBefore() >= evaluationTime;
+                                }
+                            });
+        }
+
         updateAlertChecks();
     }
 
@@ -633,7 +654,11 @@ public abstract class AbstractQueue<X ex
         return _ensureNondestructiveConsumers;
     }
 
-
+    @Override
+    public boolean isHoldOnPublishEnabled()
+    {
+        return _holdOnPublishEnabled;
+    }
 
     @Override
     public Collection<String> getAvailableAttributes()
@@ -2509,6 +2534,8 @@ public abstract class AbstractQueue<X ex
                 }
                 else
                 {
+                    node.checkHeld(currentTime);
+
                     // There is a chance that the node could be deleted by
                     // the time the check actually occurs. So verify we
                     // can actually get the message to perform the check.
@@ -2744,6 +2771,42 @@ public abstract class AbstractQueue<X ex
         return _persistentMessageDequeueCount.get();
     }
 
+    @Override
+    public boolean isHeld(final QueueEntry queueEntry, final long evaluationTime)
+    {
+        if(!_holdMethods.isEmpty())
+        {
+            ServerMessage message = queueEntry.getMessage();
+            try
+            {
+                MessageReference ref = message.newReference();
+                try
+                {
+                    for(HoldMethod method : _holdMethods)
+                    {
+                        if(method.isHeld(ref, evaluationTime))
+                        {
+                            return true;
+                        }
+                    }
+                    return false;
+                }
+                finally
+                {
+                    ref.release();
+                }
+            }
+            catch (MessageDeletedException e)
+            {
+                return false;
+            }
+        }
+        else
+        {
+            return false;
+        }
+
+    }
 
     @Override
     public String toString()

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Sun May 15 22:36:02 2016
@@ -416,9 +416,8 @@ class QueueConsumerImpl
     public final boolean hasInterest(QueueEntry entry)
     {
        //check that the message hasn't been rejected
-        if (entry.isRejectedBy(this))
+        if (entry.isRejectedBy(this) || entry.checkHeld(System.currentTimeMillis()))
         {
-
             return false;
         }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Sun May 15 22:36:02 2016
@@ -44,4 +44,6 @@ public interface QueueEntry extends Mess
     void setExpiration(long calculatedExpiration);
 
     MessageReference newMessageReference();
+
+    boolean checkHeld(final long evaluationTime);
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Sun May 15 22:36:02 2016
@@ -57,6 +57,21 @@ public abstract class QueueEntryImpl imp
 
     private Set<Long> _rejectedBy = null;
 
+    private static final EntryState HELD_STATE = new EntryState()
+    {
+        @Override
+        public State getState()
+        {
+            return State.AVAILABLE;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "HELD";
+        }
+    };
+
     private volatile EntryState _state = AVAILABLE_STATE;
 
     private static final
@@ -197,7 +212,7 @@ public abstract class QueueEntryImpl imp
 
     public boolean isAvailable()
     {
-        return _state == AVAILABLE_STATE;
+        return _state.getState() == State.AVAILABLE;
     }
 
     public boolean isAcquired()
@@ -269,7 +284,17 @@ public abstract class QueueEntryImpl imp
 
     private boolean acquire(final EntryState state)
     {
-        boolean acquired = _stateUpdater.compareAndSet(this, AVAILABLE_STATE, state);
+        boolean acquired = false;
+
+        EntryState currentState;
+
+        while((currentState = _state).getState() == State.AVAILABLE)
+        {
+            if(acquired = _stateUpdater.compareAndSet(this, currentState, state))
+            {
+                break;
+            }
+        }
 
         if(acquired && _stateChangeListeners != null)
         {
@@ -406,7 +431,7 @@ public abstract class QueueEntryImpl imp
         if(!getQueue().isDeleted())
         {
             getQueue().requeue(this);
-            if(_stateChangeListeners != null)
+            if(_stateChangeListeners != null && previousState.getState() == State.ACQUIRED)
             {
                 notifyStateChange(State.ACQUIRED, State.AVAILABLE);
             }
@@ -419,6 +444,38 @@ public abstract class QueueEntryImpl imp
     }
 
     @Override
+    public boolean checkHeld(final long evaluationTime)
+    {
+        EntryState state;
+        while((state = _state).getState() == State.AVAILABLE)
+        {
+            boolean isHeld = getQueue().isHeld(this, evaluationTime);
+            if(state == AVAILABLE_STATE && isHeld)
+            {
+                if(!_stateUpdater.compareAndSet(this, state, HELD_STATE))
+                {
+                    continue;
+                }
+            }
+            else if(state == HELD_STATE && !isHeld)
+            {
+
+                if(_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
+                {
+                    postRelease(state);
+                }
+                else
+                {
+                    continue;
+                }
+            }
+            return isHeld;
+
+        }
+        return false;
+    }
+
+    @Override
     public QueueConsumer getDeliveredConsumer()
     {
         EntryState state = _state;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java Sun May 15 22:36:02 2016
@@ -165,7 +165,7 @@ public class TrustStoreMessageSource ext
         InternalMessageHeader header = new InternalMessageHeader(Collections.<String,Object>emptyMap(),
                                                                  null, 0l, null, null, UUID.randomUUID().toString(),
                                                                  null, null, (byte)4, System.currentTimeMillis(),
-                                                                 null, null);
+                                                                 0L, null, null);
         return InternalMessage.createListMessage(_virtualHost.getMessageStore(), header, messageList);
     }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java Sun May 15 22:36:02 2016
@@ -84,7 +84,7 @@ public class VirtualHostPropertiesNode e
         InternalMessageHeader header = new InternalMessageHeader(headers,
                                                                  null, 0l, null, null, UUID.randomUUID().toString(),
                                                                  null, null, (byte) 4, System.currentTimeMillis(),
-                                                                 null, null);
+                                                                 0L, null, null);
         final InternalMessage message =
                 InternalMessage.createBytesMessage(_virtualHost.getMessageStore(), header, new byte[0]);
         message.setInitialRoutingAddress(getName());

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java Sun May 15 22:36:02 2016
@@ -98,6 +98,12 @@ public class HeadersBindingTest extends
             return 0;
         }
 
+        @Override
+        public long getNotValidBefore()
+        {
+            return 0;
+        }
+
         public String getType()
         {
             return null;

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Sun May 15 22:36:02 2016
@@ -235,6 +235,88 @@ abstract class AbstractQueueTestBase ext
                    _consumer.getQueueContext().getReleasedEntry());
     }
 
+    public void testMessageHeldIfNotYetValidWhenConsumerAdded() throws Exception
+    {
+        _queue.close();
+        Map<String,Object> attributes = new HashMap<>(_arguments);
+        attributes.put(Queue.NAME, _qname);
+        attributes.put(Queue.OWNER, _owner);
+        attributes.put(Queue.HOLD_ON_PUBLISH_ENABLED, Boolean.TRUE);
+
+        _queue = _virtualHost.createChild(Queue.class, attributes);
+
+        ServerMessage messageA = createMessage(new Long(24));
+        AMQMessageHeader messageHeader = messageA.getMessageHeader();
+        when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()+20000L);
+        _queue.enqueue(messageA, null, null);
+        _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+                                                          EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+                                                                     ConsumerImpl.Option.SEES_REQUEUES));
+        Thread.sleep(QUEUE_RUNNER_WAIT_TIME);
+
+        assertEquals("Message which was not yet valid was received", 0, _consumerTarget.getMessages().size());
+        when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()-100L);
+        _queue.checkMessageStatus();
+        Thread.sleep(QUEUE_RUNNER_WAIT_TIME);
+        assertEquals("Message which was valid was not received", 1, _consumerTarget.getMessages().size());
+    }
+
+    public void testMessageHoldingDependentOnQueueProperty() throws Exception
+    {
+        _queue.close();
+        Map<String,Object> attributes = new HashMap<>(_arguments);
+        attributes.put(Queue.NAME, _qname);
+        attributes.put(Queue.OWNER, _owner);
+        attributes.put(Queue.HOLD_ON_PUBLISH_ENABLED, Boolean.FALSE);
+
+        _queue = _virtualHost.createChild(Queue.class, attributes);
+
+        ServerMessage messageA = createMessage(new Long(24));
+        AMQMessageHeader messageHeader = messageA.getMessageHeader();
+        when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()+20000L);
+        _queue.enqueue(messageA, null, null);
+        _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+                                                          EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+                                                                     ConsumerImpl.Option.SEES_REQUEUES));
+        Thread.sleep(QUEUE_RUNNER_WAIT_TIME);
+
+        assertEquals("Message was held despite queue not having holding enabled", 1, _consumerTarget.getMessages().size());
+
+    }
+
+    public void testUnheldMessageOvertakesHeld() throws Exception
+    {
+        _queue.close();
+        Map<String,Object> attributes = new HashMap<>(_arguments);
+        attributes.put(Queue.NAME, _qname);
+        attributes.put(Queue.OWNER, _owner);
+        attributes.put(Queue.HOLD_ON_PUBLISH_ENABLED, Boolean.TRUE);
+
+        _queue = _virtualHost.createChild(Queue.class, attributes);
+
+        ServerMessage messageA = createMessage(new Long(24));
+        AMQMessageHeader messageHeader = messageA.getMessageHeader();
+        when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()+20000L);
+        _queue.enqueue(messageA, null, null);
+        ServerMessage messageB = createMessage(new Long(25));
+        _queue.enqueue(messageB, null, null);
+
+        _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+                                                          EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+                                                                     ConsumerImpl.Option.SEES_REQUEUES));
+        Thread.sleep(QUEUE_RUNNER_WAIT_TIME);
+
+        assertEquals("Expect one message (message B)", 1, _consumerTarget.getMessages().size());
+        assertEquals("Wrong message received", messageB.getMessageHeader().getMessageId(), _consumerTarget.getMessages().get(0).getMessage().getMessageHeader().getMessageId());
+
+        when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()-100L);
+        _queue.checkMessageStatus();
+        Thread.sleep(QUEUE_RUNNER_WAIT_TIME);
+        assertEquals("Message which was valid was not received", 2, _consumerTarget.getMessages().size());
+        assertEquals("Wrong message received", messageA.getMessageHeader().getMessageId(), _consumerTarget.getMessages().get(1).getMessage().getMessageHeader().getMessageId());
+
+    }
+
     /**
      * Tests that a released queue entry is resent to the subscriber.  Verifies also that the
      * QueueContext._releasedEntry is reset to null after the entry has been reset.

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java Sun May 15 22:36:02 2016
@@ -144,6 +144,12 @@ public class MessageConverter_v0_10_to_I
         }
 
         @Override
+        public long getNotValidBefore()
+        {
+            return _delegate.getNotValidBefore();
+        }
+
+        @Override
         public String getType()
         {
             return _delegate.getType();

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java Sun May 15 22:36:02 2016
@@ -34,6 +34,7 @@ class MessageTransferHeader implements A
 
     private final DeliveryProperties _deliveryProps;
     private final MessageProperties _messageProps;
+    private long _notValidBefore;
 
     public MessageTransferHeader(DeliveryProperties deliveryProps, MessageProperties messageProps)
     {
@@ -100,6 +101,14 @@ class MessageTransferHeader implements A
         return _deliveryProps == null ? 0L : _deliveryProps.getTimestamp();
     }
 
+
+    @Override
+    public long getNotValidBefore()
+    {
+        Object header = getHeader("x-qpid-not-valid-before");
+        return header instanceof Number ? ((Number)header).longValue() : 0L;
+    }
+
     public String getType()
     {
         Object type = getHeader(JMS_TYPE);

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java Sun May 15 22:36:02 2016
@@ -193,6 +193,12 @@ public class MessageConverter_v0_8_to_In
         }
 
         @Override
+        public long getNotValidBefore()
+        {
+            return _delegate.getNotValidBefore();
+        }
+
+        @Override
         public String getType()
         {
             return _delegate.getType();

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java Sun May 15 22:36:02 2016
@@ -247,6 +247,13 @@ public class MessageMetaData implements
             return getProperties().getReplyToAsString();
         }
 
+        @Override
+        public long getNotValidBefore()
+        {
+            Object header = getHeader("x-qpid-not-valid-before");
+            return header instanceof Number ? ((Number)header).longValue() : 0L;
+        }
+
         public Object getHeader(String name)
         {
             FieldTable ft = getProperties().getHeaders();

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java Sun May 15 22:36:02 2016
@@ -510,6 +510,23 @@ public class MessageMetaData_1_0 impleme
 
         }
 
+
+        @Override
+        public long getNotValidBefore()
+        {
+            long notValidBefore;
+            Object annotation;
+            if(_messageAnnotations != null && (annotation = _messageAnnotations.get(Symbol.valueOf("x-qpid-not-valid-before"))) instanceof Number)
+            {
+                notValidBefore = ((Number)annotation).longValue();
+            }
+            else
+            {
+                notValidBefore = 0L;
+            }
+            return notValidBefore;
+        }
+
         public String getType()
         {
             String subject = getSubject();

Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Sun May 15 22:36:02 2016
@@ -1301,6 +1301,7 @@ class ManagementNode implements MessageS
         private String _encoding;
         private byte _priority;
         private long _timestamp;
+        private long _notValidBefore;
         private String _type;
         private String _replyTo;
 
@@ -1349,6 +1350,11 @@ class ManagementNode implements MessageS
             _timestamp = timestamp;
         }
 
+        public void setNotValidBefore(final long notValidBefore)
+        {
+            _notValidBefore = notValidBefore;
+        }
+
         public void setType(final String type)
         {
             _type = type;
@@ -1404,6 +1410,12 @@ class ManagementNode implements MessageS
             return _timestamp;
         }
 
+        @Override
+        public long getNotValidBefore()
+        {
+            return _notValidBefore;
+        }
+
         public String getType()
         {
             return _type;

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java Sun May 15 22:36:02 2016
@@ -99,6 +99,7 @@ public abstract class AMQDestination imp
     public static final int UNKNOWN_TYPE = 3;
     private boolean _sendEncrypted;
     private String _encryptedRecipients;
+    private long _deliveryDelay;
 
     protected void setExclusive(boolean exclusive)
     {
@@ -130,6 +131,11 @@ public abstract class AMQDestination imp
         return _encryptedRecipients;
     }
 
+    public long getDeliveryDelay()
+    {
+        return _deliveryDelay;
+    }
+
     // ----- Fields required to support new address syntax -------
 
     public enum DestSyntax {
@@ -314,6 +320,8 @@ public abstract class AMQDestination imp
         _consumerArguments = binding.getConsumerOptions();
         _sendEncrypted = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_SEND_ENCRYPTED));
         _encryptedRecipients = binding.getOption(BindingURL.OPTION_ENCRYPTED_RECIPIENTS);
+        String deliveryDelayVal = binding.getOption(BindingURL.OPTION_DELIVERY_DELAY);
+        _deliveryDelay = deliveryDelayVal == null ? 0L : Long.parseLong(deliveryDelayVal);
     }
 
     protected AMQDestination(String exchangeName, String exchangeClass, String routingKey, String queueName)
@@ -957,7 +965,7 @@ public abstract class AMQDestination imp
         _addressType = _addrHelper.getNodeType();
         _node =  _addrHelper.getNode();
         _link = _addrHelper.getLink();
-
+        _deliveryDelay = _link.getDelay();
         _sendEncrypted = _addrHelper.getSendEncrypted();
         _encryptedRecipients = _addrHelper.getEncryptedRecipients();
     }

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Sun May 15 22:36:02 2016
@@ -100,6 +100,8 @@ public abstract class BasicMessageProduc
 
     private String _userID;  // ref user id used in the connection.
 
+    private long _deliveryDelay;
+
 
     /**
      * The default value for immediate flag used this producer is false. That is, a consumer does
@@ -150,6 +152,11 @@ public abstract class BasicMessageProduc
                 : mandatory;
 
         _userID = connection.isPopulateUserId() ? connection.getUsername() : null;
+
+        if(destination != null && destination.getDeliveryDelay() != 0L)
+        {
+            _deliveryDelay = destination.getDeliveryDelay();
+        }
         setPublishMode();
     }
 
@@ -323,7 +330,8 @@ public abstract class BasicMessageProduc
 
         synchronized (_connection.getFailoverMutex())
         {
-            sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
+            sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate,
+                     _deliveryDelay);
         }
     }
 
@@ -334,7 +342,8 @@ public abstract class BasicMessageProduc
 
         synchronized (_connection.getFailoverMutex())
         {
-            sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
+            sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate,
+                     _deliveryDelay);
         }
     }
 
@@ -344,7 +353,8 @@ public abstract class BasicMessageProduc
         checkInitialDestination();
         synchronized (_connection.getFailoverMutex())
         {
-            sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate);
+            sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate,
+                     _deliveryDelay);
         }
     }
 
@@ -354,7 +364,7 @@ public abstract class BasicMessageProduc
         checkInitialDestination();
         synchronized (_connection.getFailoverMutex())
         {
-            sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
+            sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate, _deliveryDelay);
         }
     }
 
@@ -365,13 +375,15 @@ public abstract class BasicMessageProduc
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
-            sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive,
+            AMQDestination amqDestination = (AMQDestination) destination;
+            sendImpl(amqDestination, message, _deliveryMode, _messagePriority, _timeToLive,
                     _mandatory == null
                             ? destination instanceof Topic
                                 ? _defaultMandatoryTopicValue
                                 : _defaultMandatoryValue
                             : _mandatory,
-                     _immediate);
+                     _immediate,
+                     amqDestination.getDeliveryDelay() != 0L ? amqDestination.getDeliveryDelay() : _deliveryDelay);
         }
     }
 
@@ -383,13 +395,15 @@ public abstract class BasicMessageProduc
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
-            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
+            AMQDestination amqDestination = (AMQDestination) destination;
+            sendImpl(amqDestination, message, deliveryMode, priority, timeToLive,
                     _mandatory == null
                             ? destination instanceof Topic
                                 ? _defaultMandatoryTopicValue
                                 : _defaultMandatoryValue
                             : _mandatory,
-                    _immediate);
+                     _immediate,
+                     amqDestination.getDeliveryDelay() != 0L ? amqDestination.getDeliveryDelay() : _deliveryDelay);
         }
     }
 
@@ -401,7 +415,9 @@ public abstract class BasicMessageProduc
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
-            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, _immediate);
+            AMQDestination amqDestination = (AMQDestination) destination;
+            sendImpl(amqDestination, message, deliveryMode, priority, timeToLive, mandatory, _immediate,
+                     amqDestination.getDeliveryDelay() != 0L ? amqDestination.getDeliveryDelay() : _deliveryDelay);
         }
     }
 
@@ -413,7 +429,9 @@ public abstract class BasicMessageProduc
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
-            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate);
+            AMQDestination amqDestination = (AMQDestination) destination;
+            sendImpl(amqDestination, message, deliveryMode, priority, timeToLive, mandatory, immediate,
+                     amqDestination.getDeliveryDelay() != 0L ? amqDestination.getDeliveryDelay() : _deliveryDelay);
         }
     }
 
@@ -498,10 +516,17 @@ public abstract class BasicMessageProduc
      * @param mandatory
      * @param immediate
      *
+     * @param deliveryDelay
      * @throws JMSException
      */
-    protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive,
-                            boolean mandatory, boolean immediate) throws JMSException
+    protected void sendImpl(AMQDestination destination,
+                            Message origMessage,
+                            int deliveryMode,
+                            int priority,
+                            long timeToLive,
+                            boolean mandatory,
+                            boolean immediate,
+                            long deliveryDelay) throws JMSException
     {
         checkTemporaryDestination(destination);
         origMessage.setJMSDestination(destination);
@@ -521,7 +546,8 @@ public abstract class BasicMessageProduc
 
         try
         {
-            sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate);
+            sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate,
+                        deliveryDelay);
         }
         catch (TransportException e)
         {
@@ -549,7 +575,7 @@ public abstract class BasicMessageProduc
 
     abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
                               UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
-                              boolean immediate) throws JMSException;
+                              boolean immediate, final long deliveryDelay) throws JMSException;
 
     private void checkTemporaryDestination(AMQDestination destination) throws InvalidDestinationException
     {
@@ -724,6 +750,17 @@ public abstract class BasicMessageProduc
         _publishMode = publishMode;
     }
 
+    public long getDeliveryDelay()
+    {
+        return _deliveryDelay;
+    }
+
+    @Override
+    public void setDeliveryDelay(final long deliveryDelay)
+    {
+        _deliveryDelay = deliveryDelay;
+    }
+
     Logger getLogger()
     {
         return _logger;

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Sun May 15 22:36:02 2016
@@ -112,7 +112,7 @@ public class BasicMessageProducer_0_10 e
      */
     void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
                      UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
-                     boolean immediate) throws JMSException
+                     boolean immediate, final long deliveryDelay) throws JMSException
     {
         message.prepareForSending();
 
@@ -137,7 +137,7 @@ public class BasicMessageProducer_0_10 e
         }
 
         long currentTime = 0;
-        if (timeToLive > 0 || !isDisableTimestamps())
+        if (timeToLive > 0 || !isDisableTimestamps() || deliveryDelay != 0L)
         {
             currentTime = System.currentTimeMillis();
         }        
@@ -188,12 +188,13 @@ public class BasicMessageProducer_0_10 e
             deliveryProp.setRoutingKey(routingKey);
         }
         
-        if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR && 
-           (destination.getSubject() != null || 
+        Map<String,Object> appProps = messageProps.getApplicationHeaders();
+
+        if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR &&
+           (destination.getSubject() != null ||
               (messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT) != null))
            )
         {
-            Map<String,Object> appProps = messageProps.getApplicationHeaders();
             if (appProps == null)
             {
                 appProps = new HashMap<String,Object>();
@@ -213,6 +214,18 @@ public class BasicMessageProducer_0_10 e
             }
         }
 
+
+        if(deliveryDelay != 0L && (appProps == null || appProps.get(QpidMessageProperties.QPID_NOT_VALID_BEFORE) == null))
+        {
+            if (appProps == null)
+            {
+                appProps = new HashMap<String,Object>();
+                messageProps.setApplicationHeaders(appProps);
+            }
+
+            appProps.put(QpidMessageProperties.QPID_NOT_VALID_BEFORE, deliveryDelay+currentTime);
+        }
+
         ByteBuffer data = message.getData();
         boolean encrypt = message.getBooleanProperty(MessageEncryptionHelper.ENCRYPT_HEADER) || destination.sendEncrypted();
         if(encrypt)

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Sun May 15 22:36:02 2016
@@ -109,8 +109,8 @@ public class BasicMessageProducer_0_8 ex
     }
 
     void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
-                     UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
-                     boolean immediate) throws JMSException
+                     UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
+                     boolean immediate, final long deliveryDelay) throws JMSException
     {
 
 
@@ -171,9 +171,11 @@ public class BasicMessageProducer_0_8 ex
                 .getHeaders()
                 .setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
 
+        long currentTime;
         if (!isDisableTimestamps())
         {
-            final long currentTime = System.currentTimeMillis();
+
+            currentTime = System.currentTimeMillis();
             contentHeaderProperties.setTimestamp(currentTime);
 
             if (timeToLive > 0)
@@ -194,6 +196,20 @@ public class BasicMessageProducer_0_8 ex
                 contentHeaderProperties.setExpiration(0);
             }
         }
+        else
+        {
+            currentTime = 0L;
+        }
+
+        if(deliveryDelay != 0L && headers.get(QpidMessageProperties.QPID_NOT_VALID_BEFORE) == null)
+        {
+            if(currentTime == 0L)
+            {
+                currentTime = System.currentTimeMillis();
+            }
+            headers.setLong(QpidMessageProperties.QPID_NOT_VALID_BEFORE, deliveryDelay+currentTime);
+        }
+
 
         contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
         contentHeaderProperties.setPriority((byte) priority);

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java Sun May 15 22:36:02 2016
@@ -135,7 +135,12 @@ public class AMQMessageDelegate_0_10 ext
             dest = (AMQDestination) convertToAddressBasedDestination(_deliveryProps.getExchange(),
                     _deliveryProps.getRoutingKey(), subject, false, AMQDestination.UNKNOWN_TYPE);
         }
-        
+
+        if(messageProps != null && messageProps.getApplicationHeaders() != null)
+        {
+            messageProps.getApplicationHeaders().remove(QpidMessageProperties.QPID_NOT_VALID_BEFORE);
+        }
+
         setJMSDestination(dest);        
     }
 

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java Sun May 15 22:36:02 2016
@@ -97,15 +97,20 @@ public class AMQMessageDelegate_0_8 exte
     }
 
     // Used when generating a received message object
-    protected AMQMessageDelegate_0_8(long deliveryTag, BasicContentHeaderProperties contentHeader, String exchange,
-                                     String routingKey, AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
-                                                         AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
-                                    int addressType)
+    protected AMQMessageDelegate_0_8(long deliveryTag,
+                                     BasicContentHeaderProperties contentHeader,
+                                     String exchange,
+                                     String routingKey,
+                                     AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
+                                     AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+                                     int addressType)
     {
         this(contentHeader, deliveryTag);
 
         Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
 
+        contentHeader.getHeaders().remove(QpidMessageProperties.QPID_NOT_VALID_BEFORE);
+
         AMQDestination dest = null;
 
         if(AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL)

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java Sun May 15 22:36:02 2016
@@ -32,7 +32,10 @@ public class QpidMessageProperties
     public static final String QPID_SUBJECT = "qpid.subject";
     public static final String QPID_SUBJECT_JMS_PROPERTY = "JMS_qpid_subject";
     public static final String QPID_SUBJECT_JMS_PROPER = QPID_SUBJECT_JMS_PROPERTY.substring(4);
-    
+
+    public static final String QPID_NOT_VALID_BEFORE = "x-qpid-not-valid-before";
+
+
     // AMQP 0-10 related properties
     public static final String AMQP_0_10_APP_ID = "x-amqp-0-10.app-id";
     public static final String AMQP_0_10_ROUTING_KEY = "x-amqp-0-10.routing-key";

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java Sun May 15 22:36:02 2016
@@ -245,7 +245,11 @@ public class AddressHelper
                             reliability + "' is not yet supported");
                 }
             }
-            
+            Long delay = _linkPropAccess.getLong("delay");
+            if(delay != null)
+            {
+                link.setDelay(delay);
+            }
             if (((Map) _address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
             {
                 MapAccessor capacityProps = new MapAccessor((Map) ((Map) _address.getOptions().get(LINK)).get(CAPACITY));

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java Sun May 15 22:36:02 2016
@@ -38,6 +38,7 @@ public class Link
     private FilterType _filterType = FilterType.SUBJECT;
     private boolean _isNoLocal;
     private boolean _isDurable;
+    private long _delay;
     private int _consumerCapacity = -1;
     private int _producerCapacity = -1;
     private Subscription subscription;
@@ -145,6 +146,16 @@ public class Link
         _bindings = bindings;
     }
 
+    public long getDelay()
+    {
+        return _delay;
+    }
+
+    public void setDelay(final long delay)
+    {
+        _delay = delay;
+    }
+
     public SubscriptionQueue getSubscriptionQueue()
     {
         return _subscriptionQueue;

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/MessageProducer.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/MessageProducer.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/MessageProducer.java Sun May 15 22:36:02 2016
@@ -36,4 +36,6 @@ public interface MessageProducer extends
                      int priority, long timeToLive, boolean mandatory, boolean immediate)
             throws JMSException;
 
+    void setDeliveryDelay(long delay);
+
 }

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/Accessor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/Accessor.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/Accessor.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/Accessor.java Sun May 15 22:36:02 2016
@@ -120,7 +120,7 @@ public interface Accessor
                 }
                 else
                 {
-                    return Long.parseLong((String)source.get(name));
+                    return Long.parseLong(source.get(name).toString());
                 }
             }
             else

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/url/BindingURL.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/url/BindingURL.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/url/BindingURL.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/url/BindingURL.java Sun May 15 22:36:02 2016
@@ -43,6 +43,7 @@ public interface BindingURL
     String OPTION_EXCHANGE_INTERNAL = "exchangeinternal";
     String OPTION_SEND_ENCRYPTED = "sendencrypted";
     String OPTION_ENCRYPTED_RECIPIENTS = "encryptedrecipients";
+    String OPTION_DELIVERY_DELAY = "deliveryDelay";
 
 
     /**
@@ -66,7 +67,8 @@ public interface BindingURL
                                                                           OPTION_EXCHANGE_DURABLE,
                                                                           OPTION_REJECT_BEHAVIOUR,
                                                                           OPTION_SEND_ENCRYPTED,
-                                                                          OPTION_ENCRYPTED_RECIPIENTS)));
+                                                                          OPTION_ENCRYPTED_RECIPIENTS,
+                                                                          OPTION_DELIVERY_DELAY)));
 
 
     String getURL();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org