You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/07/03 14:02:08 UTC
svn commit: r790880 - in /activemq/trunk/activemq-core: ./
src/main/java/org/apache/activemq/broker/jmx/
src/main/java/org/apache/activemq/broker/region/
src/main/java/org/apache/activemq/broker/region/policy/
src/main/java/org/apache/activemq/usage/ s...
Author: gtully
Date: Fri Jul 3 12:02:07 2009
New Revision: 790880
URL: http://svn.apache.org/viewvc?rev=790880&view=rev
Log:
first cut of resolution to deterministic expiry https://issues.apache.org/activemq/browse/AMQ-1112 - default period is 30 seconds, destination policy entry allows it to be specified or turned off (0)
Modified:
activemq/trunk/activemq-core/pom.xml
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Fri Jul 3 12:02:07 2009
@@ -451,8 +451,6 @@
<exclude>**/AMQDeadlockTest3.*</exclude>
<exclude>**/AMQ1936Test.*</exclude>
- <!-- excluding it until the issue is fixed (AMQ-1112), so we can have successful builds -->
- <exclude>**/MessageExpirationReaperTest.*</exclude>
</excludes>
</configuration>
</plugin>
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Fri Jul 3 12:02:07 2009
@@ -89,6 +89,10 @@
public long getInFlightCount() {
return destination.getDestinationStatistics().getInflight().getCount();
}
+
+ public long getExpiredCount() {
+ return destination.getDestinationStatistics().getExpired().getCount();
+ }
public long getConsumerCount() {
return destination.getDestinationStatistics().getConsumers().getCount();
@@ -363,4 +367,5 @@
}
return answer;
}
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Fri Jul 3 12:02:07 2009
@@ -73,6 +73,14 @@
*/
long getInFlightCount();
+
+ /**
+ * Returns the number of messages that have expired
+ *
+ * @return The number of messages that have expired
+ */
+ long getExpiredCount();
+
/**
* Returns the number of consumers subscribed this destination.
*
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Fri Jul 3 12:02:07 2009
@@ -44,6 +44,7 @@
*/
public static final int MAX_PAGE_SIZE = 200;
public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
+ public static final long EXPIRE_MESSAGE_PERIOD = 30*1000;
protected final ActiveMQDestination destination;
protected final Broker broker;
protected final MessageStore store;
@@ -69,6 +70,8 @@
protected final BrokerService brokerService;
protected final Broker regionBroker;
protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
+ protected long expireMessagesPeriod = EXPIRE_MESSAGE_PERIOD;
+ private int maxExpirePageSize = MAX_BROWSE_PAGE_SIZE;
/**
* @param broker
@@ -213,7 +216,23 @@
public void setMaxBrowsePageSize(int maxPageSize) {
this.maxBrowsePageSize = maxPageSize;
}
+
+ public int getMaxExpirePageSize() {
+ return this.maxExpirePageSize;
+ }
+
+ public void setMaxExpirePageSize(int maxPageSize) {
+ this.maxExpirePageSize = maxPageSize;
+ }
+ public void setExpireMessagesPeriod(long expireMessagesPeriod) {
+ this.expireMessagesPeriod = expireMessagesPeriod;
+ }
+
+ public long getExpireMessagesPeriod() {
+ return expireMessagesPeriod;
+ }
+
public boolean isUseCache() {
return useCache;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java Fri Jul 3 12:02:07 2009
@@ -21,7 +21,6 @@
import org.apache.activemq.management.PollCountStatisticImpl;
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.management.TimeStatisticImpl;
-import org.apache.tools.ant.taskdefs.condition.IsReference;
/**
* The J2EE Statistics for the a Destination.
@@ -38,6 +37,7 @@
protected PollCountStatisticImpl messagesCached;
protected CountStatisticImpl dispatched;
protected CountStatisticImpl inflight;
+ protected CountStatisticImpl expired;
protected TimeStatisticImpl processTime;
public DestinationStatistics() {
@@ -46,6 +46,8 @@
dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the destination");
dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the destination");
inflight = new CountStatisticImpl("inflight", "The number of messages dispatched but awaiting acknowledgement");
+ expired = new CountStatisticImpl("expired", "The number of messages that have expired");
+
consumers = new CountStatisticImpl("consumers", "The number of consumers that that are subscribing to messages from the destination");
consumers.setDoReset(false);
producers = new CountStatisticImpl("producers", "The number of producers that that are publishing messages to the destination");
@@ -57,6 +59,7 @@
addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues);
addStatistic("inflight", inflight);
+ addStatistic("expired", expired);
addStatistic("consumers", consumers);
addStatistic("producers", producers);
addStatistic("messages", messages);
@@ -76,6 +79,10 @@
return inflight;
}
+ public CountStatisticImpl getExpired() {
+ return expired;
+ }
+
public CountStatisticImpl getConsumers() {
return consumers;
}
@@ -111,6 +118,7 @@
dequeues.reset();
dispatched.reset();
inflight.reset();
+ expired.reset();
}
}
@@ -120,6 +128,7 @@
dispatched.setEnabled(enabled);
dequeues.setEnabled(enabled);
inflight.setEnabled(enabled);
+ expired.setEnabled(true);
consumers.setEnabled(enabled);
producers.setEnabled(enabled);
messages.setEnabled(enabled);
@@ -134,6 +143,7 @@
dispatched.setParent(parent.dispatched);
dequeues.setParent(parent.dequeues);
inflight.setParent(parent.inflight);
+ expired.setParent(parent.expired);
consumers.setParent(parent.consumers);
producers.setParent(parent.producers);
messagesCached.setParent(parent.messagesCached);
@@ -144,6 +154,7 @@
dispatched.setParent(null);
dequeues.setParent(null);
inflight.setParent(null);
+ expired.setParent(null);
consumers.setParent(null);
producers.setParent(null);
messagesCached.setParent(null);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Jul 3 12:02:07 2009
@@ -17,6 +17,7 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
+import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -66,6 +67,7 @@
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.DeterministicTaskRunner;
+import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -112,7 +114,13 @@
wakeup();
}
};
+ private final Runnable expireMessagesTask = new Runnable() {
+ public void run() {
+ expireMessages();
+ }
+ };
private final Object iteratingMutex = new Object() {};
+ private static final Scheduler scheduler = Scheduler.getInstance();
private static final Comparator<Subscription>orderedCompare = new Comparator<Subscription>() {
@@ -177,6 +185,11 @@
this.taskRunner = new DeterministicTaskRunner(this.executor,this);
}
+
+ if (getExpireMessagesPeriod() > 0) {
+ scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod());
+ }
+
super.initialize();
if (store != null) {
// Restore the persistent messages.
@@ -192,7 +205,7 @@
// Message could have expired while it was being
// loaded..
if (broker.isExpired(message)) {
- messageExpired(createConnectionContext(), message);
+ messageExpired(createConnectionContext(), null, message, false);
return true;
}
if (hasSpace()) {
@@ -416,11 +429,11 @@
public void run() {
try {
-
// While waiting for space to free up... the
// message may have expired.
if (message.isExpired()) {
broker.messageExpired(context, message);
+ destinationStatistics.getExpired().increment();
} else {
doMessageSend(producerExchange, message);
}
@@ -498,6 +511,7 @@
throw new IOException(
"Connection closed, send aborted.");
}
+ LOG.debug(this + ", waiting for store space... msg: " + message);
}
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
store.addMessage(context, message);
@@ -516,8 +530,7 @@
// op, by that time the message could have expired..
if (broker.isExpired(message)) {
broker.messageExpired(context, message);
- //message not added to stats yet
- //destinationStatistics.getMessages().decrement();
+ destinationStatistics.getExpired().increment();
return;
}
sendMessage(context, message);
@@ -537,9 +550,34 @@
sendMessage(context, message);
}
}
+
+ private void expireMessages() {
+ LOG.info("expiring messages...");
- public void gc(){
- }
+ // just track the insertion count
+ List<Message> l = new AbstractList<Message>() {
+ int size = 0;
+
+ @Override
+ public void add(int index, Message element) {
+ size++;
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public Message get(int index) {
+ return null;
+ }
+ };
+ doBrowse(true, l, getMaxBrowsePageSize());
+ }
+
+ public void gc(){
+ }
public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
messageConsumed(context, node);
@@ -593,6 +631,10 @@
if (this.executor != null) {
this.executor.shutdownNow();
}
+
+ LOG.info(toString() + ", canceling expireMessagesTask");
+ scheduler.cancel(expireMessagesTask);
+
if (messages != null) {
messages.stop();
}
@@ -691,57 +733,74 @@
return result;
}
- public Message[] browse() {
- int count = 0;
+ public Message[] browse() {
List<Message> l = new ArrayList<Message>();
+ doBrowse(false, l, getMaxBrowsePageSize());
+ return l.toArray(new Message[l.size()]);
+ }
+
+ public void doBrowse(boolean forcePageIn, List<Message> l, int max) {
+ final ConnectionContext connectionContext = createConnectionContext();
try {
- pageInMessages(false);
- synchronized (this.pagedInPendingDispatch) {
- for (Iterator<QueueMessageReference> i = this.pagedInPendingDispatch
- .iterator(); i.hasNext()
- && count < getMaxBrowsePageSize();) {
- l.add(i.next().getMessage());
- count++;
+ pageInMessages(forcePageIn);
+ List<MessageReference> toExpire = new ArrayList<MessageReference>();
+ dispatchLock.lock();
+ try {
+ synchronized (pagedInPendingDispatch) {
+ addAll(pagedInPendingDispatch, l, max, toExpire);
+ for (MessageReference ref : toExpire) {
+ pagedInPendingDispatch.remove(ref);
+ messageExpired(connectionContext, ref, false);
+ }
}
- }
- if (count < getMaxBrowsePageSize()) {
+ toExpire.clear();
synchronized (pagedInMessages) {
- for (Iterator<QueueMessageReference> i = this.pagedInMessages
- .values().iterator(); i.hasNext()
- && count < getMaxBrowsePageSize();) {
- Message m = i.next().getMessage();
- if (l.contains(m) == false) {
- l.add(m);
- count++;
- }
- }
+ addAll(pagedInMessages.values(), l, max, toExpire);
}
- }
- if (count < getMaxBrowsePageSize()) {
- synchronized (messages) {
- try {
- messages.reset();
- while (messages.hasNext()
- && count < getMaxBrowsePageSize()) {
- MessageReference node = messages.next();
- messages.rollback(node.getMessageId());
- if (node != null) {
- Message m = node.getMessage();
- if (l.contains(m) == false) {
- l.add(m);
- count++;
+ for (MessageReference ref : toExpire) {
+ messageExpired(connectionContext, ref, false);
+ }
+
+ if (l.size() < getMaxBrowsePageSize()) {
+ synchronized (messages) {
+ try {
+ messages.reset();
+ while (messages.hasNext() && l.size() < max) {
+ MessageReference node = messages.next();
+ messages.rollback(node.getMessageId());
+ if (node != null) {
+ if (broker.isExpired(node)) {
+ messageExpired(connectionContext,
+ createMessageReference(node.getMessage()), false);
+ } else if (l.contains(node.getMessage()) == false) {
+ l.add(node.getMessage());
+ }
}
}
+ } finally {
+ messages.release();
}
- } finally {
- messages.release();
}
}
+ } finally {
+ dispatchLock.unlock();
}
} catch (Exception e) {
- LOG.error("Problem retrieving message in browse() ", e);
+ LOG.error("Problem retrieving message for browse", e);
+ }
+ }
+
+ private void addAll(Collection<QueueMessageReference> refs,
+ List<Message> l, int maxBrowsePageSize, List<MessageReference> toExpire) throws Exception {
+ for (Iterator<QueueMessageReference> i = refs.iterator(); i.hasNext()
+ && l.size() < getMaxBrowsePageSize();) {
+ QueueMessageReference ref = i.next();
+ if (broker.isExpired(ref)) {
+ toExpire.add(ref);
+ } else if (l.contains(ref.getMessage()) == false) {
+ l.add(ref.getMessage());
+ }
}
- return l.toArray(new Message[l.size()]);
}
public Message getMessage(String id) {
@@ -1190,22 +1249,26 @@
}
}
- public void messageExpired(ConnectionContext context,MessageReference reference) {
- messageExpired(context,null,reference);
+ public void messageExpired(ConnectionContext context,MessageReference reference, boolean dispatched) {
+ messageExpired(context,null,reference, dispatched);
}
public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) {
+ messageExpired(context, subs, reference, true);
+ }
+
+ public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference, boolean dispatched) {
broker.messageExpired(context, reference);
destinationStatistics.getDequeues().increment();
- destinationStatistics.getInflight().decrement();
+ destinationStatistics.getExpired().increment();
+ if (dispatched) {
+ destinationStatistics.getInflight().decrement();
+ }
try {
removeMessage(context,subs,(QueueMessageReference)reference);
} catch (IOException e) {
LOG.error("Failed to remove expired Message from the store ",e);
}
- synchronized(pagedInMessages) {
- pagedInMessages.remove(reference.getMessageId());
- }
wakeup();
}
@@ -1286,7 +1349,7 @@
result.add(ref);
count++;
} else {
- messageExpired(createConnectionContext(), ref);
+ messageExpired(createConnectionContext(), ref, false);
}
}
} finally {
@@ -1312,7 +1375,7 @@
}
return resultList;
}
-
+
private void doDispatch(List<QueueMessageReference> list) throws Exception {
dispatchLock.lock();
try {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Fri Jul 3 12:02:07 2009
@@ -709,6 +709,10 @@
BrokerSupport.resend(context,message,
deadLetterDestination);
}
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Expired message with no DLQ strategy in place");
+ }
}
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Jul 3 12:02:07 2009
@@ -278,6 +278,7 @@
// destination.. it may have expired.
if (message.isExpired()) {
broker.messageExpired(context, message);
+ getDestinationStatistics().getExpired().increment();
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
@@ -306,6 +307,7 @@
// While waiting for space to free up... the
// message may have expired.
if (message.isExpired()) {
+ getDestinationStatistics().getExpired().increment();
broker.messageExpired(context, message);
} else {
doMessageSend(producerExchange, message);
@@ -361,6 +363,7 @@
// The usage manager could have delayed us by the time
// we unblock the message could have expired..
if (message.isExpired()) {
+ getDestinationStatistics().getExpired().increment();
if (LOG.isDebugEnabled()) {
LOG.debug("Expired message: " + message);
}
@@ -418,6 +421,7 @@
// operration.. by that time the message could have
// expired..
if (broker.isExpired(message)) {
+ getDestinationStatistics().getExpired().increment();
broker.messageExpired(context, message);
message.decrementReferenceCount();
return;
@@ -594,6 +598,7 @@
broker.messageExpired(context, reference);
destinationStatistics.getMessages().decrement();
destinationStatistics.getEnqueues().decrement();
+ destinationStatistics.getExpired().increment();
MessageAck ack = new MessageAck();
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
ack.setDestination(destination);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Fri Jul 3 12:02:07 2009
@@ -161,6 +161,7 @@
matched.remove();
dispatchedCounter.incrementAndGet();
node.decrementReferenceCount();
+ node.getRegionDestination().getDestinationStatistics().getExpired().increment();
broker.messageExpired(getContext(), node);
break;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Fri Jul 3 12:02:07 2009
@@ -73,15 +73,15 @@
private boolean advisoryWhenFull;
private boolean advisoryForDelivery;
private boolean advisoryForConsumed;
+ private long expireMessagesPeriod = BaseDestination.EXPIRE_MESSAGE_PERIOD;
+ private int maxExpirePageSize = BaseDestination.MAX_BROWSE_PAGE_SIZE;
public void configure(Broker broker,Queue queue) {
baseConfiguration(queue);
if (dispatchPolicy != null) {
queue.setDispatchPolicy(dispatchPolicy);
}
- if (deadLetterStrategy != null) {
- queue.setDeadLetterStrategy(deadLetterStrategy);
- }
+ queue.setDeadLetterStrategy(getDeadLetterStrategy());
queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
if (memoryLimit > 0) {
queue.getMemoryUsage().setLimit(memoryLimit);
@@ -104,9 +104,7 @@
if (dispatchPolicy != null) {
topic.setDispatchPolicy(dispatchPolicy);
}
- if (deadLetterStrategy != null) {
- topic.setDeadLetterStrategy(deadLetterStrategy);
- }
+ topic.setDeadLetterStrategy(getDeadLetterStrategy());
if (subscriptionRecoveryPolicy != null) {
topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy.copy());
}
@@ -132,6 +130,8 @@
destination.setAdvisdoryForFastProducers(isAdvisdoryForFastProducers());
destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
destination.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
+ destination.setExpireMessagesPeriod(getExpireMessagesPeriod());
+ destination.setMaxExpirePageSize(getMaxExpirePageSize());
}
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
@@ -543,4 +543,21 @@
this.advisdoryForFastProducers = advisdoryForFastProducers;
}
+ public void setMaxExpirePageSize(int maxExpirePageSize) {
+ this.maxExpirePageSize = maxExpirePageSize;
+ }
+
+ public int getMaxExpirePageSize() {
+ return maxExpirePageSize;
+ }
+
+ public void setExpireMessagesPeriod(long expireMessagesPeriod) {
+ this.expireMessagesPeriod = expireMessagesPeriod;
+ }
+
+ public long getExpireMessagesPeriod() {
+ return expireMessagesPeriod;
+ }
+
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java Fri Jul 3 12:02:07 2009
@@ -242,8 +242,8 @@
LOG.debug("Memory usage change from: " + oldPercentUsage + "% of available memory, to: "
+ newPercentUsage + "% of available memory");
}
- if (newPercentUsage >= 80) {
- LOG.warn("Memory usage is now over 80%!");
+ if (newPercentUsage >= 100) {
+ LOG.warn("Memory usage is now at " + newPercentUsage + "%");
}
if (started.get()) {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Fri Jul 3 12:02:07 2009
@@ -19,7 +19,6 @@
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.HashMap;
import java.util.Map;
-import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -28,8 +27,6 @@
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java Fri Jul 3 12:02:07 2009
@@ -1,6 +1,7 @@
package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
@@ -16,6 +17,8 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.junit.After;
import org.junit.Before;
@@ -56,8 +59,16 @@
broker = new BrokerService();
// broker.setPersistent(false);
// broker.setUseJmx(true);
+ broker.setDeleteAllMessagesOnStartup(true);
broker.setBrokerName(brokerName);
broker.addConnector(brokerUrl);
+
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry defaultEntry = new PolicyEntry();
+ defaultEntry.setExpireMessagesPeriod(500);
+ policyMap.setDefaultEntry(defaultEntry);
+ broker.setDestinationPolicy(policyMap);
+
broker.start();
}
@@ -85,15 +96,13 @@
}
// Let the messages expire
- Thread.sleep(1000);
+ Thread.sleep(2000);
DestinationViewMBean view = createView(destination);
- /*################### CURRENT EXPECTED FAILURE ####################*/
- // The messages expire and should be reaped but they're not currently
- // reaped until there is an active consumer placed on the queue
- assertEquals("Incorrect count: " + view.getInFlightCount(), 0, view.getInFlightCount());
-
+ assertEquals("Incorrect inflight count: " + view.getInFlightCount(), 0, view.getInFlightCount());
+ assertEquals("Incorrect queue size count", 0, view.getQueueSize());
+ assertEquals("Incorrect expired size count", 3, view.getEnqueueCount());
// Send more messages with an expiration
for (int i = 0; i < count; i++) {
@@ -101,10 +110,13 @@
producer.send(message);
}
+ // Let the messages expire
+ Thread.sleep(2000);
+
// Simply browse the queue
Session browserSession = createSession();
QueueBrowser browser = browserSession.createBrowser((Queue) destination);
- browser.getEnumeration();
+ assertFalse("no message in the browser", browser.getEnumeration().hasMoreElements());
// The messages expire and should be reaped because of the presence of
// the queue browser
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java?rev=790880&r1=790879&r2=790880&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesTest.java Fri Jul 3 12:02:07 2009
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.usecases;
+import java.util.concurrent.atomic.AtomicLong;
+
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -33,17 +35,21 @@
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
public class ExpiredMessagesTest extends CombinationTestSupport {
+ private static final Log LOG = LogFactory.getLog(ExpiredMessagesTest.class);
+
BrokerService broker;
Connection connection;
Session session;
MessageProducer producer;
MessageConsumer consumer;
- public ActiveMQDestination destination;
+ public ActiveMQDestination destination = new ActiveMQQueue("test");
public static Test suite() {
return suite(ExpiredMessagesTest.class);
@@ -77,6 +83,7 @@
producer.setTimeToLive(100);
consumer = session.createConsumer(destination);
connection.start();
+ final AtomicLong received = new AtomicLong();
Thread consumerThread = new Thread("Consumer Thread") {
public void run() {
@@ -84,7 +91,9 @@
try {
long end = System.currentTimeMillis();
while (end - start < 3000) {
- consumer.receive(1000);
+ if (consumer.receive(1000) != null) {
+ received.incrementAndGet();
+ }
Thread.sleep(100);
end = System.currentTimeMillis();
}
@@ -115,9 +124,13 @@
consumerThread.join();
producingThread.join();
+
DestinationViewMBean view = createView(destination);
+ LOG.info("Stats: received: " + received.get() + ", enqueues: " + view.getDequeueCount() + ", dequeues: " + view.getDequeueCount()
+ + ", dispatched: " + view.getDispatchCount() + ", inflight: " + view.getInFlightCount() + ", expiries: " + view.getExpiredCount());
- assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), view.getDispatchCount() - view.getDequeueCount(), view.getInFlightCount());
+ assertEquals("got what did not expire", received.get(), view.getDequeueCount() - view.getExpiredCount());
+ //assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), view.getDispatchCount() - view.getDequeueCount(), view.getInFlightCount());
}
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {