You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/28 21:37:07 UTC
svn commit: r789143 [2/2] - in /activemq/sandbox/activemq-flow:
activemq-broker/
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/
activemq-openwire/src/main/java/org/a...
Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Sun Jun 28 19:37:06 2009
@@ -31,7 +31,12 @@
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.broker.store.Store.DuplicateKeyException;
+import org.apache.activemq.broker.store.Store.KeyNotFoundException;
import org.apache.activemq.broker.store.Store.SubscriptionRecord;
+import org.apache.activemq.broker.store.kahadb.Data.MapAdd;
+import org.apache.activemq.broker.store.kahadb.Data.MapEntryPut;
+import org.apache.activemq.broker.store.kahadb.Data.MapEntryRemove;
+import org.apache.activemq.broker.store.kahadb.Data.MapRemove;
import org.apache.activemq.broker.store.kahadb.Data.MessageAdd;
import org.apache.activemq.broker.store.kahadb.Data.QueueAdd;
import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage;
@@ -41,6 +46,10 @@
import org.apache.activemq.broker.store.kahadb.Data.SubscriptionRemove;
import org.apache.activemq.broker.store.kahadb.Data.Trace;
import org.apache.activemq.broker.store.kahadb.Data.Type;
+import org.apache.activemq.broker.store.kahadb.Data.MapAdd.MapAddBean;
+import org.apache.activemq.broker.store.kahadb.Data.MapEntryPut.MapEntryPutBean;
+import org.apache.activemq.broker.store.kahadb.Data.MapEntryRemove.MapEntryRemoveBean;
+import org.apache.activemq.broker.store.kahadb.Data.MapRemove.MapRemoveBean;
import org.apache.activemq.broker.store.kahadb.Data.MessageAdd.MessageAddBean;
import org.apache.activemq.broker.store.kahadb.Data.QueueAdd.QueueAddBean;
import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage.QueueAddMessageBean;
@@ -649,9 +658,25 @@
case TRANSACTION_COMMIT:
case TRANSACTION_ROLLBACK:
case MAP_ADD:
+ rootEntity.mapAdd(((MapAdd) command).getMapName(), tx);
+ break;
case MAP_REMOVE:
- case MAP_ENTRY_PUT:
- case MAP_ENTRY_REMOVE:
+ rootEntity.mapRemove(((MapRemove) command).getMapName(), tx);
+ break;
+ case MAP_ENTRY_PUT: {
+ MapEntryPut p = (MapEntryPut) command;
+ rootEntity.mapAddEntry(p.getMapName(), p.getKey(), p.getValue(), tx);
+ break;
+ }
+ case MAP_ENTRY_REMOVE: {
+ MapEntryRemove p = (MapEntryRemove) command;
+ try {
+ rootEntity.mapRemoveEntry(p.getMapName(), p.getKey(), tx);
+ } catch (KeyNotFoundException e) {
+ //yay, removed.
+ }
+ break;
+ }
case STREAM_OPEN:
case STREAM_WRITE:
case STREAM_CLOSE:
@@ -982,7 +1007,7 @@
update.setName(record.getName());
update.setDestination(record.getDestination());
update.setDurable(record.getIsDurable());
-
+
if (record.getAttachment() != null) {
update.setAttachment(record.getAttachment());
}
@@ -1019,32 +1044,54 @@
// /////////////////////////////////////////////////////////////
// Map related methods.
// /////////////////////////////////////////////////////////////
- public boolean mapAdd(AsciiBuffer map) {
- return false;
+ public void mapAdd(AsciiBuffer map) {
+ MapAddBean update = new MapAddBean();
+ update.setMapName(map);
+ addUpdate(update);
}
- public boolean mapRemove(AsciiBuffer map) {
- return false;
+ public void mapRemove(AsciiBuffer map) {
+ MapRemoveBean update = new MapRemoveBean();
+ update.setMapName(map);
+ addUpdate(update);
}
public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max) {
- return null;
+ storeAtomic();
+ return rootEntity.mapList(first, max, tx);
}
- public Buffer mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException {
- return null;
+ public void mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) {
+ MapEntryPutBean update = new MapEntryPutBean();
+ update.setMapName(map);
+ update.setKey(key);
+ update.setValue(value);
+ addUpdate(update);
}
public Buffer mapEntryGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
- return null;
+ storeAtomic();
+ try {
+ return rootEntity.mapGetEntry(map, key, tx);
+ } catch (IOException e) {
+ throw new FatalStoreException(e);
+ }
}
- public Buffer mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
- return null;
+ public void mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
+ MapEntryRemoveBean update = new MapEntryRemoveBean();
+ update.setMapName(map);
+ update.setKey(key);
+ addUpdate(update);
}
public Iterator<AsciiBuffer> mapEntryListKeys(AsciiBuffer map, AsciiBuffer first, int max) throws KeyNotFoundException {
- return null;
+ storeAtomic();
+ try {
+ return rootEntity.mapListKeys(map, first, max, tx);
+ } catch (IOException e) {
+ throw new FatalStoreException(e);
+ }
}
// /////////////////////////////////////////////////////////////
Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java Sun Jun 28 19:37:06 2009
@@ -49,7 +49,6 @@
import org.apache.kahadb.util.Marshaller;
import org.apache.kahadb.util.VariableMarshaller;
-
public class RootEntity {
//TODO remove this one performance testing is complete.
@@ -69,6 +68,7 @@
rc.destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(is.readLong());
rc.messageRefsIndex = new BTreeIndex<Long, Long>(is.readLong());
rc.subscriptionIndex = new BTreeIndex<AsciiBuffer, Buffer>(is.readLong());
+ rc.mapIndex = new BTreeIndex<AsciiBuffer, Long>(is.readLong());
if (is.readBoolean()) {
rc.lastUpdate = Marshallers.LOCATION_MARSHALLER.readPayload(is);
} else {
@@ -87,6 +87,7 @@
os.writeLong(object.destinationIndex.getPageId());
os.writeLong(object.messageRefsIndex.getPageId());
os.writeLong(object.subscriptionIndex.getPageId());
+ os.writeLong(object.mapIndex.getPageId());
if (object.lastUpdate != null) {
os.writeBoolean(true);
Marshallers.LOCATION_MARSHALLER.writePayload(object.lastUpdate, os);
@@ -119,6 +120,10 @@
// Subscriptions
private BTreeIndex<AsciiBuffer, Buffer> subscriptionIndex;
+ // Maps:
+ private BTreeIndex<AsciiBuffer, Long> mapIndex;
+ private TreeMap<AsciiBuffer, BTreeIndex<AsciiBuffer, Buffer>> mapCache;
+
// /////////////////////////////////////////////////////////////////
// Lifecycle Methods.
// /////////////////////////////////////////////////////////////////
@@ -137,6 +142,8 @@
destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(tx.getPageFile(), tx.allocate().getPageId());
messageRefsIndex = new BTreeIndex<Long, Long>(tx.getPageFile(), tx.allocate().getPageId());
subscriptionIndex = new BTreeIndex<AsciiBuffer, Buffer>(tx.getPageFile(), tx.allocate().getPageId());
+ mapIndex = new BTreeIndex<AsciiBuffer, Long>(tx.getPageFile(), tx.allocate().getPageId());
+
page.set(this);
tx.store(page, MARSHALLER, true);
}
@@ -196,6 +203,23 @@
ioe.initCause(e);
throw ioe;
}
+
+ //Load Maps:
+ mapIndex.setPageFile(tx.getPageFile());
+ mapIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
+ mapIndex.setValueMarshaller(LongMarshaller.INSTANCE);
+ mapIndex.load(tx);
+
+ //Load all of the maps and cache them:
+ for (Iterator<Entry<AsciiBuffer, Long>> iterator = mapIndex.iterator(tx); iterator.hasNext();) {
+ Entry<AsciiBuffer, Long> entry = iterator.next();
+ BTreeIndex<AsciiBuffer, Buffer> map = new BTreeIndex<AsciiBuffer, Buffer>(tx.getPageFile(), entry.getValue());
+ map.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
+ map.setValueMarshaller(Marshallers.BUFFER_MARSHALLER);
+ map.load(tx);
+ mapCache.put(entry.getKey(), map);
+ }
+
}
/**
@@ -349,15 +373,17 @@
// /////////////////////////////////////////////////////////////////
/**
- * Returns a list of all of the stored subscriptions.
- * @param tx The transaction under which this is to be executed.
- * @return a list of all of the stored subscriptions.
- * @throws IOException
+ * Returns a list of all of the stored subscriptions.
+ *
+ * @param tx
+ * The transaction under which this is to be executed.
+ * @return a list of all of the stored subscriptions.
+ * @throws IOException
*/
public Iterator<SubscriptionRecord> listSubsriptions(Transaction tx) throws IOException {
-
+
final LinkedList<SubscriptionRecord> rc = new LinkedList<SubscriptionRecord>();
-
+
subscriptionIndex.visit(tx, new BTreeVisitor<AsciiBuffer, Buffer>() {
public boolean isInterestedInKeysBetween(AsciiBuffer first, AsciiBuffer second) {
return true;
@@ -372,8 +398,12 @@
}
}
}
+
+ public boolean isSatiated() {
+ return false;
+ }
});
-
+
return rc.iterator();
}
@@ -406,7 +436,9 @@
/**
* Converts a Subscription buffer to a SubscriptionRecord.
- * @param b The buffer
+ *
+ * @param b
+ * The buffer
* @return The record.
* @throws InvalidProtocolBufferException
*/
@@ -414,7 +446,7 @@
if (b == null) {
return null;
}
-
+
SubscriptionRecord rc = null;
if (b != null) {
SubscriptionAddBuffer sab = SubscriptionAddBuffer.parseFramed(b);
@@ -508,6 +540,101 @@
return result;
}
+ // /////////////////////////////////////////////////////////////////
+ // Map Methods.
+ // /////////////////////////////////////////////////////////////////
+ public final void mapAdd(AsciiBuffer key, Transaction tx) throws IOException {
+ BTreeIndex<AsciiBuffer, Buffer> map = mapCache.get(key);
+
+ if (map == null) {
+ long pageId = tx.allocate().getPageId();
+ map = new BTreeIndex<AsciiBuffer, Buffer>(tx.getPageFile(), pageId);
+ map.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
+ map.setValueMarshaller(Marshallers.BUFFER_MARSHALLER);
+ map.load(tx);
+ mapIndex.put(tx, key, pageId);
+ mapCache.put(key, map);
+ }
+ }
+
+ public final void mapRemove(AsciiBuffer key, Transaction tx) throws IOException {
+ BTreeIndex<AsciiBuffer, Buffer> map = mapCache.remove(key);
+ if (map != null) {
+ map.clear(tx);
+ map.unload(tx);
+ mapIndex.remove(tx, key);
+ }
+ }
+
+ public final void mapAddEntry(AsciiBuffer name, AsciiBuffer key, Buffer value, Transaction tx) throws IOException {
+ BTreeIndex<AsciiBuffer, Buffer> map = mapCache.get(name);
+ if (map == null) {
+ mapAdd(name, tx);
+ map = mapCache.get(name);
+ }
+
+ map.put(tx, key, value);
+
+ }
+
+ public final void mapRemoveEntry(AsciiBuffer name, AsciiBuffer key, Transaction tx) throws IOException, KeyNotFoundException {
+ BTreeIndex<AsciiBuffer, Buffer> map = mapCache.get(name);
+ if (map == null) {
+ throw new KeyNotFoundException(name.toString());
+ }
+ map.remove(tx, key);
+ }
+
+ public final Buffer mapGetEntry(AsciiBuffer name, AsciiBuffer key, Transaction tx) throws IOException, KeyNotFoundException {
+ BTreeIndex<AsciiBuffer, Buffer> map = mapCache.get(name);
+ if (map == null) {
+ throw new KeyNotFoundException(name.toString());
+ }
+ return map.get(tx, key);
+ }
+
+ public final Iterator<AsciiBuffer> mapList(AsciiBuffer first, int count, Transaction tx) {
+ LinkedList<AsciiBuffer> results = new LinkedList<AsciiBuffer>();
+
+ Collection<AsciiBuffer> values = (first == null ? mapCache.keySet() : mapCache.tailMap(first).keySet());
+ for (AsciiBuffer key : values) {
+ results.add(key);
+ }
+
+ return results.iterator();
+ }
+
+ public final Iterator<AsciiBuffer> mapListKeys(AsciiBuffer name, AsciiBuffer first, int count, Transaction tx) throws IOException, KeyNotFoundException {
+ BTreeIndex<AsciiBuffer, Buffer> map = mapCache.get(name);
+ if (map == null) {
+ throw new KeyNotFoundException(name.toString());
+ }
+
+ final LinkedList<AsciiBuffer> results = new LinkedList<AsciiBuffer>();
+
+ if (first != null && count > 0) {
+ map.visit(tx, new BTreeVisitor.GTEVisitor<AsciiBuffer, Buffer>(first, count) {
+
+ @Override
+ protected void matched(AsciiBuffer key, Buffer value) {
+ results.add(key);
+ }
+ });
+ } else {
+ Iterator<Entry<AsciiBuffer, Buffer>> iterator = map.iterator(tx);
+ while (iterator.hasNext()) {
+ Entry<AsciiBuffer, Buffer> e = iterator.next();
+ results.add(e.getKey());
+ }
+ }
+
+ return results.iterator();
+ }
+
+ // /////////////////////////////////////////////////////////////////
+ // Map Methods.
+ // /////////////////////////////////////////////////////////////////
+
public long getPageId() {
return pageId;
}
@@ -705,6 +832,10 @@
}
}
}
+
+ public boolean isSatiated() {
+ return !gcCandidateSet.isEmpty();
+ }
});
}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Sun Jun 28 19:37:06 2009
@@ -522,7 +522,7 @@
private final FlowController<MessageDelivery> controller;
private final WindowLimiter<MessageDelivery> limiter;
- private HashMap<MessageId, SubscriptionDeliveryCallback> pendingMessages = new HashMap<MessageId, SubscriptionDeliveryCallback>();
+ private HashMap<MessageId, SubscriptionDelivery<MessageDelivery>> pendingMessages = new HashMap<MessageId, SubscriptionDelivery<MessageDelivery>>();
private LinkedList<MessageId> pendingMessageIds = new LinkedList<MessageId>();
private BrokerSubscription brokerSubscription;
private int borrowedLimterCredits;
@@ -557,7 +557,7 @@
brokerSubscription.connect(this);
}
- public boolean offer(final MessageDelivery message, ISourceController<?> source, SubscriptionDeliveryCallback callback) {
+ public boolean offer(final MessageDelivery message, ISourceController<?> source, SubscriptionDelivery<MessageDelivery> callback) {
if (!controller.offer(message, source)) {
return false;
} else {
@@ -566,12 +566,12 @@
}
}
- public void add(final MessageDelivery message, ISourceController<?> source, SubscriptionDeliveryCallback callback) {
+ public void add(final MessageDelivery message, ISourceController<?> source, SubscriptionDelivery<MessageDelivery> callback) {
controller.add(message, source);
sendInternal(message, controller, callback);
}
- private void sendInternal(final MessageDelivery message, ISourceController<?> controller, SubscriptionDeliveryCallback callback) {
+ private void sendInternal(final MessageDelivery message, ISourceController<?> controller, SubscriptionDelivery<MessageDelivery> callback) {
Message msg = message.asType(Message.class);
MessageDispatch md = new MessageDispatch();
md.setConsumerId(info.getConsumerId());
@@ -606,13 +606,13 @@
borrowedLimterCredits += flowCredit;
limiter.onProtocolCredit(flowCredit);
} else if(info.isStandardAck()) {
- LinkedList<SubscriptionDeliveryCallback> acked = new LinkedList<SubscriptionDeliveryCallback>();
+ LinkedList<SubscriptionDelivery<MessageDelivery>> acked = new LinkedList<SubscriptionDelivery<MessageDelivery>>();
synchronized (this) {
MessageId id = info.getLastMessageId();
if (isDurable() || isQueueReceiver()) {
while (!pendingMessageIds.isEmpty()) {
MessageId pendingId = pendingMessageIds.getFirst();
- SubscriptionDeliveryCallback callback = pendingMessages.remove(pendingId);
+ SubscriptionDelivery<MessageDelivery> callback = pendingMessages.remove(pendingId);
acked.add(callback);
pendingMessageIds.removeFirst();
if (pendingId.equals(id)) {
@@ -639,7 +639,7 @@
// Delete outside of synchronization on queue to avoid contention
// with enqueueing threads.
- for (SubscriptionDeliveryCallback callback : acked) {
+ for (SubscriptionDelivery<MessageDelivery> callback : acked) {
callback.acknowledge();
}
}
@@ -818,14 +818,14 @@
brokerSubscription.disconnect(this);
if (isDurable() || isQueueReceiver()) {
- LinkedList<SubscriptionDeliveryCallback> unacquired = null;
+ LinkedList<SubscriptionDelivery<MessageDelivery>> unacquired = null;
synchronized (this) {
- unacquired = new LinkedList<SubscriptionDeliveryCallback>();
+ unacquired = new LinkedList<SubscriptionDelivery<MessageDelivery>>();
while (!pendingMessageIds.isEmpty()) {
MessageId pendingId = pendingMessageIds.getLast();
- SubscriptionDeliveryCallback callback = pendingMessages.remove(pendingId);
+ SubscriptionDelivery<MessageDelivery> callback = pendingMessages.remove(pendingId);
unacquired.add(callback);
pendingMessageIds.removeLast();
}
@@ -835,7 +835,7 @@
if (unacquired != null) {
// Delete outside of synchronization on queue to avoid contention
// with enqueueing threads.
- for (SubscriptionDeliveryCallback callback : unacquired) {
+ for (SubscriptionDelivery<MessageDelivery> callback : unacquired) {
callback.unacquire(controller);
}
}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Sun Jun 28 19:37:06 2009
@@ -58,6 +58,7 @@
import org.apache.activemq.queue.QueueDispatchTarget;
import org.apache.activemq.queue.SingleFlowRelay;
import org.apache.activemq.queue.Subscription;
+import org.apache.activemq.queue.Subscription.SubscriptionDelivery;
import junit.framework.TestCase;
@@ -456,7 +457,7 @@
* org.apache.activemq.flow.ISourceController,
* org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback)
*/
- public void add(MessageDelivery element, ISourceController<?> source, SubscriptionDeliveryCallback callback) {
+ public void add(MessageDelivery element, ISourceController<?> source, SubscriptionDelivery<MessageDelivery> callback) {
controller.add(element, source);
addInternal(element, source, callback);
}
@@ -468,7 +469,7 @@
* org.apache.activemq.flow.ISourceController,
* org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback)
*/
- public boolean offer(MessageDelivery element, ISourceController<?> source, SubscriptionDeliveryCallback callback) {
+ public boolean offer(MessageDelivery element, ISourceController<?> source, SubscriptionDelivery<MessageDelivery> callback) {
if (controller.offer(element, source)) {
addInternal(element, source, callback);
return true;
@@ -481,7 +482,7 @@
* @param source
* @param callback
*/
- private void addInternal(MessageDelivery element, ISourceController<?> source, SubscriptionDeliveryCallback callback) {
+ private void addInternal(MessageDelivery element, ISourceController<?> source, SubscriptionDelivery<MessageDelivery> callback) {
rate.increment();
synchronized (this) {
controller.elementDispatched(element);
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java Sun Jun 28 19:37:06 2009
@@ -28,7 +28,7 @@
import org.apache.activemq.flow.FlowController;
import org.apache.activemq.flow.IFlowController;
import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback;
+import org.apache.activemq.queue.Subscription.SubscriptionDelivery;
import org.apache.activemq.util.Comparators;
import org.apache.activemq.util.Mapper;
import org.apache.activemq.util.SortedLinkedList;
@@ -727,7 +727,7 @@
}
}
- static class QueueElement<V> extends SortedLinkedListNode<QueueElement<V>> implements SubscriptionDeliveryCallback, SaveableQueueElement<V> {
+ static class QueueElement<V> extends SortedLinkedListNode<QueueElement<V>> implements SubscriptionDelivery<V>, SaveableQueueElement<V> {
final long sequence;
final long restoreBlock;
@@ -1157,6 +1157,13 @@
return "QueueElement " + sequence + " loaded: " + loaded + " elem loaded: " + !isPagedOut() + " owner: " + owner;
}
+ /* (non-Javadoc)
+ * @see org.apache.activemq.queue.Subscription.SubscriptionDelivery#getSourceQueueRemovalKey()
+ */
+ public long getSourceQueueRemovalKey() {
+ return sequence;
+ }
+
}
private class Expirator {
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java Sun Jun 28 19:37:06 2009
@@ -26,11 +26,10 @@
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.queue.CursoredQueue.Cursor;
import org.apache.activemq.queue.CursoredQueue.QueueElement;
-import org.apache.activemq.queue.QueueStore.PersistentQueue;
-import org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback;
+import org.apache.activemq.queue.Subscription.SubscriptionDelivery;
import org.apache.activemq.util.Mapper;
-public class ExclusivePersistentQueue<K, E> extends AbstractFlowQueue<E> implements PersistentQueue<K, E> {
+public class ExclusivePersistentQueue<K, E> extends AbstractFlowQueue<E> implements IQueue<K, E> {
private CursoredQueue<E> queue;
private final FlowController<E> controller;
private final IFlowSizeLimiter<E> limiter;
@@ -285,7 +284,7 @@
QueueElement<E> qe = cursor.getNext();
if (qe != null) {
// If the sub doesn't remove on dispatch set an ack listener:
- SubscriptionDeliveryCallback callback = subscription.isRemoveOnDispatch(qe.elem) ? null : qe;
+ SubscriptionDelivery<E> callback = subscription.isRemoveOnDispatch(qe.elem) ? null : qe;
// See if the sink has room:
qe.setAcquired(subscription);
@@ -383,7 +382,7 @@
/**
* @return The count of the elements in this queue or -1 if not yet known.
*/
- public synchronized long getEnqueuedCount() {
+ public synchronized int getEnqueuedCount() {
if (!initialized) {
return -1;
}
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java Sun Jun 28 19:37:06 2009
@@ -18,7 +18,6 @@
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.FlowController;
-import org.apache.activemq.flow.ISinkController;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.flow.PriorityFlowController;
import org.apache.activemq.flow.PrioritySizeLimiter;
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java Sun Jun 28 19:37:06 2009
@@ -21,7 +21,6 @@
import org.apache.activemq.flow.Flow;
import org.apache.activemq.flow.FlowController;
import org.apache.activemq.flow.IFlowLimiter;
-import org.apache.activemq.flow.ISinkController;
import org.apache.activemq.flow.ISourceController;
public class ExclusiveQueue<E> extends AbstractFlowQueue<E> {
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java Sun Jun 28 19:37:06 2009
@@ -17,7 +17,6 @@
package org.apache.activemq.queue;
import org.apache.activemq.dispatch.IDispatcher;
-import org.apache.activemq.flow.IFlowSink;
import org.apache.activemq.queue.QueueStore.PersistentQueue;
import org.apache.activemq.util.Mapper;
@@ -61,7 +60,13 @@
* The base priority for the queue
*/
public void setDispatchPriority(int priority);
-
+
+ /**
+ * Removes the element specified by the given key from the queue:
+ * @param key The key.
+ public void acknowledge(K key);
+ */
+
/**
* Starts the queue.
*/
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java Sun Jun 28 19:37:06 2009
@@ -18,6 +18,8 @@
import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
public interface QueueStore<K, V> {
@@ -65,7 +67,7 @@
public QueueDescriptor getDescriptor();
}
-
+
/**
* Loads a series of elements for the specified queue. The loaded messages
* are given to the provided {@link MessageRestoreListener}.
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java Sun Jun 28 19:37:06 2009
@@ -16,8 +16,6 @@
*/
package org.apache.activemq.queue;
-
-
public interface SaveableQueueElement<V> {
/**
@@ -48,5 +46,4 @@
* Called when the element has been saved.
*/
public void notifySave();
-
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java Sun Jun 28 19:37:06 2009
@@ -30,7 +30,7 @@
import org.apache.activemq.queue.CursoredQueue.Cursor;
import org.apache.activemq.queue.CursoredQueue.CursorReadyListener;
import org.apache.activemq.queue.CursoredQueue.QueueElement;
-import org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback;
+import org.apache.activemq.queue.Subscription.SubscriptionDelivery;
import org.apache.activemq.util.Mapper;
import org.apache.kahadb.util.LinkedNode;
import org.apache.kahadb.util.LinkedNodeList;
@@ -698,7 +698,7 @@
}
// If the sub doesn't remove on dispatch pass it the callback
- SubscriptionDeliveryCallback callback = sub.isRemoveOnDispatch(qe.elem) ? null : qe;
+ SubscriptionDelivery<V> callback = sub.isRemoveOnDispatch(qe.elem) ? null : qe;
// If the sub is a browser don't pass it a callback since it does not need to
// delete messages
if( sub.isBrowser() ) {
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java Sun Jun 28 19:37:06 2009
@@ -21,12 +21,20 @@
public interface Subscription<E> {
- public interface SubscriptionDeliveryCallback {
-
+ public interface SubscriptionDelivery<E> {
+
+ /**
+ * @return The descriptor of the queue from which this element came.
+ */
+ public QueueDescriptor getQueueDescriptor();
+
+ /**
+ * @return a key that can be used to remove the message from the queue.
+ */
+ public long getSourceQueueRemovalKey();
+
/**
- * If {@link Subscription#isBrowser()} returns false this method
- * indicates that the Subscription is finished with the element and that
- * it can be removed from the queue.
+ * Acknowledges the delivery.
*/
public void acknowledge();
@@ -101,7 +109,7 @@
* The queue's controller, which must be used if the offered
* element exceeds the subscription's buffer limits.
* @param callback
- * The {@link SubscriptionDeliveryCallback} associated with the
+ * The {@link SubscriptionDelivery<E>} associated with the
* element
*
* @return true if the element was accepted false otherwise, if false is
@@ -109,7 +117,7 @@
* {@link ISourceController#onFlowBlock(ISinkController)} prior to
* returning false.
*/
- public boolean offer(E element, ISourceController<?> controller, SubscriptionDeliveryCallback callback);
+ public boolean offer(E element, ISourceController<?> controller, SubscriptionDelivery<E> callback);
/**
* Pushes an item to the subscription. If the subscription is not remove on
@@ -122,14 +130,14 @@
* The queue's controller, which must be used if the added
* element exceeds the subscription's buffer limits.
* @param callback
- * The {@link SubscriptionDeliveryCallback} associated with the
+ * The {@link SubscriptionDelivery<E>} associated with the
* element
* @return true if the element was accepted false otherwise, if false is
* returned the caller must have called
* {@link ISourceController#onFlowBlock(ISinkController)} prior to
* returning false.
*/
- public void add(E element, ISourceController<?> controller, SubscriptionDeliveryCallback callback);
+ public void add(E element, ISourceController<?> controller, SubscriptionDelivery<E> callback);
@Deprecated
public IFlowSink<E> getSink();
Modified: activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java Sun Jun 28 19:37:06 2009
@@ -134,11 +134,11 @@
return dt.hasSelector();
}
- public boolean offer(Message elem, ISourceController<?> controller, SubscriptionDeliveryCallback ackCallback) {
+ public boolean offer(Message elem, ISourceController<?> controller, SubscriptionDelivery<Message> ackCallback) {
return getSink().offer(elem, controller);
}
- public void add(Message elem, ISourceController<?> controller, SubscriptionDeliveryCallback ackCallback) {
+ public void add(Message elem, ISourceController<?> controller, SubscriptionDelivery<Message> ackCallback) {
getSink().add(elem, controller);
}
};
Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java Sun Jun 28 19:37:06 2009
@@ -71,7 +71,7 @@
}
return priority;
}
-
+
/*
* (non-Javadoc)
*
@@ -88,7 +88,6 @@
}
return tte;
}
-
public AsciiBuffer getMsgId() {
if (msgId == null) {
@@ -162,5 +161,4 @@
return new StompMessageEvaluationContext();
}
-
}
Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Sun Jun 28 19:37:06 2009
@@ -273,7 +273,7 @@
private Destination destination;
private String ackMode;
- private LinkedHashMap<AsciiBuffer, SubscriptionDeliveryCallback> sentMessageIds = new LinkedHashMap<AsciiBuffer, SubscriptionDeliveryCallback>();
+ private LinkedHashMap<AsciiBuffer, SubscriptionDelivery<MessageDelivery>> sentMessageIds = new LinkedHashMap<AsciiBuffer, SubscriptionDelivery<MessageDelivery>>();
private boolean durable;
@@ -388,19 +388,19 @@
/* (non-Javadoc)
* @see org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext#send(org.apache.activemq.broker.MessageDelivery, org.apache.activemq.flow.ISourceController, org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback)
*/
- public void add(MessageDelivery message, ISourceController<?> controller, SubscriptionDeliveryCallback callback) {
+ public void add(MessageDelivery message, ISourceController<?> controller, SubscriptionDelivery<MessageDelivery> callback) {
addInternal(message, controller, callback);
}
/* (non-Javadoc)
* @see org.apache.activemq.queue.Subscription#offer(java.lang.Object, org.apache.activemq.flow.ISourceController, org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback)
*/
- public boolean offer(MessageDelivery message, ISourceController<?> controller, SubscriptionDeliveryCallback callback) {
+ public boolean offer(MessageDelivery message, ISourceController<?> controller, SubscriptionDelivery<MessageDelivery> callback) {
//FIXME need a controller:
return false;
}
- private void addInternal(MessageDelivery message, ISourceController<?> controller, SubscriptionDeliveryCallback callback)
+ private void addInternal(MessageDelivery message, ISourceController<?> controller, SubscriptionDelivery<MessageDelivery> callback)
{
StompFrame frame = message.asType(StompFrame.class);
if (ackMode == StompSubscription.CLIENT_ACK || ackMode == StompSubscription.INDIVIDUAL_ACK) {
Modified: activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java Sun Jun 28 19:37:06 2009
@@ -374,6 +374,8 @@
AsciiBuffer messageId;
AsciiBuffer encoding;
int size;
+ Buffer buffer;
+ Long streamKey;
public int getSize() {
return size;
@@ -382,10 +384,7 @@
public void setSize(int size) {
this.size = size;
}
-
- Buffer buffer;
- Long streamKey;
-
+
public Long getKey() {
return key;
}
@@ -683,15 +682,15 @@
public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max);
- public boolean mapAdd(AsciiBuffer map);
+ public void mapAdd(AsciiBuffer map);
- public boolean mapRemove(AsciiBuffer map);
+ public void mapRemove(AsciiBuffer map);
- public Buffer mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException;
+ public void mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException;
public Buffer mapEntryGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
- public Buffer mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
+ public void mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
public Iterator<AsciiBuffer> mapEntryListKeys(AsciiBuffer map, AsciiBuffer first, int max) throws KeyNotFoundException;
Modified: activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Sun Jun 28 19:37:06 2009
@@ -415,12 +415,11 @@
public void removeSubscription(AsciiBuffer name) {
subscriptions.remove(name);
}
-
+
/**
- * @return A list of subscriptions
+ * @return A list of subscriptions
*/
- public Iterator<SubscriptionRecord> listSubscriptions()
- {
+ public Iterator<SubscriptionRecord> listSubscriptions() {
ArrayList<SubscriptionRecord> rc = new ArrayList<SubscriptionRecord>(subscriptions.size());
rc.addAll(subscriptions.values());
return rc.iterator();
@@ -543,16 +542,15 @@
// Simple Key Value related methods could come in handy to store misc
// data.
// ///////////////////////////////////////////////////////////////////////////////
- public boolean mapAdd(AsciiBuffer mapName) {
+ public void mapAdd(AsciiBuffer mapName) {
if (maps.containsKey(mapName)) {
- return false;
+ return;
}
maps.put(mapName, new TreeMap<AsciiBuffer, Buffer>());
- return true;
}
- public boolean mapRemove(AsciiBuffer mapName) {
- return maps.remove(mapName) != null;
+ public void mapRemove(AsciiBuffer mapName) {
+ maps.remove(mapName);
}
public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max) {
@@ -560,15 +558,22 @@
}
public Buffer mapEntryGet(AsciiBuffer mapName, AsciiBuffer key) throws KeyNotFoundException {
- return get(maps, mapName).get(key);
+ TreeMap<AsciiBuffer, Buffer> map = get(maps, mapName);
+ return map.get(key);
}
- public Buffer mapEntryRemove(AsciiBuffer mapName, AsciiBuffer key) throws KeyNotFoundException {
- return get(maps, mapName).remove(key);
+ public void mapEntryRemove(AsciiBuffer mapName, AsciiBuffer key) throws KeyNotFoundException {
+ TreeMap<AsciiBuffer, Buffer> map = get(maps, mapName);
+ map.remove(key);
}
- public Buffer mapEntryPut(AsciiBuffer mapName, AsciiBuffer key, Buffer value) throws KeyNotFoundException {
- return get(maps, mapName).put(key, value);
+ public void mapEntryPut(AsciiBuffer mapName, AsciiBuffer key, Buffer value) {
+ TreeMap<AsciiBuffer, Buffer> map = maps.get(mapName);
+ if (map == null) {
+ mapAdd(mapName);
+ map = maps.get(mapName);
+ }
+ map.put(key, value);
}
public Iterator<AsciiBuffer> mapEntryListKeys(AsciiBuffer mapName, AsciiBuffer first, int max) throws KeyNotFoundException {
Modified: activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java (original)
+++ activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java Sun Jun 28 19:37:06 2009
@@ -31,20 +31,19 @@
import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.util.VariableMarshaller;
-
/**
- * The BTreeNode class represents a node in the BTree object graph. It is stored in
- * one Page of a PageFile.
+ * The BTreeNode class represents a node in the BTree object graph. It is stored
+ * in one Page of a PageFile.
*/
-public final class BTreeNode<Key,Value> {
+public final class BTreeNode<Key, Value> {
// The index that this node is part of.
- private final BTreeIndex<Key,Value> index;
+ private final BTreeIndex<Key, Value> index;
// The parent node or null if this is the root node of the BTree
- private BTreeNode<Key,Value> parent;
+ private BTreeNode<Key, Value> parent;
// The page associated with this node
- private Page<BTreeNode<Key,Value>> page;
-
+ private Page<BTreeNode<Key, Value>> page;
+
// Order list of keys in the node
private Key[] keys;
// Values associated with the Keys. Null if this is a branch node.
@@ -53,7 +52,7 @@
private long[] children;
// The next leaf node after this one. Used for fast iteration of the entries.
private long next = -1;
-
+
private final class KeyValueEntry implements Map.Entry<Key, Value> {
private final Key key;
private final Value value;
@@ -78,39 +77,39 @@
}
private final class BTreeIterator implements Iterator<Map.Entry<Key, Value>> {
-
+
private final Transaction tx;
- BTreeNode<Key,Value> current;
+ BTreeNode<Key, Value> current;
int nextIndex;
- Map.Entry<Key,Value> nextEntry;
+ Map.Entry<Key, Value> nextEntry;
- private BTreeIterator(Transaction tx, BTreeNode<Key,Value> current, int nextIndex) {
+ private BTreeIterator(Transaction tx, BTreeNode<Key, Value> current, int nextIndex) {
this.tx = tx;
this.current = current;
- this.nextIndex=nextIndex;
+ this.nextIndex = nextIndex;
}
synchronized private void findNextPage() {
- if( nextEntry!=null ) {
+ if (nextEntry != null) {
return;
}
-
+
try {
- while( current!=null ) {
- if( nextIndex >= current.keys.length ) {
+ while (current != null) {
+ if (nextIndex >= current.keys.length) {
// we need to roll to the next leaf..
- if( current.next >= 0 ) {
+ if (current.next >= 0) {
current = index.loadNode(tx, current.next, null);
- nextIndex=0;
+ nextIndex = 0;
} else {
break;
}
- } else {
+ } else {
nextEntry = new KeyValueEntry(current.keys[nextIndex], current.values[nextIndex]);
nextIndex++;
break;
}
-
+
}
} catch (IOException e) {
}
@@ -118,14 +117,14 @@
public boolean hasNext() {
findNextPage();
- return nextEntry !=null;
+ return nextEntry != null;
}
public Entry<Key, Value> next() {
- findNextPage();
- if( nextEntry !=null ) {
+ findNextPage();
+ if (nextEntry != null) {
Entry<Key, Value> lastEntry = nextEntry;
- nextEntry=null;
+ nextEntry = null;
return lastEntry;
} else {
throw new NoSuchElementException();
@@ -138,37 +137,38 @@
}
/**
- * The Marshaller is used to store and load the data in the BTreeNode into a Page.
- *
+ * The Marshaller is used to store and load the data in the BTreeNode into a
+ * Page.
+ *
* @param <Key>
* @param <Value>
*/
- static public class Marshaller<Key,Value> extends VariableMarshaller<BTreeNode<Key,Value>> {
- private final BTreeIndex<Key,Value> index;
-
- public Marshaller(BTreeIndex<Key,Value> index) {
+ static public class Marshaller<Key, Value> extends VariableMarshaller<BTreeNode<Key, Value>> {
+ private final BTreeIndex<Key, Value> index;
+
+ public Marshaller(BTreeIndex<Key, Value> index) {
this.index = index;
}
- public void writePayload(BTreeNode<Key,Value> node, DataOutput os) throws IOException {
+ public void writePayload(BTreeNode<Key, Value> node, DataOutput os) throws IOException {
// Write the keys
- short count = (short)node.keys.length; // cast may truncate value...
- if( count != node.keys.length ) {
+ short count = (short) node.keys.length; // cast may truncate value...
+ if (count != node.keys.length) {
throw new IOException("Too many keys");
}
-
+
os.writeShort(count);
for (int i = 0; i < node.keys.length; i++) {
index.getKeyMarshaller().writePayload(node.keys[i], os);
}
-
- if( node.isBranch() ) {
+
+ if (node.isBranch()) {
// If this is a branch...
os.writeBoolean(true);
- for (int i = 0; i < count+1; i++) {
+ for (int i = 0; i < count + 1; i++) {
os.writeLong(node.children[i]);
}
-
+
} else {
// If this is a leaf
os.writeBoolean(false);
@@ -180,22 +180,22 @@
}
@SuppressWarnings("unchecked")
- public BTreeNode<Key,Value> readPayload(DataInput is) throws IOException {
- BTreeNode<Key,Value> node = new BTreeNode<Key,Value>(index);
+ public BTreeNode<Key, Value> readPayload(DataInput is) throws IOException {
+ BTreeNode<Key, Value> node = new BTreeNode<Key, Value>(index);
int count = is.readShort();
-
- node.keys = (Key[])new Object[count];
+
+ node.keys = (Key[]) new Object[count];
for (int i = 0; i < count; i++) {
node.keys[i] = index.getKeyMarshaller().readPayload(is);
}
-
- if( is.readBoolean() ) {
- node.children = new long[count+1];
- for (int i = 0; i < count+1; i++) {
+
+ if (is.readBoolean()) {
+ node.children = new long[count + 1];
+ for (int i = 0; i < count + 1; i++) {
node.children[i] = is.readLong();
}
} else {
- node.values = (Value[])new Object[count];
+ node.values = (Value[]) new Object[count];
for (int i = 0; i < count; i++) {
node.values[i] = index.getValueMarshaller().readPayload(is);
}
@@ -205,21 +205,21 @@
}
}
- public BTreeNode(BTreeIndex<Key,Value> index) {
+ public BTreeNode(BTreeIndex<Key, Value> index) {
this.index = index;
}
-
+
public void setEmpty() {
setLeafData(createKeyArray(0), createValueArray(0));
}
-
/**
* Internal (to the BTreeNode) method. Because this method is called only by
* BTreeNode itself, no synchronization done inside of this method.
- * @throws IOException
+ *
+ * @throws IOException
*/
- private BTreeNode<Key,Value> getChild(Transaction tx, int idx) throws IOException {
+ private BTreeNode<Key, Value> getChild(Transaction tx, int idx) throws IOException {
if (isBranch() && idx >= 0 && idx < children.length) {
BTreeNode<Key, Value> result = this.index.loadNode(tx, children[idx], this);
return result;
@@ -227,47 +227,47 @@
return null;
}
}
-
+
public Value remove(Transaction tx, Key key) throws IOException {
- if(isBranch()) {
+ if (isBranch()) {
int idx = Arrays.binarySearch(keys, key);
idx = idx < 0 ? -(idx + 1) : idx + 1;
BTreeNode<Key, Value> child = getChild(tx, idx);
- if( child.getPageId() == index.getPageId() ) {
+ if (child.getPageId() == index.getPageId()) {
throw new IOException("BTree corrupted: Cylce detected.");
}
Value rc = child.remove(tx, key);
-
+
// child node is now empty.. remove it from the branch node.
- if( child.keys.length == 0 ) {
-
+ if (child.keys.length == 0) {
+
// If the child node is a branch, promote
- if( child.isBranch() ) {
+ if (child.isBranch()) {
// This is cause branches are never really empty.. they just go down to 1 child..
children[idx] = child.children[0];
} else {
-
+
// The child was a leaf. Then we need to actually remove it from this branch node..
// We need to update the previous child's next pointer to skip over the child being removed....
- if( idx > 0 && children.length > 1) {
- BTreeNode<Key, Value> previousChild = getChild(tx, idx-1);
+ if (idx > 0 && children.length > 1) {
+ BTreeNode<Key, Value> previousChild = getChild(tx, idx - 1);
previousChild.next = child.next;
index.storeNode(tx, previousChild, true);
}
-
- if( idx < children.length-1 ) {
+
+ if (idx < children.length - 1) {
// Delete it and key to the right.
setBranchData(arrayDelete(keys, idx), arrayDelete(children, idx));
} else {
// It was the last child.. Then delete it and key to the left
- setBranchData(arrayDelete(keys, idx-1), arrayDelete(children, idx));
+ setBranchData(arrayDelete(keys, idx - 1), arrayDelete(children, idx));
}
-
+
// If we are the root node, and only have 1 child left. Then
// make the root be the leaf node.
- if( children.length == 1 && parent==null ) {
+ if (children.length == 1 && parent == null) {
child = getChild(tx, 0);
keys = child.keys;
children = child.children;
@@ -275,11 +275,11 @@
// free up the page..
tx.free(child.getPage());
}
-
+
}
index.storeNode(tx, this, true);
}
-
+
return rc;
} else {
int idx = Arrays.binarySearch(keys, key);
@@ -288,13 +288,13 @@
} else {
Value oldValue = values[idx];
setLeafData(arrayDelete(keys, idx), arrayDelete(values, idx));
-
- if( keys.length==0 && parent!=null) {
+
+ if (keys.length == 0 && parent != null) {
tx.free(getPage());
} else {
index.storeNode(tx, this, true);
}
-
+
return oldValue;
}
}
@@ -305,12 +305,12 @@
throw new IllegalArgumentException("Key cannot be null");
}
- if( isBranch() ) {
+ if (isBranch()) {
return getLeafNode(tx, this, key).put(tx, key, value);
} else {
int idx = Arrays.binarySearch(keys, key);
-
- Value oldValue=null;
+
+ Value oldValue = null;
if (idx >= 0) {
// Key was found... Overwrite
oldValue = values[idx];
@@ -321,14 +321,14 @@
idx = -(idx + 1);
setLeafData(arrayInsert(keys, key, idx), arrayInsert(values, value, idx));
}
-
+
try {
index.storeNode(tx, this, allowOverflow());
- } catch ( Transaction.PageOverflowIOException e ) {
+ } catch (Transaction.PageOverflowIOException e) {
// If we get an overflow
split(tx);
}
-
+
return oldValue;
}
}
@@ -341,7 +341,7 @@
try {
index.storeNode(tx, this, allowOverflow());
- } catch ( Transaction.PageOverflowIOException e ) {
+ } catch (Transaction.PageOverflowIOException e) {
split(tx);
}
@@ -353,17 +353,17 @@
private void split(Transaction tx) throws IOException {
Key[] leftKeys;
Key[] rightKeys;
- Value[] leftValues=null;
- Value[] rightValues=null;
- long[] leftChildren=null;
- long[] rightChildren=null;
+ Value[] leftValues = null;
+ Value[] rightValues = null;
+ long[] leftChildren = null;
+ long[] rightChildren = null;
Key separator;
int vc = keys.length;
int pivot = vc / 2;
// Split the node into two nodes
- if( isBranch() ) {
+ if (isBranch()) {
leftKeys = createKeyArray(pivot);
leftChildren = new long[leftKeys.length + 1];
@@ -377,13 +377,12 @@
// Is it a Simple Prefix BTree??
Prefixer<Key> prefixer = index.getPrefixer();
- if(prefixer!=null) {
+ if (prefixer != null) {
separator = prefixer.getSimplePrefix(leftKeys[leftKeys.length - 1], rightKeys[0]);
} else {
separator = keys[leftKeys.length];
}
-
-
+
} else {
leftKeys = createKeyArray(pivot);
@@ -404,12 +403,12 @@
// Promote the pivot to the parent branch
if (parent == null) {
-
+
// This can only happen if this is the root
- BTreeNode<Key,Value> rNode = this.index.createNode(tx, this);
- BTreeNode<Key,Value> lNode = this.index.createNode(tx, this);
+ BTreeNode<Key, Value> rNode = this.index.createNode(tx, this);
+ BTreeNode<Key, Value> lNode = this.index.createNode(tx, this);
- if( isBranch() ) {
+ if (isBranch()) {
rNode.setBranchData(rightKeys, rightChildren);
lNode.setBranchData(leftKeys, leftChildren);
} else {
@@ -419,17 +418,17 @@
}
Key[] v = createKeyArray(1);
- v[0]=separator;
+ v[0] = separator;
setBranchData(v, new long[] { lNode.getPageId(), rNode.getPageId() });
index.storeNode(tx, this, true);
index.storeNode(tx, rNode, true);
index.storeNode(tx, lNode, true);
-
+
} else {
- BTreeNode<Key,Value> rNode = this.index.createNode(tx, parent);
-
- if( isBranch() ) {
+ BTreeNode<Key, Value> rNode = this.index.createNode(tx, parent);
+
+ if (isBranch()) {
setBranchData(leftKeys, leftChildren);
rNode.setBranchData(rightKeys, rightChildren);
} else {
@@ -446,48 +445,47 @@
}
public void printStructure(Transaction tx, PrintWriter out, String prefix) throws IOException {
- if( prefix.length()>0 && parent == null ) {
+ if (prefix.length() > 0 && parent == null) {
throw new IllegalStateException("Cycle back to root node detected.");
}
-
- if( isBranch() ) {
- for(int i=0 ; i < children.length; i++) {
+
+ if (isBranch()) {
+ for (int i = 0; i < children.length; i++) {
BTreeNode<Key, Value> child = getChild(tx, i);
- if( i == children.length-1) {
- out.println(prefix+"\\- "+child.getPageId()+(child.isBranch()?" ("+child.children.length+")":""));
- child.printStructure(tx, out, prefix+" ");
+ if (i == children.length - 1) {
+ out.println(prefix + "\\- " + child.getPageId() + (child.isBranch() ? " (" + child.children.length + ")" : ""));
+ child.printStructure(tx, out, prefix + " ");
} else {
- out.println(prefix+"|- "+child.getPageId()+(child.isBranch()?" ("+child.children.length+")":"")+" : "+keys[i]);
- child.printStructure(tx, out, prefix+" ");
+ out.println(prefix + "|- " + child.getPageId() + (child.isBranch() ? " (" + child.children.length + ")" : "") + " : " + keys[i]);
+ child.printStructure(tx, out, prefix + " ");
}
}
}
}
-
-
+
public int getMinLeafDepth(Transaction tx, int depth) throws IOException {
depth++;
- if( isBranch() ) {
+ if (isBranch()) {
int min = Integer.MAX_VALUE;
- for(int i=0 ; i < children.length; i++) {
+ for (int i = 0; i < children.length; i++) {
min = Math.min(min, getChild(tx, i).getMinLeafDepth(tx, depth));
}
return min;
} else {
-// print(depth*2, "- "+page.getPageId());
+ // print(depth*2, "- "+page.getPageId());
return depth;
}
}
public int getMaxLeafDepth(Transaction tx, int depth) throws IOException {
depth++;
- if( isBranch() ) {
+ if (isBranch()) {
int v = 0;
- for(int i=0 ; i < children.length; i++) {
+ for (int i = 0; i < children.length; i++) {
v = Math.max(v, getChild(tx, i).getMaxLeafDepth(tx, depth));
}
depth = v;
- }
+ }
return depth;
}
@@ -495,7 +493,7 @@
if (key == null) {
throw new IllegalArgumentException("Key cannot be null");
}
- if( isBranch() ) {
+ if (isBranch()) {
return getLeafNode(tx, this, key).get(tx, key);
} else {
int idx = Arrays.binarySearch(keys, key);
@@ -506,22 +504,27 @@
}
}
}
-
+
public void visit(Transaction tx, BTreeVisitor<Key, Value> visitor) throws IOException {
if (visitor == null) {
throw new IllegalArgumentException("Visitor cannot be null");
}
- if( isBranch() ) {
- for(int i=0; i < this.children.length; i++) {
+
+ if (visitor.isSatiated()) {
+ return;
+ }
+
+ if (isBranch()) {
+ for (int i = 0; i < this.children.length; i++) {
Key key1 = null;
- if( i!=0 ) {
- key1 = keys[i-1];
+ if (i != 0) {
+ key1 = keys[i - 1];
}
Key key2 = null;
- if( i!=this.children.length-1 ) {
+ if (i != this.children.length - 1) {
key2 = keys[i];
}
- if( visitor.isInterestedInKeysBetween(key1, key2) ) {
+ if (visitor.isInterestedInKeysBetween(key1, key2)) {
BTreeNode<Key, Value> child = getChild(tx, i);
child.visit(tx, visitor);
}
@@ -530,45 +533,45 @@
visitor.visit(Arrays.asList(keys), Arrays.asList(values));
}
}
-
- public Map.Entry<Key,Value> getFirst(Transaction tx) throws IOException {
+
+ public Map.Entry<Key, Value> getFirst(Transaction tx) throws IOException {
BTreeNode<Key, Value> node = this;
- while( node .isBranch() ) {
+ while (node.isBranch()) {
node = node.getChild(tx, 0);
}
- if( node.values.length>0 ) {
+ if (node.values.length > 0) {
return new KeyValueEntry(node.keys[0], node.values[0]);
} else {
return null;
}
}
- public Map.Entry<Key,Value> getLast(Transaction tx) throws IOException {
+ public Map.Entry<Key, Value> getLast(Transaction tx) throws IOException {
BTreeNode<Key, Value> node = this;
- while( node.isBranch() ) {
- node = node.getChild(tx, node.children.length-1);
+ while (node.isBranch()) {
+ node = node.getChild(tx, node.children.length - 1);
}
- if( node.values.length>0 ) {
- int idx = node.values.length-1;
+ if (node.values.length > 0) {
+ int idx = node.values.length - 1;
return new KeyValueEntry(node.keys[idx], node.values[idx]);
} else {
return null;
}
}
-
- public BTreeNode<Key,Value> getFirstLeafNode(Transaction tx) throws IOException {
+
+ public BTreeNode<Key, Value> getFirstLeafNode(Transaction tx) throws IOException {
BTreeNode<Key, Value> node = this;
- while( node .isBranch() ) {
+ while (node.isBranch()) {
node = node.getChild(tx, 0);
}
return node;
}
-
- public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx, Key startKey) throws IOException {
+
+ public Iterator<Map.Entry<Key, Value>> iterator(final Transaction tx, Key startKey) throws IOException {
if (startKey == null) {
return iterator(tx);
}
- if( isBranch() ) {
+ if (isBranch()) {
return getLeafNode(tx, this, startKey).iterator(tx, startKey);
} else {
int idx = Arrays.binarySearch(keys, startKey);
@@ -579,12 +582,12 @@
}
}
- public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx) throws IOException {
+ public Iterator<Map.Entry<Key, Value>> iterator(final Transaction tx) throws IOException {
return new BTreeIterator(tx, getFirstLeafNode(tx), 0);
}
-
+
public void clear(Transaction tx) throws IOException {
- if( isBranch() ) {
+ if (isBranch()) {
for (int i = 0; i < children.length; i++) {
BTreeNode<Key, Value> node = index.loadNode(tx, children[i], this);
node.clear(tx);
@@ -592,27 +595,26 @@
}
}
// Reset the root node to be a leaf.
- if( parent == null ) {
+ if (parent == null) {
setLeafData(createKeyArray(0), createValueArray(0));
- next=-1;
+ next = -1;
index.storeNode(tx, this, true);
}
}
-
- private static <Key,Value> BTreeNode<Key, Value> getLeafNode(Transaction tx, final BTreeNode<Key, Value> node, Key key) throws IOException {
+ private static <Key, Value> BTreeNode<Key, Value> getLeafNode(Transaction tx, final BTreeNode<Key, Value> node, Key key) throws IOException {
BTreeNode<Key, Value> current = node;
- while( true ) {
- if( current.isBranch() ) {
+ while (true) {
+ if (current.isBranch()) {
int idx = Arrays.binarySearch(current.keys, key);
idx = idx < 0 ? -(idx + 1) : idx + 1;
- BTreeNode<Key, Value> child = current.getChild(tx, idx);
+ BTreeNode<Key, Value> child = current.getChild(tx, idx);
// A little cycle detection for sanity's sake
- if( child == node ) {
+ if (child == node) {
throw new IOException("BTree corrupted: Cylce detected.");
}
-
+
current = child;
} else {
break;
@@ -626,7 +628,7 @@
throw new IllegalArgumentException("Key cannot be null");
}
- if( isBranch() ) {
+ if (isBranch()) {
return getLeafNode(tx, this, key).contains(tx, key);
} else {
int idx = Arrays.binarySearch(keys, key);
@@ -641,20 +643,18 @@
///////////////////////////////////////////////////////////////////
// Implementation methods
///////////////////////////////////////////////////////////////////
-
private boolean allowOverflow() {
// Only allow page overflow if there are <= 3 keys in the node. Otherwise a split will occur on overflow
- return this.keys.length<=3;
+ return this.keys.length <= 3;
}
-
private void setLeafData(Key[] keys, Value[] values) {
this.keys = keys;
this.values = values;
this.children = null;
}
-
+
private void setBranchData(Key[] keys, long[] nodeIds) {
this.keys = keys;
this.children = nodeIds;
@@ -663,17 +663,17 @@
@SuppressWarnings("unchecked")
private Key[] createKeyArray(int size) {
- return (Key[])new Object[size];
+ return (Key[]) new Object[size];
}
@SuppressWarnings("unchecked")
private Value[] createValueArray(int size) {
- return (Value[])new Object[size];
+ return (Value[]) new Object[size];
}
-
+
@SuppressWarnings("unchecked")
static private <T> T[] arrayDelete(T[] vals, int idx) {
- T[] newVals = (T[])new Object[vals.length - 1];
+ T[] newVals = (T[]) new Object[vals.length - 1];
if (idx > 0) {
System.arraycopy(vals, 0, newVals, 0, idx);
}
@@ -682,7 +682,7 @@
}
return newVals;
}
-
+
static private long[] arrayDelete(long[] vals, int idx) {
long[] newVals = new long[vals.length - 1];
if (idx > 0) {
@@ -696,7 +696,7 @@
@SuppressWarnings("unchecked")
static private <T> T[] arrayInsert(T[] vals, T val, int idx) {
- T[] newVals = (T[])new Object[vals.length + 1];
+ T[] newVals = (T[]) new Object[vals.length + 1];
if (idx > 0) {
System.arraycopy(vals, 0, newVals, 0, idx);
}
@@ -707,9 +707,8 @@
return newVals;
}
-
static private long[] arrayInsert(long[] vals, long val, int idx) {
-
+
long[] newVals = new long[vals.length + 1];
if (idx > 0) {
System.arraycopy(vals, 0, newVals, 0, idx);
@@ -725,7 +724,7 @@
// Property Accessors
///////////////////////////////////////////////////////////////////
private boolean isBranch() {
- return children!=null;
+ return children != null;
}
public long getPageId() {
@@ -755,12 +754,10 @@
public void setNext(long next) {
this.next = next;
}
-
+
@Override
public String toString() {
- return "[BTreeNode "+(isBranch()?"branch":"leaf")+": "+Arrays.asList(keys)+"]";
+ return "[BTreeNode " + (isBranch() ? "branch" : "leaf") + ": " + Arrays.asList(keys) + "]";
}
}
-
-
Modified: activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java (original)
+++ activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java Sun Jun 28 19:37:06 2009
@@ -24,17 +24,21 @@
* @param <Key>
* @param <Value>
*/
-public interface BTreeVisitor<Key,Value> {
-
+public interface BTreeVisitor<Key, Value> {
+
/**
- * Do you want to visit the range of BTree entries between the first and and second key?
+ * Do you want to visit the range of BTree entries between the first and and
+ * second key?
*
- * @param first if null indicates the range of values before the second key.
- * @param second if null indicates the range of values after the first key.
- * @return true if you want to visit the values between the first and second key.
+ * @param first
+ * if null indicates the range of values before the second key.
+ * @param second
+ * if null indicates the range of values after the first key.
+ * @return true if you want to visit the values between the first and second
+ * key.
*/
boolean isInterestedInKeysBetween(Key first, Key second);
-
+
/**
* The keys and values of a BTree leaf node.
*
@@ -42,97 +46,155 @@
* @param values
*/
void visit(List<Key> keys, List<Value> values);
-
-
- abstract class GTVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
- final private Key value;
-
- public GTVisitor(Key value) {
- this.value = value;
- }
-
- public boolean isInterestedInKeysBetween(Key first, Key second) {
- return second==null || second.compareTo(value)>0;
- }
-
- public void visit(List<Key> keys, List<Value> values) {
- for( int i=0; i < keys.size(); i++) {
- Key key = keys.get(i);
- if( key.compareTo(value)>0 ) {
- matched(key, values.get(i));
- }
- }
- }
- abstract protected void matched(Key key, Value value);
+ /**
+ * If the visitor wishes to
+ *
+ * @return
+ */
+ boolean isSatiated();
+
+ abstract class GTVisitor<Key extends Comparable<? super Key>, Value> implements BTreeVisitor<Key, Value> {
+ final private Key value;
+ int matches = Integer.MAX_VALUE;
+ boolean limited;
+
+ public GTVisitor(Key value) {
+ this.value = value;
+ }
+
+ public GTVisitor(Key value, int limit) {
+ this.value = value;
+ limited = true;
+ matches = limit;
+ }
+
+ public boolean isInterestedInKeysBetween(Key first, Key second) {
+ return second == null || second.compareTo(value) > 0;
+ }
+
+ public void visit(List<Key> keys, List<Value> values) {
+ for (int i = 0; i < keys.size() && !isSatiated(); i++) {
+ Key key = keys.get(i);
+ if (key.compareTo(value) > 0) {
+ matched(key, values.get(i));
+ if (limited) matches--;
+ }
+ }
+ }
+
+ public boolean isSatiated() {
+ return limited && matches <= 0;
+ }
+
+ abstract protected void matched(Key key, Value value);
}
-
- abstract class GTEVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
- final private Key value;
-
- public GTEVisitor(Key value) {
- this.value = value;
- }
-
- public boolean isInterestedInKeysBetween(Key first, Key second) {
- return second==null || second.compareTo(value)>=0;
- }
-
- public void visit(List<Key> keys, List<Value> values) {
- for( int i=0; i < keys.size(); i++) {
- Key key = keys.get(i);
- if( key.compareTo(value)>=0 ) {
- matched(key, values.get(i));
- }
- }
- }
- abstract protected void matched(Key key, Value value);
+ abstract class GTEVisitor<Key extends Comparable<? super Key>, Value> implements BTreeVisitor<Key, Value> {
+ final private Key value;
+ int matches = Integer.MAX_VALUE;
+ boolean limited;
+
+ public GTEVisitor(Key value) {
+ this.value = value;
+ }
+
+ public GTEVisitor(Key value, int limit) {
+ this.value = value;
+ limited = true;
+ matches = limit;
+ }
+
+ public boolean isInterestedInKeysBetween(Key first, Key second) {
+ return second == null || second.compareTo(value) >= 0;
+ }
+
+ public void visit(List<Key> keys, List<Value> values) {
+ for (int i = 0; i < keys.size() && !isSatiated(); i++) {
+ Key key = keys.get(i);
+ if (key.compareTo(value) >= 0) {
+ matched(key, values.get(i));
+ if (limited) matches--;
+ }
+ }
+ }
+
+ public boolean isSatiated() {
+ return limited && matches <= 0;
+ }
+
+ abstract protected void matched(Key key, Value value);
}
-
- abstract class LTVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
- final private Key value;
-
- public LTVisitor(Key value) {
- this.value = value;
- }
-
- public boolean isInterestedInKeysBetween(Key first, Key second) {
- return first==null || first.compareTo(value)<0;
- }
-
- public void visit(List<Key> keys, List<Value> values) {
- for( int i=0; i < keys.size(); i++) {
- Key key = keys.get(i);
- if( key.compareTo(value)<0 ) {
- matched(key, values.get(i));
- }
- }
- }
- abstract protected void matched(Key key, Value value);
+ abstract class LTVisitor<Key extends Comparable<? super Key>, Value> implements BTreeVisitor<Key, Value> {
+ final private Key value;
+ int matches = Integer.MAX_VALUE;
+ boolean limited;
+
+ public LTVisitor(Key value) {
+ this.value = value;
+ }
+
+ public LTVisitor(Key value, int limit) {
+ this.value = value;
+ limited = true;
+ matches = limit;
+ }
+
+ public boolean isInterestedInKeysBetween(Key first, Key second) {
+ return first == null || first.compareTo(value) < 0;
+ }
+
+ public void visit(List<Key> keys, List<Value> values) {
+ for (int i = 0; i < keys.size() && !isSatiated(); i++) {
+ Key key = keys.get(i);
+ if (key.compareTo(value) < 0) {
+ matched(key, values.get(i));
+ if (limited) matches--;
+ }
+ }
+ }
+
+ public boolean isSatiated() {
+ return limited && matches <= 0;
+ }
+
+ abstract protected void matched(Key key, Value value);
}
-
- abstract class LTEVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
- final private Key value;
-
- public LTEVisitor(Key value) {
- this.value = value;
- }
-
- public boolean isInterestedInKeysBetween(Key first, Key second) {
- return first==null || first.compareTo(value)<=0;
- }
-
- public void visit(List<Key> keys, List<Value> values) {
- for( int i=0; i < keys.size(); i++) {
- Key key = keys.get(i);
- if( key.compareTo(value)<=0 ) {
- matched(key, values.get(i));
- }
- }
- }
- abstract protected void matched(Key key, Value value);
+ abstract class LTEVisitor<Key extends Comparable<? super Key>, Value> implements BTreeVisitor<Key, Value> {
+ final private Key value;
+ int matches = Integer.MAX_VALUE;
+ boolean limited;
+
+ public LTEVisitor(Key value) {
+ this.value = value;
+ }
+
+ public LTEVisitor(Key value, int limit) {
+ this.value = value;
+ limited = true;
+ matches = limit;
+ }
+
+ public boolean isInterestedInKeysBetween(Key first, Key second) {
+ return first == null || first.compareTo(value) <= 0;
+ }
+
+ public void visit(List<Key> keys, List<Value> values) {
+ for (int i = 0; i < keys.size() && !isSatiated(); i++) {
+ Key key = keys.get(i);
+ if (key.compareTo(value) <= 0) {
+ matched(key, values.get(i));
+ if (limited) matches--;
+ }
+ }
+ }
+
+ public boolean isSatiated() {
+ return limited && matches <= 0;
+ }
+
+ abstract protected void matched(Key key, Value value);
}
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java (original)
+++ activemq/sandbox/activemq-flow/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java Sun Jun 28 19:37:06 2009
@@ -167,6 +167,9 @@
}
public void visit(List<String> keys, List<Long> values) {
}
+ public boolean isSatiated() {
+ return false;
+ }
});