You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/28 21:37:07 UTC

svn commit: r789143 [2/2] - in /activemq/sandbox/activemq-flow: activemq-broker/ activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/ activemq-openwire/src/main/java/org/a...

Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Sun Jun 28 19:37:06 2009
@@ -31,7 +31,12 @@
 
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.Store.DuplicateKeyException;
+import org.apache.activemq.broker.store.Store.KeyNotFoundException;
 import org.apache.activemq.broker.store.Store.SubscriptionRecord;
+import org.apache.activemq.broker.store.kahadb.Data.MapAdd;
+import org.apache.activemq.broker.store.kahadb.Data.MapEntryPut;
+import org.apache.activemq.broker.store.kahadb.Data.MapEntryRemove;
+import org.apache.activemq.broker.store.kahadb.Data.MapRemove;
 import org.apache.activemq.broker.store.kahadb.Data.MessageAdd;
 import org.apache.activemq.broker.store.kahadb.Data.QueueAdd;
 import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage;
@@ -41,6 +46,10 @@
 import org.apache.activemq.broker.store.kahadb.Data.SubscriptionRemove;
 import org.apache.activemq.broker.store.kahadb.Data.Trace;
 import org.apache.activemq.broker.store.kahadb.Data.Type;
+import org.apache.activemq.broker.store.kahadb.Data.MapAdd.MapAddBean;
+import org.apache.activemq.broker.store.kahadb.Data.MapEntryPut.MapEntryPutBean;
+import org.apache.activemq.broker.store.kahadb.Data.MapEntryRemove.MapEntryRemoveBean;
+import org.apache.activemq.broker.store.kahadb.Data.MapRemove.MapRemoveBean;
 import org.apache.activemq.broker.store.kahadb.Data.MessageAdd.MessageAddBean;
 import org.apache.activemq.broker.store.kahadb.Data.QueueAdd.QueueAddBean;
 import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage.QueueAddMessageBean;
@@ -649,9 +658,25 @@
         case TRANSACTION_COMMIT:
         case TRANSACTION_ROLLBACK:
         case MAP_ADD:
+            rootEntity.mapAdd(((MapAdd) command).getMapName(), tx);
+            break;
         case MAP_REMOVE:
-        case MAP_ENTRY_PUT:
-        case MAP_ENTRY_REMOVE:
+            rootEntity.mapRemove(((MapRemove) command).getMapName(), tx);
+            break;
+        case MAP_ENTRY_PUT: {
+            MapEntryPut p = (MapEntryPut) command;
+            rootEntity.mapAddEntry(p.getMapName(), p.getKey(), p.getValue(), tx);
+            break;
+        }
+        case MAP_ENTRY_REMOVE: {
+            MapEntryRemove p = (MapEntryRemove) command;
+            try {
+                rootEntity.mapRemoveEntry(p.getMapName(), p.getKey(), tx);
+            } catch (KeyNotFoundException e) {
+                //yay, removed.
+            }
+            break;
+        }
         case STREAM_OPEN:
         case STREAM_WRITE:
         case STREAM_CLOSE:
@@ -982,7 +1007,7 @@
             update.setName(record.getName());
             update.setDestination(record.getDestination());
             update.setDurable(record.getIsDurable());
-            
+
             if (record.getAttachment() != null) {
                 update.setAttachment(record.getAttachment());
             }
@@ -1019,32 +1044,54 @@
         // /////////////////////////////////////////////////////////////
         // Map related methods.
         // /////////////////////////////////////////////////////////////
-        public boolean mapAdd(AsciiBuffer map) {
-            return false;
+        public void mapAdd(AsciiBuffer map) {
+            MapAddBean update = new MapAddBean();
+            update.setMapName(map);
+            addUpdate(update);
         }
 
-        public boolean mapRemove(AsciiBuffer map) {
-            return false;
+        public void mapRemove(AsciiBuffer map) {
+            MapRemoveBean update = new MapRemoveBean();
+            update.setMapName(map);
+            addUpdate(update);
         }
 
         public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max) {
-            return null;
+            storeAtomic();
+            return rootEntity.mapList(first, max, tx);
         }
 
-        public Buffer mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException {
-            return null;
+        public void mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) {
+            MapEntryPutBean update = new MapEntryPutBean();
+            update.setMapName(map);
+            update.setKey(key);
+            update.setValue(value);
+            addUpdate(update);
         }
 
         public Buffer mapEntryGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
-            return null;
+            storeAtomic();
+            try {
+                return rootEntity.mapGetEntry(map, key, tx);
+            } catch (IOException e) {
+                throw new FatalStoreException(e);
+            }
         }
 
-        public Buffer mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
-            return null;
+        public void mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
+            MapEntryRemoveBean update = new MapEntryRemoveBean();
+            update.setMapName(map);
+            update.setKey(key);
+            addUpdate(update);
         }
 
         public Iterator<AsciiBuffer> mapEntryListKeys(AsciiBuffer map, AsciiBuffer first, int max) throws KeyNotFoundException {
-            return null;
+            storeAtomic();
+            try {
+                return rootEntity.mapListKeys(map, first, max, tx);
+            } catch (IOException e) {
+                throw new FatalStoreException(e);
+            }
         }
 
         // /////////////////////////////////////////////////////////////

Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java Sun Jun 28 19:37:06 2009
@@ -49,7 +49,6 @@
 import org.apache.kahadb.util.Marshaller;
 import org.apache.kahadb.util.VariableMarshaller;
 
-
 public class RootEntity {
 
     //TODO remove this one performance testing is complete. 
@@ -69,6 +68,7 @@
             rc.destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(is.readLong());
             rc.messageRefsIndex = new BTreeIndex<Long, Long>(is.readLong());
             rc.subscriptionIndex = new BTreeIndex<AsciiBuffer, Buffer>(is.readLong());
+            rc.mapIndex = new BTreeIndex<AsciiBuffer, Long>(is.readLong());
             if (is.readBoolean()) {
                 rc.lastUpdate = Marshallers.LOCATION_MARSHALLER.readPayload(is);
             } else {
@@ -87,6 +87,7 @@
             os.writeLong(object.destinationIndex.getPageId());
             os.writeLong(object.messageRefsIndex.getPageId());
             os.writeLong(object.subscriptionIndex.getPageId());
+            os.writeLong(object.mapIndex.getPageId());
             if (object.lastUpdate != null) {
                 os.writeBoolean(true);
                 Marshallers.LOCATION_MARSHALLER.writePayload(object.lastUpdate, os);
@@ -119,6 +120,10 @@
     // Subscriptions
     private BTreeIndex<AsciiBuffer, Buffer> subscriptionIndex;
 
+    // Maps:
+    private BTreeIndex<AsciiBuffer, Long> mapIndex;
+    private TreeMap<AsciiBuffer, BTreeIndex<AsciiBuffer, Buffer>> mapCache;
+
     // /////////////////////////////////////////////////////////////////
     // Lifecycle Methods.
     // /////////////////////////////////////////////////////////////////
@@ -137,6 +142,8 @@
         destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(tx.getPageFile(), tx.allocate().getPageId());
         messageRefsIndex = new BTreeIndex<Long, Long>(tx.getPageFile(), tx.allocate().getPageId());
         subscriptionIndex = new BTreeIndex<AsciiBuffer, Buffer>(tx.getPageFile(), tx.allocate().getPageId());
+        mapIndex = new BTreeIndex<AsciiBuffer, Long>(tx.getPageFile(), tx.allocate().getPageId());
+
         page.set(this);
         tx.store(page, MARSHALLER, true);
     }
@@ -196,6 +203,23 @@
             ioe.initCause(e);
             throw ioe;
         }
+
+        //Load Maps:
+        mapIndex.setPageFile(tx.getPageFile());
+        mapIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
+        mapIndex.setValueMarshaller(LongMarshaller.INSTANCE);
+        mapIndex.load(tx);
+
+        //Load all of the maps and cache them:
+        for (Iterator<Entry<AsciiBuffer, Long>> iterator = mapIndex.iterator(tx); iterator.hasNext();) {
+            Entry<AsciiBuffer, Long> entry = iterator.next();
+            BTreeIndex<AsciiBuffer, Buffer> map = new BTreeIndex<AsciiBuffer, Buffer>(tx.getPageFile(), entry.getValue());
+            map.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
+            map.setValueMarshaller(Marshallers.BUFFER_MARSHALLER);
+            map.load(tx);
+            mapCache.put(entry.getKey(), map);
+        }
+
     }
 
     /**
@@ -349,15 +373,17 @@
     // /////////////////////////////////////////////////////////////////
 
     /**
-     * Returns a list of all of the stored subscriptions. 
-     * @param tx The transaction under which this is to be executed.
-     * @return a list of all of the stored subscriptions. 
-     * @throws IOException 
+     * Returns a list of all of the stored subscriptions.
+     * 
+     * @param tx
+     *            The transaction under which this is to be executed.
+     * @return a list of all of the stored subscriptions.
+     * @throws IOException
      */
     public Iterator<SubscriptionRecord> listSubsriptions(Transaction tx) throws IOException {
-        
+
         final LinkedList<SubscriptionRecord> rc = new LinkedList<SubscriptionRecord>();
-        
+
         subscriptionIndex.visit(tx, new BTreeVisitor<AsciiBuffer, Buffer>() {
             public boolean isInterestedInKeysBetween(AsciiBuffer first, AsciiBuffer second) {
                 return true;
@@ -372,8 +398,12 @@
                     }
                 }
             }
+
+            public boolean isSatiated() {
+                return false;
+            }
         });
