You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/05/09 14:32:28 UTC

svn commit: r536506 - in /incubator/qpid/branches/M2/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/queue/ cluster/src/main/java/org/apache/qpid/server/queue/ systests/src/main/java/org/apache/qpid/server...

Author: ritchiem
Date: Wed May  9 05:32:27 2007
New Revision: 536506

URL: http://svn.apache.org/viewvc?view=rev&rev=536506
Log:
QPID-25 TimeToLive Basic implementation. 

Messages are not automatically purged rather they are checked as they are selected for delivery. If they have expired they are dequeued.

AMQChannel - Update to call setExpiration on the message so the time can be adjusted if client & broker clocks are out of sync.
AMQMessage - Caches the _expiration time for internal use, adjusted for broker time. This leaves message headers unchanged so receiving client can see actual value requested by producer.
ConcurrentSelectorDeliveryManager - Updated to check for expired messages when getNextMessage is called. Immediate messages are NOT expired.
Subscription - Added method to getChannel that this Subscription is attatched to so we can retrieve the StoreContext for dequeue-ing the message.

TimeToLiveTest - Test of Time to live. Sends 50 msgs. 1 non-timed 48 1 second and 1 non-timed ensure only 2 msgs come back after 2 seconds

Added:
    incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java   (with props)
Modified:
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
    incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
    incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=536506&r1=536505&r2=536506
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed May  9 05:32:27 2007
@@ -216,6 +216,8 @@
                 _log.trace(debugIdentity() + "Content header received on channel " + _channelId);
             }
             _currentMessage.setContentHeaderBody(contentHeaderBody);
+            _currentMessage.setExpiration();
+
             routeCurrentMessage();
             _currentMessage.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
 

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=536506&r1=536505&r2=536506
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Wed May  9 05:32:27 2007
@@ -26,6 +26,7 @@
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
@@ -33,6 +34,7 @@
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 
 /** Combines the information that make up a deliverable message into a more manageable form. */
 
@@ -93,12 +95,42 @@
 
 
     private final int hashcode = System.identityHashCode(this);
+    private long _expiration;
 
     public String debugIdentity()
     {
         return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
     }
 
+    public void setExpiration()
+    {
+        long expiration = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
+        long timestamp = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
+
+        if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false))
+        {
+            _expiration = expiration;
+        }
+        else
+        {
+            // Update TTL to be in broker time.
+            if (expiration != 0L)
+            {
+                if (timestamp != 0L)
+                {
+                    //todo perhaps use arrival time
+                    long diff = (System.currentTimeMillis() - timestamp);
+
+                    if (diff > 1000L || diff < 1000L)
+                    {
+                        _expiration = expiration + diff;
+                    }
+                }
+            }
+        }
+
+    }
+
     /**
      * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
      * therefore is memory-efficient.
@@ -205,8 +237,6 @@
         _immediate = info.isImmediate();
         _transientMessageData.setMessagePublishInfo(info);
 
-//        _taken = new AtomicBoolean(false);
-
     }
 
     /**
@@ -617,6 +647,33 @@
         return _messageHandle.getArrivalTime();
     }
 
+    /**
+     * Checks to see if the message has expired. If it has the message is dequeued.
+     *
+     * @param storecontext
+     * @param queue
+     *
+     * @return true if the message has expire
+     *
+     * @throws AMQException
+     */
+    public boolean expired(StoreContext storecontext, AMQQueue queue) throws AMQException
+    {
+        //note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
+
+        if (_expiration != 0L)
+        {
+            long now = System.currentTimeMillis();
+
+            if (now > _expiration)
+            {
+                dequeue(storecontext, queue);
+                return true;
+            }
+        }
+
+        return false;
+    }
 
     /** Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). */
     public void setDeliveredToConsumer()

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=536506&r1=536505&r2=536506
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Wed May  9 05:32:27 2007
@@ -434,13 +434,19 @@
         return count;
     }
 
-    /** This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. */
+    /**
+     * This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged.
+     *
+     * @return the next message or null
+     *
+     * @throws org.apache.qpid.AMQException
+     */
     private AMQMessage getNextMessage() throws AMQException
     {
         return getNextMessage(_messages, null);
     }
 
-    private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub)
+    private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub) throws AMQException
     {
         AMQMessage message = messages.peek();
 
@@ -449,9 +455,11 @@
                && (
                 ((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
                 || sub == null)
-               && message.taken(_queue, sub))
+               && (message.taken(_queue, sub) // Message not taken by another consumer ... unless it is expired
+                   || (sub == null || message.expired(sub.getChannel().getStoreContext(), _queue))) // Message not expired
+                )
         {
-            //remove the already taken message
+            //remove the already taken message or expired
             AMQMessage removed = messages.poll();
 
             assert removed == message;
@@ -466,6 +474,7 @@
             // try the next message
             message = messages.peek();
         }
+
         return message;
     }
 

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=536506&r1=536505&r2=536506
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Wed May  9 05:32:27 2007
@@ -23,6 +23,7 @@
 import java.util.Queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
 
 public interface Subscription
 {
@@ -57,4 +58,6 @@
     void addToResendQueue(AMQMessage msg);
 
     Object getSendLock();
+
+    AMQChannel getChannel();
 }

Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=536506&r1=536505&r2=536506
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Wed May  9 05:32:27 2007
@@ -668,5 +668,9 @@
         return _sendLock;
     }
 
