You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/05/09 17:33:18 UTC

svn commit: r536567 - in /incubator/qpid/trunk/qpid: ./ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/server/queue/ java/cluster/src/main/java/org/apache/qpid/server/queue/ java/systests/src/main/java/org/a...

Author: ritchiem
Date: Wed May  9 08:33:17 2007
New Revision: 536567

URL: http://svn.apache.org/viewvc?view=rev&rev=536567
Log:
Merged revisions 536506 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2

........
  r536506 | ritchiem | 2007-05-09 13:32:27 +0100 (Wed, 09 May 2007) | 10 lines
  
  QPID-25 TimeToLive Basic implementation. 
  
  Messages are not automatically purged rather they are checked as they are selected for delivery. If they have expired they are dequeued.
  
  AMQChannel - Update to call setExpiration on the message so the time can be adjusted if client & broker clocks are out of sync.
  AMQMessage - Caches the _expiration time for internal use, adjusted for broker time. This leaves message headers unchanged so receiving client can see actual value requested by producer.
  ConcurrentSelectorDeliveryManager - Updated to check for expired messages when getNextMessage is called. Immediate messages are NOT expired.
  Subscription - Added method to getChannel that this Subscription is attatched to so we can retrieve the StoreContext for dequeue-ing the message.
  
  TimeToLiveTest - Test of Time to live. Sends 50 msgs. 1 non-timed 48 1 second and 1 non-timed ensure only 2 msgs come back after 2 seconds
........

Added:
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
      - copied unchanged from r536506, incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
Modified:
    incubator/qpid/trunk/qpid/   (props changed)
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java

Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=536567&r1=536566&r2=536567
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed May  9 08:33:17 2007
@@ -228,6 +228,8 @@
                 _log.trace(debugIdentity() + "Content header received on channel " + _channelId);
             }
             _currentMessage.setContentHeaderBody(contentHeaderBody);
+            _currentMessage.setExpiration();
+
             routeCurrentMessage();
             _currentMessage.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=536567&r1=536566&r2=536567
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Wed May  9 08:33:17 2007
@@ -25,6 +25,7 @@
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
@@ -34,6 +35,7 @@
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.messageStore.StorableMessage;
 import org.apache.qpid.server.messageStore.StorableQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 
 /** Combines the information that make up a deliverable message into a more manageable form. */
 
@@ -100,12 +102,42 @@
 
 
     private final int hashcode = System.identityHashCode(this);
+    private long _expiration;
 
     public String debugIdentity()
     {
         return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
     }
 
+    public void setExpiration()
+    {
+        long expiration = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
+        long timestamp = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
+
+        if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false))
+        {
+            _expiration = expiration;
+        }
+        else
+        {
+            // Update TTL to be in broker time.
+            if (expiration != 0L)
+            {
+                if (timestamp != 0L)
+                {
+                    //todo perhaps use arrival time
+                    long diff = (System.currentTimeMillis() - timestamp);
+
+                    if (diff > 1000L || diff < 1000L)
+                    {
+                        _expiration = expiration + diff;
+                    }
+                }
+            }
+        }
+
+    }
+
     /**
      * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
      * therefore is memory-efficient.
@@ -212,8 +244,6 @@
         _immediate = info.isImmediate();
         _transientMessageData.setMessagePublishInfo(info);
 
-//        _taken = new AtomicBoolean(false);
-
     }
 
     /**
@@ -731,10 +761,35 @@
         return _messageHandle.getArrivalTime();
     }
 
-
     /**
-     * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
+     * Checks to see if the message has expired. If it has the message is dequeued.
+     *
+     * @param storecontext
+     * @param queue
+     *
+     * @return true if the message has expire
+     *
+     * @throws AMQException
      */
+    public boolean expired(StoreContext storecontext, AMQQueue queue) throws AMQException
+    {
+        //note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
+
+        if (_expiration != 0L)
+        {
+            long now = System.currentTimeMillis();
+
+            if (now > _expiration)
+            {
+                dequeue(storecontext, queue);
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /** Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). */
     public void setDeliveredToConsumer()
     {
         _deliveredToConsumer = true;

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=536567&r1=536566&r2=536567
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Wed May  9 08:33:17 2007
@@ -434,13 +434,19 @@
         return count;
     }
 
-    /** This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. */
+    /**
+     * This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged.
+     *
+     * @return the next message or null
+     *
+     * @throws org.apache.qpid.AMQException
+     */
     private AMQMessage getNextMessage() throws AMQException
     {
         return getNextMessage(_messages, null);
     }
 
-    private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub)
+    private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub) throws AMQException
     {
         AMQMessage message = messages.peek();
 
@@ -449,9 +455,11 @@
                && (
                 ((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
                 || sub == null)
-               && message.taken(_queue, sub))
+               && (message.taken(_queue, sub) // Message not taken by another consumer ... unless it is expired
+                   || (sub == null || message.expired(sub.getChannel().getStoreContext(), _queue))) // Message not expired
+                )
         {
-            //remove the already taken message
+            //remove the already taken message or expired
             AMQMessage removed = messages.poll();
 
             assert removed == message;
@@ -466,6 +474,7 @@
             // try the next message
             message = messages.peek();
         }
+
         return message;
     }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=536567&r1=536566&r2=536567
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Wed May  9 08:33:17 2007
@@ -23,6 +23,7 @@
 import java.util.Queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
 
 public interface Subscription
 {
@@ -57,4 +58,6 @@
     void addToResendQueue(AMQMessage msg);
 
     Object getSendLock();
+
+    AMQChannel getChannel();
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=536567&r1=536566&r2=536567
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Wed May  9 08:33:17 2007
@@ -668,5 +668,9 @@
         return _sendLock;
     }
 
+    public AMQChannel getChannel()
+    {
+        return channel;
+    }
 
 }

Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java?view=diff&rev=536567&r1=536566&r2=536567
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java Wed May  9 08:33:17 2007
@@ -23,6 +23,7 @@
 import org.apache.qpid.server.cluster.MemberHandle;
 import org.apache.qpid.server.cluster.GroupManager;
 import org.apache.qpid.server.cluster.SimpleSendable;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.AMQException;
 
 import java.util.Queue;
@@ -165,6 +166,11 @@
     public Object getSendLock()
     {
         return new Object();
+    }
+
+    public AMQChannel getChannel()
+    {
+        return null;
     }
 
 }

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java?view=diff&rev=536567&r1=536566&r2=536567
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java Wed May  9 08:33:17 2007
@@ -62,7 +62,7 @@
         {
             for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++)
             {
-                _protocolSession.addChannel(new AMQChannel(_protocolSession, (int) currentChannel, null, null));
+                _protocolSession.addChannel(new AMQChannel(_protocolSession, (int) currentChannel, null, null, null));
             }
         }
         catch (AMQException e)

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?view=diff&rev=536567&r1=536566&r2=536567
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Wed May  9 08:33:17 2007
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.AMQChannel;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
@@ -82,6 +84,10 @@
         return new Object();
     }
 
+    public AMQChannel getChannel()
+    {
+        return null;
+    }
 
     public void queueDeleted(AMQQueue queue)
     {