You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/05/09 17:33:18 UTC
svn commit: r536567 - in /incubator/qpid/trunk/qpid: ./
java/broker/src/main/java/org/apache/qpid/server/
java/broker/src/main/java/org/apache/qpid/server/queue/
java/cluster/src/main/java/org/apache/qpid/server/queue/
java/systests/src/main/java/org/a...
Author: ritchiem
Date: Wed May 9 08:33:17 2007
New Revision: 536567
URL: http://svn.apache.org/viewvc?view=rev&rev=536567
Log:
Merged revisions 536506 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r536506 | ritchiem | 2007-05-09 13:32:27 +0100 (Wed, 09 May 2007) | 10 lines
QPID-25 TimeToLive Basic implementation.
Messages are not automatically purged rather they are checked as they are selected for delivery. If they have expired they are dequeued.
AMQChannel - Update to call setExpiration on the message so the time can be adjusted if client & broker clocks are out of sync.
AMQMessage - Caches the _expiration time for internal use, adjusted for broker time. This leaves message headers unchanged so receiving client can see actual value requested by producer.
ConcurrentSelectorDeliveryManager - Updated to check for expired messages when getNextMessage is called. Immediate messages are NOT expired.
Subscription - Added method to getChannel that this Subscription is attatched to so we can retrieve the StoreContext for dequeue-ing the message.
TimeToLiveTest - Test of Time to live. Sends 50 msgs. 1 non-timed 48 1 second and 1 non-timed ensure only 2 msgs come back after 2 seconds
........
Added:
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
- copied unchanged from r536506, incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/TimeToLiveTest.java
Modified:
incubator/qpid/trunk/qpid/ (props changed)
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=536567&r1=536566&r2=536567
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed May 9 08:33:17 2007
@@ -228,6 +228,8 @@
_log.trace(debugIdentity() + "Content header received on channel " + _channelId);
}
_currentMessage.setContentHeaderBody(contentHeaderBody);
+ _currentMessage.setExpiration();
+
routeCurrentMessage();
_currentMessage.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=536567&r1=536566&r2=536567
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Wed May 9 08:33:17 2007
@@ -25,6 +25,7 @@
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
@@ -34,6 +35,7 @@
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.messageStore.StorableMessage;
import org.apache.qpid.server.messageStore.StorableQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
/** Combines the information that make up a deliverable message into a more manageable form. */
@@ -100,12 +102,42 @@
private final int hashcode = System.identityHashCode(this);
+ private long _expiration;
public String debugIdentity()
{
return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")";
}
+ public void setExpiration()
+ {
+ long expiration = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getExpiration();
+ long timestamp = ((BasicContentHeaderProperties) _transientMessageData.getContentHeaderBody().properties).getTimestamp();
+
+ if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks", false))
+ {
+ _expiration = expiration;
+ }
+ else
+ {
+ // Update TTL to be in broker time.
+ if (expiration != 0L)
+ {
+ if (timestamp != 0L)
+ {
+ //todo perhaps use arrival time
+ long diff = (System.currentTimeMillis() - timestamp);
+
+ if (diff > 1000L || diff < 1000L)
+ {
+ _expiration = expiration + diff;
+ }
+ }
+ }
+ }
+
+ }
+
/**
* Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
* therefore is memory-efficient.
@@ -212,8 +244,6 @@
_immediate = info.isImmediate();
_transientMessageData.setMessagePublishInfo(info);
-// _taken = new AtomicBoolean(false);
-
}
/**
@@ -731,10 +761,35 @@
return _messageHandle.getArrivalTime();
}
-
/**
- * Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality).
+ * Checks to see if the message has expired. If it has the message is dequeued.
+ *
+ * @param storecontext
+ * @param queue
+ *
+ * @return true if the message has expire
+ *
+ * @throws AMQException
*/
+ public boolean expired(StoreContext storecontext, AMQQueue queue) throws AMQException
+ {
+ //note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
+
+ if (_expiration != 0L)
+ {
+ long now = System.currentTimeMillis();
+
+ if (now > _expiration)
+ {
+ dequeue(storecontext, queue);
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /** Called when this message is delivered to a consumer. (used to implement the 'immediate' flag functionality). */
public void setDeliveredToConsumer()
{
_deliveredToConsumer = true;
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=536567&r1=536566&r2=536567
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Wed May 9 08:33:17 2007
@@ -434,13 +434,19 @@
return count;
}
- /** This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged. */
+ /**
+ * This can only be used to clear the _messages queue. Any subscriber resend queue will not be purged.
+ *
+ * @return the next message or null
+ *
+ * @throws org.apache.qpid.AMQException
+ */
private AMQMessage getNextMessage() throws AMQException
{
return getNextMessage(_messages, null);
}
- private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub)
+ private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription sub) throws AMQException
{
AMQMessage message = messages.peek();
@@ -449,9 +455,11 @@
&& (
((sub != null && !sub.isBrowser()) || message.isTaken(_queue))
|| sub == null)
- && message.taken(_queue, sub))
+ && (message.taken(_queue, sub) // Message not taken by another consumer ... unless it is expired
+ || (sub == null || message.expired(sub.getChannel().getStoreContext(), _queue))) // Message not expired
+ )
{
- //remove the already taken message
+ //remove the already taken message or expired
AMQMessage removed = messages.poll();
assert removed == message;
@@ -466,6 +474,7 @@
// try the next message
message = messages.peek();
}
+
return message;
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=536567&r1=536566&r2=536567
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Wed May 9 08:33:17 2007
@@ -23,6 +23,7 @@
import java.util.Queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
public interface Subscription
{
@@ -57,4 +58,6 @@
void addToResendQueue(AMQMessage msg);
Object getSendLock();
+
+ AMQChannel getChannel();
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=536567&r1=536566&r2=536567
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Wed May 9 08:33:17 2007
@@ -668,5 +668,9 @@
return _sendLock;
}
+ public AMQChannel getChannel()
+ {
+ return channel;
+ }
}
Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java?view=diff&rev=536567&r1=536566&r2=536567
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java Wed May 9 08:33:17 2007
@@ -23,6 +23,7 @@
import org.apache.qpid.server.cluster.MemberHandle;
import org.apache.qpid.server.cluster.GroupManager;
import org.apache.qpid.server.cluster.SimpleSendable;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.AMQException;
import java.util.Queue;
@@ -165,6 +166,11 @@
public Object getSendLock()
{
return new Object();
+ }
+
+ public AMQChannel getChannel()
+ {
+ return null;
}
}
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java?view=diff&rev=536567&r1=536566&r2=536567
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MaxChannelsTest.java Wed May 9 08:33:17 2007
@@ -62,7 +62,7 @@
{
for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++)
{
- _protocolSession.addChannel(new AMQChannel(_protocolSession, (int) currentChannel, null, null));
+ _protocolSession.addChannel(new AMQChannel(_protocolSession, (int) currentChannel, null, null, null));
}
}
catch (AMQException e)
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?view=diff&rev=536567&r1=536566&r2=536567
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Wed May 9 08:33:17 2007
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.AMQChannel;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
@@ -82,6 +84,10 @@
return new Object();
}
+ public AMQChannel getChannel()
+ {
+ return null;
+ }
public void queueDeleted(AMQQueue queue)
{