You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/07/28 22:43:21 UTC

svn commit: r1614166 - in /qpid/trunk/qpid/java: broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/test/java/org/apache/qpid/server/queue/ broker-plugins/management-http/src/...

Author: rgodfrey
Date: Mon Jul 28 20:43:20 2014
New Revision: 1614166

URL: http://svn.apache.org/r1614166
Log:
QPID-5934 : [Java Broker] Allow TTL to be overridden on a per-Queue basis

Modified:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
    qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html
    qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
    qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1614166&r1=1614165&r2=1614166&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Mon Jul 28 20:43:20 2014
@@ -46,6 +46,8 @@ public interface Queue<X extends Queue<X
     String QUEUE_FLOW_CONTROL_SIZE_BYTES = "queueFlowControlSizeBytes";
     String QUEUE_FLOW_RESUME_SIZE_BYTES = "queueFlowResumeSizeBytes";
     String QUEUE_FLOW_STOPPED = "queueFlowStopped";
+    String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl";
+    String MINIMUM_MESSAGE_TTL = "minimumMessageTtl";
 
     @ManagedAttribute
     Exchange getAlternateExchange();
@@ -135,16 +137,17 @@ public interface Queue<X extends Queue<X
     @ManagedAttribute( defaultValue = "DEFAULT" )
     MessageDurability getMessageDurability();
 
+    @ManagedAttribute
+    long getMinimumMessageTtl();
 
+    @ManagedAttribute
+    long getMaximumMessageTtl();
 
     //children
     Collection<? extends Binding> getBindings();
 
-    // TODO - Undo this commented out line when we stop supporting 1.6 for compilation
-    //        In 1.6 this causes the build to break at AbstractQueue because the 1.6 compiler can't work out that
-    //        the definition in terms of the Consumer implementation meets both this, and the contract for AMQQueue
 
-    // Collection<? extends Consumer> getConsumers();
+    Collection<? extends Consumer> getConsumers();
 
     //operations
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1614166&r1=1614165&r2=1614166&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Mon Jul 28 20:43:20 2014
@@ -230,6 +230,11 @@ public abstract class AbstractQueue<X ex
     private String _messageGroupDefaultGroup;
     @ManagedAttributeField
     private int _maximumDistinctGroups;
+    @ManagedAttributeField
+    private long _minimumMessageTtl;
+    @ManagedAttributeField
+    private long _maximumMessageTtl;
+
 
     private State _state = State.UNINITIALIZED;
     private final AtomicBoolean _recovering = new AtomicBoolean(true);
