You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/05/14 15:41:15 UTC
svn commit: r774764 [1/2] - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/openwire/
main/java/org/apache/activemq/broker/stomp/ main/java/org/apache/act...
Author: chirino
Date: Thu May 14 13:41:14 2009
New Revision: 774764
URL: http://svn.apache.org/viewvc?rev=774764&view=rev
Log:
Applying https://issues.apache.org/activemq/browse/AMQ-2251 patch.. Thanks Colin!
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistencePolicy.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java Thu May 14 13:41:14 2009
@@ -34,10 +34,10 @@
protected String name;
private int priorityLevels;
- protected int outputWindowSize = 1000;
- protected int outputResumeThreshold = 900;
- protected int inputWindowSize = 1000;
- protected int inputResumeThreshold = 500;
+ protected int outputWindowSize = 1024 * 1024;
+ protected int outputResumeThreshold = 900 * 1024;
+ protected int inputWindowSize = 1024 * 1024;
+ protected int inputResumeThreshold = 512 * 1024;
protected boolean useAsyncWriteThread = true;
private IDispatcher dispatcher;
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerMessageDelivery.java Thu May 14 13:41:14 2009
@@ -17,6 +17,8 @@
package org.apache.activemq.broker;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Set;
import java.util.Map.Entry;
@@ -41,6 +43,7 @@
// List of persistent targets for which the message should be saved
// when dispatch is complete:
HashMap<QueueStore.QueueDescriptor, SaveableQueueElement<MessageDelivery>> persistentTargets;
+ SaveableQueueElement<MessageDelivery> singleTarget;
long storeTracking = -1;
BrokerDatabase store;
@@ -74,7 +77,7 @@
return fromStore;
}
- public final void persist(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable){
+ public final void persist(SaveableQueueElement<MessageDelivery> elem, ISourceController<?> controller, boolean delayable) {
synchronized (this) {
// Can flush of this message to the store be delayed?
if (enableFlushDelay && !delayable) {
@@ -84,16 +87,13 @@
// list of queues for which to save the message when dispatch is
// finished:
if (dispatching) {
- if (persistentTargets == null) {
- persistentTargets = new HashMap<QueueStore.QueueDescriptor, SaveableQueueElement<MessageDelivery>>();
- }
- persistentTargets.put(elem.getQueueDescriptor(), elem);
+ addPersistentTarget(elem);
return;
}
// Otherwise, if it is still in the saver queue, we can add this
// queue to the queue list:
else if (pendingSave != null) {
- persistentTargets.put(elem.getQueueDescriptor(), elem);
+ addPersistentTarget(elem);
if (!delayable) {
pendingSave.requestFlush();
}
@@ -112,15 +112,14 @@
// then we don't need to issue a delete:
if (dispatching || pendingSave != null) {
- // Remove the queue:
- persistentTargets.remove(queue);
deleted = true;
+ removePersistentTarget(queue);
// We get a save context when we place the message in the
// database queue. If it has been added to the queue,
// and we've removed the last queue, see if we can cancel
// the save:
- if (pendingSave != null && persistentTargets.isEmpty()) {
+ if (pendingSave != null && !hasPersistentTargets()) {
if (pendingSave.cancel()) {
pendingSave = null;
if (isPersistent()) {
@@ -153,8 +152,15 @@
return storeTracking;
}
- public Set<Entry<QueueDescriptor, SaveableQueueElement<MessageDelivery>>> getPersistentQueues() {
- return persistentTargets.entrySet();
+ public synchronized Collection<SaveableQueueElement<MessageDelivery>> getPersistentQueues() {
+ if (singleTarget != null) {
+ ArrayList<SaveableQueueElement<MessageDelivery>> list = new ArrayList<SaveableQueueElement<MessageDelivery>>(1);
+ list.add(singleTarget);
+ return list;
+ } else if (persistentTargets != null) {
+ return persistentTargets.values();
+ }
+ return null;
}
public void beginStore() {
@@ -163,6 +169,40 @@
}
}
+ private final boolean hasPersistentTargets() {
+ return (persistentTargets != null && !persistentTargets.isEmpty()) || singleTarget != null;
+ }
+
+ private final void removePersistentTarget(QueueDescriptor queue) {
+ if (persistentTargets != null) {
+ persistentTargets.remove(queue);
+ return;
+ }
+
+ if (singleTarget != null && singleTarget.getQueueDescriptor().equals(queue)) {
+ singleTarget = null;
+ }
+ }
+
+ private final void addPersistentTarget(SaveableQueueElement<MessageDelivery> elem) {
+ if (persistentTargets != null) {
+ persistentTargets.put(elem.getQueueDescriptor(), elem);
+ return;
+ }
+
+ if (singleTarget == null) {
+ singleTarget = elem;
+ return;
+ }
+
+ if (elem.getQueueDescriptor() != singleTarget.getQueueDescriptor()) {
+ persistentTargets = new HashMap<QueueStore.QueueDescriptor, SaveableQueueElement<MessageDelivery>>();
+ persistentTargets.put(elem.getQueueDescriptor(), elem);
+ persistentTargets.put(singleTarget.getQueueDescriptor(), singleTarget);
+ singleTarget = null;
+ }
+ }
+
public void finishDispatch(ISourceController<?> controller) throws IOException {
boolean firePersistListener = false;
synchronized (this) {
@@ -170,7 +210,7 @@
// Note that this could be the case even if the message isn't
// persistent if a target requested that the message be spooled
// for some other reason such as queue memory overflow.
- if (persistentTargets != null && !persistentTargets.isEmpty()) {
+ if (hasPersistentTargets()) {
pendingSave = store.persistReceivedMessage(this, controller);
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/BrokerQueueStore.java Thu May 14 13:41:14 2009
@@ -58,6 +58,12 @@
// subscriberQueues = new HashMap<String, IFlowQueue<MessageDelivery>>();
private Mapper<Integer, MessageDelivery> partitionMapper;
+
+ private static final int DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD = 100 * 1024 * 1;;
+ private static final int DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD = 1;
+ // Be default we don't page out elements to disk.
+ private static final int DEFAULT_SHARED_QUEUE_SIZE = DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
+ //private static final int DEFAULT_SHARED_QUEUE_SIZE = 1024 * 1024 * 10;
private static final Mapper<Long, MessageDelivery> EXPIRATION_MAPPER = new Mapper<Long, MessageDelivery>() {
public Long map(MessageDelivery element) {
@@ -65,17 +71,41 @@
}
};
+ private static final Mapper<Integer, MessageDelivery> SIZE_MAPPER = new Mapper<Integer, MessageDelivery>() {
+ public Integer map(MessageDelivery element) {
+ return element.getFlowLimiterSize();
+ }
+ };
+
private static final PersistencePolicy<MessageDelivery> SHARED_QUEUE_PERSISTENCE_POLICY = new PersistencePolicy<MessageDelivery>() {
public boolean isPersistent(MessageDelivery elem) {
return elem.isPersistent();
}
public boolean isPageOutPlaceHolders() {
- return false;
+ return true;
}
public boolean isPagingEnabled() {
- return false;
+ return DEFAULT_SHARED_QUEUE_SIZE > DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
+ }
+
+ public int getPagingInMemorySize() {
+ return DEFAULT_SHARED_QUEUE_PAGING_THRESHOLD;
+ }
+
+ public int getDisconnectedThrottleRate() {
+ // By default don't throttle consumers when disconnected.
+ return 0;
+ }
+
+ public boolean isThrottleSourcesToMemoryLimit() {
+ // Keep the queue in memory.
+ return true;
+ }
+
+ public int getRecoveryBias() {
+ return 8;
}
};
@@ -194,8 +224,9 @@
break;
}
case QueueDescriptor.SHARED_PRIORITY: {
- PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(100, 1, MessageBroker.MAX_PRIORITY);
+ PrioritySizeLimiter<MessageDelivery> limiter = new PrioritySizeLimiter<MessageDelivery>(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD, MessageBroker.MAX_PRIORITY);
limiter.setPriorityMapper(PRIORITY_MAPPER);
+ limiter.setSizeMapper(SIZE_MAPPER);
SharedPriorityQueue<Long, MessageDelivery> queue = new SharedPriorityQueue<Long, MessageDelivery>(name, limiter);
ret = queue;
queue.setKeyMapper(KEY_MAPPER);
@@ -203,7 +234,13 @@
break;
}
case QueueDescriptor.SHARED: {
- SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(100, 1);
+ SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(DEFAULT_SHARED_QUEUE_SIZE, DEFAULT_SHARED_QUEUE_RESUME_THRESHOLD){
+ @Override
+ public int getElementSize(MessageDelivery elem) {
+ return elem.getFlowLimiterSize();
+ }
+ };
+
if (!USE_OLD_QUEUE) {
SharedQueue<Long, MessageDelivery> sQueue = new SharedQueue<Long, MessageDelivery>(name, limiter);
sQueue.setKeyMapper(KEY_MAPPER);
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Thu May 14 13:41:14 2009
@@ -59,8 +59,7 @@
}
public int getMemorySize() {
- //return size;
- return 1;
+ return size;
}
public int getPriority() {
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Thu May 14 13:41:14 2009
@@ -442,7 +442,7 @@
Flow flow = new Flow("broker-" + name + "-outbound", false);
limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(), info.getPrefetchSize() / 2) {
public int getElementSize(MessageDelivery m) {
- return 1;
+ return m.getFlowLimiterSize();
}
};
queue = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(), limiter);
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Thu May 14 13:41:14 2009
@@ -310,7 +310,7 @@
Flow flow = new Flow("broker-" + subscriptionId + "-outbound", false);
limiter = new WindowLimiter<MessageDelivery>(true, flow, 1000, 500) {
public int getElementSize(MessageDelivery m) {
- return 1;
+ return m.getFlowLimiterSize();
}
};
queue = new SingleFlowRelay<MessageDelivery>(flow, flow.getFlowName(), limiter);
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Thu May 14 13:41:14 2009
@@ -22,8 +22,6 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
-import java.util.Set;
-import java.util.Map.Entry;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -64,6 +62,7 @@
private final SizeLimiter<OperationBase> storeLimiter;
private final FlowController<OperationBase> storeController;
+ private final int FLUSH_QUEUE_SIZE = 10000 * 1024;
private final IDispatcher dispatcher;
private Thread flushThread;
@@ -82,7 +81,7 @@
// num scheduled for delay
private long delayedFlushPointer = 0; // The last delayable sequence num
// requested.
- private final long FLUSH_DELAY_MS = 5;
+ private final long FLUSH_DELAY_MS = 10;
private final Runnable flushDelayCallback;
public interface DatabaseListener {
@@ -99,7 +98,7 @@
this.store = store;
this.dispatcher = dispatcher;
this.opQueue = new LinkedNodeList<OperationBase>();
- storeLimiter = new SizeLimiter<OperationBase>(10000, 5000) {
+ storeLimiter = new SizeLimiter<OperationBase>(FLUSH_QUEUE_SIZE, 0) {
@Override
public int getElementSize(OperationBase op) {
@@ -357,16 +356,17 @@
// If we procecessed some ops, flush and post process:
if (!processedQueue.isEmpty()) {
-
- //System.out.println("Flushing queue after processing: " + processedQueue.size() + " - " + processedQueue);
- //Sync the store:
+
+ if (DEBUG)
+ System.out.println("Flushing queue after processing: " + processedQueue.size() + " - " + processedQueue);
+ // Sync the store:
store.flush();
-
// Post process operations
long release = 0;
for (Operation processed : processedQueue) {
processed.onCommit();
+ // System.out.println("Processed" + processed);
release += processed.getLimiterSize();
}
@@ -619,6 +619,8 @@
final protected AtomicBoolean cancelled = new AtomicBoolean(false);
final protected AtomicBoolean executed = new AtomicBoolean(false);
+ public static final int BASE_MEM_SIZE = 20;
+
public boolean cancel() {
if (executePending.compareAndSet(true, false)) {
cancelled.set(true);
@@ -680,7 +682,7 @@
abstract protected void doExcecute(Session session);
public int getLimiterSize() {
- return 0;
+ return BASE_MEM_SIZE;
}
public boolean isDelayable() {
@@ -722,12 +724,6 @@
}
@Override
- public int getLimiterSize() {
- // Might consider bumping this up to avoid too much accumulation?
- return 0;
- }
-
- @Override
protected void doExcecute(Session session) {
try {
session.queueAdd(qd);
@@ -755,12 +751,6 @@
}
@Override
- public int getLimiterSize() {
- // Might consider bumping this up to avoid too much accumulation?
- return 0;
- }
-
- @Override
protected void doExcecute(Session session) {
session.queueRemove(qd);
}
@@ -787,7 +777,7 @@
@Override
public int getLimiterSize() {
// Might consider bumping this up to avoid too much accumulation?
- return 1;
+ return BASE_MEM_SIZE + 8;
}
@Override
@@ -832,6 +822,11 @@
}
@Override
+ public int getLimiterSize() {
+ return BASE_MEM_SIZE + 44;
+ }
+
+ @Override
protected void doExcecute(Session session) {
Iterator<QueueRecord> records = null;
@@ -942,7 +937,7 @@
@Override
public int getLimiterSize() {
- return delivery.getFlowLimiterSize();
+ return delivery.getFlowLimiterSize() + BASE_MEM_SIZE + 40;
}
@Override
@@ -950,9 +945,9 @@
if (singleElement == null) {
brokerDelivery.beginStore();
- Set<Entry<QueueDescriptor, SaveableQueueElement<MessageDelivery>>> targets = brokerDelivery.getPersistentQueues();
+ Collection<SaveableQueueElement<MessageDelivery>> targets = brokerDelivery.getPersistentQueues();
- if (!targets.isEmpty()) {
+ if (targets != null && !targets.isEmpty()) {
if (record == null) {
record = brokerDelivery.createMessageRecord();
if (record == null) {
@@ -962,24 +957,24 @@
record.setKey(brokerDelivery.getStoreTracking());
session.messageAdd(record);
- for (Entry<QueueDescriptor, SaveableQueueElement<MessageDelivery>> target : targets) {
+ for (SaveableQueueElement<MessageDelivery> target : targets) {
try {
QueueRecord queueRecord = new QueueRecord();
queueRecord.setAttachment(null);
queueRecord.setMessageKey(record.getKey());
queueRecord.setSize(brokerDelivery.getFlowLimiterSize());
- queueRecord.setQueueKey(target.getValue().getSequenceNumber());
- session.queueAddMessage(target.getKey(), queueRecord);
+ queueRecord.setQueueKey(target.getSequenceNumber());
+ session.queueAddMessage(target.getQueueDescriptor(), queueRecord);
} catch (KeyNotFoundException e) {
e.printStackTrace();
}
- if (target.getValue().requestSaveNotify()) {
+ if (target.requestSaveNotify()) {
if (notifyTargets == null) {
notifyTargets = new LinkedList<SaveableQueueElement<MessageDelivery>>();
}
- notifyTargets.add(target.getValue());
+ notifyTargets.add(target);
}
}
} else {
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/AbstractLimiter.java Thu May 14 13:41:14 2009
@@ -26,7 +26,7 @@
public AbstractLimiter() {
}
- public final void addUnThrottleListener(UnThrottleListener l) {
+ public void addUnThrottleListener(UnThrottleListener l) {
throttleListeners.add(l);
if (!resuming && !getThrottled()) {
@@ -34,7 +34,7 @@
}
}
- public final void notifyUnThrottleListeners() {
+ protected final void notifyUnThrottleListeners() {
resuming = true;
while (!getThrottled() && !throttleListeners.isEmpty()) {
UnThrottleListener l = throttleListeners.remove();
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java Thu May 14 13:41:14 2009
@@ -221,11 +221,6 @@
public void add(E elem, ISourceController<?> sourceController) {
boolean ok = false;
synchronized (mutex) {
- // If we don't have an fc sink, then just increment the limiter.
- if (controllable == null) {
- limiter.add(elem);
- return;
- }
if (okToAdd(elem)) {
ok = true;
if (limiter.add(elem)) {
@@ -241,7 +236,7 @@
}
}
}
- if (ok) {
+ if (ok && controllable != null) {
controllable.flowElemAccepted(this, elem);
}
}
@@ -259,12 +254,6 @@
public boolean offer(E elem, ISourceController<?> sourceController) {
boolean ok = false;
synchronized (mutex) {
- // If we don't have an fc sink, then just increment the limiter.
- if (controllable == null) {
- limiter.add(elem);
- return true;
- }
-
if (okToAdd(elem)) {
if (limiter.add(elem)) {
blockSource(sourceController);
@@ -274,7 +263,7 @@
blockSource(sourceController);
}
}
- if (ok) {
+ if (ok && controllable != null) {
controllable.flowElemAccepted(this, elem);
}
return ok;
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/AbstractFlowQueue.java Thu May 14 13:41:14 2009
@@ -40,7 +40,6 @@
protected DispatchContext dispatchContext;
protected final Collection<IPollableFlowSource.FlowReadyListener<E>> readyListeners = new ArrayList<IPollableFlowSource.FlowReadyListener<E>>();
private boolean notifyReady = false;
- protected boolean dispatching = false;
protected int dispatchPriority = 0;
protected FlowQueueListener listener = new FlowQueueListener()
{
@@ -62,11 +61,11 @@
this.listener = listener;
}
- public final void add(E elem, ISourceController<?> source) {
+ public void add(E elem, ISourceController<?> source) {
getSinkController(elem, source).add(elem, source);
}
- public final boolean offer(E elem, ISourceController<?> source) {
+ public boolean offer(E elem, ISourceController<?> source) {
return getSinkController(elem, source).offer(elem, source);
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java Thu May 14 13:41:14 2009
@@ -27,6 +27,7 @@
public class ExclusiveQueue<E> extends AbstractFlowQueue<E> {
private final LinkedList<E> queue = new LinkedList<E>();
private final FlowController<E> controller;
+ private boolean started = true;
/**
* Creates a flow queue that can handle multiple flows.
@@ -42,7 +43,6 @@
super.onFlowOpened(controller);
}
-
protected final ISinkController<E> getSinkController(E elem, ISourceController<?> source) {
return controller;
}
@@ -52,7 +52,17 @@
*/
public synchronized void flowElemAccepted(ISourceController<E> controller, E elem) {
queue.add(elem);
- notifyReady();
+ if (started) {
+ notifyReady();
+ }
+ }
+
+ public synchronized void start() {
+ started = true;
+ }
+
+ public synchronized void stop() {
+ started = false;
}
public FlowController<E> getFlowController(Flow flow) {
@@ -60,7 +70,7 @@
}
public final boolean isDispatchReady() {
- return !queue.isEmpty();
+ return !queue.isEmpty() && started;
}
public final boolean pollingDispatch() {
@@ -76,6 +86,10 @@
public final E poll() {
synchronized (this) {
+ if (!started) {
+ return null;
+ }
+
E elem = queue.poll();
// FIXME the release should really be done after dispatch.
// doing it here saves us from having to resynchronize
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistencePolicy.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistencePolicy.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistencePolicy.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/PersistencePolicy.java Thu May 14 13:41:14 2009
@@ -26,20 +26,22 @@
* @return True if the element must be persisted on enqueue.
*/
public boolean isPersistent(E elem);
-
+
/**
- * Indicated whether or not paging is enabled for this queue.
- * When not enabled elements are kept in memory until they are removed. When
- * enabled
- * @return Whether or not paging is enabled for the queue.
+ * Indicated whether or not paging is enabled for this queue. When not
+ * enabled elements are kept in memory until they are removed. When enabled,
+ * elements are paged to persistent storage when
+ * {@link #getPagingInMemorySize()} is exceeded();
+ *
+ * @return Whether or not paging is enabled for the queue.
*/
public boolean isPagingEnabled();
-
+
/**
- * When paging is enabled this specifies whether the queue can keep
- * a place holder for paged out elements in memory. Keeping place holders
- * in memory improves performance, but for very large queues keeping
- * place holders can cause a significant overhead.
+ * When paging is enabled this specifies whether the queue can keep a place
+ * holder for paged out elements in memory. Keeping place holders in memory
+ * improves performance, but for very large queues keeping place holders can
+ * cause a significant overhead.
*
* This method must return false if {@link #isPagingEnabled()} is false.
*
@@ -47,29 +49,107 @@
*/
public boolean isPageOutPlaceHolders();
- public static final class NON_PERSISTENT_POLICY<E> implements PersistencePolicy<E>{
+ /**
+ * Sets the memory threshold after which elements are paged out to disk to
+ * conserve memory.
+ *
+ * @return The In Memory Paging Threshold.
+ */
+ public int getPagingInMemorySize();
- /* (non-Javadoc)
- * @see org.apache.activemq.queue.PersistencePolicy#isPersistOnEnqueue(java.lang.Object)
+ /**
+ * Indicates whether sources should be throttled to the memory limit
+ * while the queue is being actively consumed.
+ *
+ * @return true if sources should be throttled to the memory limit while
+ * the queue is being consumed.
+ */
+ public boolean isThrottleSourcesToMemoryLimit();
+
+ /**
+ * When the queue is being restored this indicates the preference to
+ * allow for recovering elements from the store versus newly added
+ * elements. For example if set to 3 then the queue will try to catch up
+ * by 3 elements for each newly added one.
+ */
+ public int getRecoveryBias();
+
+ /**
+ * The rate at which to throttle sources when the queue is stopped or
+ * or is not actively being consumed.
+ *
+ * @return the rate at which to throttle source when the queue is not
+ * being consumed. A value less than or equal to 0 means no limit.
+ */
+ public int getDisconnectedThrottleRate();
+
+
+ public static final class NON_PERSISTENT_POLICY<E> implements PersistencePolicy<E> {
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.queue.PersistencePolicy#isPersistOnEnqueue(java
+ * .lang.Object)
*/
public boolean isPersistent(E elem) {
return false;
}
- /* (non-Javadoc)
- * @see org.apache.activemq.queue.PersistencePolicy#isKeepElementRefences()
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.queue.PersistencePolicy#isKeepElementRefences()
*/
public boolean isPageOutPlaceHolders() {
return false;
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.activemq.queue.PersistencePolicy#isPagingEnabled()
*/
public boolean isPagingEnabled() {
return false;
}
-
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.activemq.queue.PersistencePolicy#getPagingInMemoryThreshold
+ * ()
+ */
+ public int getPagingInMemorySize() {
+ // TODO Auto-generated method stub
+ return Integer.MAX_VALUE;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.queue.PersistencePolicy#getDisconnectedThrottleRate()
+ */
+ public int getDisconnectedThrottleRate() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.queue.PersistencePolicy#isThrottleSourcesToMemoryLimit()
+ */
+ public boolean isThrottleSourcesToMemoryLimit() {
+ return true;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.queue.PersistencePolicy#getRecoveryBias()
+ */
+ public int getRecoveryBias() {
+ return Integer.MAX_VALUE;
+ }
+
}
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedPriorityQueue.java Thu May 14 13:41:14 2009
@@ -167,11 +167,13 @@
queue.getDescriptor().setParent(queueDescriptor.getQueueName());
queue.getDescriptor().setPartitionId(prio);
partitions.set(prio, queue);
- onFlowOpened(queue.getFlowControler());
if (initialize) {
store.addQueue(queue.getDescriptor());
queue.initialize(0, 0, 0, 0);
+ onFlowOpened(queue.getFlowControler());
}
+
+
if (started) {
queue.start();
}
@@ -191,14 +193,12 @@
public void add(V value, ISourceController<?> source) {
int prio = priorityMapper.map(value);
- IQueue<K, V> partition = getPartition(prio, true);
- partition.add(value, source);
+ getPartition(prio, true).add(value, source);
}
public boolean offer(V value, ISourceController<?> source) {
int prio = priorityMapper.map(value);
- IQueue<K, V> partition = getPartition(prio, true);
- return partition.offer(value, source);
+ return getPartition(prio, true).offer(value, source);
}
public void setKeyMapper(Mapper<K, V> keyMapper) {