You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/07/28 22:43:21 UTC
svn commit: r1614166 - in /qpid/trunk/qpid/java:
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-core/src/test/java/org/apache/qpid/server/queue/
broker-plugins/management-http/src/...
Author: rgodfrey
Date: Mon Jul 28 20:43:20 2014
New Revision: 1614166
URL: http://svn.apache.org/r1614166
Log:
QPID-5934 : [Java Broker] Allow TTL to be overridden on a per-Queue basis
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html
qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1614166&r1=1614165&r2=1614166&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Mon Jul 28 20:43:20 2014
@@ -46,6 +46,8 @@ public interface Queue<X extends Queue<X
String QUEUE_FLOW_CONTROL_SIZE_BYTES = "queueFlowControlSizeBytes";
String QUEUE_FLOW_RESUME_SIZE_BYTES = "queueFlowResumeSizeBytes";
String QUEUE_FLOW_STOPPED = "queueFlowStopped";
+ String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl";
+ String MINIMUM_MESSAGE_TTL = "minimumMessageTtl";
@ManagedAttribute
Exchange getAlternateExchange();
@@ -135,16 +137,17 @@ public interface Queue<X extends Queue<X
@ManagedAttribute( defaultValue = "DEFAULT" )
MessageDurability getMessageDurability();
+ @ManagedAttribute
+ long getMinimumMessageTtl();
+ @ManagedAttribute
+ long getMaximumMessageTtl();
//children
Collection<? extends Binding> getBindings();
- // TODO - Undo this commented out line when we stop supporting 1.6 for compilation
- // In 1.6 this causes the build to break at AbstractQueue because the 1.6 compiler can't work out that
- // the definition in terms of the Consumer implementation meets both this, and the contract for AMQQueue
- // Collection<? extends Consumer> getConsumers();
+ Collection<? extends Consumer> getConsumers();
//operations
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1614166&r1=1614165&r2=1614166&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Mon Jul 28 20:43:20 2014
@@ -230,6 +230,11 @@ public abstract class AbstractQueue<X ex
private String _messageGroupDefaultGroup;
@ManagedAttributeField
private int _maximumDistinctGroups;
+ @ManagedAttributeField
+ private long _minimumMessageTtl;
+ @ManagedAttributeField
+ private long _maximumMessageTtl;
+
private State _state = State.UNINITIALIZED;
private final AtomicBoolean _recovering = new AtomicBoolean(true);
@@ -547,6 +552,18 @@ public abstract class AbstractQueue<X ex
}
@Override
+ public long getMinimumMessageTtl()
+ {
+ return _minimumMessageTtl;
+ }
+
+ @Override
+ public long getMaximumMessageTtl()
+ {
+ return _maximumMessageTtl;
+ }
+
+ @Override
public Collection<String> getAvailableAttributes()
{
return new ArrayList<String>(_arguments.keySet());
@@ -967,6 +984,7 @@ public abstract class AbstractQueue<X ex
{
final QueueConsumer<?> exclusiveSub = _exclusiveSubscriber;
final QueueEntry entry = getEntries().add(message);
+ updateExpiration(entry);
try
{
@@ -1011,6 +1029,40 @@ public abstract class AbstractQueue<X ex
}
+ private void updateExpiration(final QueueEntry entry)
+ {
+ long expiration = entry.getMessage().getExpiration();
+ long arrivalTime = entry.getMessage().getArrivalTime();
+ if(_minimumMessageTtl != 0l)
+ {
+ if(arrivalTime == 0)
+ {
+ arrivalTime = System.currentTimeMillis();
+ }
+ if(expiration != 0l)
+ {
+ long calculatedExpiration = arrivalTime+_minimumMessageTtl;
+ if(calculatedExpiration > expiration)
+ {
+ entry.setExpiration(calculatedExpiration);
+ expiration = calculatedExpiration;
+ }
+ }
+ }
+ if(_maximumMessageTtl != 0l)
+ {
+ if(arrivalTime == 0)
+ {
+ arrivalTime = System.currentTimeMillis();
+ }
+ long calculatedExpiration = arrivalTime+_maximumMessageTtl;
+ if(expiration == 0l || expiration > calculatedExpiration)
+ {
+ entry.setExpiration(calculatedExpiration);
+ }
+ }
+ }
+
/**
* iterate over consumers and if any is at the end of the queue and can deliver this message,
* then deliver the message
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1614166&r1=1614165&r2=1614166&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Mon Jul 28 20:43:20 2014
@@ -36,4 +36,5 @@ public interface QueueEntry extends Mess
QueueEntry getNextValidEntry();
+ void setExpiration(long calculatedExpiration);
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1614166&r1=1614165&r2=1614166&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Mon Jul 28 20:43:20 2014
@@ -128,6 +128,11 @@ public abstract class QueueEntryImpl imp
}
}
+ public void setExpiration(long expiration)
+ {
+ _expiration = expiration;
+ }
+
public InstanceProperties getInstanceProperties()
{
return new EntryInstanceProperties();
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1614166&r1=1614165&r2=1614166&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Mon Jul 28 20:43:20 2014
@@ -21,32 +21,41 @@
package org.apache.qpid.server.queue;
+import static org.mockito.Matchers.contains;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Matchers.contains;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import org.apache.log4j.Logger;
+
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.server.consumer.MockConsumer;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter;
-import org.apache.qpid.server.consumer.MockConsumer;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -859,6 +868,101 @@ abstract class AbstractQueueTestBase ext
verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold"));
}
+
+ public void testMaximumMessageTtl() throws Exception
+ {
+
+ // Test scenarios where only the maximum TTL has been set
+
+ Map<String,Object> attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME,"testTtlOverrideMaximumTTl");
+ attributes.put(Queue.MAXIMUM_MESSAGE_TTL, 10000l);
+
+ AMQQueue queue = _virtualHost.createQueue(attributes);
+
+ assertEquals("TTL has not been overriden", 60000l, getExpirationOnQueue(queue, 50000l, 0l));
+
+ assertEquals("TTL has not been overriden", 60000l, getExpirationOnQueue(queue, 50000l, 65000l));
+
+ assertEquals("TTL has been incorrectly overriden", 55000l, getExpirationOnQueue(queue, 50000l, 55000l));
+
+ long tooLateExpiration = System.currentTimeMillis() + 20000l;
+
+ assertTrue("TTL has not been overriden", tooLateExpiration != getExpirationOnQueue(queue, 0l, tooLateExpiration));
+
+ long acceptableExpiration = System.currentTimeMillis() + 5000l;
+
+ assertEquals("TTL has been incorrectly overriden", acceptableExpiration, getExpirationOnQueue(queue, 0l, acceptableExpiration));
+
+ // Test the scenarios where only the minimum TTL has been set
+
+ attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME,"testTtlOverrideMinimumTTl");
+ attributes.put(Queue.MINIMUM_MESSAGE_TTL, 10000l);
+
+ queue = _virtualHost.createQueue(attributes);
+
+ assertEquals("TTL has been overriden incorrectly", 0l, getExpirationOnQueue(queue, 50000l, 0l));
+
+ assertEquals("TTL has been overriden incorrectly", 65000l, getExpirationOnQueue(queue, 50000l, 65000l));
+
+ assertEquals("TTL has not been overriden", 60000l, getExpirationOnQueue(queue, 50000l, 55000l));
+
+ long unacceptableExpiration = System.currentTimeMillis() + 5000l;
+
+ assertTrue("TTL has not been overriden", unacceptableExpiration != getExpirationOnQueue(queue, 0l, tooLateExpiration));
+
+ acceptableExpiration = System.currentTimeMillis() + 20000l;
+
+ assertEquals("TTL has been incorrectly overriden", acceptableExpiration, getExpirationOnQueue(queue, 0l, acceptableExpiration));
+
+
+ // Test the scenarios where both the minimum and maximum TTL have been set
+
+ attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME,"testTtlOverrideBothTTl");
+ attributes.put(Queue.MINIMUM_MESSAGE_TTL, 10000l);
+ attributes.put(Queue.MAXIMUM_MESSAGE_TTL, 20000l);
+
+ queue = _virtualHost.createQueue(attributes);
+
+ assertEquals("TTL has not been overriden", 70000l, getExpirationOnQueue(queue, 50000l, 0l));
+
+ assertEquals("TTL has been overriden incorrectly", 65000l, getExpirationOnQueue(queue, 50000l, 65000l));
+
+ assertEquals("TTL has not been overriden", 60000l, getExpirationOnQueue(queue, 50000l, 55000l));
+
+
+
+ }
+
+ private long getExpirationOnQueue(final AMQQueue queue, long arrivalTime, long expiration)
+ {
+ final List<QueueEntry> entries = new ArrayList<>();
+
+ ServerMessage message = createMessage(1l);
+ when(message.getArrivalTime()).thenReturn(arrivalTime);
+ when(message.getExpiration()).thenReturn(expiration);
+ queue.enqueue(message,null);
+ queue.visit(new QueueEntryVisitor()
+ {
+ @Override
+ public boolean visit(final QueueEntry entry)
+ {
+ entries.add(entry);
+ return true;
+ }
+ });
+ assertEquals("Expected only one entry in the queue", 1, entries.size());
+
+ Long entryExpiration =
+ (Long) entries.get(0).getInstanceProperties().getProperty(InstanceProperties.Property.EXPIRATION);
+
+ queue.clearQueue();
+ entries.clear();
+ return entryExpiration;
+ }
+
/**
* A helper method to put given number of messages into queue
* <p>
Modified: qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html?rev=1614166&r1=1614165&r2=1614166&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html Mon Jul 28 20:43:20 2014
@@ -44,6 +44,22 @@
</td>
</tr>
<tr>
+ <td valign="top"><strong>Max Ttl: </strong></td>
+ <td><input type="text" required="false" name="maximumMessageTtl" id="formAddQueue.maximumMessageTtl" placeholder="Ttl in ms."
+ dojoType="dijit.form.ValidationTextBox"
+ trim="true"
+ regexp="[0-9]+"
+ invalidMessage= "Invalid value" /></td>
+ </tr>
+ <tr>
+ <td valign="top"><strong>Min Ttl: </strong></td>
+ <td><input type="text" required="false" name="minimumMessageTtl" id="formAddQueue.minimumMessageTtl" placeholder="Ttl in ms."
+ dojoType="dijit.form.ValidationTextBox"
+ trim="true"
+ regexp="[0-9]+"
+ invalidMessage= "Invalid value" /></td>
+ </tr>
+ <tr>
<td valign="top"><strong>Queue Type: </strong></td>
<td>
<input type="radio" id="formAddQueueTypeStandard" name="type" value="standard" checked="checked" dojoType="dijit.form.RadioButton" />
Modified: qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js?rev=1614166&r1=1614165&r2=1614166&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js Mon Jul 28 20:43:20 2014
@@ -278,6 +278,8 @@ define(["dojo/_base/xhr",
"state",
"durable",
"messageDurability",
+ "maximumMessageTtl",
+ "minimumMessageTtl",
"exclusive",
"owner",
"lifetimePolicy",
@@ -353,6 +355,9 @@ define(["dojo/_base/xhr",
this.owner.innerHTML = this.queueData[ "owner" ] ? entities.encode(String(this.queueData[ "owner" ])) : "" ;
this.lifetimePolicy.innerHTML = entities.encode(String(this.queueData[ "lifetimePolicy" ]));
this.messageDurability.innerHTML = entities.encode(String(this.queueData[ "messageDurability" ]));
+ this.minimumMessageTtl.innerHTML = entities.encode(String(this.queueData[ "minimumMessageTtl" ]));
+ this.maximumMessageTtl.innerHTML = entities.encode(String(this.queueData[ "maximumMessageTtl" ]));
+
this.alternateExchange.innerHTML = this.queueData[ "alternateExchange" ] ? entities.encode(String(this.queueData[ "alternateExchange" ])) : "" ;
this.queueDepthMessages.innerHTML = entities.encode(String(this.queueData["queueDepthMessages"]));
Modified: qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html?rev=1614166&r1=1614165&r2=1614166&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html Mon Jul 28 20:43:20 2014
@@ -37,6 +37,14 @@
<div class="messageDurability" style="float:left;"></div>
</div>
<div style="clear:both">
+ <div class="formLabel-labelCell" style="float:left; width: 150px;">Enforced Max. Ttl(ms):</div>
+ <div class="maximumMessageTtl" style="float:left;"></div>
+ </div>
+ <div style="clear:both">
+ <div class="formLabel-labelCell" style="float:left; width: 150px;">Enforced Min. Ttl(ms):</div>
+ <div class="minimumMessageTtl" style="float:left;"></div>
+ </div>
+ <div style="clear:both">
<div class="formLabel-labelCell" style="float:left; width: 150px;">Exclusive:</div>
<div class="exclusive" style="float:left;"></div>
</div>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org