@@ -547,6 +552,18 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
+    public long getMinimumMessageTtl()
+    {
+        return _minimumMessageTtl;
+    }
+
+    @Override
+    public long getMaximumMessageTtl()
+    {
+        return _maximumMessageTtl;
+    }
+
+    @Override
     public Collection<String> getAvailableAttributes()
     {
         return new ArrayList<String>(_arguments.keySet());
@@ -967,6 +984,7 @@ public abstract class AbstractQueue<X ex
     {
         final QueueConsumer<?> exclusiveSub = _exclusiveSubscriber;
         final QueueEntry entry = getEntries().add(message);
+        updateExpiration(entry);
 
         try
         {
@@ -1011,6 +1029,40 @@ public abstract class AbstractQueue<X ex
 
     }
 
+    private void updateExpiration(final QueueEntry entry)
+    {
+        long expiration = entry.getMessage().getExpiration();
+        long arrivalTime = entry.getMessage().getArrivalTime();
+        if(_minimumMessageTtl != 0l)
+        {
+            if(arrivalTime == 0)
+            {
+                arrivalTime = System.currentTimeMillis();
+            }
+            if(expiration != 0l)
+            {
+                long calculatedExpiration = arrivalTime+_minimumMessageTtl;
+                if(calculatedExpiration > expiration)
+                {
+                    entry.setExpiration(calculatedExpiration);
+                    expiration = calculatedExpiration;
+                }
+            }
+        }
+        if(_maximumMessageTtl != 0l)
+        {
+            if(arrivalTime == 0)
+            {
+                arrivalTime = System.currentTimeMillis();
+            }
+            long calculatedExpiration = arrivalTime+_maximumMessageTtl;
+            if(expiration == 0l || expiration > calculatedExpiration)
+            {
+                entry.setExpiration(calculatedExpiration);
+            }
+        }
+    }
+
     /**
      * iterate over consumers and if any is at the end of the queue and can deliver this message,
      * then deliver the message

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1614166&r1=1614165&r2=1614166&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Mon Jul 28 20:43:20 2014
@@ -36,4 +36,5 @@ public interface QueueEntry extends Mess
     QueueEntry getNextValidEntry();
 
 
+    void setExpiration(long calculatedExpiration);
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1614166&r1=1614165&r2=1614166&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Mon Jul 28 20:43:20 2014
@@ -128,6 +128,11 @@ public abstract class QueueEntryImpl imp
         }
     }
 
+    public void setExpiration(long expiration)
+    {
+        _expiration = expiration;
+    }
+
     public InstanceProperties getInstanceProperties()
     {
         return new EntryInstanceProperties();

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1614166&r1=1614165&r2=1614166&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Mon Jul 28 20:43:20 2014
@@ -21,32 +21,41 @@
 
 package org.apache.qpid.server.queue;
 
+import static org.mockito.Matchers.contains;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Matchers.contains;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.when;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 
 import org.apache.log4j.Logger;
+
+import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.server.binding.BindingImpl;
 import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.server.consumer.MockConsumer;
 import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.QueueNotificationListener;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter;
-import org.apache.qpid.server.consumer.MockConsumer;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -859,6 +868,101 @@ abstract class AbstractQueueTestBase ext
         verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold"));
     }
 
+
+    public void testMaximumMessageTtl() throws Exception
+    {
+
+        // Test scenarios where only the maximum TTL has been set
+
+        Map<String,Object> attributes = new HashMap<>(_arguments);
+        attributes.put(Queue.NAME,"testTtlOverrideMaximumTTl");
+        attributes.put(Queue.MAXIMUM_MESSAGE_TTL, 10000l);
+
+        AMQQueue queue = _virtualHost.createQueue(attributes);
+
+        assertEquals("TTL has not been overriden", 60000l, getExpirationOnQueue(queue, 50000l, 0l));
+
+        assertEquals("TTL has not been overriden", 60000l, getExpirationOnQueue(queue, 50000l, 65000l));
+
+        assertEquals("TTL has been incorrectly overriden", 55000l, getExpirationOnQueue(queue, 50000l, 55000l));
+
+        long tooLateExpiration = System.currentTimeMillis() + 20000l;
+
+        assertTrue("TTL has not been overriden", tooLateExpiration != getExpirationOnQueue(queue, 0l, tooLateExpiration));
+
+        long acceptableExpiration = System.currentTimeMillis() + 5000l;
+
+        assertEquals("TTL has been incorrectly overriden", acceptableExpiration, getExpirationOnQueue(queue, 0l, acceptableExpiration));
+
+        // Test the scenarios where only the minimum TTL has been set
+
+        attributes = new HashMap<>(_arguments);
+        attributes.put(Queue.NAME,"testTtlOverrideMinimumTTl");
+        attributes.put(Queue.MINIMUM_MESSAGE_TTL, 10000l);
+
+        queue = _virtualHost.createQueue(attributes);
+
+        assertEquals("TTL has been overriden incorrectly", 0l, getExpirationOnQueue(queue, 50000l, 0l));
+
+        assertEquals("TTL has been overriden incorrectly", 65000l, getExpirationOnQueue(queue, 50000l, 65000l));
+
+        assertEquals("TTL has not been overriden", 60000l, getExpirationOnQueue(queue, 50000l, 55000l));
+
+        long unacceptableExpiration = System.currentTimeMillis() + 5000l;
+
+        assertTrue("TTL has not been overriden", unacceptableExpiration != getExpirationOnQueue(queue, 0l, tooLateExpiration));
+
+        acceptableExpiration = System.currentTimeMillis() + 20000l;
+
+        assertEquals("TTL has been incorrectly overriden", acceptableExpiration, getExpirationOnQueue(queue, 0l, acceptableExpiration));
+
+
+        // Test the scenarios where both the minimum and maximum TTL have been set
+
+        attributes = new HashMap<>(_arguments);
+        attributes.put(Queue.NAME,"testTtlOverrideBothTTl");
+        attributes.put(Queue.MINIMUM_MESSAGE_TTL, 10000l);
+        attributes.put(Queue.MAXIMUM_MESSAGE_TTL, 20000l);
+
+        queue = _virtualHost.createQueue(attributes);
+
+        assertEquals("TTL has not been overriden", 70000l, getExpirationOnQueue(queue, 50000l, 0l));
+
+        assertEquals("TTL has been overriden incorrectly", 65000l, getExpirationOnQueue(queue, 50000l, 65000l));
+
+        assertEquals("TTL has not been overriden", 60000l, getExpirationOnQueue(queue, 50000l, 55000l));
+
+
+
+    }
+
+    private long getExpirationOnQueue(final AMQQueue queue, long arrivalTime, long expiration)
+    {
+        final List<QueueEntry> entries = new ArrayList<>();
+
+        ServerMessage message = createMessage(1l);
+        when(message.getArrivalTime()).thenReturn(arrivalTime);
+        when(message.getExpiration()).thenReturn(expiration);
+        queue.enqueue(message,null);
+        queue.visit(new QueueEntryVisitor()
+        {
+            @Override
+            public boolean visit(final QueueEntry entry)
+            {
+                entries.add(entry);
+                return true;
+            }
+        });
+        assertEquals("Expected only one entry in the queue", 1, entries.size());
+
+        Long entryExpiration =
+                (Long) entries.get(0).getInstanceProperties().getProperty(InstanceProperties.Property.EXPIRATION);
+
+        queue.clearQueue();
+        entries.clear();
+        return entryExpiration;
+    }
+
     /**
      * A helper method to put given number of messages into queue
      * <p>

Modified: qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html?rev=1614166&r1=1614165&r2=1614166&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/addQueue.html Mon Jul 28 20:43:20 2014
@@ -44,6 +44,22 @@
                     </td>
                 </tr>
                 <tr>
+                    <td valign="top"><strong>Max Ttl:  </strong></td>
+                    <td><input type="text" required="false" name="maximumMessageTtl" id="formAddQueue.maximumMessageTtl" placeholder="Ttl in ms."
+                               dojoType="dijit.form.ValidationTextBox"
+                               trim="true"
+                               regexp="[0-9]+"
+                               invalidMessage= "Invalid value" /></td>
+                </tr>
+                <tr>
+                    <td valign="top"><strong>Min Ttl:  </strong></td>
+                    <td><input type="text" required="false" name="minimumMessageTtl" id="formAddQueue.minimumMessageTtl" placeholder="Ttl in ms."
+                               dojoType="dijit.form.ValidationTextBox"
+                               trim="true"
+                               regexp="[0-9]+"
+                               invalidMessage= "Invalid value" /></td>
+                </tr>
+                <tr>
                     <td valign="top"><strong>Queue Type: </strong></td>
                     <td>
                     <input type="radio" id="formAddQueueTypeStandard" name="type" value="standard" checked="checked" dojoType="dijit.form.RadioButton" />

Modified: qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js?rev=1614166&r1=1614165&r2=1614166&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js Mon Jul 28 20:43:20 2014
@@ -278,6 +278,8 @@ define(["dojo/_base/xhr",
                            "state",
                            "durable",
                            "messageDurability",
+                           "maximumMessageTtl",
+                           "minimumMessageTtl",
                            "exclusive",
                            "owner",
                            "lifetimePolicy",
@@ -353,6 +355,9 @@ define(["dojo/_base/xhr",
                this.owner.innerHTML = this.queueData[ "owner" ] ? entities.encode(String(this.queueData[ "owner" ])) : "" ;
                this.lifetimePolicy.innerHTML = entities.encode(String(this.queueData[ "lifetimePolicy" ]));
                this.messageDurability.innerHTML = entities.encode(String(this.queueData[ "messageDurability" ]));
+               this.minimumMessageTtl.innerHTML = entities.encode(String(this.queueData[ "minimumMessageTtl" ]));
+               this.maximumMessageTtl.innerHTML = entities.encode(String(this.queueData[ "maximumMessageTtl" ]));
+
                this.alternateExchange.innerHTML = this.queueData[ "alternateExchange" ] ? entities.encode(String(this.queueData[ "alternateExchange" ])) : "" ;
 
                this.queueDepthMessages.innerHTML = entities.encode(String(this.queueData["queueDepthMessages"]));

Modified: qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html?rev=1614166&r1=1614165&r2=1614166&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html (original)
+++ qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/resources/showQueue.html Mon Jul 28 20:43:20 2014
@@ -37,6 +37,14 @@
             <div class="messageDurability" style="float:left;"></div>
         </div>
         <div style="clear:both">
+            <div class="formLabel-labelCell" style="float:left; width: 150px;">Enforced Max. Ttl(ms):</div>
+            <div class="maximumMessageTtl" style="float:left;"></div>
+        </div>
+        <div style="clear:both">
+            <div class="formLabel-labelCell" style="float:left; width: 150px;">Enforced Min. Ttl(ms):</div>
+            <div class="minimumMessageTtl" style="float:left;"></div>
+        </div>
+        <div style="clear:both">
             <div class="formLabel-labelCell" style="float:left; width: 150px;">Exclusive:</div>
             <div class="exclusive" style="float:left;"></div>
         </div>



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org