You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/05/14 15:41:15 UTC
svn commit: r774764 [2/2] - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/openwire/
main/java/org/apache/activemq/broker/stomp/ main/java/org/apache/act...
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/queue/SharedQueue.java Thu May 14 13:41:14 2009
@@ -69,11 +69,11 @@
private static final int NO_MATCH = 1;
private static final int DECLINED = 2;
- private final SortedLinkedList<QueueElement> queue = new SortedLinkedList<QueueElement>();
+ private final SortedLinkedList<QueueElement<V>> queue = new SortedLinkedList<QueueElement<V>>();
private Mapper<K, V> keyMapper;
private final ElementLoader loader;
- private Cursor sharedCursor;
+ private Cursor<V> sharedCursor;
private QueueStore<K, V> store;
private PersistencePolicy<V> persistencePolicy;
private long nextSequenceNumber = 0;
@@ -93,11 +93,9 @@
private final LinkedNodeList<SubscriptionContext> trailingConsumers = new LinkedNodeList<SubscriptionContext>();
// Limiter/Controller for the size of the queue:
- private final FlowController<V> sizeController;
+ private FlowController<V> inputController;
private final IFlowSizeLimiter<V> sizeLimiter;
-
- // Default cursor memory limit:
- private static final long DEFAULT_MEMORY_LIMIT = 10;
+ private final boolean RELEASE_ON_ACQUISITION = true;
private int totalQueueCount;
@@ -105,13 +103,13 @@
private boolean started = false;
private Mapper<Long, V> expirationMapper;
- private final Expirator expirator = new Expirator();
+ private Expirator expirator;
public SharedQueue(String name, IFlowSizeLimiter<V> limiter) {
this(name, limiter, null);
}
- SharedQueue(String name, IFlowSizeLimiter<V> limiter, Object mutex) {
+ SharedQueue(String name, IFlowSizeLimiter<V> sizeLimiter, Object mutex) {
super(name);
this.mutex = mutex == null ? new Object() : mutex;
@@ -119,12 +117,7 @@
queueDescriptor = new QueueStore.QueueDescriptor();
queueDescriptor.setQueueName(new AsciiBuffer(super.getResourceName()));
queueDescriptor.setQueueType(QueueDescriptor.SHARED);
- this.sizeLimiter = limiter;
-
- this.sizeController = new FlowController<V>(getFlowControllableHook(), flow, limiter, this.mutex);
- sizeController.useOverFlowQueue(false);
- super.onFlowOpened(sizeController);
-
+ this.sizeLimiter = sizeLimiter;
loader = new ElementLoader();
}
@@ -152,7 +145,11 @@
persistencePolicy = new PersistencePolicy.NON_PERSISTENT_POLICY<V>();
}
- sharedCursor = new Cursor(queueDescriptor.getQueueName().toString(), true, true);
+ inputController = new FlowController<V>(null, flow, sizeLimiter, mutex);
+ inputController.useOverFlowQueue(false);
+ super.onFlowOpened(inputController);
+
+ sharedCursor = openCursor(getResourceName(), true, true);
// Initialize counts:
nextSequenceNumber = sequenceMax + 1;
@@ -160,7 +157,7 @@
sizeLimiter.add(count, size);
totalQueueCount = count;
// Add a paged out placeholder:
- QueueElement qe = new QueueElement(null, sequenceMin);
+ QueueElement<V> qe = new QueueElement<V>(null, sequenceMin, this);
qe.loaded = false;
queue.add(qe);
}
@@ -177,6 +174,8 @@
};
}
+ expirator = new Expirator();
+
if (DEBUG)
System.out.println(this + "Initialized, first seq: " + sequenceMin + " next sequence: " + nextSequenceNumber);
}
@@ -199,6 +198,65 @@
}
}
+ private final Cursor<V> openCursor(String name, boolean pageInElements, boolean skipAcquired) {
+
+ FlowController<QueueElement<V>> controller = null;
+ if (pageInElements && persistencePolicy.isPagingEnabled() && sizeLimiter.getCapacity() > persistencePolicy.getPagingInMemorySize()) {
+ IFlowSizeLimiter<QueueElement<V>> limiter = new SizeLimiter<QueueElement<V>>(persistencePolicy.getPagingInMemorySize(), persistencePolicy.getPagingInMemorySize() / 2) {
+ public int getElementSize(QueueElement<V> qe) {
+ return qe.size;
+ };
+ };
+
+ controller = new FlowController<QueueElement<V>>(null, flow, limiter, mutex) {
+ @Override
+ public IFlowResource getFlowResource() {
+ return SharedQueue.this;
+ }
+ };
+ controller.useOverFlowQueue(false);
+ controller.setExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
+ }
+
+ return new Cursor<V>(queue, loader, name, skipAcquired, pageInElements, controller);
+ }
+
+ final int getElementSize(V elem) {
+ return sizeLimiter.getElementSize(elem);
+ }
+
+ final long getElementExpiration(V elem) {
+ return expirationMapper.map(elem);
+ }
+
+ final Expirator getExpirator() {
+ return expirator;
+ }
+
+ final QueueStore<K, V> getQueueStore() {
+ return store;
+ }
+
+ final ElementLoader getLoader() {
+ return loader;
+ }
+
+ final PersistencePolicy<V> getPersistencePolicy() {
+ return persistencePolicy;
+ }
+
+ final void acknowledge(QueueElement<V> qe) {
+ synchronized (mutex) {
+ V elem = qe.getElement();
+ if (qe.delete()) {
+ if (!qe.acquired || !RELEASE_ON_ACQUISITION) {
+ inputController.elementDispatched(elem);
+ }
+ totalQueueCount--;
+ }
+ }
+ }
+
/**
* Starts this queue.
*/
@@ -211,6 +269,7 @@
if (!started) {
started = true;
sharedCursor.activate();
+ loader.start();
expirator.start();
if (isDispatchReady()) {
notifyReady();
@@ -232,42 +291,102 @@
stop();
}
- public void flowElemAccepted(ISourceController<V> source, V elem) {
+ public void add(V elem, ISourceController<?> source) {
+ synchronized (mutex) {
+ inputController.add(elem, source);
+ accepted(source, elem);
+ }
+ }
+ public boolean offer(V elem, ISourceController<?> source) {
synchronized (mutex) {
- if (!initialized) {
- throw new IllegalStateException("Not able to use uninitialized queue: " + getResourceName());
+ if (inputController.offer(elem, source)) {
+ accepted(source, elem);
+ return true;
}
+ return false;
+ }
+ }
- // Create a new queue element with the next sequence number:
- QueueElement qe = new QueueElement(elem, nextSequenceNumber++);
+ public void flowElemAccepted(ISourceController<V> source, V elem) {
+ synchronized (mutex) {
+ // TODO should change flow controller to pass original source:
+ accepted(null, elem);
+ }
+ }
- // Save the element (note that it is important this be done after
- // we've set the sequence number above)
- if (persistencePolicy.isPersistent(elem)) {
- // For now base decision on whether to delay flush on
- // whether or not there are
- // consumers ready.
- boolean delayable = !sharedConsumers.isEmpty();
- qe.save(source, delayable);
- }
-
- // Add it to our queue:
- queue.add(qe);
- totalQueueCount++;
- // Check with the loader to see if it needs to be paged out:
- loader.elementAdded(qe, source);
- expirator.elementAdded(qe);
-
- // Request dispatch for the newly enqueued element.
- // TODO consider optimizing to do direct dispatch?
- // It might be better if the dispatcher itself provided
- // this for cases where the caller is on the same dispatcher
- if (isDispatchReady()) {
- notifyReady();
+ private final void accepted(ISourceController<?> source, V elem) {
+
+ if (!initialized) {
+ throw new IllegalStateException("Uninitialized queue: " + getResourceName());
+ }
+
+ // Create a new queue element with the next sequence number:
+ QueueElement<V> qe = new QueueElement<V>(elem, nextSequenceNumber++, this);
+
+ // Save the element (note that it is important this be done after
+ // we've set the sequence number above)
+ if (persistencePolicy.isPersistent(elem)) {
+ // For now base decision on whether to delay flush on
+ // whether or not there are
+ // consumers ready.
+ boolean delayable = !sharedConsumers.isEmpty();
+ qe.save(source, delayable);
+ }
+
+ // Add it to our queue:
+ queue.add(qe);
+ totalQueueCount++;
+ if (!persistencePolicy.isPagingEnabled()) {
+ qe.addHardRef();
+ }
+ // Check with the shared cursor to see if it is willing to
+ // absorb the element. If so that's good enough.
+ if (persistencePolicy.isPagingEnabled() && !sharedCursor.offer(qe, source)) {
+
+ // Otherwise check with any other open cursor to see if
+ // it can take the element:
+ Collection<Cursor<V>> active = loader.getActiveCursors(qe);
+
+ // If there are none, unload the element:
+ if (active == null) {
+ qe.unload(source);
+ return;
+ }
+
+ // See if a cursor is willing to hang on to the
+ // element:
+ boolean accepted = false;
+ for (Cursor<V> cursor : active) {
+ // Already checked the shared cursor above:
+ if (cursor == sharedCursor) {
+ continue;
+ }
+
+ if (cursor.offer(qe, source)) {
+ accepted = true;
+ break;
+ }
+ }
+
+ // If no cursor accepted it, then page out the element:
+ // keeping the element loaded.
+ if (!accepted) {
+ qe.unload(source);
}
}
+
+ expirator.elementAdded(qe);
+
+ // Request dispatch for the newly enqueued element.
+ // TODO consider optimizing to do direct dispatch?
+ // It might be better if the dispatcher itself provided
+ // this for cases where the caller is on the same dispatcher
+ if (isDispatchReady()) {
+ notifyReady();
+ // while(pollingDispatch());
+ }
}
public boolean pollingDispatch() {
@@ -303,7 +422,7 @@
// Process shared consumers:
if (!sharedConsumers.isEmpty()) {
- QueueElement next = sharedCursor.getNext();
+ QueueElement<V> next = sharedCursor.getNext();
if (next != null) {
// See if there are any interested consumers:
@@ -412,11 +531,11 @@
boolean isStarted;
// The consumer's cursor:
- final Cursor cursor;
+ final Cursor<V> cursor;
SubscriptionContext(Subscription<V> target) {
this.sub = target;
- this.cursor = new Cursor(target.toString(), !sub.isBrowser(), true);
+ this.cursor = openCursor(target.toString(), true, !sub.isBrowser());
cursor.setCursorReadyListener(new CursorReadyListener() {
public void onElementReady() {
if (!isLinked()) {
@@ -468,7 +587,7 @@
return;
}
- QueueElement next = cursor.getNext();
+ QueueElement<V> next = cursor.getNext();
// If the next element isn't yet available
// then unlink this subscription
if (next == null) {
@@ -546,7 +665,7 @@
}
}
- public final int offer(QueueElement qe) {
+ public final int offer(QueueElement<V> qe) {
// If we are already passed this element return NO_MATCH:
if (cursor.getCurrentSequeunce() > qe.sequence) {
@@ -561,7 +680,7 @@
// Check for expiration:
if (qe.isExpired()) {
- qe.acknowledge();
+ acknowledge(qe);
return ACCEPTED;
}
@@ -571,7 +690,10 @@
// See if the sink has room:
if (sub.offer(qe.elem, this, callback)) {
if (!sub.isBrowser()) {
- qe.setAcquired(this);
+ qe.setAcquired(true);
+ if (RELEASE_ON_ACQUISITION) {
+ inputController.elementDispatched(qe.getElement());
+ }
// If remove on dispatch acknowledge now:
if (callback == null) {
@@ -632,16 +754,19 @@
public void onElementReady();
}
- class Cursor implements Comparable<Cursor> {
+ static class Cursor<V> implements Comparable<Cursor<V>> {
private CursorReadyListener readyListener;
private final String name;
+ private final SharedQueue<?, V>.ElementLoader loader;
+ private final SortedLinkedList<QueueElement<V>> queue;
+
private boolean activated = false;;
// The next element for this cursor, always non null
// if activated, unless no element available:
- QueueElement current = null;
+ QueueElement<V> current = null;
// The current sequence number for this cursor,
// used when inactive or pointing to an element
// sequence number beyond the queue's limit.
@@ -649,8 +774,8 @@
// The cursor is holding references for all
// elements between first and last inclusive:
- QueueElement firstRef = null;
- QueueElement lastRef = null;
+ QueueElement<V> firstRef = null;
+ QueueElement<V> lastRef = null;
// This is set to the last block that for which
// we have requested a load:
long lastBlockRequest = -1;
@@ -659,38 +784,30 @@
// When the limiter is set the cursor is able to
// keep as many elements in memory as its limiter
// allows.
- private final IFlowSizeLimiter<QueueElement> memoryLimiter;
- private final IFlowController<QueueElement> memoryController;
+ private final IFlowController<QueueElement<V>> memoryController;
// Indicates whether this cursor skips acquired elements
private final boolean skipAcquired;
// Indicates whether this cursor will page in elements
- //
private final boolean pageInElements;
- public Cursor(String name, boolean skipAcquired, boolean pageInElements) {
+ private long limit = Long.MAX_VALUE;
+
+ public Cursor(SortedLinkedList<QueueElement<V>> queue, SharedQueue<?, V>.ElementLoader loader, String name, boolean skipAcquired, boolean pageInElements,
+ IFlowController<QueueElement<V>> memoryController) {
this.name = name;
+ this.queue = queue;
+ this.loader = loader;
+
this.skipAcquired = skipAcquired;
this.pageInElements = pageInElements;
// Set up a limiter if this cursor pages in elements, and memory
// limit is less than the queue size:
- if (pageInElements && persistencePolicy.isPagingEnabled() && DEFAULT_MEMORY_LIMIT < sizeLimiter.getCapacity()) {
- memoryLimiter = new SizeLimiter<QueueElement>(DEFAULT_MEMORY_LIMIT, DEFAULT_MEMORY_LIMIT) {
- public int getElementSize(QueueElement qe) {
- return qe.size;
- };
- };
-
- memoryController = new FlowController<QueueElement>(null, flow, memoryLimiter, mutex) {
- @Override
- public IFlowResource getFlowResource() {
- return SharedQueue.this;
- }
- };
+ if (pageInElements) {
+ this.memoryController = memoryController;
} else {
- memoryLimiter = null;
- memoryController = null;
+ this.memoryController = null;
}
}
@@ -702,8 +819,8 @@
* The element for which to check.
* @return
*/
- public final boolean offer(QueueElement qe, ISourceController<?> controller) {
- if (activated && memoryLimiter != null) {
+ public final boolean offer(QueueElement<V> qe, ISourceController<?> controller) {
+ if (activated && memoryController != null) {
getNext();
if (lastRef != null) {
// Return true if we absorbed it:
@@ -712,7 +829,11 @@
}
// If our last ref is close to this one reserve the element
else if (qe.getPrevious() == lastRef) {
- return addCursorRef(qe, controller);
+ if (addCursorRef(qe, controller)) {
+ return true;
+ } else {
+ return false;
+ }
}
}
return false;
@@ -724,7 +845,7 @@
public final void reset(long sequence) {
updateSequence(sequence);
- current = null;
+ updateCurrent(null);
}
public final void activate() {
@@ -746,7 +867,8 @@
// If we're passing into a new block release the old one:
if (firstRef.isLastInBlock()) {
- System.out.println(this + " releasing block:" + firstRef.restoreBlock);
+ if (DEBUG)
+ System.out.println(this + " releasing block:" + firstRef.restoreBlock);
loader.releaseBlock(this, firstRef.restoreBlock);
}
@@ -758,31 +880,47 @@
}
// Release the last requested block:
- if (persistencePolicy.isPageOutPlaceHolders()) {
+ if (loader.isPageOutPlaceHolders() && lastBlockRequest >= 0) {
loader.releaseBlock(this, lastBlockRequest);
}
lastBlockRequest = -1;
- // Let go of our current ref:
- current = null;
+ updateCurrent(null);
activated = false;
}
}
/**
+ * Updates the current ref. We keep a soft ref to the current to keep it
+ * in the queue so that we can get at the next without a costly lookup.
+ */
+ private final void updateCurrent(QueueElement<V> qe) {
+ if (qe == current) {
+ return;
+ }
+ if (current != null) {
+ current.releaseSoftRef();
+ }
+ current = qe;
+ if (current != null) {
+ current.addSoftRef();
+ }
+ }
+
+ /**
* Makes sure elements are paged in
*/
private final void updatePagingRefs() {
if (!activated)
return;
- if (pageInElements && memoryLimiter != null) {
+ if (pageInElements && memoryController != null) {
// Release memory references up to our sequence number
while (firstRef != null && firstRef.getSequence() < sequence) {
boolean lastInBlock = firstRef.isLastInBlock();
- QueueElement next = firstRef.getNext();
+ QueueElement<V> next = firstRef.getNext();
firstRef.releaseHardRef(memoryController);
// If we're passing into a new block release the old one:
@@ -802,14 +940,14 @@
}
// Now add refs for as many elements as we can hold:
- QueueElement next = null;
+ QueueElement<V> next = null;
if (lastRef == null) {
next = current;
} else {
next = lastRef.getNext();
}
- while (next != null && !memoryLimiter.getThrottled()) {
+ while (next != null && !memoryController.isSinkBlocked()) {
if (!addCursorRef(next, null)) {
break;
}
@@ -819,11 +957,11 @@
// Otherwise we still need to ensure the block has been loaded:
else if (current != null && !current.isLoaded()) {
if (lastBlockRequest != current.restoreBlock) {
- if (persistencePolicy.isPageOutPlaceHolders()) {
+ if (lastBlockRequest != -1) {
loader.releaseBlock(this, lastBlockRequest);
}
- loader.loadBlock(this, current.restoreBlock);
lastBlockRequest = current.restoreBlock;
+ loader.reserveBlock(this, lastBlockRequest);
}
}
}
@@ -839,13 +977,13 @@
* The controller adding the element.
* @return false if the element isn't in memory.
*/
- private final boolean addCursorRef(QueueElement qe, ISourceController<?> controller) {
+ private final boolean addCursorRef(QueueElement<V> qe, ISourceController<?> controller) {
// Make sure we have requested the block:
if (qe.restoreBlock != lastBlockRequest) {
lastBlockRequest = qe.restoreBlock;
if (DEBUG)
- System.out.println(this + " requesting block:" + lastBlockRequest);
- loader.loadBlock(this, lastBlockRequest);
+ System.out.println(this + " requesting block:" + lastBlockRequest + " for" + qe);
+ loader.reserveBlock(this, lastBlockRequest);
}
// If the next element isn't loaded then we can't yet
@@ -865,25 +1003,22 @@
private final void updateSequence(final long newSequence) {
this.sequence = newSequence;
- if (DEBUG && sequence > nextSequenceNumber) {
- new Exception(this + "cursor overflow").printStackTrace();
- }
}
/**
* Sets the cursor to the next sequence number after the provided
* element:
*/
- public final void skip(QueueElement elem) {
- QueueElement next = elem.isLinked() ? elem.getNext() : null;
+ public final void skip(QueueElement<V> elem) {
+ QueueElement<V> next = elem.isLinked() ? elem.getNext() : null;
if (next != null) {
updateSequence(next.sequence);
if (activated) {
- current = next;
+ updateCurrent(next);
}
} else {
- current = null;
+ updateCurrent(null);
updateSequence(sequence + 1);
}
updatePagingRefs();
@@ -893,21 +1028,22 @@
* @return the next available element or null if one is not currently
* available.
*/
- public final QueueElement getNext() {
+ public final QueueElement<V> getNext() {
try {
if (queue.isEmpty() || queue.getTail().sequence < sequence) {
- current = null;
+ updateCurrent(null);
return null;
}
if (queue.getTail().sequence == sequence) {
- current = queue.getTail();
+ updateCurrent(queue.getTail());
}
- // Get a pointer to the next element
- if (current == null || !current.isLinked()) {
- current = queue.upper(sequence, true);
+ // If we don't have a current, then look it up based
+ // on our sequence:
+ if (current == null) {
+ updateCurrent(queue.upper(sequence, true));
if (current == null) {
return null;
}
@@ -915,8 +1051,8 @@
// Skip removed elements (and acquired ones if requested)
while ((skipAcquired && current.isAcquired()) || current.isDeleted()) {
- QueueElement last = current;
- current = current.getNext();
+ QueueElement<V> last = current;
+ updateCurrent(current.getNext());
// If the next element is null, increment our sequence
// and return:
@@ -946,7 +1082,7 @@
} finally {
// Don't hold on to a current ref if we aren't activated:
if (!activated) {
- current = null;
+ updateCurrent(null);
}
updatePagingRefs();
}
@@ -968,7 +1104,7 @@
return sequence;
}
- public int compareTo(Cursor o) {
+ public int compareTo(Cursor<V> o) {
if (o.sequence > sequence) {
return -1;
} else if (sequence > o.sequence) {
@@ -997,7 +1133,9 @@
*/
public void onElementsLoaded() {
if (readyListener != null && isReady()) {
- System.out.println(this + " notifying ready");
+ if (DEBUG) {
+ System.out.println(this + " notifying ready");
+ }
readyListener.onElementReady();
}
}
@@ -1013,12 +1151,15 @@
* @return true if the cursor has passed the end of the queue.
*/
public boolean atEnd() {
- // TODO Auto-generated method stub
if (queue.isEmpty()) {
return true;
}
- QueueElement tail = queue.getTail();
+ if (sequence > limit) {
+ return true;
+ }
+
+ QueueElement<V> tail = queue.getTail();
// Can't be at the end if the tail isn't loaded:
if (!tail.isLoaded()) {
return false;
@@ -1034,12 +1175,20 @@
public String toString() {
return "Cursor: " + sequence + " [" + name + "]";
}
+
+ /**
+ * @param l
+ */
+ public void setLimit(long l) {
+ limit = l;
+ }
}
- class QueueElement extends SortedLinkedListNode<QueueElement> implements SubscriptionDeliveryCallback, SaveableQueueElement<V> {
+ static class QueueElement<V> extends SortedLinkedListNode<QueueElement<V>> implements SubscriptionDeliveryCallback, SaveableQueueElement<V> {
final long sequence;
final long restoreBlock;
+ final SharedQueue<?, V> queue;
V elem;
int size = -1;
@@ -1063,14 +1212,14 @@
boolean saved = false;
boolean deleted = false;
- SubscriptionContext owner;
+ boolean acquired = false;
- public QueueElement(V elem, long sequence) {
+ public QueueElement(V elem, long sequence, SharedQueue<?, V> queue) {
this.elem = elem;
-
+ this.queue = queue;
if (elem != null) {
- size = sizeLimiter.getElementSize(elem);
- expiration = expirationMapper.map(elem);
+ size = queue.getElementSize(elem);
+ expiration = queue.getElementExpiration(elem);
}
this.sequence = sequence;
this.restoreBlock = sequence / RESTORE_BLOCK_SIZE;
@@ -1083,8 +1232,8 @@
return deleted;
}
- public QueueElement(RestoredElement<V> restored) throws Exception {
- this(restored.getElement(), restored.getSequenceNumber());
+ public QueueElement(RestoredElement<V> restored, SharedQueue<?, V> queue) throws Exception {
+ this(restored.getElement(), restored.getSequenceNumber(), queue);
this.size = restored.getElementSize();
this.expiration = restored.getExpiration();
saved = true;
@@ -1103,12 +1252,12 @@
// If this is the first request for this
// element request a load:
if (hardRefs == 1) {
- loader.pageIn(this);
+ queue.getLoader().pageIn(this);
}
}
}
- public final void releaseHardRef(IFlowController<QueueElement> controller) {
+ public final void releaseHardRef(IFlowController<QueueElement<V>> controller) {
hardRefs--;
if (hardRefs == 0) {
unload(controller);
@@ -1131,35 +1280,33 @@
assert softRefs >= 0;
}
- public final void setAcquired(SubscriptionContext owner) {
- this.owner = owner;
+ public final void setAcquired(boolean val) {
+ this.acquired = val;
}
public final void acknowledge() {
- synchronized (mutex) {
- delete();
- }
+ queue.acknowledge(this);
}
- public final void delete() {
+ public final boolean delete() {
if (!deleted) {
deleted = true;
- owner = null;
- totalQueueCount--;
if (isExpirable()) {
- expirator.elementRemoved(this);
+ queue.getExpirator().elementRemoved(this);
}
- sizeController.elementDispatched(elem);
+
if (saved) {
- store.deleteQueueElement(queueDescriptor, elem);
+ queue.getQueueStore().deleteQueueElement(queue.getDescriptor(), elem);
}
elem = null;
unload(null);
+ return true;
}
+ return false;
}
public final void unacquire(ISourceController<?> source) {
- owner = null;
+ acquired = false;
if (isExpired()) {
acknowledge();
} else {
@@ -1174,38 +1321,38 @@
*/
public final void unload(ISourceController<?> controller) {
- // Can't unlink if there is a cursor ref, the cursor
- // needs this element to decrement it's limiter space
- if (hardRefs > 0) {
+ // Don't page out of there is a hard ref to the element
+ // or if it is acquired (since we need the element
+ // during delete:
+ if (!deleted && (hardRefs > 0 || acquired)) {
return;
}
// If the element didn't require persistence on enqueue, then
// we'll need to save it now before paging it out.
- // Note that we don't page out the element if it has an owner
- // because we need the element when we issue the delete.
- if (owner == null && elem != null && !persistencePolicy.isPersistent(elem)) {
- save(controller, true);
- if (DEBUG)
- System.out.println("Paged out element: " + this);
- elem = null;
- }
+ if (elem != null) {
+ if (!deleted) {
+ if (!queue.getPersistencePolicy().isPersistent(elem)) {
+ save(controller, true);
+ if (DEBUG)
+ System.out.println("Paged out element: " + this);
+ }
- // If save is pending don't unload until the save has completed
- if (savePending) {
- return;
+ // If save is pending don't unload until the save has
+ // completed
+ if (savePending) {
+ return;
+ }
+ }
+
+ elem = null;
}
- QueueElement next = getNext();
- QueueElement prev = getPrevious();
+ QueueElement<V> next = getNext();
+ QueueElement<V> prev = getPrevious();
- // See if we can unload this element:
- // Don't unload the element if it is:
- // -Has an owner (we keep the element in memory so we don't
- // forget about the owner).
- // -If there are soft references to it
- // -Or it is in the load queue
- if (owner == null && softRefs == 0 && !loader.inLoadQueue(this)) {
+ // See if we can unload this element, don't unload if we have a soft
+ if (softRefs == 0) {
// If deleted unlink this element from the queue, and link
// together adjacent paged out entries:
if (deleted) {
@@ -1215,7 +1362,10 @@
if (next != null && prev != null && !next.isLoaded() && !prev.isLoaded()) {
next.unlink();
}
- } else {
+ }
+ // Otherwise as long as the element isn't acquired we can unload
+ // it. If it is acquired we keep the soft ref around
+ else if (!acquired && queue.getPersistencePolicy().isPageOutPlaceHolders()) {
loaded = false;
@@ -1230,6 +1380,8 @@
if (prev != null && !prev.isLoaded()) {
unlink();
}
+ } else {
+ return;
}
}
@@ -1246,9 +1398,9 @@
* @throws Exception
* If there was an error creating the loaded element:
*/
- public final QueueElement loadAfter(RestoredElement<V> re) throws Exception {
+ public final QueueElement<V> loadAfter(RestoredElement<V> re) throws Exception {
- QueueElement ret = null;
+ QueueElement<V> ret = null;
// See if this element represents the one being loaded:
if (sequence == re.getSequenceNumber()) {
@@ -1263,14 +1415,15 @@
if (re.getNextSequenceNumber() != -1) {
// Otherwise if our next pointer doesn't match the
// next restored number:
- QueueElement next = getNext();
+ QueueElement<V> next = getNext();
if (next == null || next.sequence != re.getNextSequenceNumber()) {
- next = new QueueElement(null, re.getNextSequenceNumber());
+ next = new QueueElement<V>(null, re.getNextSequenceNumber(), queue);
next.loaded = false;
this.linkAfter(next);
}
}
this.size = re.getElementSize();
+ this.expiration = re.getExpiration();
}
// If we're paged out set our elem to the restored one:
@@ -1281,9 +1434,9 @@
savePending = false;
} else {
- ret = new QueueElement(re);
+ ret = new QueueElement<V>(re, queue);
// Otherwise simply link this element into the list:
- queue.add(ret);
+ queue.queue.add(ret);
}
if (DEBUG)
@@ -1301,7 +1454,7 @@
public final boolean isLastInBlock() {
if (isTailNode()) {
- return nextSequenceNumber / RESTORE_BLOCK_SIZE != restoreBlock;
+ return queue.nextSequenceNumber / RESTORE_BLOCK_SIZE != restoreBlock;
} else {
return next.restoreBlock != restoreBlock;
}
@@ -1316,7 +1469,7 @@
}
public final boolean isAcquired() {
- return owner != null || deleted;
+ return acquired || deleted;
}
public final long getExpiration() {
@@ -1333,14 +1486,14 @@
public final void save(ISourceController<?> controller, boolean delayable) {
if (!saved) {
- store.persistQueueElement(this, controller, delayable);
+ queue.getQueueStore().persistQueueElement(this, controller, delayable);
saved = true;
// If paging is enabled we can't unload the element until it
// is saved, otherwise there is no guarantee that it will be
// in the store on a subsequent load requests because the
// save is done asynchronously.
- if (persistencePolicy.isPagingEnabled()) {
+ if (queue.getPersistencePolicy().isPagingEnabled()) {
savePending = true;
}
}
@@ -1375,7 +1528,8 @@
* ()
*/
public void notifySave() {
- synchronized (mutex) {
+ // TODO Refactor this:
+ synchronized (queue.mutex) {
// Unload if we haven't already:
if (isLinked()) {
savePending = false;
@@ -1402,18 +1556,18 @@
* getQueueDescriptor()
*/
public QueueDescriptor getQueueDescriptor() {
- return queueDescriptor;
+ return queue.getDescriptor();
}
public String toString() {
- return "QueueElement " + sequence + " loaded: " + loaded + " elem loaded: " + !isPagedOut() + " owner: " + owner;
+ return "QueueElement " + sequence + " loaded: " + loaded + " elem loaded: " + !isPagedOut() + " aquired: " + acquired;
}
}
private class Expirator {
- private final Cursor cursor = new Cursor("Expirator", false, false);
+ private final Cursor<V> cursor = openCursor("Expirator-" + getResourceName(), false, false);
// Number of expirable elements in the queue:
private int count = 0;
@@ -1423,13 +1577,13 @@
private static final int MAX_CACHE_SIZE = 500;
private long uncachedMin = Long.MAX_VALUE;
- TreeMap<Long, HashSet<QueueElement>> expirationCache = new TreeMap<Long, HashSet<QueueElement>>();
+ TreeMap<Long, HashSet<QueueElement<V>>> expirationCache = new TreeMap<Long, HashSet<QueueElement<V>>>();
private int cacheSize = 0;
public final boolean needsDispatch() {
// If we have expiration candidates or are scanning the
// queue request dispatch:
- return hasExpirationCandidates() || cursor.isReady();
+ return hasExpirables() || cursor.isReady();
}
public void start() {
@@ -1453,7 +1607,9 @@
}
public void dispatch() {
-
+ if (!needsDispatch()) {
+ return;
+ }
long now = -1;
// If their are uncached elements in the queue that are ready for
// expiration
@@ -1465,7 +1621,7 @@
// Scan the queue looking for expirables:
if (cursor.isReady()) {
- QueueElement qe = cursor.getNext();
+ QueueElement<V> qe = cursor.getNext();
while (qe != null) {
if (!loaded) {
if (qe.sequence < recoverySequence) {
@@ -1485,7 +1641,8 @@
// Finished loading:
if (!loaded && cursor.getCurrentSequeunce() >= recoverySequence) {
- System.out.println(this + " Queue Load Complete");
+ if (DEBUG)
+ System.out.println(this + " Queue Load Complete");
loaded = true;
cursor.deactivate();
} else if (cursor.atEnd()) {
@@ -1493,15 +1650,15 @@
}
}
- if (now == -1) {
+ if (now == -1 && !expirationCache.isEmpty()) {
now = System.currentTimeMillis();
}
//
while (!expirationCache.isEmpty()) {
- Entry<Long, HashSet<QueueElement>> first = expirationCache.firstEntry();
+ Entry<Long, HashSet<QueueElement<V>>> first = expirationCache.firstEntry();
if (first.getKey() < now) {
- for (QueueElement qe : first.getValue()) {
+ for (QueueElement<V> qe : first.getValue()) {
qe.releaseSoftRef();
qe.acknowledge();
}
@@ -1509,7 +1666,7 @@
}
}
- public void elementAdded(QueueElement qe) {
+ public void elementAdded(QueueElement<V> qe) {
if (qe.isExpirable() && !qe.isDeleted()) {
count++;
if (qe.isExpired()) {
@@ -1520,10 +1677,10 @@
}
}
- private void addToCache(QueueElement qe) {
+ private void addToCache(QueueElement<V> qe) {
// See if we should cache it, evicting entries if possible
if (cacheSize >= MAX_CACHE_SIZE) {
- Entry<Long, HashSet<QueueElement>> last = expirationCache.lastEntry();
+ Entry<Long, HashSet<QueueElement<V>>> last = expirationCache.lastEntry();
if (last.getKey() <= qe.expiration) {
// Keep track of the minimum uncached value:
if (qe.expiration < uncachedMin) {
@@ -1533,7 +1690,7 @@
}
// Evict the entry:
- Iterator<QueueElement> i = last.getValue().iterator();
+ Iterator<QueueElement<V>> i = last.getValue().iterator();
removeFromCache(i.next());
if (last.getKey() <= uncachedMin) {
@@ -1542,19 +1699,19 @@
}
}
- HashSet<QueueElement> entry = new HashSet<QueueElement>();
+ HashSet<QueueElement<V>> entry = new HashSet<QueueElement<V>>();
entry.add(qe);
qe.addSoftRef();
cacheSize++;
- HashSet<QueueElement> old = expirationCache.put(qe.expiration, entry);
+ HashSet<QueueElement<V>> old = expirationCache.put(qe.expiration, entry);
if (old != null) {
old.add(qe);
expirationCache.put(qe.expiration, old);
}
}
- private final void removeFromCache(QueueElement qe) {
- HashSet<QueueElement> last = expirationCache.get(qe.expiration);
+ private final void removeFromCache(QueueElement<V> qe) {
+ HashSet<QueueElement<V>> last = expirationCache.get(qe.expiration);
if (last != null && last.remove(qe.getSequenceNumber())) {
cacheSize--;
qe.releaseSoftRef();
@@ -1564,7 +1721,7 @@
}
}
- public void elementRemoved(QueueElement qe) {
+ public void elementRemoved(QueueElement<V> qe) {
// While loading, ignore elements that we haven't been seen yet.
if (!loaded && qe.sequence < recoverySequence && qe.sequence > lastRecoverdSequence) {
return;
@@ -1577,11 +1734,7 @@
}
}
- public boolean hasExpirationCandidates() {
- return !loaded || hasExpirables();
- }
-
- public boolean hasExpirables() {
+ public final boolean hasExpirables() {
if (count == 0) {
return false;
} else {
@@ -1617,85 +1770,60 @@
private class ElementLoader implements RestoreListener<V> {
private LinkedList<QueueStore.RestoredElement<V>> fromDatabase = new LinkedList<QueueStore.RestoredElement<V>>();
- private final HashMap<Long, HashSet<Cursor>> requestedBlocks = new HashMap<Long, HashSet<Cursor>>();
- private final HashSet<Cursor> pagingCursors = new HashSet<Cursor>();
+ private final HashMap<Long, HashSet<Cursor<V>>> requestedBlocks = new HashMap<Long, HashSet<Cursor<V>>>();
+ private final HashSet<Cursor<V>> pagingCursors = new HashSet<Cursor<V>>();
- public boolean inLoadQueue(QueueElement queueElement) {
+ private boolean loadOnRequest = false;
+ private Cursor<V> recoveryCursor = null;
+
+ public final void start() {
+
+ // If paging is enabled and we don't keep placeholders in memory
+ // then we load on
+ // request.
+ if (persistencePolicy.isPagingEnabled() && persistencePolicy.isPageOutPlaceHolders()) {
+ loadOnRequest = true;
+ } else {
+ loadOnRequest = false;
+ if (getEnqueuedCount() > 0) {
+ recoveryCursor = openCursor("Loader", false, false);
+ recoveryCursor.setLimit(nextSequenceNumber - 1);
+ recoveryCursor.activate();
+ }
+ }
+ }
+
+ public boolean inLoadQueue(QueueElement<V> queueElement) {
return requestedBlocks.containsKey(queueElement.restoreBlock);
}
+ public final boolean isPageOutPlaceHolders() {
+ return persistencePolicy.isPageOutPlaceHolders();
+ }
+
/**
* @param queueElement
*/
- public void pageIn(QueueElement qe) {
+ public void pageIn(QueueElement<V> qe) {
store.restoreQueueElements(queueDescriptor, false, qe.sequence, qe.sequence, 1, this);
}
- /**
- * Must be called after an element is added to the queue to enforce
- * memory limits
- *
- * @param elem
- * The added element:
- * @param source
- * The source of the message
- */
- public final void elementAdded(QueueElement qe, ISourceController<V> source) {
-
- if (persistencePolicy.isPagingEnabled()) {
-
- // Check with the shared cursor to see if it is willing to
- // absorb the element. If so that's good enough.
- if (sharedCursor.offer(qe, source)) {
- return;
- }
-
- // Otherwise check with any other open cursor to see if
- // it can take the element:
- HashSet<Cursor> active = requestedBlocks.get(qe.sequence);
-
- // If there are none, unload the element:
- if (active == null) {
- qe.unload(source);
- return;
- }
-
- // See if a cursor is willing to hang on to the
- // element:
- boolean accepted = false;
- for (Cursor cursor : active) {
- // Already checked the shared cursor above:
- if (cursor == sharedCursor) {
- continue;
- }
-
- if (cursor.offer(qe, source)) {
- accepted = true;
- }
- }
-
- // If no cursor accepted it, then page out the element:
- // keeping the element loaded.
- if (!accepted) {
- qe.unload(source);
- }
- }
+ public final Collection<Cursor<V>> getActiveCursors(QueueElement<V> qe) {
+ return requestedBlocks.get(qe.getSequence());
}
- // Updates memory when an element is loaded from the database:
- private final void elementLoaded(QueueElement qe) {
- // TODO track the rate of loaded elements vs those that
- // are added to the queue. We'll want to throttle back
- // enqueueing sources to a rate less than the restore
- // rate so we can stay out of the store.
- }
+ public void reserveBlock(Cursor<V> cursor, long block) {
+ HashSet<Cursor<V>> cursors = requestedBlocks.get(block);
+ boolean load = recoveryCursor != null && cursor == recoveryCursor;
- public void loadBlock(Cursor cursor, long block) {
- HashSet<Cursor> cursors = requestedBlocks.get(block);
if (cursors == null) {
- cursors = new HashSet<Cursor>();
+ cursors = new HashSet<Cursor<V>>();
requestedBlocks.put(block, cursors);
+ load |= loadOnRequest;
+ }
+ cursors.add(cursor);
+ if (load) {
// Max sequence number is the end of this restoreBlock:
long firstSequence = block * RESTORE_BLOCK_SIZE;
long maxSequence = block * RESTORE_BLOCK_SIZE + RESTORE_BLOCK_SIZE - 1;
@@ -1706,21 +1834,17 @@
if (DEBUG)
System.out.println(cursor + " requesting restoreBlock:" + block + " from " + firstSequence + " to " + maxSequence + " max: " + maxCount + " queueMax: " + nextSequenceNumber);
- // If paging is enabled only pull in queue records, don't bring
+ // If paging is enabled only pull in queue records, don't
+ // bring
// in the payload.
// Each active cursor will have to pull in messages based on
// available memory.
store.restoreQueueElements(queueDescriptor, persistencePolicy.isPagingEnabled(), firstSequence, maxSequence, maxCount, this);
}
- cursors.add(cursor);
}
- public void releaseBlock(Cursor cursor, long block) {
- // Don't do anything if we don't page out placeholders
- if (!persistencePolicy.isPageOutPlaceHolders()) {
- return;
- }
- HashSet<Cursor> cursors = requestedBlocks.get(block);
+ public void releaseBlock(Cursor<V> cursor, long block) {
+ HashSet<Cursor<V>> cursors = requestedBlocks.get(block);
if (cursors == null) {
if (true || DEBUG)
System.out.println(this + " removeBlockInterest " + block + ", no cursors" + cursor);
@@ -1731,9 +1855,9 @@
// If this is the last cursor active in this block
// unload the block:
if (persistencePolicy.isPagingEnabled()) {
- QueueElement qe = queue.upper(RESTORE_BLOCK_SIZE * block, true);
+ QueueElement<V> qe = queue.upper(RESTORE_BLOCK_SIZE * block, true);
while (qe != null && qe.restoreBlock == block) {
- QueueElement next = qe.getNext();
+ QueueElement<V> next = qe.getNext();
qe.unload(cursor.memoryController);
qe = next;
}
@@ -1765,7 +1889,7 @@
for (RestoredElement<V> restored : restoredElems) {
try {
- QueueElement qe = queue.lower(restored.getSequenceNumber(), true);
+ QueueElement<V> qe = queue.lower(restored.getSequenceNumber(), true);
// If we don't have a paged out place holder for this
// element
@@ -1776,7 +1900,13 @@
}
qe = qe.loadAfter(restored);
- elementLoaded(qe);
+
+ // If paging isn't enabled then add a hard ref to the
+ // element,
+ // this will keep it around until it deleted
+ if (!persistencePolicy.isPagingEnabled()) {
+ qe.addHardRef();
+ }
// If we don't page out place holders we needn't track
// block
@@ -1795,11 +1925,25 @@
}
// Add restoring consumers back to trailing consumers:
- for (Cursor paging : pagingCursors)
+ for (Cursor<V> paging : pagingCursors)
paging.onElementsLoaded();
pagingCursors.clear();
}
+
+ // Advance the recovery cursor:
+ if (recoveryCursor != null) {
+ while (recoveryCursor.isReady()) {
+ QueueElement<V> qe = recoveryCursor.getNext();
+ if (recoveryCursor.atEnd()) {
+ recoveryCursor.deactivate();
+ recoveryCursor = null;
+ break;
+ }
+ recoveryCursor.skip(qe);
+
+ }
+ }
}
public final boolean hasRestoredMessages() {
@@ -1851,7 +1995,7 @@
@Override
protected ISinkController<V> getSinkController(V elem, ISourceController<?> source) {
- return sizeController;
+ return inputController;
}
public V poll() {
@@ -1859,6 +2003,6 @@
}
public IFlowController<V> getFlowControler() {
- return sizeController;
+ return inputController;
}
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Thu May 14 13:41:14 2009
@@ -40,7 +40,7 @@
public abstract class BrokerTestBase extends TestCase {
- protected static final int PERFORMANCE_SAMPLES = 3;
+ protected static final int PERFORMANCE_SAMPLES = 5;
protected static final int IO_WORK_AMOUNT = 0;
protected static final int FANIN_COUNT = 10;
@@ -118,6 +118,9 @@
}
public void test_1_1_0() throws Exception {
+ if (ptp) {
+ return;
+ }
producerCount = 1;
destCount = 1;
@@ -376,7 +379,7 @@
sendBroker = rcvBroker = createBroker("Broker", sendBrokerBindURI, sendBrokerConnectURI);
brokers.add(sendBroker);
}
-
+
startBrokers();
Destination[] dests = new Destination[destCount];
@@ -476,7 +479,7 @@
store = StoreFactory.createStore("memory");
}
- store.setStoreDirectory(new File("sub/test-data/broker-test/" + broker.getName()));
+ store.setStoreDirectory(new File("sub/test-data/broker-test/" + broker.getName()));
store.setDeleteAllMessages(PURGE_STORE);
return store;
}
@@ -497,15 +500,14 @@
}
}
- private void startBrokers() throws Exception
- {
+ private void startBrokers() throws Exception {
for (MessageBroker broker : brokers) {
broker.start();
}
}
-
+
private void startClients() throws Exception {
-
+
for (RemoteConsumer connection : consumers) {
connection.start();
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/SharedQueuePerfTest.java Thu May 14 13:41:14 2009
@@ -31,6 +31,7 @@
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.dispatch.IDispatcher;
@@ -68,7 +69,7 @@
BrokerQueueStore queueStore;
private static final boolean USE_KAHA_DB = true;
private static final boolean PERSISTENT = true;
- private static final boolean PURGE_STORE = false;
+ private static final boolean PURGE_STORE = true;
protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
@@ -111,7 +112,7 @@
store = StoreFactory.createStore("memory");
}
- store.setStoreDirectory(new File("test-data/shared-message-queue-test/"));
+ store.setStoreDirectory(new File("test-data/shared-queue-perf-test/"));
store.setDeleteAllMessages(PURGE_STORE);
return store;
}
@@ -121,6 +122,7 @@
producers.clear();
queues.clear();
stopServices();
+ consumerStartDelay = 0;
}
public void testSharedQueue_1_1_1() throws Exception {
@@ -141,6 +143,7 @@
try {
createQueues(1);
createProducers(1);
+ consumerStartDelay = 10;
createConsumers(1);
doTest();
@@ -202,56 +205,48 @@
}
private void doTest() throws Exception {
-
- try
- {
+
+ try {
// Start queues:
for (IQueue<Long, MessageDelivery> queue : queues) {
queue.start();
}
-
- Runnable startConsumers = new Runnable()
- {
- public void run()
- {
+
+ Runnable startConsumers = new Runnable() {
+ public void run() {
// Start consumers:
for (Consumer consumer : consumers) {
consumer.start();
}
}
};
-
- if(consumerStartDelay > 0)
- {
+
+ if (consumerStartDelay > 0) {
dispatcher.schedule(startConsumers, consumerStartDelay, TimeUnit.SECONDS);
- }
- else
- {
+ } else {
startConsumers.run();
}
-
+
// Start producers:
for (Producer producer : producers) {
producer.start();
}
reportRates();
- }
- finally
- {
+ } finally {
// Stop producers:
for (Producer producer : producers) {
producer.stop();
}
-
+
// Stop consumers:
for (Consumer consumer : consumers) {
consumer.stop();
}
-
+
// Stop queues:
for (IQueue<Long, MessageDelivery> queue : queues) {
queue.stop();
- }
+ }
}
}
@@ -303,7 +298,7 @@
protected final IFlowRelay<OpenWireMessageDelivery> outboundQueue;
protected OpenWireMessageDelivery next;
private int priority;
- private final byte[] payload;
+ private final String payload;
private int sequenceNumber;
private final ActiveMQDestination destination;
private final IQueue<Long, MessageDelivery> targetQueue;
@@ -316,14 +311,21 @@
rate.name("Producer " + name + " Rate");
totalProducerRate.add(rate);
dispatchContext = dispatcher.register(this, name);
- payload = new byte[1024];
+ // create a 1024 byte payload (2 bytes per char):
+ payload = new String(new byte[512]);
producerId = new ProducerId(name);
wireFormat = new OpenWireFormat();
wireFormat.setCacheEnabled(false);
wireFormat.setSizePrefixDisabled(false);
wireFormat.setVersion(OpenWireFormat.DEFAULT_VERSION);
- SizeLimiter<OpenWireMessageDelivery> limiter = new SizeLimiter<OpenWireMessageDelivery>(1000, 500);
+ SizeLimiter<OpenWireMessageDelivery> limiter = new SizeLimiter<OpenWireMessageDelivery>(1000 * 1024, 500 * 1024) {
+ @Override
+ public int getElementSize(OpenWireMessageDelivery elem) {
+ return elem.getFlowLimiterSize();
+ }
+ };
+
Flow flow = new Flow(name, true);
outboundQueue = new SingleFlowRelay<OpenWireMessageDelivery>(flow, name, limiter);
outboundQueue.setFlowExecutor(dispatcher.createPriorityExecutor(dispatcher.getDispatchPriorities() - 1));
@@ -384,14 +386,14 @@
}
private void createNextMessage() throws JMSException {
- ActiveMQBytesMessage message = new ActiveMQBytesMessage();
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setJMSPriority(priority);
message.setProducerId(producerId);
message.setMessageId(new MessageId(name, ++sequenceNumber));
message.setDestination(destination);
message.setPersistent(PERSISTENT);
if (payload != null) {
- message.writeBytes(payload);
+ message.setText(payload);
}
next = new OpenWireMessageDelivery(message);
}
@@ -414,12 +416,14 @@
private final ExclusiveQueue<MessageDelivery> queue;
private final IQueue<Long, MessageDelivery> sourceQueue;
private final QueueStore.QueueDescriptor queueDescriptor;
+ private int limit = 20000;
+ private int count = 0;
public Consumer(String name, IQueue<Long, MessageDelivery> sourceQueue) {
this.sourceQueue = sourceQueue;
this.name = name;
Flow flow = new Flow(name + "-outbound", false);
- limiter = new SizeLimiter<MessageDelivery>(1024, 512) {
+ limiter = new SizeLimiter<MessageDelivery>(1024 * 1024, 512 * 1024) {
public int getElementSize(MessageDelivery m) {
return m.getFlowLimiterSize();
}
@@ -439,6 +443,10 @@
public void drain(MessageDelivery elem, ISourceController<MessageDelivery> controller) {
elem.acknowledge(queueDescriptor);
rate.increment();
+ /*
+ if (count++ == limit) {
+ queue.stop();
+ }*/
}
});
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireRemoteProducer.java Thu May 14 13:41:14 2009
@@ -101,9 +101,17 @@
try {
msg.setStringProperty(property, property);
} catch (JMSException e) {
- new RuntimeException(e);
+ throw new RuntimeException(e);
}
}
+
+ //Call the before marshal method to sync the content so we get an
+ //accurate size:
+ try {
+ msg.beforeMarshall(null);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
next = new OpenWireMessageDelivery(msg);
}
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=774764&r1=774763&r2=774764&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java Thu May 14 13:41:14 2009
@@ -13,6 +13,7 @@
import org.apache.activemq.queue.QueueStore;
import org.apache.activemq.queue.SharedPriorityQueue;
import org.apache.activemq.queue.SharedQueue;
+import org.apache.activemq.queue.SharedQueueOld;
import org.apache.activemq.queue.Subscription;
import org.apache.activemq.util.Mapper;
@@ -27,6 +28,7 @@
private Mapper<Long, Message> keyExtractor;
private final MockStoreAdapater store = new MockStoreAdapater();
private static final PersistencePolicy<Message> NO_PERSISTENCE = new PersistencePolicy.NON_PERSISTENT_POLICY<Message>();
+ private static final boolean USE_OLD_QUEUE = false;
private IQueue<Long, Message> createQueue() {
@@ -61,15 +63,27 @@
queue.initialize(0, 0, 0, 0);
return queue;
} else {
- SizeLimiter<Message> limiter = new SizeLimiter<Message>(100, 1);
- SharedQueue<Long, Message> queue = new SharedQueue<Long, Message>(destination.getName().toString(), limiter);
- queue.setKeyMapper(keyExtractor);
- queue.setAutoRelease(true);
- queue.setDispatcher(broker.getDispatcher());
- queue.setStore(store);
- queue.setPersistencePolicy(NO_PERSISTENCE);
- queue.initialize(0, 0, 0, 0);
- return queue;
+ if (USE_OLD_QUEUE) {
+ SizeLimiter<Message> limiter = new SizeLimiter<Message>(100, 1);
+ SharedQueueOld<Long, Message> queue = new SharedQueueOld<Long, Message>(destination.getName().toString(), limiter);
+ queue.setKeyMapper(keyExtractor);
+ queue.setAutoRelease(true);
+ queue.setDispatcher(broker.getDispatcher());
+ queue.setStore(store);
+ queue.setPersistencePolicy(NO_PERSISTENCE);
+ queue.initialize(0, 0, 0, 0);
+ return queue;
+ } else {
+ SizeLimiter<Message> limiter = new SizeLimiter<Message>(100, 1);
+ SharedQueue<Long, Message> queue = new SharedQueue<Long, Message>(destination.getName().toString(), limiter);
+ queue.setKeyMapper(keyExtractor);
+ queue.setAutoRelease(true);
+ queue.setDispatcher(broker.getDispatcher());
+ queue.setStore(store);
+ queue.setPersistencePolicy(NO_PERSISTENCE);
+ queue.initialize(0, 0, 0, 0);
+ return queue;
+ }
}
}
@@ -83,7 +97,7 @@
public final void addConsumer(final DeliveryTarget dt) {
Subscription<Message> sub = new Subscription<Message>() {
-
+
public boolean isBrowser() {
return false;
}