You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/05/15 22:36:03 UTC
svn commit: r1743982 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/org/apache/qpid/server/message/internal/
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/o...
Author: rgodfrey
Date: Sun May 15 22:36:02 2016
New Revision: 1743982
URL: http://svn.apache.org/viewvc?rev=1743982&view=rev
Log:
QPID-7255 : Support delivery delay
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/Accessor.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/url/BindingURL.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java Sun May 15 22:36:02 2016
@@ -43,6 +43,8 @@ public interface AMQMessageHeader
long getTimestamp();
+ long getNotValidBefore();
+
String getType();
String getReplyTo();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java Sun May 15 22:36:02 2016
@@ -94,7 +94,7 @@ public interface MessageInstance
public abstract class EntryState
{
- private EntryState()
+ protected EntryState()
{
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java Sun May 15 22:36:02 2016
@@ -43,19 +43,24 @@ public final class InternalMessageHeader
private final String _encoding;
private final byte _priority;
private final long _timestamp;
+ private final long _notValidBefore;
private final String _type;
private final String _replyTo;
private long _arrivalTime;
public InternalMessageHeader(final Map<String, Object> headers,
- final String correlationId,
- final long expiration,
- final String userId,
- final String appId,
- final String messageId,
- final String mimeType,
- final String encoding,
- final byte priority, final long timestamp, final String type, final String replyTo)
+ final String correlationId,
+ final long expiration,
+ final String userId,
+ final String appId,
+ final String messageId,
+ final String mimeType,
+ final String encoding,
+ final byte priority,
+ final long timestamp,
+ final long notValidBefore,
+ final String type,
+ final String replyTo)
{
_headers = headers == null ? new LinkedHashMap<String, Object>()
: new LinkedHashMap<String, Object>(headers);
@@ -69,6 +74,7 @@ public final class InternalMessageHeader
_encoding = encoding;
_priority = priority;
_timestamp = timestamp;
+ _notValidBefore = notValidBefore;
_type = type;
_replyTo = replyTo;
_arrivalTime = System.currentTimeMillis();
@@ -85,6 +91,7 @@ public final class InternalMessageHeader
_encoding = header.getEncoding();
_priority = header.getPriority();
_timestamp = header.getTimestamp();
+ _notValidBefore = header.getNotValidBefore();
_type = header.getType();
_replyTo = header.getReplyTo();
_headers = new LinkedHashMap<String, Object>();
@@ -150,6 +157,12 @@ public final class InternalMessageHeader
}
@Override
+ public long getNotValidBefore()
+ {
+ return _notValidBefore;
+ }
+
+ @Override
public String getType()
{
return _type;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Sun May 15 22:36:02 2016
@@ -79,6 +79,8 @@ public interface Queue<X extends Queue<X
String MINIMUM_MESSAGE_TTL = "minimumMessageTtl";
String DEFAULT_FILTERS = "defaultFilters";
String ENSURE_NONDESTRUCTIVE_CONSUMERS = "ensureNondestructiveConsumers";
+ String HOLD_ON_PUBLISH_ENABLED = "holdOnPublishEnabled";
+
String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint";
@ManagedContextDefault( name = QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT)
@@ -192,6 +194,13 @@ public interface Queue<X extends Queue<X
@ManagedAttribute
Map<String, Map<String,List<String>>> getDefaultFilters();
+
+ @ManagedContextDefault( name = "queue.holdOnPublishEnabled")
+ boolean DEFAULT_HOLD_ON_PUBLISH_ENABLED = false;
+
+ @ManagedAttribute( defaultValue = "${queue.holdOnPublishEnabled}")
+ boolean isHoldOnPublishEnabled();
+
//children
Collection<? extends Binding<?>> getBindings();
@@ -344,4 +353,6 @@ public interface Queue<X extends Queue<X
void setTargetSize(long targetSize);
long getPotentialMemoryFootprint();
+
+ boolean isHeld(QueueEntry queueEntry, final long evaluationTime);
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Sun May 15 22:36:02 2016
@@ -253,6 +253,9 @@ public abstract class AbstractQueue<X ex
private long _maximumMessageTtl;
@ManagedAttributeField
private boolean _ensureNondestructiveConsumers;
+ @ManagedAttributeField
+ private volatile boolean _holdOnPublishEnabled;
+
private static final int RECOVERING = 1;
private static final int COMPLETING_RECOVERY = 2;
@@ -266,6 +269,12 @@ public abstract class AbstractQueue<X ex
private final QueueRunner _queueRunner;
private boolean _closing;
private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
+ private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>();
+
+ private interface HoldMethod
+ {
+ boolean isHeld(MessageReference<?> message, long evalutaionTime);
+ }
protected AbstractQueue(Map<String, Object> attributes, VirtualHost<?> virtualHost)
{
@@ -499,6 +508,18 @@ public abstract class AbstractQueue<X ex
}
}
+ if(isHoldOnPublishEnabled())
+ {
+ _holdMethods.add(new HoldMethod()
+ {
+ @Override
+ public boolean isHeld(final MessageReference<?> messageReference, final long evaluationTime)
+ {
+ return messageReference.getMessage().getMessageHeader().getNotValidBefore() >= evaluationTime;
+ }
+ });
+ }
+
updateAlertChecks();
}
@@ -633,7 +654,11 @@ public abstract class AbstractQueue<X ex
return _ensureNondestructiveConsumers;
}
-
+ @Override
+ public boolean isHoldOnPublishEnabled()
+ {
+ return _holdOnPublishEnabled;
+ }
@Override
public Collection<String> getAvailableAttributes()
@@ -2509,6 +2534,8 @@ public abstract class AbstractQueue<X ex
}
else
{
+ node.checkHeld(currentTime);
+
// There is a chance that the node could be deleted by
// the time the check actually occurs. So verify we
// can actually get the message to perform the check.
@@ -2744,6 +2771,42 @@ public abstract class AbstractQueue<X ex
return _persistentMessageDequeueCount.get();
}
+ @Override
+ public boolean isHeld(final QueueEntry queueEntry, final long evaluationTime)
+ {
+ if(!_holdMethods.isEmpty())
+ {
+ ServerMessage message = queueEntry.getMessage();
+ try
+ {
+ MessageReference ref = message.newReference();
+ try
+ {
+ for(HoldMethod method : _holdMethods)
+ {
+ if(method.isHeld(ref, evaluationTime))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+ finally
+ {
+ ref.release();
+ }
+ }
+ catch (MessageDeletedException e)
+ {
+ return false;
+ }
+ }
+ else
+ {
+ return false;
+ }
+
+ }
@Override
public String toString()
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Sun May 15 22:36:02 2016
@@ -416,9 +416,8 @@ class QueueConsumerImpl
public final boolean hasInterest(QueueEntry entry)
{
//check that the message hasn't been rejected
- if (entry.isRejectedBy(this))
+ if (entry.isRejectedBy(this) || entry.checkHeld(System.currentTimeMillis()))
{
-
return false;
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Sun May 15 22:36:02 2016
@@ -44,4 +44,6 @@ public interface QueueEntry extends Mess
void setExpiration(long calculatedExpiration);
MessageReference newMessageReference();
+
+ boolean checkHeld(final long evaluationTime);
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Sun May 15 22:36:02 2016
@@ -57,6 +57,21 @@ public abstract class QueueEntryImpl imp
private Set<Long> _rejectedBy = null;
+ private static final EntryState HELD_STATE = new EntryState()
+ {
+ @Override
+ public State getState()
+ {
+ return State.AVAILABLE;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "HELD";
+ }
+ };
+
private volatile EntryState _state = AVAILABLE_STATE;
private static final
@@ -197,7 +212,7 @@ public abstract class QueueEntryImpl imp
public boolean isAvailable()
{
- return _state == AVAILABLE_STATE;
+ return _state.getState() == State.AVAILABLE;
}
public boolean isAcquired()
@@ -269,7 +284,17 @@ public abstract class QueueEntryImpl imp
private boolean acquire(final EntryState state)
{
- boolean acquired = _stateUpdater.compareAndSet(this, AVAILABLE_STATE, state);
+ boolean acquired = false;
+
+ EntryState currentState;
+
+ while((currentState = _state).getState() == State.AVAILABLE)
+ {
+ if(acquired = _stateUpdater.compareAndSet(this, currentState, state))
+ {
+ break;
+ }
+ }
if(acquired && _stateChangeListeners != null)
{
@@ -406,7 +431,7 @@ public abstract class QueueEntryImpl imp
if(!getQueue().isDeleted())
{
getQueue().requeue(this);
- if(_stateChangeListeners != null)
+ if(_stateChangeListeners != null && previousState.getState() == State.ACQUIRED)
{
notifyStateChange(State.ACQUIRED, State.AVAILABLE);
}
@@ -419,6 +444,38 @@ public abstract class QueueEntryImpl imp
}
@Override
+ public boolean checkHeld(final long evaluationTime)
+ {
+ EntryState state;
+ while((state = _state).getState() == State.AVAILABLE)
+ {
+ boolean isHeld = getQueue().isHeld(this, evaluationTime);
+ if(state == AVAILABLE_STATE && isHeld)
+ {
+ if(!_stateUpdater.compareAndSet(this, state, HELD_STATE))
+ {
+ continue;
+ }
+ }
+ else if(state == HELD_STATE && !isHeld)
+ {
+
+ if(_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
+ {
+ postRelease(state);
+ }
+ else
+ {
+ continue;
+ }
+ }
+ return isHeld;
+
+ }
+ return false;
+ }
+
+ @Override
public QueueConsumer getDeliveredConsumer()
{
EntryState state = _state;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java Sun May 15 22:36:02 2016
@@ -165,7 +165,7 @@ public class TrustStoreMessageSource ext
InternalMessageHeader header = new InternalMessageHeader(Collections.<String,Object>emptyMap(),
null, 0l, null, null, UUID.randomUUID().toString(),
null, null, (byte)4, System.currentTimeMillis(),
- null, null);
+ 0L, null, null);
return InternalMessage.createListMessage(_virtualHost.getMessageStore(), header, messageList);
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java Sun May 15 22:36:02 2016
@@ -84,7 +84,7 @@ public class VirtualHostPropertiesNode e
InternalMessageHeader header = new InternalMessageHeader(headers,
null, 0l, null, null, UUID.randomUUID().toString(),
null, null, (byte) 4, System.currentTimeMillis(),
- null, null);
+ 0L, null, null);
final InternalMessage message =
InternalMessage.createBytesMessage(_virtualHost.getMessageStore(), header, new byte[0]);
message.setInitialRoutingAddress(getName());
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java Sun May 15 22:36:02 2016
@@ -98,6 +98,12 @@ public class HeadersBindingTest extends
return 0;
}
+ @Override
+ public long getNotValidBefore()
+ {
+ return 0;
+ }
+
public String getType()
{
return null;
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Sun May 15 22:36:02 2016
@@ -235,6 +235,88 @@ abstract class AbstractQueueTestBase ext
_consumer.getQueueContext().getReleasedEntry());
}
+ public void testMessageHeldIfNotYetValidWhenConsumerAdded() throws Exception
+ {
+ _queue.close();
+ Map<String,Object> attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME, _qname);
+ attributes.put(Queue.OWNER, _owner);
+ attributes.put(Queue.HOLD_ON_PUBLISH_ENABLED, Boolean.TRUE);
+
+ _queue = _virtualHost.createChild(Queue.class, attributes);
+
+ ServerMessage messageA = createMessage(new Long(24));
+ AMQMessageHeader messageHeader = messageA.getMessageHeader();
+ when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()+20000L);
+ _queue.enqueue(messageA, null, null);
+ _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES));
+ Thread.sleep(QUEUE_RUNNER_WAIT_TIME);
+
+ assertEquals("Message which was not yet valid was received", 0, _consumerTarget.getMessages().size());
+ when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()-100L);
+ _queue.checkMessageStatus();
+ Thread.sleep(QUEUE_RUNNER_WAIT_TIME);
+ assertEquals("Message which was valid was not received", 1, _consumerTarget.getMessages().size());
+ }
+
+ public void testMessageHoldingDependentOnQueueProperty() throws Exception
+ {
+ _queue.close();
+ Map<String,Object> attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME, _qname);
+ attributes.put(Queue.OWNER, _owner);
+ attributes.put(Queue.HOLD_ON_PUBLISH_ENABLED, Boolean.FALSE);
+
+ _queue = _virtualHost.createChild(Queue.class, attributes);
+
+ ServerMessage messageA = createMessage(new Long(24));
+ AMQMessageHeader messageHeader = messageA.getMessageHeader();
+ when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()+20000L);
+ _queue.enqueue(messageA, null, null);
+ _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES));
+ Thread.sleep(QUEUE_RUNNER_WAIT_TIME);
+
+ assertEquals("Message was held despite queue not having holding enabled", 1, _consumerTarget.getMessages().size());
+
+ }
+
+ public void testUnheldMessageOvertakesHeld() throws Exception
+ {
+ _queue.close();
+ Map<String,Object> attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME, _qname);
+ attributes.put(Queue.OWNER, _owner);
+ attributes.put(Queue.HOLD_ON_PUBLISH_ENABLED, Boolean.TRUE);
+
+ _queue = _virtualHost.createChild(Queue.class, attributes);
+
+ ServerMessage messageA = createMessage(new Long(24));
+ AMQMessageHeader messageHeader = messageA.getMessageHeader();
+ when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()+20000L);
+ _queue.enqueue(messageA, null, null);
+ ServerMessage messageB = createMessage(new Long(25));
+ _queue.enqueue(messageB, null, null);
+
+ _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
+ EnumSet.of(ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES));
+ Thread.sleep(QUEUE_RUNNER_WAIT_TIME);
+
+ assertEquals("Expect one message (message B)", 1, _consumerTarget.getMessages().size());
+ assertEquals("Wrong message received", messageB.getMessageHeader().getMessageId(), _consumerTarget.getMessages().get(0).getMessage().getMessageHeader().getMessageId());
+
+ when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()-100L);
+ _queue.checkMessageStatus();
+ Thread.sleep(QUEUE_RUNNER_WAIT_TIME);
+ assertEquals("Message which was valid was not received", 2, _consumerTarget.getMessages().size());
+ assertEquals("Wrong message received", messageA.getMessageHeader().getMessageId(), _consumerTarget.getMessages().get(1).getMessage().getMessageHeader().getMessageId());
+
+ }
+
/**
* Tests that a released queue entry is resent to the subscriber. Verifies also that the
* QueueContext._releasedEntry is reset to null after the entry has been reset.
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java Sun May 15 22:36:02 2016
@@ -144,6 +144,12 @@ public class MessageConverter_v0_10_to_I
}
@Override
+ public long getNotValidBefore()
+ {
+ return _delegate.getNotValidBefore();
+ }
+
+ @Override
public String getType()
{
return _delegate.getType();
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java Sun May 15 22:36:02 2016
@@ -34,6 +34,7 @@ class MessageTransferHeader implements A
private final DeliveryProperties _deliveryProps;
private final MessageProperties _messageProps;
+ private long _notValidBefore;
public MessageTransferHeader(DeliveryProperties deliveryProps, MessageProperties messageProps)
{
@@ -100,6 +101,14 @@ class MessageTransferHeader implements A
return _deliveryProps == null ? 0L : _deliveryProps.getTimestamp();
}
+
+ @Override
+ public long getNotValidBefore()
+ {
+ Object header = getHeader("x-qpid-not-valid-before");
+ return header instanceof Number ? ((Number)header).longValue() : 0L;
+ }
+
public String getType()
{
Object type = getHeader(JMS_TYPE);
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java Sun May 15 22:36:02 2016
@@ -193,6 +193,12 @@ public class MessageConverter_v0_8_to_In
}
@Override
+ public long getNotValidBefore()
+ {
+ return _delegate.getNotValidBefore();
+ }
+
+ @Override
public String getType()
{
return _delegate.getType();
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java Sun May 15 22:36:02 2016
@@ -247,6 +247,13 @@ public class MessageMetaData implements
return getProperties().getReplyToAsString();
}
+ @Override
+ public long getNotValidBefore()
+ {
+ Object header = getHeader("x-qpid-not-valid-before");
+ return header instanceof Number ? ((Number)header).longValue() : 0L;
+ }
+
public Object getHeader(String name)
{
FieldTable ft = getProperties().getHeaders();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java Sun May 15 22:36:02 2016
@@ -510,6 +510,23 @@ public class MessageMetaData_1_0 impleme
}
+
+ @Override
+ public long getNotValidBefore()
+ {
+ long notValidBefore;
+ Object annotation;
+ if(_messageAnnotations != null && (annotation = _messageAnnotations.get(Symbol.valueOf("x-qpid-not-valid-before"))) instanceof Number)
+ {
+ notValidBefore = ((Number)annotation).longValue();
+ }
+ else
+ {
+ notValidBefore = 0L;
+ }
+ return notValidBefore;
+ }
+
public String getType()
{
String subject = getSubject();
Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java Sun May 15 22:36:02 2016
@@ -1301,6 +1301,7 @@ class ManagementNode implements MessageS
private String _encoding;
private byte _priority;
private long _timestamp;
+ private long _notValidBefore;
private String _type;
private String _replyTo;
@@ -1349,6 +1350,11 @@ class ManagementNode implements MessageS
_timestamp = timestamp;
}
+ public void setNotValidBefore(final long notValidBefore)
+ {
+ _notValidBefore = notValidBefore;
+ }
+
public void setType(final String type)
{
_type = type;
@@ -1404,6 +1410,12 @@ class ManagementNode implements MessageS
return _timestamp;
}
+ @Override
+ public long getNotValidBefore()
+ {
+ return _notValidBefore;
+ }
+
public String getType()
{
return _type;
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQDestination.java Sun May 15 22:36:02 2016
@@ -99,6 +99,7 @@ public abstract class AMQDestination imp
public static final int UNKNOWN_TYPE = 3;
private boolean _sendEncrypted;
private String _encryptedRecipients;
+ private long _deliveryDelay;
protected void setExclusive(boolean exclusive)
{
@@ -130,6 +131,11 @@ public abstract class AMQDestination imp
return _encryptedRecipients;
}
+ public long getDeliveryDelay()
+ {
+ return _deliveryDelay;
+ }
+
// ----- Fields required to support new address syntax -------
public enum DestSyntax {
@@ -314,6 +320,8 @@ public abstract class AMQDestination imp
_consumerArguments = binding.getConsumerOptions();
_sendEncrypted = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_SEND_ENCRYPTED));
_encryptedRecipients = binding.getOption(BindingURL.OPTION_ENCRYPTED_RECIPIENTS);
+ String deliveryDelayVal = binding.getOption(BindingURL.OPTION_DELIVERY_DELAY);
+ _deliveryDelay = deliveryDelayVal == null ? 0L : Long.parseLong(deliveryDelayVal);
}
protected AMQDestination(String exchangeName, String exchangeClass, String routingKey, String queueName)
@@ -957,7 +965,7 @@ public abstract class AMQDestination imp
_addressType = _addrHelper.getNodeType();
_node = _addrHelper.getNode();
_link = _addrHelper.getLink();
-
+ _deliveryDelay = _link.getDelay();
_sendEncrypted = _addrHelper.getSendEncrypted();
_encryptedRecipients = _addrHelper.getEncryptedRecipients();
}
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Sun May 15 22:36:02 2016
@@ -100,6 +100,8 @@ public abstract class BasicMessageProduc
private String _userID; // ref user id used in the connection.
+ private long _deliveryDelay;
+
/**
* The default value for immediate flag used this producer is false. That is, a consumer does
@@ -150,6 +152,11 @@ public abstract class BasicMessageProduc
: mandatory;
_userID = connection.isPopulateUserId() ? connection.getUsername() : null;
+
+ if(destination != null && destination.getDeliveryDelay() != 0L)
+ {
+ _deliveryDelay = destination.getDeliveryDelay();
+ }
setPublishMode();
}
@@ -323,7 +330,8 @@ public abstract class BasicMessageProduc
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
+ sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate,
+ _deliveryDelay);
}
}
@@ -334,7 +342,8 @@ public abstract class BasicMessageProduc
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
+ sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate,
+ _deliveryDelay);
}
}
@@ -344,7 +353,8 @@ public abstract class BasicMessageProduc
checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate);
+ sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate,
+ _deliveryDelay);
}
}
@@ -354,7 +364,7 @@ public abstract class BasicMessageProduc
checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
- sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
+ sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate, _deliveryDelay);
}
}
@@ -365,13 +375,15 @@ public abstract class BasicMessageProduc
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive,
+ AMQDestination amqDestination = (AMQDestination) destination;
+ sendImpl(amqDestination, message, _deliveryMode, _messagePriority, _timeToLive,
_mandatory == null
? destination instanceof Topic
? _defaultMandatoryTopicValue
: _defaultMandatoryValue
: _mandatory,
- _immediate);
+ _immediate,
+ amqDestination.getDeliveryDelay() != 0L ? amqDestination.getDeliveryDelay() : _deliveryDelay);
}
}
@@ -383,13 +395,15 @@ public abstract class BasicMessageProduc
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
+ AMQDestination amqDestination = (AMQDestination) destination;
+ sendImpl(amqDestination, message, deliveryMode, priority, timeToLive,
_mandatory == null
? destination instanceof Topic
? _defaultMandatoryTopicValue
: _defaultMandatoryValue
: _mandatory,
- _immediate);
+ _immediate,
+ amqDestination.getDeliveryDelay() != 0L ? amqDestination.getDeliveryDelay() : _deliveryDelay);
}
}
@@ -401,7 +415,9 @@ public abstract class BasicMessageProduc
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, _immediate);
+ AMQDestination amqDestination = (AMQDestination) destination;
+ sendImpl(amqDestination, message, deliveryMode, priority, timeToLive, mandatory, _immediate,
+ amqDestination.getDeliveryDelay() != 0L ? amqDestination.getDeliveryDelay() : _deliveryDelay);
}
}
@@ -413,7 +429,9 @@ public abstract class BasicMessageProduc
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
- sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate);
+ AMQDestination amqDestination = (AMQDestination) destination;
+ sendImpl(amqDestination, message, deliveryMode, priority, timeToLive, mandatory, immediate,
+ amqDestination.getDeliveryDelay() != 0L ? amqDestination.getDeliveryDelay() : _deliveryDelay);
}
}
@@ -498,10 +516,17 @@ public abstract class BasicMessageProduc
* @param mandatory
* @param immediate
*
+ * @param deliveryDelay
* @throws JMSException
*/
- protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate) throws JMSException
+ protected void sendImpl(AMQDestination destination,
+ Message origMessage,
+ int deliveryMode,
+ int priority,
+ long timeToLive,
+ boolean mandatory,
+ boolean immediate,
+ long deliveryDelay) throws JMSException
{
checkTemporaryDestination(destination);
origMessage.setJMSDestination(destination);
@@ -521,7 +546,8 @@ public abstract class BasicMessageProduc
try
{
- sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate);
+ sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate,
+ deliveryDelay);
}
catch (TransportException e)
{
@@ -549,7 +575,7 @@ public abstract class BasicMessageProduc
abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
- boolean immediate) throws JMSException;
+ boolean immediate, final long deliveryDelay) throws JMSException;
private void checkTemporaryDestination(AMQDestination destination) throws InvalidDestinationException
{
@@ -724,6 +750,17 @@ public abstract class BasicMessageProduc
_publishMode = publishMode;
}
+ public long getDeliveryDelay()
+ {
+ return _deliveryDelay;
+ }
+
+ @Override
+ public void setDeliveryDelay(final long deliveryDelay)
+ {
+ _deliveryDelay = deliveryDelay;
+ }
+
Logger getLogger()
{
return _logger;
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Sun May 15 22:36:02 2016
@@ -112,7 +112,7 @@ public class BasicMessageProducer_0_10 e
*/
void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
- boolean immediate) throws JMSException
+ boolean immediate, final long deliveryDelay) throws JMSException
{
message.prepareForSending();
@@ -137,7 +137,7 @@ public class BasicMessageProducer_0_10 e
}
long currentTime = 0;
- if (timeToLive > 0 || !isDisableTimestamps())
+ if (timeToLive > 0 || !isDisableTimestamps() || deliveryDelay != 0L)
{
currentTime = System.currentTimeMillis();
}
@@ -188,12 +188,13 @@ public class BasicMessageProducer_0_10 e
deliveryProp.setRoutingKey(routingKey);
}
- if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR &&
- (destination.getSubject() != null ||
+ Map<String,Object> appProps = messageProps.getApplicationHeaders();
+
+ if (destination.getDestSyntax() == AMQDestination.DestSyntax.ADDR &&
+ (destination.getSubject() != null ||
(messageProps.getApplicationHeaders() != null && messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT) != null))
)
{
- Map<String,Object> appProps = messageProps.getApplicationHeaders();
if (appProps == null)
{
appProps = new HashMap<String,Object>();
@@ -213,6 +214,18 @@ public class BasicMessageProducer_0_10 e
}
}
+
+ if(deliveryDelay != 0L && (appProps == null || appProps.get(QpidMessageProperties.QPID_NOT_VALID_BEFORE) == null))
+ {
+ if (appProps == null)
+ {
+ appProps = new HashMap<String,Object>();
+ messageProps.setApplicationHeaders(appProps);
+ }
+
+ appProps.put(QpidMessageProperties.QPID_NOT_VALID_BEFORE, deliveryDelay+currentTime);
+ }
+
ByteBuffer data = message.getData();
boolean encrypt = message.getBooleanProperty(MessageEncryptionHelper.ENCRYPT_HEADER) || destination.sendEncrypted();
if(encrypt)
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Sun May 15 22:36:02 2016
@@ -109,8 +109,8 @@ public class BasicMessageProducer_0_8 ex
}
void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
- UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
- boolean immediate) throws JMSException
+ UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
+ boolean immediate, final long deliveryDelay) throws JMSException
{
@@ -171,9 +171,11 @@ public class BasicMessageProducer_0_8 ex
.getHeaders()
.setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
+ long currentTime;
if (!isDisableTimestamps())
{
- final long currentTime = System.currentTimeMillis();
+
+ currentTime = System.currentTimeMillis();
contentHeaderProperties.setTimestamp(currentTime);
if (timeToLive > 0)
@@ -194,6 +196,20 @@ public class BasicMessageProducer_0_8 ex
contentHeaderProperties.setExpiration(0);
}
}
+ else
+ {
+ currentTime = 0L;
+ }
+
+ if(deliveryDelay != 0L && headers.get(QpidMessageProperties.QPID_NOT_VALID_BEFORE) == null)
+ {
+ if(currentTime == 0L)
+ {
+ currentTime = System.currentTimeMillis();
+ }
+ headers.setLong(QpidMessageProperties.QPID_NOT_VALID_BEFORE, deliveryDelay+currentTime);
+ }
+
contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
contentHeaderProperties.setPriority((byte) priority);
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java Sun May 15 22:36:02 2016
@@ -135,7 +135,12 @@ public class AMQMessageDelegate_0_10 ext
dest = (AMQDestination) convertToAddressBasedDestination(_deliveryProps.getExchange(),
_deliveryProps.getRoutingKey(), subject, false, AMQDestination.UNKNOWN_TYPE);
}
-
+
+ if(messageProps != null && messageProps.getApplicationHeaders() != null)
+ {
+ messageProps.getApplicationHeaders().remove(QpidMessageProperties.QPID_NOT_VALID_BEFORE);
+ }
+
setJMSDestination(dest);
}
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java Sun May 15 22:36:02 2016
@@ -97,15 +97,20 @@ public class AMQMessageDelegate_0_8 exte
}
// Used when generating a received message object
- protected AMQMessageDelegate_0_8(long deliveryTag, BasicContentHeaderProperties contentHeader, String exchange,
- String routingKey, AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
- AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
- int addressType)
+ protected AMQMessageDelegate_0_8(long deliveryTag,
+ BasicContentHeaderProperties contentHeader,
+ String exchange,
+ String routingKey,
+ AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
+ AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
+ int addressType)
{
this(contentHeader, deliveryTag);
Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
+ contentHeader.getHeaders().remove(QpidMessageProperties.QPID_NOT_VALID_BEFORE);
+
AMQDestination dest = null;
if(AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL)
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/message/QpidMessageProperties.java Sun May 15 22:36:02 2016
@@ -32,7 +32,10 @@ public class QpidMessageProperties
public static final String QPID_SUBJECT = "qpid.subject";
public static final String QPID_SUBJECT_JMS_PROPERTY = "JMS_qpid_subject";
public static final String QPID_SUBJECT_JMS_PROPER = QPID_SUBJECT_JMS_PROPERTY.substring(4);
-
+
+ public static final String QPID_NOT_VALID_BEFORE = "x-qpid-not-valid-before";
+
+
// AMQP 0-10 related properties
public static final String AMQP_0_10_APP_ID = "x-amqp-0-10.app-id";
public static final String AMQP_0_10_ROUTING_KEY = "x-amqp-0-10.routing-key";
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java Sun May 15 22:36:02 2016
@@ -245,7 +245,11 @@ public class AddressHelper
reliability + "' is not yet supported");
}
}
-
+ Long delay = _linkPropAccess.getLong("delay");
+ if(delay != null)
+ {
+ link.setDelay(delay);
+ }
if (((Map) _address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
{
MapAccessor capacityProps = new MapAccessor((Map) ((Map) _address.getOptions().get(LINK)).get(CAPACITY));
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java Sun May 15 22:36:02 2016
@@ -38,6 +38,7 @@ public class Link
private FilterType _filterType = FilterType.SUBJECT;
private boolean _isNoLocal;
private boolean _isDurable;
+ private long _delay;
private int _consumerCapacity = -1;
private int _producerCapacity = -1;
private Subscription subscription;
@@ -145,6 +146,16 @@ public class Link
_bindings = bindings;
}
+ public long getDelay()
+ {
+ return _delay;
+ }
+
+ public void setDelay(final long delay)
+ {
+ _delay = delay;
+ }
+
public SubscriptionQueue getSubscriptionQueue()
{
return _subscriptionQueue;
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/MessageProducer.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/MessageProducer.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/jms/MessageProducer.java Sun May 15 22:36:02 2016
@@ -36,4 +36,6 @@ public interface MessageProducer extends
int priority, long timeToLive, boolean mandatory, boolean immediate)
throws JMSException;
+ void setDeliveryDelay(long delay);
+
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/Accessor.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/Accessor.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/Accessor.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/Accessor.java Sun May 15 22:36:02 2016
@@ -120,7 +120,7 @@ public interface Accessor
}
else
{
- return Long.parseLong((String)source.get(name));
+ return Long.parseLong(source.get(name).toString());
}
}
else
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/url/BindingURL.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/url/BindingURL.java?rev=1743982&r1=1743981&r2=1743982&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/url/BindingURL.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/url/BindingURL.java Sun May 15 22:36:02 2016
@@ -43,6 +43,7 @@ public interface BindingURL
String OPTION_EXCHANGE_INTERNAL = "exchangeinternal";
String OPTION_SEND_ENCRYPTED = "sendencrypted";
String OPTION_ENCRYPTED_RECIPIENTS = "encryptedrecipients";
+ String OPTION_DELIVERY_DELAY = "deliveryDelay";
/**
@@ -66,7 +67,8 @@ public interface BindingURL
OPTION_EXCHANGE_DURABLE,
OPTION_REJECT_BEHAVIOUR,
OPTION_SEND_ENCRYPTED,
- OPTION_ENCRYPTED_RECIPIENTS)));
+ OPTION_ENCRYPTED_RECIPIENTS,
+ OPTION_DELIVERY_DELAY)));
String getURL();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org