-        
+
         return rc.iterator();
     }
 
@@ -406,7 +436,9 @@
 
     /**
      * Converts a Subscription buffer to a SubscriptionRecord.
-     * @param b The buffer
+     * 
+     * @param b
+     *            The buffer
      * @return The record.
      * @throws InvalidProtocolBufferException
      */
@@ -414,7 +446,7 @@
         if (b == null) {
             return null;
         }
-        
+
         SubscriptionRecord rc = null;
         if (b != null) {
             SubscriptionAddBuffer sab = SubscriptionAddBuffer.parseFramed(b);
@@ -508,6 +540,101 @@
         return result;
     }
 
+    // /////////////////////////////////////////////////////////////////
+    // Map Methods.
+    // /////////////////////////////////////////////////////////////////
+    public final void mapAdd(AsciiBuffer key, Transaction tx) throws IOException {
+        BTreeIndex<AsciiBuffer, Buffer> map = mapCache.get(key);
+
+        if (map == null) {
+            long pageId = tx.allocate().getPageId();
+            map = new BTreeIndex<AsciiBuffer, Buffer>(tx.getPageFile(), pageId);
+            map.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
+            map.setValueMarshaller(Marshallers.BUFFER_MARSHALLER);
+            map.load(tx);
+            mapIndex.put(tx, key, pageId);
+            mapCache.put(key, map);
+        }
+    }
+
+    public final void mapRemove(AsciiBuffer key, Transaction tx) throws IOException {
+        BTreeIndex<AsciiBuffer, Buffer> map = mapCache.remove(key);
+        if (map != null) {
+            map.clear(tx);
+            map.unload(tx);
+            mapIndex.remove(tx, key);
+        }
+    }
+
+    public final void mapAddEntry(AsciiBuffer name, AsciiBuffer key, Buffer value, Transaction tx) throws IOException {
+        BTreeIndex<AsciiBuffer, Buffer> map = mapCache.get(name);
+        if (map == null) {
+            mapAdd(name, tx);
+            map = mapCache.get(name);
+        }
+
+        map.put(tx, key, value);
+
+    }
+
+    public final void mapRemoveEntry(AsciiBuffer name, AsciiBuffer key, Transaction tx) throws IOException, KeyNotFoundException {
+        BTreeIndex<AsciiBuffer, Buffer> map = mapCache.get(name);
+        if (map == null) {
+            throw new KeyNotFoundException(name.toString());
+        }
+        map.remove(tx, key);
+    }
+
+    public final Buffer mapGetEntry(AsciiBuffer name, AsciiBuffer key, Transaction tx) throws IOException, KeyNotFoundException {
+        BTreeIndex<AsciiBuffer, Buffer> map = mapCache.get(name);
+        if (map == null) {
+            throw new KeyNotFoundException(name.toString());
+        }
+        return map.get(tx, key);
+    }
+
+    public final Iterator<AsciiBuffer> mapList(AsciiBuffer first, int count, Transaction tx) {
+        LinkedList<AsciiBuffer> results = new LinkedList<AsciiBuffer>();
+
+        Collection<AsciiBuffer> values = (first == null ? mapCache.keySet() : mapCache.tailMap(first).keySet());
+        for (AsciiBuffer key : values) {
+            results.add(key);
+        }
+
+        return results.iterator();
+    }
+
+    public final Iterator<AsciiBuffer> mapListKeys(AsciiBuffer name, AsciiBuffer first, int count, Transaction tx) throws IOException, KeyNotFoundException {
+        BTreeIndex<AsciiBuffer, Buffer> map = mapCache.get(name);
+        if (map == null) {
+            throw new KeyNotFoundException(name.toString());
+        }
+
+        final LinkedList<AsciiBuffer> results = new LinkedList<AsciiBuffer>();
+
+        if (first != null && count > 0) {
+            map.visit(tx, new BTreeVisitor.GTEVisitor<AsciiBuffer, Buffer>(first, count) {
+
+                @Override
+                protected void matched(AsciiBuffer key, Buffer value) {
+                    results.add(key);
+                }
+            });
+        } else {
+            Iterator<Entry<AsciiBuffer, Buffer>> iterator = map.iterator(tx);
+            while (iterator.hasNext()) {
+                Entry<AsciiBuffer, Buffer> e = iterator.next();
+                results.add(e.getKey());
+            }
+        }
+
+        return results.iterator();
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // Map Methods.
+    // /////////////////////////////////////////////////////////////////
+
     public long getPageId() {
         return pageId;
     }
@@ -705,6 +832,10 @@
                     }
                 }
             }
+
+            public boolean isSatiated() {
+                return !gcCandidateSet.isEmpty();
+            }
         });
 
     }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Sun Jun 28 19:37:06 2009
@@ -522,7 +522,7 @@
         private final FlowController<MessageDelivery> controller;
         private final WindowLimiter<MessageDelivery> limiter;
 
-        private HashMap<MessageId, SubscriptionDeliveryCallback> pendingMessages = new HashMap<MessageId, SubscriptionDeliveryCallback>();
+        private HashMap<MessageId, SubscriptionDelivery<MessageDelivery>> pendingMessages = new HashMap<MessageId, SubscriptionDelivery<MessageDelivery>>();
         private LinkedList<MessageId> pendingMessageIds = new LinkedList<MessageId>();
         private BrokerSubscription brokerSubscription;
         private int borrowedLimterCredits;
@@ -557,7 +557,7 @@
             brokerSubscription.connect(this);
         }
 
