You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/05/09 14:32:28 UTC
svn commit: r536506 - in /incubator/qpid/branches/M2/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/queue/
cluster/src/main/java/org/apache/qpid/server/queue/
systests/src/main/java/org/apache/qpid/server...
Author: ritchiem
Date: Wed May 9 05:32:27 2007
New Revision: 536506
URL: http://svn.apache.org/viewvc?view=rev&rev=536506
Log:
QPID-25 TimeToLive Basic implementation.
Messages are not automatically purged rather they are checked as they are selected for delivery. If they have expired they are dequeued.
AMQChannel - Update to call setExpiration on the message so the time can be adjusted if client & broker clocks are out of sync.
AMQMessage - Caches the _expiration time for internal use, adjusted for broker time. This leaves message headers unchanged so receiving client can see actual value requested by producer.
ConcurrentSelectorDeliveryManager - Updated to check for expired messages when getNextMessage is called. Immediate messages are NOT expired.
Subscription - Added method to getChannel that this Subscription is attatched to so we can retrieve the StoreContext for dequeue-ing the message.
TimeToLiveTest - Test of Time to live. Sends 50 msgs. 1 non-timed 48 1 second and 1 non-timed ensure only 2 msgs come back after 2 seconds
Added:
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java (with props)
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=536506&r1=536505&r2=536506
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed May 9 05:32:27 2007
@@ -216,6 +216,8 @@
_log.trace(debugIdentity() + "Content header received on channel " + _channelId);
}
_currentMessage.setContentHeaderBody(contentHeaderBody);
+ _currentMessage.setExpiration();
+
routeCurrentMessage();
_currentMessage.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=536506&r1=536505&r2=536506
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Wed May 9 05:32:27 2007
@@ -26,6 +26,7 @@
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
@@ -33,6 +34,7 @@
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.registry.ApplicationRegistry;
/** Combines the information that make up a deliverable message into a more manageable form. */
@@ -93,12 +95,42 @@
private final int hashcode = System.identityHashCode(this);
+ private long _expiration;
public String debugIdentity()
{
return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
}
+ public void setExpiration()
+ {
+ long expiration = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
+ long timestamp = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
+
+ if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false))
+ {
+ _expiration = expiration;
+ }
+ else
+ {
+ // Update TTL to be in broker time.
+ if (expiration != 0L)
+ {
+ if (timestamp != 0L)
+ {
+ //todo perhaps use arrival time
+ long diff = (System.currentTimeMillis() - timestamp);
+
+ if (diff > 1000L || diff < 1000L)
+ {
+ _expiration = expiration + diff;
+ }
+ }
+ }
+ }
+
+ }
+
/**
* Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
* therefore is memory-efficient.
@@ -205,8 +237,6 @@
_immediate = info.isImmediate();
_transientMessageData.setMessagePublishInfo(info);
-// _taken = new AtomicBoolean(false);
-
}
/**
@@ -617,6 +647,33 @@
return _messageHandle.getArrivalTime();
}
+ /**
+ * Checks to see if the message has expired. If it has the message is dequeued.
+ *
+ * @param storecontext
+ * @param queue
+ *
+ * @return true if the message has expire
+ *
+ * @throws AMQException
+ */
+ public boolean expired(StoreContext storecontext, AMQQueue queue) throws AMQException
+ {
+ //note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
+
+ if (_expiration != 0L)
+ {
+ long now = System.currentTimeMillis();
+
+ if (now > _expiration)
+ {
+ dequeue(storecontext, queue);
+ return true;
+ }
+ }
+
+ return false;
+ }
/** Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). */
public void setDeliveredToConsumer()
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=536506&r1=536505&r2=536506
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Wed May 9 05:32:27 2007
@@ -434,13 +434,19 @@
return count;
}
- /** This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. */
+ /**
+ * This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged.
+ *
+ * @return the next message or null
+ *
+ * @throws org.apache.qpid.AMQException
+ */
private AMQMessage getNextMessage() throws AMQException
{
return getNextMessage(_messages, null);
}
- private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub)
+ private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub) throws AMQException
{
AMQMessage message = messages.peek();
@@ -449,9 +455,11 @@
&& (
((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
|| sub == null)
- && message.taken(_queue, sub))
+ && (message.taken(_queue, sub) // Message not taken by another consumer ... unless it is expired
+ || (sub == null || message.expired(sub.getChannel().getStoreContext(), _queue))) // Message not expired
+ )
{
- //remove the already taken message
+ //remove the already taken message or expired
AMQMessage removed = messages.poll();
assert removed == message;
@@ -466,6 +474,7 @@
// try the next message
message = messages.peek();
}
+
return message;
}
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=536506&r1=536505&r2=536506
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Wed May 9 05:32:27 2007
@@ -23,6 +23,7 @@
import java.util.Queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
public interface Subscription
{
@@ -57,4 +58,6 @@
void addToResendQueue(AMQMessage msg);
Object getSendLock();
+
+ AMQChannel getChannel();
}
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=536506&r1=536505&r2=536506
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Wed May 9 05:32:27 2007
@@ -668,5 +668,9 @@
return _sendLock;
}
+ public AMQChannel getChannel()
+ {
+ return channel;
+ }
}
Modified: incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java?view=diff&rev=536506&r1=536505&r2=536506
==============================================================================
--- incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java (original)
+++ incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java Wed May 9 05:32:27 2007
@@ -23,6 +23,7 @@
import org.apache.qpid.server.cluster.MemberHandle;
import org.apache.qpid.server.cluster.GroupManager;
import org.apache.qpid.server.cluster.SimpleSendable;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.AMQException;
import java.util.Queue;
@@ -165,6 +166,11 @@
public Object getSendLock()
{
return new Object();
+ }
+
+ public AMQChannel getChannel()
+ {
+ return null;
}
}
Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?view=diff&rev=536506&r1=536505&r2=536506
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Wed May 9 05:32:27 2007
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.AMQChannel;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
@@ -82,6 +84,10 @@
return new Object();
}
+ public AMQChannel getChannel()
+ {
+ return null;
+ }
public void queueDeleted(AMQQueue queue)
{
Added: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java?view=auto&rev=536506
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java (added)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java Wed May 9 05:32:27 2007
@@ -0,0 +1,145 @@
+package org.apache.qpid.server.queue;
+
+import junit.framework.TestCase;
+import junit.framework.Assert;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
+import org.apache.log4j.Logger;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.ConnectionFactory;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.naming.spi.InitialContextFactory;
+import javax.naming.Context;
+import java.util.Hashtable;
+
+
+/** Test Case provided by client Non-functional Test NF101: heap exhaustion behaviour */
+public class TimeToLiveTest extends TestCase
+{
+ private static final Logger _logger = Logger.getLogger(TimeToLiveTest.class);
+
+
+ protected final String BROKER = "vm://:1";
+ protected final String VHOST = "/test";
+ protected final String QUEUE = "TimeToLiveQueue";
+
+ private final long TIME_TO_LIVE = 1000L;
+
+ Context _context;
+
+ private Connection _clientConnection, _producerConnection;
+
+ private MessageConsumer _consumer;
+ MessageProducer _producer;
+ Session _clientSession, _producerSession;
+ private static final int MSG_COUNT = 50;
+
+ protected void setUp() throws Exception
+ {
+ if (BROKER.startsWith("vm://"))
+ {
+ TransportConnection.createVMBroker(1);
+ }
+ InitialContextFactory factory = new PropertiesFileInitialContextFactory();
+
+ Hashtable<String, String> env = new Hashtable<String, String>();
+
+ env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'");
+ env.put("queue.queue", QUEUE);
+
+ _context = factory.getInitialContext(env);
+
+ Queue queue = (Queue) _context.lookup("queue");
+
+ //Create Client 1
+ _clientConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _consumer = _clientSession.createConsumer(queue);
+
+ //Create Producer
+ _producerConnection = ((ConnectionFactory) _context.lookup("connection")).createConnection();
+
+ _producerConnection.start();
+
+ _producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ _producer = _producerSession.createProducer(queue);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ _clientConnection.close();
+
+ _producerConnection.close();
+ super.tearDown();
+
+ if (BROKER.startsWith("vm://"))
+ {
+ TransportConnection.killAllVMBrokers();
+ }
+ }
+
+ public void test() throws JMSException
+ {
+ //Set TTL
+ int msg = 0;
+ _producer.send(nextMessage(String.valueOf(msg), true));
+
+ _producer.setTimeToLive(TIME_TO_LIVE);
+
+ for (; msg < MSG_COUNT - 2; msg++)
+ {
+ _producer.send(nextMessage(String.valueOf(msg), false));
+ }
+
+ //Reset TTL
+ _producer.setTimeToLive(0L);
+ _producer.send(nextMessage(String.valueOf(msg), false));
+
+ try
+ {
+ // Sleep to ensure TTL reached
+ Thread.sleep(2000);
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+
+ _clientConnection.start();
+
+ //Receive Message 0
+ Message received = _consumer.receive(100);
+ Assert.assertNotNull("First message not received", received);
+ Assert.assertTrue("First message doesn't have first set.", received.getBooleanProperty("first"));
+ Assert.assertEquals("First message has incorrect TTL.", 0L, received.getLongProperty("TTL"));
+
+
+ received = _consumer.receive(100);
+ Assert.assertNotNull("Final message not received", received);
+ Assert.assertFalse("Final message has first set.", received.getBooleanProperty("first"));
+ Assert.assertEquals("Final message has incorrect TTL.", 0L, received.getLongProperty("TTL"));
+
+ received = _consumer.receive(100);
+ Assert.assertNull("More messages received", received);
+ }
+
+ private Message nextMessage(String msg, boolean first) throws JMSException
+ {
+ Message send = _producerSession.createTextMessage("Message " + msg);
+ send.setBooleanProperty("first", first);
+ send.setLongProperty("TTL", _producer.getTimeToLive());
+ return send;
+ }
+
+
+}
Propchange: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date