+    public AMQChannel getChannel()
+    {
+        return channel;
+    }
 
 }

Modified: incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java?view=diff&rev=536506&r1=536505&r2=536506
==============================================================================
--- incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java (original)
+++ incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java Wed May  9 05:32:27 2007
@@ -23,6 +23,7 @@
 import org.apache.qpid.server.cluster.MemberHandle;
 import org.apache.qpid.server.cluster.GroupManager;
 import org.apache.qpid.server.cluster.SimpleSendable;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.AMQException;
 
 import java.util.Queue;
@@ -165,6 +166,11 @@
     public Object getSendLock()
     {
         return new Object();
+    }
+
+    public AMQChannel getChannel()
+    {
+        return null;
     }
 
 }

Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?view=diff&rev=536506&r1=536505&r2=536506
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Wed May  9 05:32:27 2007
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.AMQChannel;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
@@ -82,6 +84,10 @@
         return new Object();
     }
 
+    public AMQChannel getChannel()
+    {
+        return null;
+    }
 
     public void queueDeleted(AMQQueue queue)
     {

Added: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java?view=auto&rev=536506
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java (added)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java Wed May  9 05:32:27 2007
@@ -0,0 +1,145 @@
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import junit.framework.Assert;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+import org.apache.log4j.Logger;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.ConnectionFactory;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.naming.spi.InitialContextFactory;
+import javax.naming.Context;
+import java.util.Hashtable;
+
+
+/** Test Case provided by client Non-functional Test NF101: heap exhaustion behaviour */
+public class TimeToLiveTest extends TestCase
+{
+    private static final Logger _logger = Logger.getLogger(TimeToLiveTest.class);
+
+
+    protected final String BROKER = "vm://:1";
+    protected final String VHOST = "/test";
+    protected final String QUEUE = "TimeToLiveQueue";
+
+    private final long TIME_TO_LIVE = 1000L;
+
+    Context _context;
+
+    private Connection _clientConnection, _producerConnection;
+
+    private MessageConsumer _consumer;
+    MessageProducer _producer;
+    Session _clientSession, _producerSession;
+    private static final int MSG_COUNT = 50;
+
+    protected void setUp() throws Exception
+    {
+        if (BROKER.startsWith("vm://"))
+        {
+            TransportConnection.createVMBroker(1);
+        }
+        InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+
+        Hashtable<String, String> env = new Hashtable<String, String>();
+
+        env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'");
+        env.put("queue.queue", QUEUE);
+
+        _context = factory.getInitialContext(env);
+
+        Queue queue = (Queue) _context.lookup("queue");
+
+        //Create Client 1
+        _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+        _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        _consumer = _clientSession.createConsumer(queue);
+
+        //Create Producer
+        _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+        _producerConnection.start();
+
+        _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        _producer = _producerSession.createProducer(queue);
+    }
+
+    protected void tearDown() throws Exception
+    {
+        _clientConnection.close();
+
+        _producerConnection.close();
+        super.tearDown();
+        
+        if (BROKER.startsWith("vm://"))
+        {
+            TransportConnection.killAllVMBrokers();
+        }
+    }
+
+    public void test() throws JMSException
+    {
+        //Set TTL
+        int msg = 0;
+        _producer.send(nextMessage(String.valueOf(msg), true));
+
+        _producer.setTimeToLive(TIME_TO_LIVE);
+
+        for (; msg < MSG_COUNT - 2; msg++)
+        {
+            _producer.send(nextMessage(String.valueOf(msg), false));
+        }
+
+        //Reset TTL
+        _producer.setTimeToLive(0L);
+        _producer.send(nextMessage(String.valueOf(msg), false));
+
+         try
+        {
+            // Sleep to ensure TTL reached
+            Thread.sleep(2000);
+        }
+        catch (InterruptedException e)
+        {
+
+        }
+
+        _clientConnection.start();
+
+        //Receive Message 0
+        Message received = _consumer.receive(100);
+        Assert.assertNotNull("First message not received", received);
+        Assert.assertTrue("First message doesn't have first set.", received.getBooleanProperty("first"));
+        Assert.assertEquals("First message has incorrect TTL.", 0L, received.getLongProperty("TTL"));
+
+
+        received = _consumer.receive(100);
+        Assert.assertNotNull("Final message not received", received);
+        Assert.assertFalse("Final message has first set.", received.getBooleanProperty("first"));
+        Assert.assertEquals("Final message has incorrect TTL.", 0L, received.getLongProperty("TTL"));
+
+        received = _consumer.receive(100);
+        Assert.assertNull("More messages received", received);
+    }
+
+    private Message nextMessage(String msg, boolean first) throws JMSException
+    {
+        Message send = _producerSession.createTextMessage("Message " + msg);
+        send.setBooleanProperty("first", first);
+        send.setLongProperty("TTL", _producer.getTimeToLive());
+        return send;
+    }
+
+
+}

Propchange: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date