-        public boolean offer(final MessageDelivery message, ISourceController<?> source, SubscriptionDeliveryCallback callback) {
+        public boolean offer(final MessageDelivery message, ISourceController<?> source, SubscriptionDelivery<MessageDelivery> callback) {
             if (!controller.offer(message, source)) {
                 return false;
             } else {
@@ -566,12 +566,12 @@
             }
         }
 
-        public void add(final MessageDelivery message, ISourceController<?> source, SubscriptionDeliveryCallback callback) {
+        public void add(final MessageDelivery message, ISourceController<?> source, SubscriptionDelivery<MessageDelivery> callback) {
             controller.add(message, source);
             sendInternal(message, controller, callback);
         }
 
-        private void sendInternal(final MessageDelivery message, ISourceController<?> controller, SubscriptionDeliveryCallback callback) {
+        private void sendInternal(final MessageDelivery message, ISourceController<?> controller, SubscriptionDelivery<MessageDelivery> callback) {
             Message msg = message.asType(Message.class);
             MessageDispatch md = new MessageDispatch();
             md.setConsumerId(info.getConsumerId());
@@ -606,13 +606,13 @@
                 borrowedLimterCredits += flowCredit;
                 limiter.onProtocolCredit(flowCredit);
             } else if(info.isStandardAck()) {
-                LinkedList<SubscriptionDeliveryCallback> acked = new LinkedList<SubscriptionDeliveryCallback>();
+                LinkedList<SubscriptionDelivery<MessageDelivery>> acked = new LinkedList<SubscriptionDelivery<MessageDelivery>>();
                 synchronized (this) {
                     MessageId id = info.getLastMessageId();
                     if (isDurable() || isQueueReceiver()) {
                         while (!pendingMessageIds.isEmpty()) {
                             MessageId pendingId = pendingMessageIds.getFirst();
-                            SubscriptionDeliveryCallback callback = pendingMessages.remove(pendingId);
+                            SubscriptionDelivery<MessageDelivery> callback = pendingMessages.remove(pendingId);
                             acked.add(callback);
                             pendingMessageIds.removeFirst();
                             if (pendingId.equals(id)) {
@@ -639,7 +639,7 @@
 
                 // Delete outside of synchronization on queue to avoid contention
                 // with enqueueing threads.
-                for (SubscriptionDeliveryCallback callback : acked) {
+                for (SubscriptionDelivery<MessageDelivery> callback : acked) {
                     callback.acknowledge();
                 }
             }
@@ -818,14 +818,14 @@
             brokerSubscription.disconnect(this);
 
             if (isDurable() || isQueueReceiver()) {
-                LinkedList<SubscriptionDeliveryCallback> unacquired = null;
+                LinkedList<SubscriptionDelivery<MessageDelivery>> unacquired = null;
 
                 synchronized (this) {
 
-                    unacquired = new LinkedList<SubscriptionDeliveryCallback>();
+                    unacquired = new LinkedList<SubscriptionDelivery<MessageDelivery>>();
                     while (!pendingMessageIds.isEmpty()) {
                         MessageId pendingId = pendingMessageIds.getLast();
-                        SubscriptionDeliveryCallback callback = pendingMessages.remove(pendingId);
+                        SubscriptionDelivery<MessageDelivery> callback = pendingMessages.remove(pendingId);
                         unacquired.add(callback);
                         pendingMessageIds.removeLast();
                     }
@@ -835,7 +835,7 @@
                 if (unacquired != null) {
                     // Delete outside of synchronization on queue to avoid contention
                     // with enqueueing threads.
-                    for (SubscriptionDeliveryCallback callback : unacquired) {
+                    for (SubscriptionDelivery<MessageDelivery> callback : unacquired) {
                         callback.unacquire(controller);
                     }
                 }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/perf/broker/SharedQueuePerfTest.java Sun Jun 28 19:37:06 2009
@@ -58,6 +58,7 @@
 import org.apache.activemq.queue.QueueDispatchTarget;
 import org.apache.activemq.queue.SingleFlowRelay;
 import org.apache.activemq.queue.Subscription;
+import org.apache.activemq.queue.Subscription.SubscriptionDelivery;
 
 import junit.framework.TestCase;
 
@@ -456,7 +457,7 @@
          * org.apache.activemq.flow.ISourceController,
          * org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback)
          */
-        public void add(MessageDelivery element, ISourceController<?> source, SubscriptionDeliveryCallback callback) {
+        public void add(MessageDelivery element, ISourceController<?> source, SubscriptionDelivery<MessageDelivery> callback) {
             controller.add(element, source);
             addInternal(element, source, callback);
         }
@@ -468,7 +469,7 @@
          * org.apache.activemq.flow.ISourceController,
          * org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback)
          */
-        public boolean offer(MessageDelivery element, ISourceController<?> source, SubscriptionDeliveryCallback callback) {
+        public boolean offer(MessageDelivery element, ISourceController<?> source, SubscriptionDelivery<MessageDelivery> callback) {
             if (controller.offer(element, source)) {
                 addInternal(element, source, callback);
                 return true;
@@ -481,7 +482,7 @@
          * @param source
          * @param callback
          */
-        private void addInternal(MessageDelivery element, ISourceController<?> source, SubscriptionDeliveryCallback callback) {
+        private void addInternal(MessageDelivery element, ISourceController<?> source, SubscriptionDelivery<MessageDelivery> callback) {
             rate.increment();
             synchronized (this) {
                 controller.elementDispatched(element);

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/CursoredQueue.java Sun Jun 28 19:37:06 2009
@@ -28,7 +28,7 @@
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowController;
 import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback;
+import org.apache.activemq.queue.Subscription.SubscriptionDelivery;
 import org.apache.activemq.util.Comparators;
 import org.apache.activemq.util.Mapper;
 import org.apache.activemq.util.SortedLinkedList;
@@ -727,7 +727,7 @@
         }
     }
 
-    static class QueueElement<V> extends SortedLinkedListNode<QueueElement<V>> implements SubscriptionDeliveryCallback, SaveableQueueElement<V> {
+    static class QueueElement<V> extends SortedLinkedListNode<QueueElement<V>> implements SubscriptionDelivery<V>, SaveableQueueElement<V> {
 
         final long sequence;
         final long restoreBlock;
@@ -1157,6 +1157,13 @@
             return "QueueElement " + sequence + " loaded: " + loaded + " elem loaded: " + !isPagedOut() + " owner: " + owner;
         }
 
+        /* (non-Javadoc)
+         * @see org.apache.activemq.queue.Subscription.SubscriptionDelivery#getSourceQueueRemovalKey()
+         */
+        public long getSourceQueueRemovalKey() {
+            return sequence;
+        }
+
     }
 
     private class Expirator {

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePersistentQueue.java Sun Jun 28 19:37:06 2009
@@ -26,11 +26,10 @@
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.queue.CursoredQueue.Cursor;
 import org.apache.activemq.queue.CursoredQueue.QueueElement;
-import org.apache.activemq.queue.QueueStore.PersistentQueue;
-import org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback;
+import org.apache.activemq.queue.Subscription.SubscriptionDelivery;
 import org.apache.activemq.util.Mapper;
 
-public class ExclusivePersistentQueue<K, E> extends AbstractFlowQueue<E> implements PersistentQueue<K, E> {
+public class ExclusivePersistentQueue<K, E> extends AbstractFlowQueue<E> implements IQueue<K, E> {
     private CursoredQueue<E> queue;
     private final FlowController<E> controller;
     private final IFlowSizeLimiter<E> limiter;
@@ -285,7 +284,7 @@
             QueueElement<E> qe = cursor.getNext();
             if (qe != null) {
                 // If the sub doesn't remove on dispatch set an ack listener:
-                SubscriptionDeliveryCallback callback = subscription.isRemoveOnDispatch(qe.elem) ? null : qe;
+                SubscriptionDelivery<E> callback = subscription.isRemoveOnDispatch(qe.elem) ? null : qe;
 
                 // See if the sink has room:
                 qe.setAcquired(subscription);
@@ -383,7 +382,7 @@
     /**
      * @return The count of the elements in this queue or -1 if not yet known.
      */
-    public synchronized long getEnqueuedCount() {
+    public synchronized int getEnqueuedCount() {
         if (!initialized) {
             return -1;
         }

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusivePriorityQueue.java Sun Jun 28 19:37:06 2009
@@ -18,7 +18,6 @@
 
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
-import org.apache.activemq.flow.ISinkController;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.flow.PriorityFlowController;
 import org.apache.activemq.flow.PrioritySizeLimiter;

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/ExclusiveQueue.java Sun Jun 28 19:37:06 2009
@@ -21,7 +21,6 @@
 import org.apache.activemq.flow.Flow;
 import org.apache.activemq.flow.FlowController;
 import org.apache.activemq.flow.IFlowLimiter;
-import org.apache.activemq.flow.ISinkController;
 import org.apache.activemq.flow.ISourceController;
 
 public class ExclusiveQueue<E> extends AbstractFlowQueue<E> {

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/IQueue.java Sun Jun 28 19:37:06 2009
@@ -17,7 +17,6 @@
 package org.apache.activemq.queue;
 
 import org.apache.activemq.dispatch.IDispatcher;
-import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.queue.QueueStore.PersistentQueue;
 import org.apache.activemq.util.Mapper;
 
@@ -61,7 +60,13 @@
      *            The base priority for the queue
      */
     public void setDispatchPriority(int priority);
-
+    
+    /**
+     * Removes the element specified by the given key from the queue:
+     * @param key The key.
+    public void acknowledge(K key);
+    */
+    
     /**
      * Starts the queue.
      */

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/QueueStore.java Sun Jun 28 19:37:06 2009
@@ -18,6 +18,8 @@
 
 
 import org.apache.activemq.flow.ISourceController;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
 
 public interface QueueStore<K, V> {
 
@@ -65,7 +67,7 @@
         public QueueDescriptor getDescriptor();
 
     }
-
+    
     /**
      * Loads a series of elements for the specified queue. The loaded messages
      * are given to the provided {@link MessageRestoreListener}.

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SaveableQueueElement.java Sun Jun 28 19:37:06 2009
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.queue;
 
-
-
 public interface SaveableQueueElement<V> {
 
     /**
@@ -48,5 +46,4 @@
      * Called when the element has been saved.
      */
     public void notifySave();
-
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/SharedQueue.java Sun Jun 28 19:37:06 2009
@@ -30,7 +30,7 @@
 import org.apache.activemq.queue.CursoredQueue.Cursor;
 import org.apache.activemq.queue.CursoredQueue.CursorReadyListener;
 import org.apache.activemq.queue.CursoredQueue.QueueElement;
-import org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback;
+import org.apache.activemq.queue.Subscription.SubscriptionDelivery;
 import org.apache.activemq.util.Mapper;
 import org.apache.kahadb.util.LinkedNode;
 import org.apache.kahadb.util.LinkedNodeList;
@@ -698,7 +698,7 @@
             }
 
             // If the sub doesn't remove on dispatch pass it the callback
-            SubscriptionDeliveryCallback callback = sub.isRemoveOnDispatch(qe.elem) ? null : qe;
+            SubscriptionDelivery<V> callback = sub.isRemoveOnDispatch(qe.elem) ? null : qe;
             // If the sub is a browser don't pass it a callback since it does not need to 
             // delete messages
             if( sub.isBrowser() ) { 

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/main/java/org/apache/activemq/queue/Subscription.java Sun Jun 28 19:37:06 2009
@@ -21,12 +21,20 @@
 
 public interface Subscription<E> {
 
-    public interface SubscriptionDeliveryCallback {
-
+    public interface SubscriptionDelivery<E> {
+        
+        /**
+         * @return The descriptor of the queue from which this element came.
+         */
+        public QueueDescriptor getQueueDescriptor();
+        
+        /**
+         * @return a key that can be used to remove the message from the queue. 
+         */
+        public long getSourceQueueRemovalKey();
+        
         /**
-         * If {@link Subscription#isBrowser()} returns false this method
-         * indicates that the Subscription is finished with the element and that
-         * it can be removed from the queue.
+         * Acknowledges the delivery.
          */
         public void acknowledge();
 
@@ -101,7 +109,7 @@
      *            The queue's controller, which must be used if the offered
      *            element exceeds the subscription's buffer limits.
      * @param callback
-     *            The {@link SubscriptionDeliveryCallback} associated with the
+     *            The {@link SubscriptionDelivery<E>} associated with the
      *            element
      * 
      * @return true if the element was accepted false otherwise, if false is
@@ -109,7 +117,7 @@
      *         {@link ISourceController#onFlowBlock(ISinkController)} prior to
      *         returning false.
      */
-    public boolean offer(E element, ISourceController<?> controller, SubscriptionDeliveryCallback callback);
+    public boolean offer(E element, ISourceController<?> controller, SubscriptionDelivery<E> callback);
 
     /**
      * Pushes an item to the subscription. If the subscription is not remove on
@@ -122,14 +130,14 @@
      *            The queue's controller, which must be used if the added
      *            element exceeds the subscription's buffer limits.
      * @param callback
-     *            The {@link SubscriptionDeliveryCallback} associated with the
+     *            The {@link SubscriptionDelivery<E>} associated with the
      *            element
      * @return true if the element was accepted false otherwise, if false is
      *         returned the caller must have called
      *         {@link ISourceController#onFlowBlock(ISinkController)} prior to
      *         returning false.
      */
-    public void add(E element, ISourceController<?> controller, SubscriptionDeliveryCallback callback);
+    public void add(E element, ISourceController<?> controller, SubscriptionDelivery<E> callback);
 
     @Deprecated
     public IFlowSink<E> getSink();

Modified: activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-queue/src/test/java/org/apache/activemq/queue/perf/MockQueue.java Sun Jun 28 19:37:06 2009
@@ -134,11 +134,11 @@
                 return dt.hasSelector();
             }
 
-            public boolean offer(Message elem, ISourceController<?> controller, SubscriptionDeliveryCallback ackCallback) {
+            public boolean offer(Message elem, ISourceController<?> controller, SubscriptionDelivery<Message> ackCallback) {
                 return getSink().offer(elem, controller);
             }
 
-            public void add(Message elem, ISourceController<?> controller, SubscriptionDeliveryCallback ackCallback) {
+            public void add(Message elem, ISourceController<?> controller, SubscriptionDelivery<Message> ackCallback) {
                 getSink().add(elem, controller);
             }
         };

Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompMessageDelivery.java Sun Jun 28 19:37:06 2009
@@ -71,7 +71,7 @@
         }
         return priority;
     }
-    
+
     /*
      * (non-Javadoc)
      * 
@@ -88,7 +88,6 @@
         }
         return tte;
     }
-    
 
     public AsciiBuffer getMsgId() {
         if (msgId == null) {
@@ -162,5 +161,4 @@
         return new StompMessageEvaluationContext();
     }
 
-    
 }

Modified: activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-stomp/src/main/java/org/apache/activemq/broker/stomp/StompProtocolHandler.java Sun Jun 28 19:37:06 2009
@@ -273,7 +273,7 @@
         private Destination destination;
         private String ackMode;
 
-        private LinkedHashMap<AsciiBuffer, SubscriptionDeliveryCallback> sentMessageIds = new LinkedHashMap<AsciiBuffer, SubscriptionDeliveryCallback>();
+        private LinkedHashMap<AsciiBuffer, SubscriptionDelivery<MessageDelivery>> sentMessageIds = new LinkedHashMap<AsciiBuffer, SubscriptionDelivery<MessageDelivery>>();
 
         private boolean durable;
 
@@ -388,19 +388,19 @@
         /* (non-Javadoc)
          * @see org.apache.activemq.broker.protocol.ProtocolHandler.ConsumerContext#send(org.apache.activemq.broker.MessageDelivery, org.apache.activemq.flow.ISourceController, org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback)
          */
-        public void add(MessageDelivery message, ISourceController<?> controller, SubscriptionDeliveryCallback callback) {
+        public void add(MessageDelivery message, ISourceController<?> controller, SubscriptionDelivery<MessageDelivery> callback) {
             addInternal(message, controller, callback);
         }
         
         /* (non-Javadoc)
          * @see org.apache.activemq.queue.Subscription#offer(java.lang.Object, org.apache.activemq.flow.ISourceController, org.apache.activemq.queue.Subscription.SubscriptionDeliveryCallback)
          */
-        public boolean offer(MessageDelivery message, ISourceController<?> controller, SubscriptionDeliveryCallback callback) {
+        public boolean offer(MessageDelivery message, ISourceController<?> controller, SubscriptionDelivery<MessageDelivery> callback) {
             //FIXME need a controller:
             return false;
         }
         
-        private void addInternal(MessageDelivery message, ISourceController<?> controller, SubscriptionDeliveryCallback callback)
+        private void addInternal(MessageDelivery message, ISourceController<?> controller, SubscriptionDelivery<MessageDelivery> callback)
         {
             StompFrame frame = message.asType(StompFrame.class);
             if (ackMode == StompSubscription.CLIENT_ACK || ackMode == StompSubscription.INDIVIDUAL_ACK) {

Modified: activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java Sun Jun 28 19:37:06 2009
@@ -374,6 +374,8 @@
         AsciiBuffer messageId;
         AsciiBuffer encoding;
         int size;
+        Buffer buffer;
+        Long streamKey;
 
         public int getSize() {
             return size;
@@ -382,10 +384,7 @@
         public void setSize(int size) {
             this.size = size;
         }
-
-        Buffer buffer;
-        Long streamKey;
-
+        
         public Long getKey() {
             return key;
         }
@@ -683,15 +682,15 @@
 
         public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max);
 
-        public boolean mapAdd(AsciiBuffer map);
+        public void mapAdd(AsciiBuffer map);
 
-        public boolean mapRemove(AsciiBuffer map);
+        public void mapRemove(AsciiBuffer map);
 
-        public Buffer mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException;
+        public void mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException;
 
         public Buffer mapEntryGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
 
-        public Buffer mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
+        public void mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException;
 
         public Iterator<AsciiBuffer> mapEntryListKeys(AsciiBuffer map, AsciiBuffer first, int max) throws KeyNotFoundException;
 

Modified: activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Sun Jun 28 19:37:06 2009
@@ -415,12 +415,11 @@
         public void removeSubscription(AsciiBuffer name) {
             subscriptions.remove(name);
         }
-        
+
         /**
-         * @return A list of subscriptions 
+         * @return A list of subscriptions
          */
-        public Iterator<SubscriptionRecord> listSubscriptions()
-        {
+        public Iterator<SubscriptionRecord> listSubscriptions() {
             ArrayList<SubscriptionRecord> rc = new ArrayList<SubscriptionRecord>(subscriptions.size());
             rc.addAll(subscriptions.values());
             return rc.iterator();
@@ -543,16 +542,15 @@
         // Simple Key Value related methods could come in handy to store misc
         // data.
         // ///////////////////////////////////////////////////////////////////////////////
-        public boolean mapAdd(AsciiBuffer mapName) {
+        public void mapAdd(AsciiBuffer mapName) {
             if (maps.containsKey(mapName)) {
-                return false;
+                return;
             }
             maps.put(mapName, new TreeMap<AsciiBuffer, Buffer>());
-            return true;
         }
 
-        public boolean mapRemove(AsciiBuffer mapName) {
-            return maps.remove(mapName) != null;
+        public void mapRemove(AsciiBuffer mapName) {
+            maps.remove(mapName);
         }
 
         public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max) {
@@ -560,15 +558,22 @@
         }
 
         public Buffer mapEntryGet(AsciiBuffer mapName, AsciiBuffer key) throws KeyNotFoundException {
-            return get(maps, mapName).get(key);
+            TreeMap<AsciiBuffer, Buffer> map = get(maps, mapName);
+            return map.get(key);
         }
 
-        public Buffer mapEntryRemove(AsciiBuffer mapName, AsciiBuffer key) throws KeyNotFoundException {
-            return get(maps, mapName).remove(key);
+        public void mapEntryRemove(AsciiBuffer mapName, AsciiBuffer key) throws KeyNotFoundException {
+            TreeMap<AsciiBuffer, Buffer> map = get(maps, mapName);
+            map.remove(key);
         }
 
-        public Buffer mapEntryPut(AsciiBuffer mapName, AsciiBuffer key, Buffer value) throws KeyNotFoundException {
-            return get(maps, mapName).put(key, value);
+        public void mapEntryPut(AsciiBuffer mapName, AsciiBuffer key, Buffer value) {
+            TreeMap<AsciiBuffer, Buffer> map = maps.get(mapName);
+            if (map == null) {
+                mapAdd(mapName);
+                map = maps.get(mapName);
+            }
+            map.put(key, value);
         }
 
         public Iterator<AsciiBuffer> mapEntryListKeys(AsciiBuffer mapName, AsciiBuffer first, int max) throws KeyNotFoundException {

Modified: activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java (original)
+++ activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java Sun Jun 28 19:37:06 2009
@@ -31,20 +31,19 @@
 import org.apache.kahadb.page.Transaction;
 import org.apache.kahadb.util.VariableMarshaller;
 
-
 /**
- * The BTreeNode class represents a node in the BTree object graph.  It is stored in 
- * one Page of a PageFile.
+ * The BTreeNode class represents a node in the BTree object graph. It is stored
+ * in one Page of a PageFile.
  */
-public final class BTreeNode<Key,Value> {
+public final class BTreeNode<Key, Value> {
 
     // The index that this node is part of.
-    private final BTreeIndex<Key,Value> index;
+    private final BTreeIndex<Key, Value> index;
     // The parent node or null if this is the root node of the BTree
-    private BTreeNode<Key,Value> parent;
+    private BTreeNode<Key, Value> parent;
     // The page associated with this node
-    private Page<BTreeNode<Key,Value>> page;
-    
+    private Page<BTreeNode<Key, Value>> page;
+
     // Order list of keys in the node
     private Key[] keys;
     // Values associated with the Keys. Null if this is a branch node.
@@ -53,7 +52,7 @@
     private long[] children;
     // The next leaf node after this one.  Used for fast iteration of the entries.
     private long next = -1;
-    
+
     private final class KeyValueEntry implements Map.Entry<Key, Value> {
         private final Key key;
         private final Value value;
@@ -78,39 +77,39 @@
     }
 
     private final class BTreeIterator implements Iterator<Map.Entry<Key, Value>> {
-        
+
         private final Transaction tx;
-        BTreeNode<Key,Value> current;
+        BTreeNode<Key, Value> current;
         int nextIndex;
-        Map.Entry<Key,Value> nextEntry;
+        Map.Entry<Key, Value> nextEntry;
 
-        private BTreeIterator(Transaction tx, BTreeNode<Key,Value> current, int nextIndex) {
+        private BTreeIterator(Transaction tx, BTreeNode<Key, Value> current, int nextIndex) {
             this.tx = tx;
             this.current = current;
-            this.nextIndex=nextIndex;
+            this.nextIndex = nextIndex;
         }
 
         synchronized private void findNextPage() {
-            if( nextEntry!=null ) {
+            if (nextEntry != null) {
                 return;
             }
-            
+
             try {
-                while( current!=null ) {
-                    if( nextIndex >= current.keys.length ) {
+                while (current != null) {
+                    if (nextIndex >= current.keys.length) {
                         // we need to roll to the next leaf..
-                        if( current.next >= 0 ) {
+                        if (current.next >= 0) {
                             current = index.loadNode(tx, current.next, null);
-                            nextIndex=0;
+                            nextIndex = 0;
                         } else {
                             break;
                         }
-                    }  else {
+                    } else {
                         nextEntry = new KeyValueEntry(current.keys[nextIndex], current.values[nextIndex]);
                         nextIndex++;
                         break;
                     }
-                    
+
                 }
             } catch (IOException e) {
             }
@@ -118,14 +117,14 @@
 
         public boolean hasNext() {
             findNextPage();
-            return nextEntry !=null;
+            return nextEntry != null;
         }
 
         public Entry<Key, Value> next() {
-            findNextPage(); 
-            if( nextEntry !=null ) {
+            findNextPage();
+            if (nextEntry != null) {
                 Entry<Key, Value> lastEntry = nextEntry;
-                nextEntry=null;
+                nextEntry = null;
                 return lastEntry;
             } else {
                 throw new NoSuchElementException();
@@ -138,37 +137,38 @@
     }
 
     /**
-     * The Marshaller is used to store and load the data in the BTreeNode into a Page.
-     *  
+     * The Marshaller is used to store and load the data in the BTreeNode into a
+     * Page.
+     * 
      * @param <Key>
      * @param <Value>
      */
-    static public class Marshaller<Key,Value> extends VariableMarshaller<BTreeNode<Key,Value>> {
-        private final BTreeIndex<Key,Value> index;
-        
-        public Marshaller(BTreeIndex<Key,Value> index) {
+    static public class Marshaller<Key, Value> extends VariableMarshaller<BTreeNode<Key, Value>> {
+        private final BTreeIndex<Key, Value> index;
+
+        public Marshaller(BTreeIndex<Key, Value> index) {
             this.index = index;
         }
 
-        public void writePayload(BTreeNode<Key,Value> node, DataOutput os) throws IOException {
+        public void writePayload(BTreeNode<Key, Value> node, DataOutput os) throws IOException {
             // Write the keys
-            short count = (short)node.keys.length; // cast may truncate value...
-            if( count != node.keys.length ) {
+            short count = (short) node.keys.length; // cast may truncate value...
+            if (count != node.keys.length) {
                 throw new IOException("Too many keys");
             }
-            
+
             os.writeShort(count);
             for (int i = 0; i < node.keys.length; i++) {
                 index.getKeyMarshaller().writePayload(node.keys[i], os);
             }
-            
-            if( node.isBranch() ) {
+
+            if (node.isBranch()) {
                 // If this is a branch...
                 os.writeBoolean(true);
-                for (int i = 0; i < count+1; i++) {
+                for (int i = 0; i < count + 1; i++) {
                     os.writeLong(node.children[i]);
                 }
-                
+
             } else {
                 // If this is a leaf
                 os.writeBoolean(false);
@@ -180,22 +180,22 @@
         }
 
         @SuppressWarnings("unchecked")
-        public BTreeNode<Key,Value> readPayload(DataInput is) throws IOException {
-            BTreeNode<Key,Value>  node = new BTreeNode<Key,Value>(index);
+        public BTreeNode<Key, Value> readPayload(DataInput is) throws IOException {
+            BTreeNode<Key, Value> node = new BTreeNode<Key, Value>(index);
             int count = is.readShort();
-            
-            node.keys = (Key[])new Object[count];
+
+            node.keys = (Key[]) new Object[count];
             for (int i = 0; i < count; i++) {
                 node.keys[i] = index.getKeyMarshaller().readPayload(is);
             }
-            
-            if( is.readBoolean() ) {
-                node.children = new long[count+1];
-                for (int i = 0; i < count+1; i++) {
+
+            if (is.readBoolean()) {
+                node.children = new long[count + 1];
+                for (int i = 0; i < count + 1; i++) {
                     node.children[i] = is.readLong();
                 }
             } else {
-                node.values = (Value[])new Object[count];
+                node.values = (Value[]) new Object[count];
                 for (int i = 0; i < count; i++) {
                     node.values[i] = index.getValueMarshaller().readPayload(is);
                 }
@@ -205,21 +205,21 @@
         }
     }
 
-    public BTreeNode(BTreeIndex<Key,Value> index) {
+    public BTreeNode(BTreeIndex<Key, Value> index) {
         this.index = index;
     }
-    
+
     public void setEmpty() {
         setLeafData(createKeyArray(0), createValueArray(0));
     }
-    
 
     /**
      * Internal (to the BTreeNode) method. Because this method is called only by
      * BTreeNode itself, no synchronization done inside of this method.
-     * @throws IOException 
+     * 
+     * @throws IOException
      */
-    private BTreeNode<Key,Value> getChild(Transaction tx, int idx) throws IOException {
+    private BTreeNode<Key, Value> getChild(Transaction tx, int idx) throws IOException {
         if (isBranch() && idx >= 0 && idx < children.length) {
             BTreeNode<Key, Value> result = this.index.loadNode(tx, children[idx], this);
             return result;
@@ -227,47 +227,47 @@
             return null;
         }
     }
-   
+
     public Value remove(Transaction tx, Key key) throws IOException {
 
-        if(isBranch()) {
+        if (isBranch()) {
             int idx = Arrays.binarySearch(keys, key);
             idx = idx < 0 ? -(idx + 1) : idx + 1;
             BTreeNode<Key, Value> child = getChild(tx, idx);
-            if( child.getPageId() == index.getPageId() ) {
+            if (child.getPageId() == index.getPageId()) {
                 throw new IOException("BTree corrupted: Cylce detected.");
             }
             Value rc = child.remove(tx, key);
-            
+
             // child node is now empty.. remove it from the branch node.
-            if( child.keys.length == 0 ) {
-                
+            if (child.keys.length == 0) {
+
                 // If the child node is a branch, promote
-                if( child.isBranch() ) {
+                if (child.isBranch()) {
                     // This is cause branches are never really empty.. they just go down to 1 child..
                     children[idx] = child.children[0];
                 } else {
-                    
+
                     // The child was a leaf. Then we need to actually remove it from this branch node..
 
                     // We need to update the previous child's next pointer to skip over the child being removed....
-                    if( idx > 0 && children.length > 1) {
-                        BTreeNode<Key, Value> previousChild = getChild(tx, idx-1);
+                    if (idx > 0 && children.length > 1) {
+                        BTreeNode<Key, Value> previousChild = getChild(tx, idx - 1);
                         previousChild.next = child.next;
                         index.storeNode(tx, previousChild, true);
                     }
-                    
-                    if( idx < children.length-1 ) {
+
+                    if (idx < children.length - 1) {
                         // Delete it and key to the right.
                         setBranchData(arrayDelete(keys, idx), arrayDelete(children, idx));
                     } else {
                         // It was the last child.. Then delete it and key to the left
-                        setBranchData(arrayDelete(keys, idx-1), arrayDelete(children, idx));
+                        setBranchData(arrayDelete(keys, idx - 1), arrayDelete(children, idx));
                     }
-                    
+
                     // If we are the root node, and only have 1 child left.  Then 
                     // make the root be the leaf node.
-                    if( children.length == 1 && parent==null ) {
+                    if (children.length == 1 && parent == null) {
                         child = getChild(tx, 0);
                         keys = child.keys;
                         children = child.children;
@@ -275,11 +275,11 @@
                         // free up the page..
                         tx.free(child.getPage());
                     }
-                    
+
                 }
                 index.storeNode(tx, this, true);
             }
-            
+
             return rc;
         } else {
             int idx = Arrays.binarySearch(keys, key);
@@ -288,13 +288,13 @@
             } else {
                 Value oldValue = values[idx];
                 setLeafData(arrayDelete(keys, idx), arrayDelete(values, idx));
-                
-                if( keys.length==0 && parent!=null) {
+
+                if (keys.length == 0 && parent != null) {
                     tx.free(getPage());
                 } else {
                     index.storeNode(tx, this, true);
                 }
-                
+
                 return oldValue;
             }
         }
@@ -305,12 +305,12 @@
             throw new IllegalArgumentException("Key cannot be null");
         }
 
-        if( isBranch() ) {
+        if (isBranch()) {
             return getLeafNode(tx, this, key).put(tx, key, value);
         } else {
             int idx = Arrays.binarySearch(keys, key);
-            
-            Value oldValue=null;
+
+            Value oldValue = null;
             if (idx >= 0) {
                 // Key was found... Overwrite
                 oldValue = values[idx];
@@ -321,14 +321,14 @@
                 idx = -(idx + 1);
                 setLeafData(arrayInsert(keys, key, idx), arrayInsert(values, value, idx));
             }
-            
+
             try {
                 index.storeNode(tx, this, allowOverflow());
-            } catch ( Transaction.PageOverflowIOException e ) {
+            } catch (Transaction.PageOverflowIOException e) {
                 // If we get an overflow 
                 split(tx);
             }
-            
+
             return oldValue;
         }
     }
@@ -341,7 +341,7 @@
 
         try {
             index.storeNode(tx, this, allowOverflow());
-        } catch ( Transaction.PageOverflowIOException e ) {
+        } catch (Transaction.PageOverflowIOException e) {
             split(tx);
         }
 
@@ -353,17 +353,17 @@
     private void split(Transaction tx) throws IOException {
         Key[] leftKeys;
         Key[] rightKeys;
-        Value[] leftValues=null;
-        Value[] rightValues=null;
-        long[] leftChildren=null;
-        long[] rightChildren=null;
+        Value[] leftValues = null;
+        Value[] rightValues = null;
+        long[] leftChildren = null;
+        long[] rightChildren = null;
         Key separator;
 
         int vc = keys.length;
         int pivot = vc / 2;
 
         // Split the node into two nodes
-        if( isBranch() ) {
+        if (isBranch()) {
 
             leftKeys = createKeyArray(pivot);
             leftChildren = new long[leftKeys.length + 1];
@@ -377,13 +377,12 @@
 
             // Is it a Simple Prefix BTree??
             Prefixer<Key> prefixer = index.getPrefixer();
-            if(prefixer!=null) {
+            if (prefixer != null) {
                 separator = prefixer.getSimplePrefix(leftKeys[leftKeys.length - 1], rightKeys[0]);
             } else {
                 separator = keys[leftKeys.length];
             }
-                
-            
+
         } else {
 
             leftKeys = createKeyArray(pivot);
@@ -404,12 +403,12 @@
 
         // Promote the pivot to the parent branch
         if (parent == null) {
-            
+
             // This can only happen if this is the root
-            BTreeNode<Key,Value> rNode = this.index.createNode(tx, this);
-            BTreeNode<Key,Value> lNode = this.index.createNode(tx, this);
+            BTreeNode<Key, Value> rNode = this.index.createNode(tx, this);
+            BTreeNode<Key, Value> lNode = this.index.createNode(tx, this);
 
-            if( isBranch() ) {
+            if (isBranch()) {
                 rNode.setBranchData(rightKeys, rightChildren);
                 lNode.setBranchData(leftKeys, leftChildren);
             } else {
@@ -419,17 +418,17 @@
             }
 
             Key[] v = createKeyArray(1);
-            v[0]=separator;
+            v[0] = separator;
             setBranchData(v, new long[] { lNode.getPageId(), rNode.getPageId() });
 
             index.storeNode(tx, this, true);
             index.storeNode(tx, rNode, true);
             index.storeNode(tx, lNode, true);
-            
+
         } else {
-            BTreeNode<Key,Value> rNode = this.index.createNode(tx, parent);
-            
-            if( isBranch() ) {
+            BTreeNode<Key, Value> rNode = this.index.createNode(tx, parent);
+
+            if (isBranch()) {
                 setBranchData(leftKeys, leftChildren);
                 rNode.setBranchData(rightKeys, rightChildren);
             } else {
@@ -446,48 +445,47 @@
     }
 
     public void printStructure(Transaction tx, PrintWriter out, String prefix) throws IOException {
-        if( prefix.length()>0 && parent == null ) {
+        if (prefix.length() > 0 && parent == null) {
             throw new IllegalStateException("Cycle back to root node detected.");
         }
-        
-        if( isBranch() ) {
-            for(int i=0 ; i < children.length; i++) {
+
+        if (isBranch()) {
+            for (int i = 0; i < children.length; i++) {
                 BTreeNode<Key, Value> child = getChild(tx, i);
-                if( i == children.length-1) {
-                    out.println(prefix+"\\- "+child.getPageId()+(child.isBranch()?" ("+child.children.length+")":""));
-                    child.printStructure(tx, out, prefix+"   ");
+                if (i == children.length - 1) {
+                    out.println(prefix + "\\- " + child.getPageId() + (child.isBranch() ? " (" + child.children.length + ")" : ""));
+                    child.printStructure(tx, out, prefix + "   ");
                 } else {
-                    out.println(prefix+"|- "+child.getPageId()+(child.isBranch()?" ("+child.children.length+")":"")+" : "+keys[i]);
-                    child.printStructure(tx, out, prefix+"   ");
+                    out.println(prefix + "|- " + child.getPageId() + (child.isBranch() ? " (" + child.children.length + ")" : "") + " : " + keys[i]);
+                    child.printStructure(tx, out, prefix + "   ");
                 }
             }
         }
     }
-    
-    
+
     public int getMinLeafDepth(Transaction tx, int depth) throws IOException {
         depth++;
-        if( isBranch() ) {
+        if (isBranch()) {
             int min = Integer.MAX_VALUE;
-            for(int i=0 ; i < children.length; i++) {
+            for (int i = 0; i < children.length; i++) {
                 min = Math.min(min, getChild(tx, i).getMinLeafDepth(tx, depth));
             }
             return min;
         } else {
-//            print(depth*2, "- "+page.getPageId());
+            //            print(depth*2, "- "+page.getPageId());
             return depth;
         }
     }
 
     public int getMaxLeafDepth(Transaction tx, int depth) throws IOException {
         depth++;
-        if( isBranch() ) {
+        if (isBranch()) {
             int v = 0;
-            for(int i=0 ; i < children.length; i++) {
+            for (int i = 0; i < children.length; i++) {
                 v = Math.max(v, getChild(tx, i).getMaxLeafDepth(tx, depth));
             }
             depth = v;
-        } 
+        }
         return depth;
     }
 
@@ -495,7 +493,7 @@
         if (key == null) {
             throw new IllegalArgumentException("Key cannot be null");
         }
-        if( isBranch() ) {
+        if (isBranch()) {
             return getLeafNode(tx, this, key).get(tx, key);
         } else {
             int idx = Arrays.binarySearch(keys, key);
@@ -506,22 +504,27 @@
             }
         }
     }
-    
+
     public void visit(Transaction tx, BTreeVisitor<Key, Value> visitor) throws IOException {
         if (visitor == null) {
             throw new IllegalArgumentException("Visitor cannot be null");
         }
-        if( isBranch() ) {
-            for(int i=0; i < this.children.length; i++) {
+        
+        if (visitor.isSatiated()) {
+            return;
+        }
+        
+        if (isBranch()) {
+            for (int i = 0; i < this.children.length; i++) {
                 Key key1 = null;
-                if( i!=0 ) {
-                    key1 = keys[i-1];
+                if (i != 0) {
+                    key1 = keys[i - 1];
                 }
                 Key key2 = null;
-                if( i!=this.children.length-1 ) {
+                if (i != this.children.length - 1) {
                     key2 = keys[i];
                 }
-                if( visitor.isInterestedInKeysBetween(key1, key2) ) {
+                if (visitor.isInterestedInKeysBetween(key1, key2)) {
                     BTreeNode<Key, Value> child = getChild(tx, i);
                     child.visit(tx, visitor);
                 }
@@ -530,45 +533,45 @@
             visitor.visit(Arrays.asList(keys), Arrays.asList(values));
         }
     }
-    
-    public Map.Entry<Key,Value> getFirst(Transaction tx) throws IOException {
+
+    public Map.Entry<Key, Value> getFirst(Transaction tx) throws IOException {
         BTreeNode<Key, Value> node = this;
-        while( node .isBranch() ) {
+        while (node.isBranch()) {
             node = node.getChild(tx, 0);
         }
-        if( node.values.length>0 ) {
+        if (node.values.length > 0) {
             return new KeyValueEntry(node.keys[0], node.values[0]);
         } else {
             return null;
         }
     }
 
-    public Map.Entry<Key,Value> getLast(Transaction tx) throws IOException {
+    public Map.Entry<Key, Value> getLast(Transaction tx) throws IOException {
         BTreeNode<Key, Value> node = this;
-        while( node.isBranch() ) {
-            node = node.getChild(tx, node.children.length-1);
+        while (node.isBranch()) {
+            node = node.getChild(tx, node.children.length - 1);
         }
-        if( node.values.length>0 ) {
-            int idx = node.values.length-1;
+        if (node.values.length > 0) {
+            int idx = node.values.length - 1;
             return new KeyValueEntry(node.keys[idx], node.values[idx]);
         } else {
             return null;
         }
     }
-    
-    public BTreeNode<Key,Value> getFirstLeafNode(Transaction tx) throws IOException {
+
+    public BTreeNode<Key, Value> getFirstLeafNode(Transaction tx) throws IOException {
         BTreeNode<Key, Value> node = this;
-        while( node .isBranch() ) {
+        while (node.isBranch()) {
             node = node.getChild(tx, 0);
         }
         return node;
     }
-    
-    public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx, Key startKey) throws IOException {
+
+    public Iterator<Map.Entry<Key, Value>> iterator(final Transaction tx, Key startKey) throws IOException {
         if (startKey == null) {
             return iterator(tx);
         }
-        if( isBranch() ) {
+        if (isBranch()) {
             return getLeafNode(tx, this, startKey).iterator(tx, startKey);
         } else {
             int idx = Arrays.binarySearch(keys, startKey);
@@ -579,12 +582,12 @@
         }
     }
 
-    public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx) throws IOException {
+    public Iterator<Map.Entry<Key, Value>> iterator(final Transaction tx) throws IOException {
         return new BTreeIterator(tx, getFirstLeafNode(tx), 0);
     }
-    
+
     public void clear(Transaction tx) throws IOException {
-        if( isBranch() ) {
+        if (isBranch()) {
             for (int i = 0; i < children.length; i++) {
                 BTreeNode<Key, Value> node = index.loadNode(tx, children[i], this);
                 node.clear(tx);
@@ -592,27 +595,26 @@
             }
         }
         // Reset the root node to be a leaf.
-        if( parent == null ) {
+        if (parent == null) {
             setLeafData(createKeyArray(0), createValueArray(0));
-            next=-1;
+            next = -1;
             index.storeNode(tx, this, true);
         }
     }
 
-
-    private static <Key,Value> BTreeNode<Key, Value> getLeafNode(Transaction tx, final BTreeNode<Key, Value> node, Key key) throws IOException {
+    private static <Key, Value> BTreeNode<Key, Value> getLeafNode(Transaction tx, final BTreeNode<Key, Value> node, Key key) throws IOException {
         BTreeNode<Key, Value> current = node;
-        while( true ) {
-            if( current.isBranch() ) {
+        while (true) {
+            if (current.isBranch()) {
                 int idx = Arrays.binarySearch(current.keys, key);
                 idx = idx < 0 ? -(idx + 1) : idx + 1;
-                BTreeNode<Key, Value> child = current.getChild(tx, idx);        
+                BTreeNode<Key, Value> child = current.getChild(tx, idx);
 
                 // A little cycle detection for sanity's sake
-                if( child == node ) {
+                if (child == node) {
                     throw new IOException("BTree corrupted: Cylce detected.");
                 }
-                
+
                 current = child;
             } else {
                 break;
@@ -626,7 +628,7 @@
             throw new IllegalArgumentException("Key cannot be null");
         }
 
-        if( isBranch() ) {
+        if (isBranch()) {
             return getLeafNode(tx, this, key).contains(tx, key);
         } else {
             int idx = Arrays.binarySearch(keys, key);
@@ -641,20 +643,18 @@
     ///////////////////////////////////////////////////////////////////
     // Implementation methods
     ///////////////////////////////////////////////////////////////////
- 
 
     private boolean allowOverflow() {
         // Only allow page overflow if there are <= 3 keys in the node.  Otherwise a split will occur on overflow
-        return this.keys.length<=3;
+        return this.keys.length <= 3;
     }
 
-
     private void setLeafData(Key[] keys, Value[] values) {
         this.keys = keys;
         this.values = values;
         this.children = null;
     }
-    
+
     private void setBranchData(Key[] keys, long[] nodeIds) {
         this.keys = keys;
         this.children = nodeIds;
@@ -663,17 +663,17 @@
 
     @SuppressWarnings("unchecked")
     private Key[] createKeyArray(int size) {
-        return (Key[])new Object[size];
+        return (Key[]) new Object[size];
     }
 
     @SuppressWarnings("unchecked")
     private Value[] createValueArray(int size) {
-        return (Value[])new Object[size];
+        return (Value[]) new Object[size];
     }
-    
+
     @SuppressWarnings("unchecked")
     static private <T> T[] arrayDelete(T[] vals, int idx) {
-        T[] newVals = (T[])new Object[vals.length - 1];
+        T[] newVals = (T[]) new Object[vals.length - 1];
         if (idx > 0) {
             System.arraycopy(vals, 0, newVals, 0, idx);
         }
@@ -682,7 +682,7 @@
         }
         return newVals;
     }
-    
+
     static private long[] arrayDelete(long[] vals, int idx) {
         long[] newVals = new long[vals.length - 1];
         if (idx > 0) {
@@ -696,7 +696,7 @@
 
     @SuppressWarnings("unchecked")
     static private <T> T[] arrayInsert(T[] vals, T val, int idx) {
-        T[] newVals = (T[])new Object[vals.length + 1];
+        T[] newVals = (T[]) new Object[vals.length + 1];
         if (idx > 0) {
             System.arraycopy(vals, 0, newVals, 0, idx);
         }
@@ -707,9 +707,8 @@
         return newVals;
     }
 
-
     static private long[] arrayInsert(long[] vals, long val, int idx) {
-        
+
         long[] newVals = new long[vals.length + 1];
         if (idx > 0) {
             System.arraycopy(vals, 0, newVals, 0, idx);
@@ -725,7 +724,7 @@
     // Property Accessors
     ///////////////////////////////////////////////////////////////////
     private boolean isBranch() {
-        return children!=null;
+        return children != null;
     }
 
     public long getPageId() {
@@ -755,12 +754,10 @@
     public void setNext(long next) {
         this.next = next;
     }
-    
+
     @Override
     public String toString() {
-        return "[BTreeNode "+(isBranch()?"branch":"leaf")+": "+Arrays.asList(keys)+"]";
+        return "[BTreeNode " + (isBranch() ? "branch" : "leaf") + ": " + Arrays.asList(keys) + "]";
     }
 
 }
-
-

Modified: activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java (original)
+++ activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java Sun Jun 28 19:37:06 2009
@@ -24,17 +24,21 @@
  * @param <Key>
  * @param <Value>
  */
-public interface BTreeVisitor<Key,Value> {
-    
+public interface BTreeVisitor<Key, Value> {
+
     /**
-     * Do you want to visit the range of BTree entries between the first and and second key?
+     * Do you want to visit the range of BTree entries between the first and and
+     * second key?
      * 
-     * @param first if null indicates the range of values before the second key. 
-     * @param second if null indicates the range of values after the first key.
-     * @return true if you want to visit the values between the first and second key.
+     * @param first
+     *            if null indicates the range of values before the second key.
+     * @param second
+     *            if null indicates the range of values after the first key.
+     * @return true if you want to visit the values between the first and second
+     *         key.
      */
     boolean isInterestedInKeysBetween(Key first, Key second);
-    
+
     /**
      * The keys and values of a BTree leaf node.
      * 
@@ -42,97 +46,155 @@
      * @param values
      */
     void visit(List<Key> keys, List<Value> values);
-    
-    
-    abstract class GTVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
-		final private Key value;
-
-		public GTVisitor(Key value) {
-			this.value = value;
-		}
-
-		public boolean isInterestedInKeysBetween(Key first, Key second) {
-        	return second==null || second.compareTo(value)>0;
-		}
-
-		public void visit(List<Key> keys, List<Value> values) {
-			for( int i=0; i < keys.size(); i++) {
-				Key key = keys.get(i);
-				if( key.compareTo(value)>0 ) {
-					matched(key, values.get(i));
-				}
-			}
-		}
 
-		abstract protected void matched(Key key, Value value);
+    /**
+     * If the visitor wishes to
+     * 
+     * @return
+     */
+    boolean isSatiated();
+
+    abstract class GTVisitor<Key extends Comparable<? super Key>, Value> implements BTreeVisitor<Key, Value> {
+        final private Key value;
+        int matches = Integer.MAX_VALUE;
+        boolean limited;
+
+        public GTVisitor(Key value) {
+            this.value = value;
+        }
+
+        public GTVisitor(Key value, int limit) {
+            this.value = value;
+            limited = true;
+            matches = limit;
+        }
+
+        public boolean isInterestedInKeysBetween(Key first, Key second) {
+            return second == null || second.compareTo(value) > 0;
+        }
+
+        public void visit(List<Key> keys, List<Value> values) {
+            for (int i = 0; i < keys.size() && !isSatiated(); i++) {
+                Key key = keys.get(i);
+                if (key.compareTo(value) > 0) {
+                    matched(key, values.get(i));
+                    if (limited) matches--;
+                }
+            }
+        }
+
+        public boolean isSatiated() {
+            return limited && matches <= 0;
+        }
+
+        abstract protected void matched(Key key, Value value);
     }
-    
-    abstract class GTEVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
-		final private Key value;
-
-		public GTEVisitor(Key value) {
-			this.value = value;
-		}
-
-		public boolean isInterestedInKeysBetween(Key first, Key second) {
-        	return second==null || second.compareTo(value)>=0;
-		}
-
-		public void visit(List<Key> keys, List<Value> values) {
-			for( int i=0; i < keys.size(); i++) {
-				Key key = keys.get(i);
-				if( key.compareTo(value)>=0 ) {
-					matched(key, values.get(i));
-				}
-			}
-		}
 
-		abstract protected void matched(Key key, Value value);
+    abstract class GTEVisitor<Key extends Comparable<? super Key>, Value> implements BTreeVisitor<Key, Value> {
+        final private Key value;
+        int matches = Integer.MAX_VALUE;
+        boolean limited;
+
+        public GTEVisitor(Key value) {
+            this.value = value;
+        }
+        
+        public GTEVisitor(Key value, int limit) {
+            this.value = value;
+            limited = true;
+            matches = limit;
+        }
+
+        public boolean isInterestedInKeysBetween(Key first, Key second) {
+            return second == null || second.compareTo(value) >= 0;
+        }
+
+        public void visit(List<Key> keys, List<Value> values) {
+            for (int i = 0; i < keys.size() && !isSatiated(); i++) {
+                Key key = keys.get(i);
+                if (key.compareTo(value) >= 0) {
+                    matched(key, values.get(i));
+                    if (limited) matches--;
+                }
+            }
+        }
+
+        public boolean isSatiated() {
+            return limited && matches <= 0;
+        }
+
+        abstract protected void matched(Key key, Value value);
     }
-    
-    abstract class LTVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
-		final private Key value;
-
-		public LTVisitor(Key value) {
-			this.value = value;
-		}
-
-		public boolean isInterestedInKeysBetween(Key first, Key second) {
-        	return first==null || first.compareTo(value)<0;
-		}
-
-		public void visit(List<Key> keys, List<Value> values) {
-			for( int i=0; i < keys.size(); i++) {
-				Key key = keys.get(i);
-				if( key.compareTo(value)<0 ) {
-					matched(key, values.get(i));
-				}
-			}
-		}
 
-		abstract protected void matched(Key key, Value value);
+    abstract class LTVisitor<Key extends Comparable<? super Key>, Value> implements BTreeVisitor<Key, Value> {
+        final private Key value;
+        int matches = Integer.MAX_VALUE;
+        boolean limited;
+
+        public LTVisitor(Key value) {
+            this.value = value;
+        }
+        
+        public LTVisitor(Key value, int limit) {
+            this.value = value;
+            limited = true;
+            matches = limit;
+        }
+
+        public boolean isInterestedInKeysBetween(Key first, Key second) {
+            return first == null || first.compareTo(value) < 0;
+        }
+
+        public void visit(List<Key> keys, List<Value> values) {
+            for (int i = 0; i < keys.size() && !isSatiated(); i++) {
+                Key key = keys.get(i);
+                if (key.compareTo(value) < 0) {
+                    matched(key, values.get(i));
+                    if (limited) matches--;
+                }
+            }
+        }
+
+        public boolean isSatiated() {
+            return limited && matches <= 0;
+        }
+
+        abstract protected void matched(Key key, Value value);
     }
-    
-    abstract class LTEVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
-		final private Key value;
-
-		public LTEVisitor(Key value) {
-			this.value = value;
-		}
-
-		public boolean isInterestedInKeysBetween(Key first, Key second) {
-        	return first==null || first.compareTo(value)<=0;
-		}
-
-		public void visit(List<Key> keys, List<Value> values) {
-			for( int i=0; i < keys.size(); i++) {
-				Key key = keys.get(i);
-				if( key.compareTo(value)<=0 ) {
-					matched(key, values.get(i));
-				}
-			}
-		}
 
-		abstract protected void matched(Key key, Value value);
+    abstract class LTEVisitor<Key extends Comparable<? super Key>, Value> implements BTreeVisitor<Key, Value> {
+        final private Key value;
+        int matches = Integer.MAX_VALUE;
+        boolean limited;
+
+        public LTEVisitor(Key value) {
+            this.value = value;
+        }
+
+        public LTEVisitor(Key value, int limit) {
+            this.value = value;
+            limited = true;
+            matches = limit;
+        }
+        
+        public boolean isInterestedInKeysBetween(Key first, Key second) {
+            return first == null || first.compareTo(value) <= 0;
+        }
+
+        public void visit(List<Key> keys, List<Value> values) {
+            for (int i = 0; i < keys.size() && !isSatiated(); i++) {
+                Key key = keys.get(i);
+                if (key.compareTo(value) <= 0) {
+                    matched(key, values.get(i));
+                    if (limited) matches--;
+                }
+            }
+        }
+
+        public boolean isSatiated() {
+            return limited && matches <= 0;
+        }
+
+        abstract protected void matched(Key key, Value value);
     }
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java?rev=789143&r1=789142&r2=789143&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java (original)
+++ activemq/sandbox/activemq-flow/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java Sun Jun 28 19:37:06 2009
@@ -167,6 +167,9 @@
             }
             public void visit(List<String> keys, List<Long> values) {
             }
+            public boolean isSatiated() {
+                return false;
+            }
         });