You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/11/25 20:35:10 UTC
svn commit: r884234 [2/2] - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker:
jmx/ region/ region/policy/
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=884234&r1=884233&r2=884234&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Wed Nov 25 19:35:09 2009
@@ -75,7 +75,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
/**
* The Queue is a List of MessageEntry objects that are dispatched to matching
* subscriptions.
@@ -85,10 +84,10 @@
public class Queue extends BaseDestination implements Task, UsageListener {
protected static final Log LOG = LogFactory.getLog(Queue.class);
protected final TaskRunnerFactory taskFactory;
- protected TaskRunner taskRunner;
+ protected TaskRunner taskRunner;
protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
protected PendingMessageCursor messages;
- private final LinkedHashMap<MessageId,QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId,QueueMessageReference>();
+ private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>();
// Messages that are paged in but have not yet been targeted at a subscription
private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100);
private MessageGroupMap messageGroupOwners;
@@ -98,16 +97,16 @@
private ExecutorService executor;
protected final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final Object dispatchMutex = new Object();
- private boolean useConsumerPriority=true;
- private boolean strictOrderDispatch=false;
+ private boolean useConsumerPriority = true;
+ private boolean strictOrderDispatch = false;
private QueueDispatchSelector dispatchSelector;
- private boolean optimizedDispatch=false;
+ private boolean optimizedDispatch = false;
private boolean firstConsumer = false;
private int timeBeforeDispatchStarts = 0;
private int consumersBeforeDispatchStarts = 0;
private CountDownLatch consumersBeforeStartsLatch;
private AtomicLong pendingWakeups = new AtomicLong();
-
+
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() {
asyncWakeup();
@@ -115,25 +114,24 @@
};
private final Runnable expireMessagesTask = new Runnable() {
public void run() {
- expireMessages();
+ expireMessages();
}
};
private final Object iteratingMutex = new Object() {};
private static final Scheduler scheduler = Scheduler.getInstance();
-
- private static final Comparator<Subscription>orderedCompare = new Comparator<Subscription>() {
+
+ private static final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
public int compare(Subscription s1, Subscription s2) {
//We want the list sorted in descending order
return s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority();
- }
+ }
};
-
- public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store,DestinationStatistics parentStats,
- TaskRunnerFactory taskFactory) throws Exception {
+
+ public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
super(brokerService, store, destination, parentStats);
- this.taskFactory=taskFactory;
- this.dispatchSelector=new QueueDispatchSelector(destination);
+ this.taskFactory = taskFactory;
+ this.dispatchSelector = new QueueDispatchSelector(destination);
}
public List<Subscription> getConsumers() {
@@ -145,13 +143,13 @@
// make the queue easily visible in the debugger from its task runner threads
final class QueueThread extends Thread {
final Queue queue;
- public QueueThread(Runnable runnable, String name,
- Queue queue) {
+
+ public QueueThread(Runnable runnable, String name, Queue queue) {
super(runnable, name);
this.queue = queue;
}
}
-
+
public void initialize() throws Exception {
if (this.messages == null) {
if (destination.isTemporary() || broker == null || store == null) {
@@ -168,10 +166,9 @@
this.systemUsage = brokerService.getSystemUsage();
memoryUsage.setParent(systemUsage.getMemoryUsage());
}
-
- this.taskRunner =
- taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName());
-
+
+ this.taskRunner = taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName());
+
super.initialize();
if (store != null) {
// Restore the persistent messages.
@@ -217,12 +214,12 @@
public boolean hasSpace() {
return true;
}
-
+
public boolean isDuplicate(MessageId id) {
return false;
}
});
- }else {
+ } else {
int messageCount = store.getMessageCount();
destinationStatistics.getMessages().setCount(messageCount);
}
@@ -230,22 +227,20 @@
}
/*
- * Holder for subscription and pagedInMessages as a browser
- * needs access to existing messages in the queue that have
- * already been dispatched
+ * Holder for subscription and pagedInMessages as a browser needs access to
+ * existing messages in the queue that have already been dispatched
*/
class BrowserDispatch {
ArrayList<QueueMessageReference> messages;
QueueBrowserSubscription browser;
-
- public BrowserDispatch(QueueBrowserSubscription browserSubscription,
- Collection<QueueMessageReference> values) {
-
- messages = new ArrayList<QueueMessageReference>(values);
+
+ public BrowserDispatch(QueueBrowserSubscription browserSubscription, Collection<QueueMessageReference> values) {
+
+ messages = new ArrayList<QueueMessageReference>(values);
browser = browserSubscription;
browser.incrementQueueRef();
}
-
+
void done() {
try {
browser.decrementQueueRef();
@@ -258,57 +253,57 @@
return browser;
}
}
-
+
LinkedList<BrowserDispatch> browserDispatches = new LinkedList<BrowserDispatch>();
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
// synchronize with dispatch method so that no new messages are sent
// while setting up a subscription. avoid out of order messages,
// duplicates, etc.
- synchronized(dispatchMutex) {
-
+ synchronized (dispatchMutex) {
+
sub.add(context, this);
destinationStatistics.getConsumers().increment();
// needs to be synchronized - so no contention with dispatching
synchronized (consumers) {
-
- // set a flag if this is a first consumer
- if (consumers.size() == 0) {
- firstConsumer = true;
- if (consumersBeforeDispatchStarts != 0) {
- consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1);
- }
- } else {
- if (consumersBeforeStartsLatch != null) {
- consumersBeforeStartsLatch.countDown();
- }
- }
-
+
+ // set a flag if this is a first consumer
+ if (consumers.size() == 0) {
+ firstConsumer = true;
+ if (consumersBeforeDispatchStarts != 0) {
+ consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1);
+ }
+ } else {
+ if (consumersBeforeStartsLatch != null) {
+ consumersBeforeStartsLatch.countDown();
+ }
+ }
+
addToConsumerList(sub);
if (sub.getConsumerInfo().isExclusive()) {
Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
- if(exclusiveConsumer==null) {
- exclusiveConsumer=sub;
- }else if (sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()){
- exclusiveConsumer=sub;
+ if (exclusiveConsumer == null) {
+ exclusiveConsumer = sub;
+ } else if (sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()) {
+ exclusiveConsumer = sub;
}
dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
}
}
-
- if (sub instanceof QueueBrowserSubscription ) {
+
+ if (sub instanceof QueueBrowserSubscription) {
QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
-
+
// do again in iterate to ensure new messages are dispatched
pageInMessages(false);
-
- synchronized (pagedInMessages) {
- if (!pagedInMessages.isEmpty()) {
- BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription, pagedInMessages.values());
- browserDispatches.addLast(browserDispatch);
- }
- }
+
+ synchronized (pagedInMessages) {
+ if (!pagedInMessages.isEmpty()) {
+ BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription, pagedInMessages.values());
+ browserDispatches.addLast(browserDispatch);
+ }
+ }
}
if (!(this.optimizedDispatch || isSlave())) {
wakeup();
@@ -321,30 +316,23 @@
}
}
- public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId)
- throws Exception {
+ public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId) throws Exception {
destinationStatistics.getConsumers().decrement();
// synchronize with dispatch method so that no new messages are sent
// while removing up a subscription.
- synchronized(dispatchMutex) {
+ synchronized (dispatchMutex) {
if (LOG.isDebugEnabled()) {
- LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId
- + ", dequeues: " + getDestinationStatistics().getDequeues().getCount()
- + ", dispatched: " + getDestinationStatistics().getDispatched().getCount()
- + ", inflight: " + getDestinationStatistics().getInflight().getCount());
+ LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: " + getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
+ + getDestinationStatistics().getDispatched().getCount() + ", inflight: " + getDestinationStatistics().getInflight().getCount());
}
synchronized (consumers) {
removeFromConsumerList(sub);
if (sub.getConsumerInfo().isExclusive()) {
- Subscription exclusiveConsumer = dispatchSelector
- .getExclusiveConsumer();
+ Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
if (exclusiveConsumer == sub) {
exclusiveConsumer = null;
for (Subscription s : consumers) {
- if (s.getConsumerInfo().isExclusive()
- && (exclusiveConsumer == null
- || s.getConsumerInfo().getPriority() > exclusiveConsumer
- .getConsumerInfo().getPriority())) {
+ if (s.getConsumerInfo().isExclusive() && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority())) {
exclusiveConsumer = s;
}
@@ -354,12 +342,12 @@
}
ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
getMessageGroupOwners().removeConsumer(consumerId);
-
+
// redeliver inflight messages
List<QueueMessageReference> list = new ArrayList<QueueMessageReference>();
for (MessageReference ref : sub.remove(context, this)) {
- QueueMessageReference qmr = (QueueMessageReference)ref;
- if( qmr.getLockOwner()==sub ) {
+ QueueMessageReference qmr = (QueueMessageReference) ref;
+ if (qmr.getLockOwner() == sub) {
qmr.unlock();
// only increment redelivery if it was delivered or we have no delivery information
if (lastDeiveredSequenceId == 0 || qmr.getMessageId().getBrokerSequenceId() <= lastDeiveredSequenceId) {
@@ -368,7 +356,7 @@
}
list.add(qmr);
}
-
+
if (!list.isEmpty()) {
doDispatch(list);
}
@@ -400,32 +388,33 @@
}
return;
}
- if(memoryUsage.isFull()) {
+ if (memoryUsage.isFull()) {
isFull(context, memoryUsage);
fastProducer(context, producerInfo);
if (isProducerFlowControl() && context.isProducerFlowControl()) {
- if(warnOnProducerFlowControl) {
+ if (warnOnProducerFlowControl) {
warnOnProducerFlowControl = false;
- LOG.info("Usage Manager memory limit reached on " +getActiveMQDestination().getQualifiedName() + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." +
- " See http://activemq.apache.org/producer-flow-control.html for more info");
+ LOG.info("Usage Manager Memory Limit reached on " + getActiveMQDestination().getQualifiedName()
+ + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
+ + " See http://activemq.apache.org/producer-flow-control.html for more info");
}
-
+
if (systemUsage.isSendFailIfNoSpace()) {
- throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " +getActiveMQDestination().getQualifiedName() + "." +
- " See http://activemq.apache.org/producer-flow-control.html for more info");
+ throw new javax.jms.ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
+ + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
}
-
+
// We can avoid blocking due to low usage if the producer is sending
// a sync message or
// if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
// copy the exchange state since the context will be modified while we are waiting
// for space.
- final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
+ final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
synchronized (messagesWaitingForSpace) {
messagesWaitingForSpace.add(new Runnable() {
public void run() {
-
+
try {
// While waiting for space to free up... the
// message may have expired.
@@ -436,7 +425,7 @@
} else {
doMessageSend(producerExchangeCopy, message);
}
-
+
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
@@ -445,7 +434,7 @@
response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response);
}
-
+
} catch (Exception e) {
if (!sendProducerAck && !context.isInRecoveryMode()) {
ExceptionResponse response = new ExceptionResponse(e);
@@ -455,7 +444,7 @@
}
}
});
-
+
// If the user manager is not full, then the task will not
// get called..
if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
@@ -465,18 +454,14 @@
context.setDontSendReponse(true);
return;
}
-
+
} else {
-
- // Producer flow control cannot be used, so we have do the flow
- // control at the broker
- // by blocking this thread until there is space available.
- while (!memoryUsage.waitForSpace(1000)) {
- if (context.getStopping().get()) {
- throw new IOException("Connection closed, send aborted.");
- }
+
+ if (memoryUsage.isFull()) {
+ waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer (" + message.getProducerId() + ") stopped to prevent flooding "
+ + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
}
-
+
// The usage manager could have delayed us by the time
// we unblock the message could have expired..
if (message.isExpired()) {
@@ -501,19 +486,15 @@
synchronized (sendLock) {
if (store != null && message.isPersistent()) {
if (systemUsage.getStoreUsage().isFull()) {
- final String logMessage = "Usage Manager Store is Full. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." +
- " See http://activemq.apache.org/producer-flow-control.html for more info";
- LOG.info(logMessage);
+
+ String logMessage = "Usage Manager Store is Full. Producer (" + message.getProducerId() + ") stopped to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ + " See http://activemq.apache.org/producer-flow-control.html for more info";
+
if (systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException(logMessage);
}
- }
- while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
- if (context.getStopping().get()) {
- throw new IOException(
- "Connection closed, send aborted.");
- }
- LOG.debug(this + ", waiting for store space... msg: " + message);
+
+ waitForSpace(context, systemUsage.getStoreUsage(), logMessage);
}
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
store.addMessage(context, message);
@@ -552,12 +533,12 @@
sendMessage(context, message);
}
}
-
+
private void expireMessages() {
if (LOG.isDebugEnabled()) {
LOG.debug("Expiring messages ..");
}
-
+
// just track the insertion count
List<Message> browsedMessages = new AbstractList<Message>() {
int size = 0;
@@ -581,9 +562,9 @@
asyncWakeup();
}
- public void gc(){
+ public void gc() {
}
-
+
public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
messageConsumed(context, node);
if (store != null && node.isPersistent()) {
@@ -616,8 +597,8 @@
synchronized (messages) {
size = messages.size();
}
- return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size
- + ", in flight groups=" + messageGroupOwners;
+ return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups="
+ + messageGroupOwners;
}
public void start() throws Exception {
@@ -632,25 +613,25 @@
doPageIn(false);
}
- public void stop() throws Exception{
+ public void stop() throws Exception {
if (taskRunner != null) {
taskRunner.shutdown();
}
if (this.executor != null) {
this.executor.shutdownNow();
}
-
+
scheduler.cancel(expireMessagesTask);
-
+
if (messages != null) {
messages.stop();
}
-
+
systemUsage.getMemoryUsage().removeUsageListener(this);
if (memoryUsage != null) {
memoryUsage.stop();
}
- if (store!=null) {
+ if (store != null) {
store.stop();
}
}
@@ -661,7 +642,6 @@
return destination;
}
-
public MessageGroupMap getMessageGroupOwners() {
if (messageGroupOwners == null) {
messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap();
@@ -692,7 +672,7 @@
public void setMessages(PendingMessageCursor messages) {
this.messages = messages;
}
-
+
public boolean isUseConsumerPriority() {
return useConsumerPriority;
}
@@ -708,7 +688,6 @@
public void setStrictOrderDispatch(boolean strictOrderDispatch) {
this.strictOrderDispatch = strictOrderDispatch;
}
-
public boolean isOptimizedDispatch() {
return optimizedDispatch;
@@ -717,21 +696,22 @@
public void setOptimizedDispatch(boolean optimizedDispatch) {
this.optimizedDispatch = optimizedDispatch;
}
- public int getTimeBeforeDispatchStarts() {
- return timeBeforeDispatchStarts;
- }
-
- public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) {
- this.timeBeforeDispatchStarts = timeBeforeDispatchStarts;
- }
-
- public int getConsumersBeforeDispatchStarts() {
- return consumersBeforeDispatchStarts;
- }
-
- public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) {
- this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
- }
+
+ public int getTimeBeforeDispatchStarts() {
+ return timeBeforeDispatchStarts;
+ }
+
+ public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) {
+ this.timeBeforeDispatchStarts = timeBeforeDispatchStarts;
+ }
+
+ public int getConsumersBeforeDispatchStarts() {
+ return consumersBeforeDispatchStarts;
+ }
+
+ public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) {
+ this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
+ }
// Implementation methods
// -------------------------------------------------------------------------
@@ -740,19 +720,18 @@
return result;
}
- public Message[] browse() {
+ public Message[] browse() {
List<Message> l = new ArrayList<Message>();
doBrowse(l, getMaxBrowsePageSize());
return l.toArray(new Message[l.size()]);
}
-
-
+
public void doBrowse(List<Message> l, int max) {
final ConnectionContext connectionContext = createConnectionContext();
try {
pageInMessages(false);
List<MessageReference> toExpire = new ArrayList<MessageReference>();
- synchronized(dispatchMutex) {
+ synchronized (dispatchMutex) {
synchronized (pagedInPendingDispatch) {
addAll(pagedInPendingDispatch, l, max, toExpire);
for (MessageReference ref : toExpire) {
@@ -775,17 +754,16 @@
}
}
}
-
+
if (l.size() < getMaxBrowsePageSize()) {
synchronized (messages) {
try {
messages.reset();
while (messages.hasNext() && l.size() < max) {
- MessageReference node = messages.next();
+ MessageReference node = messages.next();
if (node.isExpired()) {
if (broker.isExpired(node)) {
- messageExpired(connectionContext,
- createMessageReference(node.getMessage()));
+ messageExpired(connectionContext, createMessageReference(node.getMessage()));
}
messages.remove();
} else {
@@ -800,16 +778,14 @@
}
}
}
- }
+ }
} catch (Exception e) {
LOG.error("Problem retrieving message for browse", e);
- }
+ }
}
- private void addAll(Collection<QueueMessageReference> refs,
- List<Message> l, int maxBrowsePageSize, List<MessageReference> toExpire) throws Exception {
- for (Iterator<QueueMessageReference> i = refs.iterator(); i.hasNext()
- && l.size() < getMaxBrowsePageSize();) {
+ private void addAll(Collection<QueueMessageReference> refs, List<Message> l, int maxBrowsePageSize, List<MessageReference> toExpire) throws Exception {
+ for (Iterator<QueueMessageReference> i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) {
QueueMessageReference ref = i.next();
if (ref.isExpired()) {
toExpire.add(ref);
@@ -843,8 +819,7 @@
break;
}
} catch (IOException e) {
- LOG.error("got an exception retrieving message "
- + id);
+ LOG.error("got an exception retrieving message " + id);
}
}
} finally {
@@ -857,10 +832,10 @@
return null;
}
- public void purge() throws Exception {
+ public void purge() throws Exception {
ConnectionContext c = createConnectionContext();
List<MessageReference> list = null;
- do {
+ do {
doPageIn(true);
synchronized (pagedInMessages) {
list = new ArrayList<MessageReference>(pagedInMessages.values());
@@ -869,11 +844,11 @@
for (MessageReference ref : list) {
try {
QueueMessageReference r = (QueueMessageReference) ref;
- removeMessage(c,(IndirectMessageReference) r);
+ removeMessage(c, (IndirectMessageReference) r);
} catch (IOException e) {
}
}
-
+
} while (!pagedInMessages.isEmpty() || this.destinationStatistics.getMessages().getCount() > 0);
gc();
this.destinationStatistics.getMessages().setCount(0);
@@ -921,15 +896,14 @@
synchronized (pagedInMessages) {
set.addAll(pagedInMessages.values());
}
- List <MessageReference>list = new ArrayList<MessageReference>(set);
+ List<MessageReference> list = new ArrayList<MessageReference>(set);
for (MessageReference ref : list) {
IndirectMessageReference r = (IndirectMessageReference) ref;
if (filter.evaluate(context, r)) {
removeMessage(context, r);
set.remove(r);
- if (++movedCounter >= maximumMessages
- && maximumMessages > 0) {
+ if (++movedCounter >= maximumMessages && maximumMessages > 0) {
return movedCounter;
}
}
@@ -975,24 +949,23 @@
int count = 0;
Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
do {
- int oldMaxSize=getMaxPageSize();
+ int oldMaxSize = getMaxPageSize();
setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
doPageIn(true);
setMaxPageSize(oldMaxSize);
synchronized (pagedInMessages) {
set.addAll(pagedInMessages.values());
}
- List <MessageReference>list = new ArrayList<MessageReference>(set);
+ List<MessageReference> list = new ArrayList<MessageReference>(set);
for (MessageReference ref : list) {
IndirectMessageReference r = (IndirectMessageReference) ref;
if (filter.evaluate(context, r)) {
-
- r.incrementReferenceCount();
+
+ r.incrementReferenceCount();
try {
Message m = r.getMessage();
BrokerSupport.resend(context, m, dest);
- if (++movedCounter >= maximumMessages
- && maximumMessages > 0) {
+ if (++movedCounter >= maximumMessages && maximumMessages > 0) {
return movedCounter;
}
} finally {
@@ -1004,15 +977,16 @@
} while (count < this.destinationStatistics.getMessages().getCount());
return movedCounter;
}
-
+
/**
* Move a message
+ *
* @param context connection context
* @param m message
* @param dest ActiveMQDestination
* @throws Exception
*/
- public boolean moveMessageTo(ConnectionContext context,Message m,ActiveMQDestination dest) throws Exception {
+ public boolean moveMessageTo(ConnectionContext context, Message m, ActiveMQDestination dest) throws Exception {
QueueMessageReference r = createMessageReference(m);
BrokerSupport.resend(context, m, dest);
removeMessage(context, r);
@@ -1035,7 +1009,7 @@
* @return the number of messages removed
*/
public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception {
- return moveMatchingMessagesTo(context, selector, dest,Integer.MAX_VALUE);
+ return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE);
}
/**
@@ -1050,9 +1024,7 @@
* Moves the messages matching the given filter up to the maximum number of
* matched messages
*/
- public int moveMatchingMessagesTo(ConnectionContext context,
- MessageReferenceFilter filter, ActiveMQDestination dest,
- int maximumMessages) throws Exception {
+ public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
int movedCounter = 0;
Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
do {
@@ -1067,20 +1039,18 @@
// We should only move messages that can be locked.
moveMessageTo(context, ref.getMessage(), dest);
set.remove(r);
- if (++movedCounter >= maximumMessages
- && maximumMessages > 0) {
+ if (++movedCounter >= maximumMessages && maximumMessages > 0) {
return movedCounter;
}
}
}
- } while (set.size() < this.destinationStatistics.getMessages().getCount()
- && set.size() < maximumMessages);
+ } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages);
return movedCounter;
}
-
+
BrowserDispatch getNextBrowserDispatch() {
synchronized (pagedInMessages) {
- if( browserDispatches.isEmpty() ) {
+ if (browserDispatches.isEmpty()) {
return null;
}
return browserDispatches.removeFirst();
@@ -1093,93 +1063,93 @@
* @see org.apache.activemq.thread.Task#iterate()
*/
public boolean iterate() {
- boolean pageInMoreMessages = false;
- synchronized(iteratingMutex) {
-
+ boolean pageInMoreMessages = false;
+ synchronized (iteratingMutex) {
+
// do early to allow dispatch of these waiting messages
- synchronized(messagesWaitingForSpace) {
+ synchronized (messagesWaitingForSpace) {
while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) {
Runnable op = messagesWaitingForSpace.removeFirst();
op.run();
}
}
-
+
BrowserDispatch rd;
- while ((rd = getNextBrowserDispatch()) != null) {
- pageInMoreMessages = true;
-
- try {
- MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
- msgContext.setDestination(destination);
-
- QueueBrowserSubscription browser = rd.getBrowser();
- for (QueueMessageReference node : rd.messages) {
- if (!node.isAcked()) {
- msgContext.setMessageReference(node);
- if (browser.matches(node, msgContext)) {
- browser.add(node);
- }
- }
- }
-
+ while ((rd = getNextBrowserDispatch()) != null) {
+ pageInMoreMessages = true;
+
+ try {
+ MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
+ msgContext.setDestination(destination);
+
+ QueueBrowserSubscription browser = rd.getBrowser();
+ for (QueueMessageReference node : rd.messages) {
+ if (!node.isAcked()) {
+ msgContext.setMessageReference(node);
+ if (browser.matches(node, msgContext)) {
+ browser.add(node);
+ }
+ }
+ }
+
rd.done();
- } catch (Exception e) {
- LOG.warn("exception on dispatch to browser: " + rd.getBrowser(), e);
- }
- }
-
- if (firstConsumer) {
- firstConsumer = false;
- try {
- if (consumersBeforeDispatchStarts > 0) {
- int timeout = 1000; // wait one second by default if consumer count isn't reached
- if (timeBeforeDispatchStarts > 0) {
- timeout = timeBeforeDispatchStarts;
- }
- if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(consumers.size() + " consumers subscribed. Starting dispatch.");
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug(timeout + " ms elapsed and " + consumers.size() + " consumers subscribed. Starting dispatch.");
- }
- }
- }
- if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) {
- iteratingMutex.wait(timeBeforeDispatchStarts);
- if (LOG.isDebugEnabled()) {
- LOG.debug(timeBeforeDispatchStarts + " ms elapsed. Starting dispatch.");
- }
- }
- } catch (Exception e) {
- LOG.error(e);
- }
- }
-
- synchronized (messages) {
- pageInMoreMessages |= !messages.isEmpty();
- }
-
- // Kinda ugly.. but I think dispatchLock is the only mutex protecting the
- // pagedInPendingDispatch variable.
- synchronized(dispatchMutex) {
- pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
- }
-
- // Perhaps we should page always into the pagedInPendingDispatch list if
- // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
- // then we do a dispatch.
- if (pageInMoreMessages) {
- try {
- pageInMessages(false);
-
- } catch (Throwable e) {
- LOG.error("Failed to page in more queue messages ", e);
+ } catch (Exception e) {
+ LOG.warn("exception on dispatch to browser: " + rd.getBrowser(), e);
+ }
+ }
+
+ if (firstConsumer) {
+ firstConsumer = false;
+ try {
+ if (consumersBeforeDispatchStarts > 0) {
+ int timeout = 1000; // wait one second by default if consumer count isn't reached
+ if (timeBeforeDispatchStarts > 0) {
+ timeout = timeBeforeDispatchStarts;
+ }
+ if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(consumers.size() + " consumers subscribed. Starting dispatch.");
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(timeout + " ms elapsed and " + consumers.size() + " consumers subscribed. Starting dispatch.");
+ }
+ }
+ }
+ if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) {
+ iteratingMutex.wait(timeBeforeDispatchStarts);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(timeBeforeDispatchStarts + " ms elapsed. Starting dispatch.");
+ }
+ }
+ } catch (Exception e) {
+ LOG.error(e);
+ }
+ }
+
+ synchronized (messages) {
+ pageInMoreMessages |= !messages.isEmpty();
+ }
+
+ // Kinda ugly.. but I think dispatchLock is the only mutex protecting the
+ // pagedInPendingDispatch variable.
+ synchronized (dispatchMutex) {
+ pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
+ }
+
+ // Perhaps we should page always into the pagedInPendingDispatch list if
+ // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
+ // then we do a dispatch.
+ if (pageInMoreMessages) {
+ try {
+ pageInMessages(false);
+
+ } catch (Throwable e) {
+ LOG.error("Failed to page in more queue messages ", e);
}
- }
- return pendingWakeups.decrementAndGet() > 0;
+ }
+ return pendingWakeups.decrementAndGet() > 0;
}
}
@@ -1188,8 +1158,9 @@
public boolean evaluate(ConnectionContext context, MessageReference r) {
return messageId.equals(r.getMessageId().toString());
}
+
public String toString() {
- return "MessageIdFilter: "+messageId;
+ return "MessageIdFilter: " + messageId;
}
};
}
@@ -1213,22 +1184,22 @@
protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException {
removeMessage(c, null, r);
- synchronized(dispatchMutex) {
+ synchronized (dispatchMutex) {
synchronized (pagedInPendingDispatch) {
pagedInPendingDispatch.remove(r);
}
}
}
-
- protected void removeMessage(ConnectionContext c, Subscription subs,QueueMessageReference r) throws IOException {
+
+ protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException {
MessageAck ack = new MessageAck();
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
ack.setDestination(destination);
ack.setMessageID(r.getMessageId());
removeMessage(c, subs, r, ack);
}
-
- protected void removeMessage(ConnectionContext context,Subscription sub,final QueueMessageReference reference,MessageAck ack) throws IOException {
+
+ protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference, MessageAck ack) throws IOException {
reference.setAcked(true);
// This sends the ack the the journal..
if (!ack.isInTransaction()) {
@@ -1240,13 +1211,13 @@
acknowledge(context, sub, ack, reference);
} finally {
context.getTransaction().addSynchronization(new Synchronization() {
-
+
public void afterCommit() throws Exception {
getDestinationStatistics().getDequeues().increment();
dropMessage(reference);
wakeup();
}
-
+
public void afterRollback() throws Exception {
reference.setAcked(false);
}
@@ -1255,38 +1226,38 @@
}
if (ack.isPoisonAck()) {
// message gone to DLQ, is ok to allow redelivery
- synchronized(messages) {
+ synchronized (messages) {
messages.rollback(reference.getMessageId());
}
}
}
-
+
private void dropMessage(QueueMessageReference reference) {
reference.drop();
destinationStatistics.getMessages().decrement();
- synchronized(pagedInMessages) {
+ synchronized (pagedInMessages) {
pagedInMessages.remove(reference.getMessageId());
}
}
-
- public void messageExpired(ConnectionContext context,MessageReference reference) {
- messageExpired(context,null,reference);
+
+ public void messageExpired(ConnectionContext context, MessageReference reference) {
+ messageExpired(context, null, reference);
}
-
- public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) {
+
+ public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
if (LOG.isDebugEnabled()) {
LOG.debug("message expired: " + reference);
}
broker.messageExpired(context, reference);
destinationStatistics.getExpired().increment();
try {
- removeMessage(context,subs,(QueueMessageReference)reference);
+ removeMessage(context, subs, (QueueMessageReference) reference);
} catch (IOException e) {
- LOG.error("Failed to remove expired Message from the store ",e);
+ LOG.error("Failed to remove expired Message from the store ", e);
}
}
-
+
protected ConnectionContext createConnectionContext() {
ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
answer.setBroker(this.broker);
@@ -1296,17 +1267,18 @@
final void sendMessage(final ConnectionContext context, Message msg) throws Exception {
if (!msg.isPersistent() && messages.getSystemUsage() != null) {
- if (systemUsage.getTempUsage().isFull()) {
- final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." +
- " See http://activemq.apache.org/producer-flow-control.html for more info";
- LOG.info(logMessage);
+ if (systemUsage.getTempUsage().isFull()) {
+ final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ + " See http://activemq.apache.org/producer-flow-control.html for more info";
if (systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException(logMessage);
}
+
+ waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
}
- messages.getSystemUsage().getTempUsage().waitForSpace();
+
}
- synchronized(messages) {
+ synchronized (messages) {
messages.addMessageLast(msg);
}
destinationStatistics.getEnqueues().increment();
@@ -1319,7 +1291,7 @@
}
wakeup();
}
-
+
public void wakeup() {
if (optimizedDispatch || isSlave()) {
iterate();
@@ -1332,12 +1304,12 @@
private void asyncWakeup() {
try {
pendingWakeups.incrementAndGet();
- this.taskRunner.wakeup();
+ this.taskRunner.wakeup();
} catch (InterruptedException e) {
LOG.warn("Async task tunner failed to wakeup ", e);
}
}
-
+
private boolean isSlave() {
return broker.getBrokerService().isSlave();
}
@@ -1345,14 +1317,13 @@
private List<QueueMessageReference> doPageIn(boolean force) throws Exception {
List<QueueMessageReference> result = null;
List<QueueMessageReference> resultList = null;
- synchronized(dispatchMutex) {
+ synchronized (dispatchMutex) {
int toPageIn = Math.min(getMaxPageSize(), messages.size());
if (LOG.isDebugEnabled()) {
- LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: "
- + destinationStatistics.getInflight().getCount()
- + ", pagedInMessages.size " + pagedInMessages.size());
+ LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: " + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
+ + pagedInMessages.size());
}
-
+
if (isLazyDispatch() && !force) {
// Only page in the minimum number of messages which can be dispatched immediately.
toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
@@ -1376,7 +1347,7 @@
} else {
result.add(ref);
count++;
- }
+ }
}
} finally {
messages.release();
@@ -1385,7 +1356,7 @@
// Only add new messages, not already pagedIn to avoid multiple dispatch attempts
synchronized (pagedInMessages) {
resultList = new ArrayList<QueueMessageReference>(result.size());
- for(QueueMessageReference ref : result) {
+ for (QueueMessageReference ref : result) {
if (!pagedInMessages.containsKey(ref.getMessageId())) {
pagedInMessages.put(ref.getMessageId(), ref);
resultList.add(ref);
@@ -1402,8 +1373,8 @@
private void doDispatch(List<QueueMessageReference> list) throws Exception {
boolean doWakeUp = false;
- synchronized(dispatchMutex) {
-
+ synchronized (dispatchMutex) {
+
synchronized (pagedInPendingDispatch) {
if (!pagedInPendingDispatch.isEmpty()) {
// Try to first dispatch anything that had not been
@@ -1425,20 +1396,20 @@
}
}
}
- }
+ }
if (doWakeUp) {
// avoid lock order contention
asyncWakeup();
}
}
-
+
/**
* @return list of messages that could get dispatched to consumers if they
* were not full.
*/
private List<QueueMessageReference> doActualDispatch(List<QueueMessageReference> list) throws Exception {
List<Subscription> consumers;
-
+
synchronized (this.consumers) {
if (this.consumers.isEmpty() || isSlave()) {
// slave dispatch happens in processDispatchNotification
@@ -1449,15 +1420,15 @@
List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size());
Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
-
+
for (MessageReference node : list) {
Subscription target = null;
- int interestCount=0;
+ int interestCount = 0;
for (Subscription s : consumers) {
- if (s instanceof QueueBrowserSubscription) {
- interestCount++;
- continue;
- }
+ if (s instanceof QueueBrowserSubscription) {
+ interestCount++;
+ continue;
+ }
if (dispatchSelector.canSelect(s, node)) {
if (!fullConsumers.contains(s)) {
if (!s.isFull()) {
@@ -1472,23 +1443,22 @@
}
interestCount++;
} else {
- // makes sure it gets dispatched again
- if (!node.isDropped() && !((QueueMessageReference)node).isAcked() && (!node.isDropped() || s.getConsumerInfo().isBrowser())) {
- interestCount++;
- }
+ // makes sure it gets dispatched again
+ if (!node.isDropped() && !((QueueMessageReference) node).isAcked() && (!node.isDropped() || s.getConsumerInfo().isBrowser())) {
+ interestCount++;
+ }
}
}
-
- if ((target == null && interestCount>0) || consumers.size() == 0) {
+
+ if ((target == null && interestCount > 0) || consumers.size() == 0) {
// This means all subs were full or that there are no consumers...
- rc.add((QueueMessageReference)node);
+ rc.add((QueueMessageReference) node);
}
// If it got dispatched, rotate the consumer list to get round robin distribution.
- if (target != null && !strictOrderDispatch && consumers.size() > 1 &&
- !dispatchSelector.isExclusiveConsumer(target)) {
+ if (target != null && !strictOrderDispatch && consumers.size() > 1 && !dispatchSelector.isExclusiveConsumer(target)) {
synchronized (this.consumers) {
- if( removeFromConsumerList(target) ) {
+ if (removeFromConsumerList(target)) {
addToConsumerList(target);
consumers = new ArrayList<Subscription>(this.consumers);
}
@@ -1496,15 +1466,13 @@
}
}
-
return rc;
}
-
protected void pageInMessages(boolean force) throws Exception {
- doDispatch(doPageIn(force));
+ doDispatch(doPageIn(force));
}
-
+
private void addToConsumerList(Subscription sub) {
if (useConsumerPriority) {
consumers.add(sub);
@@ -1513,42 +1481,43 @@
consumers.add(sub);
}
}
-
+
private boolean removeFromConsumerList(Subscription sub) {
return consumers.remove(sub);
}
-
+
private int getConsumerMessageCountBeforeFull() throws Exception {
int total = 0;
boolean zeroPrefetch = false;
synchronized (consumers) {
for (Subscription s : consumers) {
- zeroPrefetch |= s.getPrefetchSize() == 0;
- int countBeforeFull = s.countBeforeFull();
+ zeroPrefetch |= s.getPrefetchSize() == 0;
+ int countBeforeFull = s.countBeforeFull();
total += countBeforeFull;
}
}
- if (total==0 && zeroPrefetch){
- total=1;
+ if (total == 0 && zeroPrefetch) {
+ total = 1;
}
return total;
}
- /*
- * In slave mode, dispatch is ignored till we get this notification as the dispatch
- * process is non deterministic between master and slave.
- * On a notification, the actual dispatch to the subscription (as chosen by the master)
- * is completed.
- * (non-Javadoc)
- * @see org.apache.activemq.broker.region.BaseDestination#processDispatchNotification(org.apache.activemq.command.MessageDispatchNotification)
+ /*
+ * In slave mode, dispatch is ignored till we get this notification as the
+ * dispatch process is non deterministic between master and slave. On a
+ * notification, the actual dispatch to the subscription (as chosen by the
+ * master) is completed. (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.broker.region.BaseDestination#processDispatchNotification
+ * (org.apache.activemq.command.MessageDispatchNotification)
*/
- public void processDispatchNotification(
- MessageDispatchNotification messageDispatchNotification) throws Exception {
+ public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
// do dispatch
Subscription sub = getMatchingSubscription(messageDispatchNotification);
if (sub != null) {
MessageReference message = getMatchingMessage(messageDispatchNotification);
- sub.add(message);
+ sub.add(message);
sub.processMessageDispatchNotification(messageDispatchNotification);
}
}
@@ -1556,25 +1525,25 @@
private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification) throws Exception {
QueueMessageReference message = null;
MessageId messageId = messageDispatchNotification.getMessageId();
-
- synchronized(dispatchMutex) {
+
+ synchronized (dispatchMutex) {
synchronized (pagedInPendingDispatch) {
- for(QueueMessageReference ref : pagedInPendingDispatch) {
- if (messageId.equals(ref.getMessageId())) {
- message = ref;
- pagedInPendingDispatch.remove(ref);
- break;
- }
- }
+ for (QueueMessageReference ref : pagedInPendingDispatch) {
+ if (messageId.equals(ref.getMessageId())) {
+ message = ref;
+ pagedInPendingDispatch.remove(ref);
+ break;
+ }
+ }
}
-
+
if (message == null) {
synchronized (pagedInMessages) {
message = pagedInMessages.get(messageId);
}
}
-
- if (message == null) {
+
+ if (message == null) {
synchronized (messages) {
try {
messages.setMaxBatchSize(getMaxPageSize());
@@ -1593,28 +1562,25 @@
}
}
}
-
+
if (message == null) {
Message msg = loadMessage(messageId);
if (msg != null) {
message = this.createMessageReference(msg);
}
- }
-
- }
+ }
+
+ }
if (message == null) {
- throw new JMSException(
- "Slave broker out of sync with master - Message: "
- + messageDispatchNotification.getMessageId()
- + " on " + messageDispatchNotification.getDestination()
- + " does not exist among pending(" + pagedInPendingDispatch.size() + ") for subscription: "
- + messageDispatchNotification.getConsumerId());
+ throw new JMSException("Slave broker out of sync with master - Message: " + messageDispatchNotification.getMessageId() + " on " + messageDispatchNotification.getDestination()
+ + " does not exist among pending(" + pagedInPendingDispatch.size() + ") for subscription: " + messageDispatchNotification.getConsumerId());
}
return message;
}
/**
* Find a consumer that matches the id in the message dispatch notification
+ *
* @param messageDispatchNotification
* @return sub or null if the subscription has been removed before dispatch
* @throws JMSException
@@ -1637,4 +1603,20 @@
asyncWakeup();
}
}
+
+ private final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException {
+ long start = System.currentTimeMillis();
+ long nextWarn = start + blockedProducerWarningInterval;
+ while (!usage.waitForSpace(1000)) {
+ if (context.getStopping().get()) {
+ throw new IOException("Connection closed, send aborted.");
+ }
+
+ long now = System.currentTimeMillis();
+ if (now >= nextWarn) {
+ LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
+ nextWarn = now + blockedProducerWarningInterval;
+ }
+ }
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=884234&r1=884233&r2=884234&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Wed Nov 25 19:35:09 2009
@@ -41,6 +41,7 @@
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.thread.Valve;
import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.Usage;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -59,11 +60,11 @@
*
* @version $Revision: 1.21 $
*/
-public class Topic extends BaseDestination implements Task{
+public class Topic extends BaseDestination implements Task {
protected static final Log LOG = LogFactory.getLog(Topic.class);
private final TopicMessageStore topicStore;
protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
- protected final Valve dispatchValve = new Valve(true);
+ protected final Valve dispatchValve = new Valve(true);
private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
@@ -71,24 +72,22 @@
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() {
- try {
- Topic.this.taskRunner.wakeup();
- } catch (InterruptedException e) {
- }
+ try {
+ Topic.this.taskRunner.wakeup();
+ } catch (InterruptedException e) {
+ }
};
};
-
- public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, DestinationStatistics parentStats,
- TaskRunnerFactory taskFactory) throws Exception {
+ public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
super(brokerService, store, destination, parentStats);
- this.topicStore=store;
+ this.topicStore = store;
//set default subscription recovery policy
- subscriptionRecoveryPolicy= new NoSubscriptionRecoveryPolicy();
+ subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName());
}
-
- public void initialize() throws Exception{
+
+ public void initialize() throws Exception {
super.initialize();
if (store != null) {
int messageCount = store.getMessageCount();
@@ -140,7 +139,7 @@
}
} else {
sub.add(context, this);
- DurableTopicSubscription dsub = (DurableTopicSubscription)sub;
+ DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
}
}
@@ -171,7 +170,7 @@
// we are recovering a subscription to avoid out of order messages.
dispatchValve.turnOff();
try {
-
+
if (topicStore == null) {
return;
}
@@ -195,21 +194,20 @@
}
}
// Do we need to create the subscription?
- if(info==null){
- info=new SubscriptionInfo();
+ if (info == null) {
+ info = new SubscriptionInfo();
info.setClientId(clientId);
info.setSelector(selector);
info.setSubscriptionName(subscriptionName);
- info.setDestination(getActiveMQDestination());
+ info.setDestination(getActiveMQDestination());
// This destination is an actual destination id.
- info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
+ info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
// This destination might be a pattern
synchronized (consumers) {
consumers.add(subscription);
- topicStore.addSubsciption(info,subscription.getConsumerInfo().isRetroactive());
+ topicStore.addSubsciption(info, subscription.getConsumerInfo().isRetroactive());
}
}
-
final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
msgContext.setDestination(destination);
@@ -223,7 +221,7 @@
subscription.add(message);
}
} catch (IOException e) {
- LOG.error("Failed to recover this message " + message);
+ LOG.error("Failed to recover this message " + message);
}
return true;
}
@@ -235,7 +233,7 @@
public boolean hasSpace() {
return true;
}
-
+
public boolean isDuplicate(MessageId id) {
return false;
}
@@ -277,23 +275,24 @@
return;
}
- if(memoryUsage.isFull()) {
+ if (memoryUsage.isFull()) {
isFull(context, memoryUsage);
fastProducer(context, producerInfo);
-
+
if (isProducerFlowControl() && context.isProducerFlowControl()) {
-
- if(warnOnProducerFlowControl) {
+
+ if (warnOnProducerFlowControl) {
warnOnProducerFlowControl = false;
- LOG.info("Usage Manager memory limit reached for " +getActiveMQDestination().getQualifiedName() + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it." +
- " See http://activemq.apache.org/producer-flow-control.html for more info");
+ LOG.info("Usage Manager memory limit reached for " + getActiveMQDestination().getQualifiedName()
+ + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
+ + " See http://activemq.apache.org/producer-flow-control.html for more info");
}
-
+
if (systemUsage.isSendFailIfNoSpace()) {
- throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding " +getActiveMQDestination().getQualifiedName() + "." +
- " See http://activemq.apache.org/producer-flow-control.html for more info");
+ throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
+ + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
}
-
+
// We can avoid blocking due to low usage if the producer is sending
// a sync message or
// if it is using a producer window
@@ -301,9 +300,9 @@
synchronized (messagesWaitingForSpace) {
messagesWaitingForSpace.add(new Runnable() {
public void run() {
-
+
try {
-
+
// While waiting for space to free up... the
// message may have expired.
if (message.isExpired()) {
@@ -312,7 +311,7 @@
} else {
doMessageSend(producerExchange, message);
}
-
+
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
@@ -321,7 +320,7 @@
response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response);
}
-
+
} catch (Exception e) {
if (!sendProducerAck && !context.isInRecoveryMode()) {
ExceptionResponse response = new ExceptionResponse(e);
@@ -329,10 +328,10 @@
context.getConnection().dispatchAsync(response);
}
}
-
+
}
});
-
+
// If the user manager is not full, then the task will not
// get called..
if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
@@ -342,24 +341,32 @@
context.setDontSendReponse(true);
return;
}
-
+
} else {
-
// Producer flow control cannot be used, so we have do the flow
// control at the broker
// by blocking this thread until there is space available.
- int count = 0;
- while (!memoryUsage.waitForSpace(1000)) {
- if (context.getStopping().get()) {
- throw new IOException("Connection closed, send aborted.");
- }
- if (count > 2 && context.isInTransaction()) {
- count =0;
- int size = context.getTransaction().size();
- LOG.warn("Waiting for space to send transacted message - transaction elements = " + size + " need more space to commit. Message = " + message);
+
+ if (memoryUsage.isFull()) {
+ if (context.isInTransaction()) {
+
+ int count = 0;
+ while (!memoryUsage.waitForSpace(1000)) {
+ if (context.getStopping().get()) {
+ throw new IOException("Connection closed, send aborted.");
+ }
+ if (count > 2 && context.isInTransaction()) {
+ count = 0;
+ int size = context.getTransaction().size();
+ LOG.warn("Waiting for space to send transacted message - transaction elements = " + size + " need more space to commit. Message = " + message);
+ }
+ }
+ } else {
+ waitForSpace(context, memoryUsage, "Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
+ + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
}
}
-
+
// The usage manager could have delayed us by the time
// we unblock the message could have expired..
if (message.isExpired()) {
@@ -382,35 +389,28 @@
}
/**
- * do send the message - this needs to be synchronized to ensure messages are stored AND dispatched in
- * the right order
+ * do send the message - this needs to be synchronized to ensure messages
+ * are stored AND dispatched in the right order
+ *
* @param producerExchange
* @param message
* @throws IOException
* @throws Exception
*/
- synchronized void doMessageSend(
- final ProducerBrokerExchange producerExchange, final Message message)
- throws IOException, Exception {
- final ConnectionContext context = producerExchange
- .getConnectionContext();
+ synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
+ final ConnectionContext context = producerExchange.getConnectionContext();
message.setRegionDestination(this);
message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
- if (topicStore != null && message.isPersistent()
- && !canOptimizeOutPersistence()) {
+ if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
if (systemUsage.getStoreUsage().isFull()) {
- final String logMessage = "Usage Manager Store is Full. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "." +
- " See http://activemq.apache.org/producer-flow-control.html for more info";
- LOG.info(logMessage);
+ final String logMessage = "Usage Manager Store is Full. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ + " See http://activemq.apache.org/producer-flow-control.html for more info";
if (systemUsage.isSendFailIfNoSpace()) {
- throw new javax.jms.ResourceAllocationException(logMessage);
- }
- }
- while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
- if (context.getStopping().get()) {
- throw new IOException("Connection closed, send aborted.");
+ throw new javax.jms.ResourceAllocationException(logMessage);
}
+
+ waitForSpace(context, systemUsage.getStoreUsage(), logMessage);
}
topicStore.addMessage(context, message);
}
@@ -457,14 +457,13 @@
public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException {
if (topicStore != null && node.isPersistent()) {
- DurableTopicSubscription dsub = (DurableTopicSubscription)sub;
+ DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
SubscriptionKey key = dsub.getSubscriptionKey();
topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId());
}
messageConsumed(context, node);
}
-
public void gc() {
}
@@ -488,7 +487,7 @@
if (memoryUsage != null) {
memoryUsage.stop();
}
- if(this.topicStore != null) {
+ if (this.topicStore != null) {
this.topicStore.stop();
}
}
@@ -510,7 +509,7 @@
public boolean hasSpace() {
return true;
}
-
+
public boolean isDuplicate(MessageId id) {
return false;
}
@@ -527,9 +526,9 @@
}
return result.toArray(new Message[result.size()]);
}
-
+
public boolean iterate() {
- synchronized(messagesWaitingForSpace) {
+ synchronized (messagesWaitingForSpace) {
while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
Runnable op = messagesWaitingForSpace.removeFirst();
op.run();
@@ -538,12 +537,9 @@
return false;
}
-
// Properties
// -------------------------------------------------------------------------
-
-
public DispatchPolicy getDispatchPolicy() {
return dispatchPolicy;
}
@@ -560,17 +556,16 @@
this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
}
-
// Implementation methods
// -------------------------------------------------------------------------
-
+
public final void wakeup() {
}
-
+
protected void dispatch(final ConnectionContext context, Message message) throws Exception {
destinationStatistics.getMessages().increment();
destinationStatistics.getEnqueues().increment();
- dispatchValve.increment();
+ dispatchValve.increment();
MessageEvaluationContext msgContext = null;
try {
if (!subscriptionRecoveryPolicy.add(context, message)) {
@@ -587,17 +582,17 @@
msgContext.setMessageReference(message);
if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
onMessageWithNoConsumers(context, message);
- }
-
+ }
+
} finally {
dispatchValve.decrement();
- if(msgContext != null) {
+ if (msgContext != null) {
msgContext.clear();
}
}
}
-
- public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) {
+
+ public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
broker.messageExpired(context, reference);
destinationStatistics.getMessages().decrement();
destinationStatistics.getEnqueues().decrement();
@@ -609,9 +604,24 @@
try {
acknowledge(context, subs, ack, reference);
} catch (IOException e) {
- LOG.error("Failed to remove expired Message from the store ",e);
+ LOG.error("Failed to remove expired Message from the store ", e);
}
}
+ private final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException {
+ long start = System.currentTimeMillis();
+ long nextWarn = start + blockedProducerWarningInterval;
+ while (!usage.waitForSpace(1000)) {
+ if (context.getStopping().get()) {
+ throw new IOException("Connection closed, send aborted.");
+ }
+
+ long now = System.currentTimeMillis();
+ if (now >= nextWarn) {
+ LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
+ nextWarn = now + blockedProducerWarningInterval;
+ }
+ }
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=884234&r1=884233&r2=884234&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Wed Nov 25 19:35:09 2009
@@ -60,6 +60,7 @@
private int maxQueueAuditDepth=2048;
private boolean enableAudit=true;
private boolean producerFlowControl = true;
+ private long blockedProducerWarningInterval = Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL;
private boolean optimizedDispatch=false;
private int maxPageSize=BaseDestination.MAX_PAGE_SIZE;
private int maxBrowsePageSize=BaseDestination.MAX_BROWSE_PAGE_SIZE;
@@ -125,6 +126,7 @@
public void baseConfiguration(BaseDestination destination) {
destination.setProducerFlowControl(isProducerFlowControl());
+ destination.setBlockedProducerWarningInterval(getBlockedProducerWarningInterval());
destination.setEnableAudit(isEnableAudit());
destination.setMaxAuditDepth(getMaxQueueAuditDepth());
destination.setMaxProducersToAudit(getMaxProducersToAudit());
@@ -373,6 +375,27 @@
}
/**
+ * Set's the interval at which warnings about producers being blocked by
+ * resource usage will be triggered. Values of 0 or less will disable
+ * warnings
+ *
+ * @param blockedProducerWarningInterval the interval at which warning about
+ * blocked producers will be triggered.
+ */
+ public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
+ this.blockedProducerWarningInterval = blockedProducerWarningInterval;
+ }
+
+ /**
+ *
+ * @return the interval at which warning about blocked producers will be
+ * triggered.
+ */
+ public long getBlockedProducerWarningInterval() {
+ return blockedProducerWarningInterval;
+ }
+
+ /**
* @return the maxProducersToAudit
*/
public int getMaxProducersToAudit() {