You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/06/02 17:28:31 UTC

svn commit: r1130607 [1/2] - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/broker/ activemq-core/src/main/java/org/apache/activemq/broker/region/ activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/ activemq-core...

Author: gtully
Date: Thu Jun  2 15:28:30 2011
New Revision: 1130607

URL: http://svn.apache.org/viewvc?rev=1130607&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3351 - Usage of the temp store index by the PList needs the be improved. new implementation puts the max entries in a page, reading/writing requires substantially less page access and disk access when pending messages build up

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/kahadb/
    activemq/trunk/activemq-core/src/main/java/org/apache/kahadb/util/
    activemq/trunk/activemq-core/src/main/java/org/apache/kahadb/util/LocationMarshaller.java   (with props)
Removed:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/EntryLocation.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java
    activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Thu Jun  2 15:28:30 2011
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker;
 
 import java.io.IOException;
+import java.net.SocketException;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -228,13 +229,22 @@ public class TransportConnection impleme
             transportException.set(e);
             if (TRANSPORTLOG.isDebugEnabled()) {
                 TRANSPORTLOG.debug("Transport failed: " + e, e);
-            } else if (TRANSPORTLOG.isInfoEnabled()) {
+            } else if (TRANSPORTLOG.isInfoEnabled() && !expected(e)) {
                 TRANSPORTLOG.info("Transport failed: " + e);
             }
             stopAsync();
         }
     }
 
+    private boolean expected(IOException e) {
+        return  e instanceof SocketException && isStomp() && e.getMessage().indexOf("reset") != -1;
+    }
+
+    private boolean isStomp() {
+        URI uri = connector.getUri();
+        return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1;
+    }
+
     /**
      * Calls the serviceException method in an async thread. Since handling a
      * service exception closes a socket, we should not tie up broker threads

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Thu Jun  2 15:28:30 2011
@@ -583,12 +583,12 @@ public abstract class BaseDestination im
     
     protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
         if (systemUsage.isSendFailIfNoSpace()) {
-            getLog().debug("sendFailIfNoSpace, forcing exception on send: " + warning);
+            getLog().debug("sendFailIfNoSpace, forcing exception on send, usage:  " + usage + ": " + warning);
             throw new ResourceAllocationException(warning);
         }
         if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
             if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) {
-                getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send: " + warning);
+                getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: " + usage + ": " + warning);
                 throw new ResourceAllocationException(warning);
             }
         } else {
@@ -601,7 +601,7 @@ public abstract class BaseDestination im
     
                 long now = System.currentTimeMillis();
                 if (now >= nextWarn) {
-                    getLog().info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
+                    getLog().info("" + usage + ": " + warning + " (blocking for: " + (now - start) / 1000 + "s)");
                     nextWarn = now + blockedProducerWarningInterval;
                 }
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Jun  2 15:28:30 2011
@@ -631,7 +631,7 @@ public class Queue extends BaseDestinati
                 } else {
 
                     if (memoryUsage.isFull()) {
-                        waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
+                        waitForSpace(context, memoryUsage, "Usage Manager Memory Limit reached. Producer ("
                                 + message.getProducerId() + ") stopped to prevent flooding "
                                 + getActiveMQDestination().getQualifiedName() + "."
                                 + " See http://activemq.apache.org/producer-flow-control.html for more info");
@@ -738,7 +738,7 @@ public class Queue extends BaseDestinati
     private void checkUsage(ConnectionContext context, Message message) throws ResourceAllocationException, IOException, InterruptedException {
         if (message.isPersistent()) {
             if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
-                final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of "
+                final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
                     + systemUsage.getStoreUsage().getLimit() + ". Stopping producer ("
                     + message.getProducerId() + ") to prevent flooding "
                     + getActiveMQDestination().getQualifiedName() + "."
@@ -747,7 +747,7 @@ public class Queue extends BaseDestinati
                 waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
             }
         } else if (messages.getSystemUsage() != null && systemUsage.getTempUsage().isFull()) {
-            final String logMessage = "Usage Manager Temp Store is Full ("
+            final String logMessage = "Temp Store is Full ("
                     + systemUsage.getTempUsage().getPercentUsage() + "% of " + systemUsage.getTempUsage().getLimit()
                     +"). Stopping producer (" + message.getProducerId()
                 + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Thu Jun  2 15:28:30 2011
@@ -293,10 +293,7 @@ public class Topic extends BaseDestinati
 
                 if (warnOnProducerFlowControl) {
                     warnOnProducerFlowControl = false;
-                    LOG
-                            .info("Usage Manager memory limit ("
-                                    + memoryUsage.getLimit()
-                                    + ") reached for "
+                    LOG.info(memoryUsage + ", Usage Manager memory limit reached for "
                                     + getActiveMQDestination().getQualifiedName()
                                     + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
                                     + " See http://activemq.apache.org/producer-flow-control.html for more info");
@@ -304,7 +301,7 @@ public class Topic extends BaseDestinati
 
                 if (systemUsage.isSendFailIfNoSpace()) {
                     throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
-                            + memoryUsage.getLimit() + ") reached. Stopping producer (" + message.getProducerId()
+                            + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
                             + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
                             + " See http://activemq.apache.org/producer-flow-control.html for more info");
                 }
@@ -379,7 +376,7 @@ public class Topic extends BaseDestinati
                             waitForSpace(
                                     context,
                                     memoryUsage,
-                                    "Usage Manager memory limit reached. Stopping producer ("
+                                    "Usage Manager Memory Usage limit reached. Stopping producer ("
                                             + message.getProducerId()
                                             + ") to prevent flooding "
                                             + getActiveMQDestination().getQualifiedName()
@@ -427,7 +424,7 @@ public class Topic extends BaseDestinati
 
         if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
             if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
-                final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of "
+                final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
                         + systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
                         + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
                         + " See http://activemq.apache.org/producer-flow-control.html for more info";

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Thu Jun  2 15:28:30 2011
@@ -135,7 +135,9 @@ public class FilePendingMessageCursor ex
         iterating = false;
         if (flushRequired) {
             flushRequired = false;
-            flushToDisk();
+            if (!hasSpace()) {
+                flushToDisk();
+            }
         }
     }
 
@@ -151,8 +153,9 @@ public class FilePendingMessageCursor ex
     }
 
     private void destroyDiskList() throws Exception {
-        if (!isDiskListEmpty()) {
+        if (diskList != null) {
             store.removePList(name);
+            diskList = null;
         }
     }
 
@@ -335,7 +338,7 @@ public class FilePendingMessageCursor ex
      */
     @Override
     public synchronized int size() {
-        return memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size());
+        return memoryList.size() + (isDiskListEmpty() ? 0 : (int)getDiskList().size());
     }
 
     /**
@@ -374,12 +377,14 @@ public class FilePendingMessageCursor ex
     public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
         if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
             synchronized (this) {
-                flushRequired = true;
-                if (!iterating) {
-                    expireOldMessages();
-                    if (!hasSpace()) {
-                        flushToDisk();
-                        flushRequired = false;
+                if (!flushRequired) {
+                    flushRequired =true;
+                    if (!iterating) {
+                        expireOldMessages();
+                        if (!hasSpace()) {
+                            flushToDisk();
+                            flushRequired = false;
+                        }
                     }
                 }
             }
@@ -412,8 +417,12 @@ public class FilePendingMessageCursor ex
     }
 
     protected synchronized void flushToDisk() {
-
         if (!memoryList.isEmpty()) {
+            long start = 0;
+             if (LOG.isTraceEnabled()) {
+                start = System.currentTimeMillis();
+                LOG.trace("" + name + ", flushToDisk() mem list size: " +memoryList.size()  + " " +  (systemUsage != null ? systemUsage.getMemoryUsage() : "") );
+             }
             while (!memoryList.isEmpty()) {
                 MessageReference node = memoryList.removeFirst();
                 node.decrementReferenceCount();
@@ -429,6 +438,9 @@ public class FilePendingMessageCursor ex
             }
             memoryList.clear();
             setCacheEnabled(false);
+             if (LOG.isTraceEnabled()) {
+                LOG.trace("" + name + ", flushToDisk() done - " + (System.currentTimeMillis() - start) + "ms " + (systemUsage != null ? systemUsage.getMemoryUsage() : ""));
+             }
         }
     }
 
@@ -471,35 +483,23 @@ public class FilePendingMessageCursor ex
     }
 
     final class DiskIterator implements Iterator<MessageReference> {
-        private PListEntry next = null;
-        private PListEntry current = null;
-        PList list;
-
+        private final Iterator<PListEntry> iterator;
         DiskIterator() {
             try {
-                this.list = getDiskList();
-                synchronized (this.list) {
-                    this.current = this.list.getFirst();
-                    this.next = this.current;
-                }
+                iterator = getDiskList().iterator();
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
         }
 
         public boolean hasNext() {
-            return this.next != null;
+            return iterator.hasNext();
         }
 
         public MessageReference next() {
-            this.current = next;
             try {
-                ByteSequence bs = this.current.getByteSequence();
-                synchronized (this.list) {
-                    this.current = this.list.refresh(this.current);
-                    this.next = this.list.getNext(this.current);
-                }
-                return getMessage(bs);
+                PListEntry entry = iterator.next();
+                return getMessage(entry.getByteSequence());
             } catch (IOException e) {
                 LOG.error("I/O error", e);
                 throw new RuntimeException(e);
@@ -507,17 +507,7 @@ public class FilePendingMessageCursor ex
         }
 
         public void remove() {
-            try {
-                synchronized (this.list) {
-                    this.current = this.list.refresh(this.current);
-                    this.list.remove(this.current);
-                }
-
-            } catch (IOException e) {
-                LOG.error("I/O error", e);
-                throw new RuntimeException(e);
-            }
-
+            iterator.remove();
         }
 
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Thu Jun  2 15:28:30 2011
@@ -36,7 +36,6 @@ import java.util.concurrent.locks.Reentr
 import org.apache.activemq.ActiveMQMessageAuditNoSync;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
-import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.MessageAck;
@@ -63,6 +62,7 @@ import org.apache.activemq.util.Callback
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.ServiceSupport;
+import org.apache.kahadb.util.LocationMarshaller;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.kahadb.index.BTreeIndex;
@@ -816,7 +816,12 @@ public class MessageDatabase extends Ser
      * @throws IOException
      */
     public JournalCommand<?> load(Location location) throws IOException {
+        long start = System.currentTimeMillis();
         ByteSequence data = journal.read(location);
+        long end = System.currentTimeMillis();
+        if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+            LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
+        }
         DataByteArrayInputStream is = new DataByteArrayInputStream(data);
         byte readByte = is.readByte();
         KahaEntryType type = KahaEntryType.valueOf(readByte);
@@ -1472,34 +1477,6 @@ public class MessageDatabase extends Ser
         }
     }
 
-    static class LocationMarshaller implements Marshaller<Location> {
-        final static LocationMarshaller INSTANCE = new LocationMarshaller();
-
-        public Location readPayload(DataInput dataIn) throws IOException {
-            Location rc = new Location();
-            rc.setDataFileId(dataIn.readInt());
-            rc.setOffset(dataIn.readInt());
-            return rc;
-        }
-
-        public void writePayload(Location object, DataOutput dataOut) throws IOException {
-            dataOut.writeInt(object.getDataFileId());
-            dataOut.writeInt(object.getOffset());
-        }
-
-        public int getFixedSize() {
-            return 8;
-        }
-
-        public Location deepCopy(Location source) {
-            return new Location(source);
-        }
-
-        public boolean isDeepCopySupported() {
-            return true;
-        }
-    }
-
     static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
         final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
 
@@ -1569,7 +1546,7 @@ public class MessageDatabase extends Ser
         // Figure out the next key using the last entry in the destination.
         rc.orderIndex.configureLast(tx);
 
-        rc.locationIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
+        rc.locationIndex.setKeyMarshaller(org.apache.kahadb.util.LocationMarshaller.INSTANCE);
         rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
         rc.locationIndex.load(tx);
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java Thu Jun  2 15:28:30 2011
@@ -19,505 +19,241 @@ package org.apache.activemq.store.kahadb
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.activemq.store.kahadb.plist.EntryLocation.EntryLocationMarshaller;
+import org.apache.kahadb.index.ListIndex;
+import org.apache.kahadb.index.ListNode;
 import org.apache.kahadb.journal.Location;
-import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.Transaction;
 import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.LocationMarshaller;
+import org.apache.kahadb.util.StringMarshaller;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PList {
+public class PList extends ListIndex<String, Location> {
     static final Logger LOG = LoggerFactory.getLogger(PList.class);
     final PListStore store;
     private String name;
-    private long rootId = EntryLocation.NOT_SET;
-    private long lastId = EntryLocation.NOT_SET;
-    private final AtomicBoolean loaded = new AtomicBoolean();
-    private int size = 0;
     Object indexLock;
 
     PList(PListStore store) {
         this.store = store;
         this.indexLock = store.getIndexLock();
+        setPageFile(store.getPageFile());
+        setKeyMarshaller(StringMarshaller.INSTANCE);
+        setValueMarshaller(LocationMarshaller.INSTANCE);
     }
 
     public void setName(String name) {
         this.name = name;
     }
 
-    /*
-     * (non-Javadoc)
-     * @see org.apache.activemq.beanstalk.JobScheduler#getName()
-     */
     public String getName() {
         return this.name;
     }
 
-    public synchronized int size() {
-        return this.size;
-    }
-
-    public synchronized boolean isEmpty() {
-        return size == 0;
-    }
-
-    /**
-     * @return the rootId
-     */
-    public long getRootId() {
-        return this.rootId;
-    }
-
-    /**
-     * @param rootId
-     *            the rootId to set
-     */
-    public void setRootId(long rootId) {
-        this.rootId = rootId;
-    }
-
-    /**
-     * @return the lastId
-     */
-    public long getLastId() {
-        return this.lastId;
-    }
-
-    /**
-     * @param lastId
-     *            the lastId to set
-     */
-    public void setLastId(long lastId) {
-        this.lastId = lastId;
-    }
-
-    /**
-     * @return the loaded
-     */
-    public boolean isLoaded() {
-        return this.loaded.get();
-    }
-
     void read(DataInput in) throws IOException {
-        this.rootId = in.readLong();
-        this.name = in.readUTF();
+        this.headPageId = in.readLong();
     }
 
     public void write(DataOutput out) throws IOException {
-        out.writeLong(this.rootId);
-        out.writeUTF(name);
+        out.writeLong(this.headPageId);
     }
 
     public synchronized void destroy() throws IOException {
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
-                    destroy(tx);
+                    clear(tx);
+                    unload(tx);
                 }
             });
         }
     }
 
-    void destroy(Transaction tx) throws IOException {
-        // start from the first
-        EntryLocation entry = getFirst(tx);
-        while (entry != null) {
-            EntryLocation toRemove = entry.copy();
-            entry = getNext(tx, entry.getNext());
-            doRemove(tx, toRemove);
-        }
-    }
-
-    synchronized void load(Transaction tx) throws IOException {
-        if (loaded.compareAndSet(false, true)) {
-            final Page<EntryLocation> p = tx.load(this.rootId, null);
-            if (p.getType() == Page.PAGE_FREE_TYPE) {
-                // Need to initialize it..
-                EntryLocation root = createEntry(p, "root", EntryLocation.NOT_SET, EntryLocation.NOT_SET);
-
-                storeEntry(tx, root);
-                this.lastId = root.getPage().getPageId();
-            } else {
-                // find last id
-                long nextId = this.rootId;
-                while (nextId != EntryLocation.NOT_SET) {
-                    EntryLocation next = getNext(tx, nextId);
-                    if (next != null) {
-                        this.lastId = next.getPage().getPageId();
-                        nextId = next.getNext();
-                        this.size++;
-                    }
-                }
-            }
-        }
-    }
-
-    synchronized public void unload() {
-        if (loaded.compareAndSet(true, false)) {
-            this.rootId = EntryLocation.NOT_SET;
-            this.lastId = EntryLocation.NOT_SET;
-            this.size=0;
-        }
-    }
-
-    synchronized public void addLast(final String id, final ByteSequence bs) throws IOException {
+    public void addLast(final String id, final ByteSequence bs) throws IOException {
         final Location location = this.store.write(bs, false);
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
-                    addLast(tx, id, bs, location);
+                    add(tx, id, location);
                 }
             });
         }
     }
 
-    private void addLast(Transaction tx, String id, ByteSequence bs, Location location) throws IOException {
-        EntryLocation entry = createEntry(tx, id, this.lastId, EntryLocation.NOT_SET);
-        entry.setLocation(location);
-        storeEntry(tx, entry);
-        EntryLocation last = loadEntry(tx, this.lastId);
-        last.setNext(entry.getPage().getPageId());
-        storeEntry(tx, last);
-        this.lastId = entry.getPage().getPageId();
-        this.size++;
-    }
-
-    synchronized public void addFirst(final String id, final ByteSequence bs) throws IOException {
+    public void addFirst(final String id, final ByteSequence bs) throws IOException {
         final Location location = this.store.write(bs, false);
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
-                    addFirst(tx, id, bs, location);
-                }
-            });
-        }
-    }
-
-    private void addFirst(Transaction tx, String id, ByteSequence bs, Location location) throws IOException {
-        EntryLocation entry = createEntry(tx, id, EntryLocation.NOT_SET, EntryLocation.NOT_SET);
-        entry.setLocation(location);
-        EntryLocation oldFirst = getFirst(tx);
-        if (oldFirst != null) {
-            oldFirst.setPrev(entry.getPage().getPageId());
-            storeEntry(tx, oldFirst);
-            entry.setNext(oldFirst.getPage().getPageId());
-
-        }
-        EntryLocation root = getRoot(tx);
-        root.setNext(entry.getPage().getPageId());
-        storeEntry(tx, root);
-        storeEntry(tx, entry);
-
-        this.size++;
-    }
-
-    synchronized public boolean remove(final String id) throws IOException {
-        final AtomicBoolean result = new AtomicBoolean();
-        synchronized (indexLock) {
-            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-                public void execute(Transaction tx) throws IOException {
-                    result.set(remove(tx, id));
+                    addFirst(tx, id, location);
                 }
             });
         }
-        return result.get();
     }
 
-    synchronized public boolean remove(final int position) throws IOException {
+    public boolean remove(final String id) throws IOException {
         final AtomicBoolean result = new AtomicBoolean();
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
-                    result.set(remove(tx, position));
+                    result.set(remove(tx, id) != null);
                 }
             });
         }
         return result.get();
     }
 
-    synchronized public boolean remove(final PListEntry entry) throws IOException {
+    public boolean remove(final long position) throws IOException {
         final AtomicBoolean result = new AtomicBoolean();
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
-                    result.set(doRemove(tx, entry.getEntry()));
+                    Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
+                    if (iterator.hasNext()) {
+                        iterator.next();
+                        iterator.remove();
+                        result.set(true);
+                    } else {
+                        result.set(false);
+                    }
                 }
             });
         }
         return result.get();
     }
 
-    synchronized public PListEntry get(final int position) throws IOException {
+    public PListEntry get(final long position) throws IOException {
         PListEntry result = null;
-        final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
+        final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
-                    ref.set(get(tx, position));
+                    Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
+                    ref.set(iterator.next());
                 }
             });
         }
         if (ref.get() != null) {
-            ByteSequence bs = this.store.getPayload(ref.get().getLocation());
-            result = new PListEntry(ref.get(), bs);
+            ByteSequence bs = this.store.getPayload(ref.get().getValue());
+            result = new PListEntry(ref.get().getKey(), bs);
         }
         return result;
     }
 
-    synchronized public PListEntry getFirst() throws IOException {
+    public PListEntry getFirst() throws IOException {
         PListEntry result = null;
-        final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
+        final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
                     ref.set(getFirst(tx));
                 }
             });
-            if (ref.get() != null) {
-                ByteSequence bs = this.store.getPayload(ref.get().getLocation());
-                result = new PListEntry(ref.get(), bs);
-            }
+        }
+        if (ref.get() != null) {
+            ByteSequence bs = this.store.getPayload(ref.get().getValue());
+            result = new PListEntry(ref.get().getKey(), bs);
         }
         return result;
     }
 
-    synchronized public PListEntry getLast() throws IOException {
+    public PListEntry getLast() throws IOException {
         PListEntry result = null;
-        final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
+        final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
         synchronized (indexLock) {
             this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
                     ref.set(getLast(tx));
                 }
             });
-            if (ref.get() != null) {
-                ByteSequence bs = this.store.getPayload(ref.get().getLocation());
-                result = new PListEntry(ref.get(), bs);
-            }
+        }
+        if (ref.get() != null) {
+            ByteSequence bs = this.store.getPayload(ref.get().getValue());
+            result = new PListEntry(ref.get().getKey(), bs);
         }
         return result;
     }
 
-    synchronized public PListEntry getNext(PListEntry entry) throws IOException {
-        PListEntry result = null;
-        final long nextId = entry != null ? entry.getEntry().getNext() : this.rootId;
-        if (nextId != EntryLocation.NOT_SET) {
-            final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
-            synchronized (indexLock) {
-                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-                    public void execute(Transaction tx) throws IOException {
-                        ref.set(getNext(tx, nextId));
-                    }
-                });
-                if (ref.get() != null) {
-                    ByteSequence bs = this.store.getPayload(ref.get().getLocation());
-                    result = new PListEntry(ref.get(), bs);
-                }
-            }
-        }
-        return result;
+    public boolean isEmpty() {
+        return size() == 0;
     }
 
-    synchronized public PListEntry refresh(final PListEntry entry) throws IOException {
-        PListEntry result = null;
-        final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
-        synchronized (indexLock) {
-            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-                public void execute(Transaction tx) throws IOException {
-                    ref.set(loadEntry(tx, entry.getEntry().getPage().getPageId()));
-                }
-            });
-            if (ref.get() != null) {
-                result = new PListEntry(ref.get(), entry.getByteSequence());
-            }
-        }
-        return result;
+    synchronized public Iterator<PListEntry> iterator() throws IOException {
+        return new PListIterator();
     }
 
-    synchronized public void claimFileLocations(final Set<Integer> candidates) throws IOException {
-        synchronized (indexLock) {
-            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-                public void execute(Transaction tx) throws IOException {
-                    long nextId = rootId;
-                    while (nextId != EntryLocation.NOT_SET) {
-                        EntryLocation entry = getNext(tx, nextId);
-                        if (entry != null) {
-                            candidates.remove(entry.getLocation().getDataFileId());
-                            nextId = entry.getNext();
-                        } else {
-                            break;
-                        }
-                    }
-                }
-            });
+    private final class PListIterator implements Iterator<PListEntry> {
+        final Iterator<Map.Entry<String, Location>> iterator;
+        final Transaction tx;
+
+        PListIterator() throws IOException {
+            tx = store.pageFile.tx();
+            this.iterator = iterator(tx);
         }
-    }
 
-    boolean remove(Transaction tx, String id) throws IOException {
-        boolean result = false;
-        long nextId = this.rootId;
-        while (nextId != EntryLocation.NOT_SET) {
-            EntryLocation entry = getNext(tx, nextId);
-            if (entry != null) {
-                if (entry.getId().equals(id)) {
-                    result = doRemove(tx, entry);
-                    break;
-                }
-                nextId = entry.getNext();
-            } else {
-                // not found
-                break;
-            }
+        @Override
+        public boolean hasNext() {
+            return iterator.hasNext();
         }
-        return result;
-    }
 
-    boolean remove(Transaction tx, int position) throws IOException {
-        boolean result = false;
-        long nextId = this.rootId;
-        int count = 0;
-        while (nextId != EntryLocation.NOT_SET) {
-            EntryLocation entry = getNext(tx, nextId);
-            if (entry != null) {
-                if (count == position) {
-                    result = doRemove(tx, entry);
-                    break;
-                }
-                nextId = entry.getNext();
-            } else {
-                // not found
-                break;
+        @Override
+        public PListEntry next() {
+            Map.Entry<String, Location> entry = iterator.next();
+            ByteSequence bs = null;
+            try {
+                bs = store.getPayload(entry.getValue());
+            } catch (IOException unexpected) {
+                NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage());
+                e.initCause(unexpected);
+                throw e;
             }
-            count++;
+            return new PListEntry(entry.getKey(), bs);
         }
-        return result;
-    }
 
-    EntryLocation get(Transaction tx, int position) throws IOException {
-        EntryLocation result = null;
-        long nextId = this.rootId;
-        int count = -1;
-        while (nextId != EntryLocation.NOT_SET) {
-            EntryLocation entry = getNext(tx, nextId);
-            if (entry != null) {
-                if (count == position) {
-                    result = entry;
-                    break;
+        @Override
+        public void remove() {
+            try {
+                synchronized (indexLock) {
+                    tx.execute(new Transaction.Closure<IOException>() {
+                        @Override
+                        public void execute(Transaction tx) throws IOException {
+                            iterator.remove();
+                        }
+                    });
                 }
-                nextId = entry.getNext();
-            } else {
-                break;
+            } catch (IOException unexpected) {
+                IllegalStateException e = new IllegalStateException(unexpected);
+                e.initCause(unexpected);
+                throw e;
             }
-            count++;
-        }
-        return result;
-    }
-
-    EntryLocation getFirst(Transaction tx) throws IOException {
-        long offset = getRoot(tx).getNext();
-        if (offset != EntryLocation.NOT_SET) {
-            return loadEntry(tx, offset);
-        }
-        return null;
-    }
-
-    EntryLocation getLast(Transaction tx) throws IOException {
-        if (this.lastId != EntryLocation.NOT_SET) {
-            return loadEntry(tx, this.lastId);
         }
-        return null;
     }
 
-    private boolean doRemove(Transaction tx, EntryLocation entry) throws IOException {
-        boolean result = false;
-        if (entry != null) {
-
-            EntryLocation prev = getPrevious(tx, entry.getPrev());
-            EntryLocation next = getNext(tx, entry.getNext());
-            long prevId = prev != null ? prev.getPage().getPageId() : this.rootId;
-            long nextId = next != null ? next.getPage().getPageId() : EntryLocation.NOT_SET;
-
-            if (next != null) {
-                next.setPrev(prevId);
-                storeEntry(tx, next);
-            } else {
-                // we are deleting the last one in the list
-                this.lastId = prevId;
-            }
-            if (prev != null) {
-                prev.setNext(nextId);
-                storeEntry(tx, prev);
+    public void claimFileLocations(final Set<Integer> candidates) throws IOException {
+        synchronized (indexLock) {
+            if (loaded.get()) {
+                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                    public void execute(Transaction tx) throws IOException {
+                        Iterator<Map.Entry<String,Location>> iterator = iterator(tx);
+                        while (iterator.hasNext()) {
+                            Location location = iterator.next().getValue();
+                            candidates.remove(location.getDataFileId());
+                        }
+                    }
+                });
             }
-
-            entry.reset();
-            storeEntry(tx, entry);
-            tx.free(entry.getPage().getPageId());
-            result = true;
-            this.size--;
-        }
-        return result;
-    }
-
-    private EntryLocation createEntry(Transaction tx, String id, long previous, long next) throws IOException {
-        Page<EntryLocation> p = tx.allocate();
-        EntryLocation result = new EntryLocation();
-        result.setPage(p);
-        p.set(result);
-        result.setId(id);
-        result.setPrev(previous);
-        result.setNext(next);
-        return result;
-    }
-
-    private EntryLocation createEntry(Page<EntryLocation> p, String id, long previous, long next) throws IOException {
-        EntryLocation result = new EntryLocation();
-        result.setPage(p);
-        p.set(result);
-        result.setId(id);
-        result.setPrev(previous);
-        result.setNext(next);
-        return result;
-    }
-
-    EntryLocation loadEntry(Transaction tx, long pageId) throws IOException {
-        Page<EntryLocation> page = tx.load(pageId, EntryLocationMarshaller.INSTANCE);
-        EntryLocation entry = page.get();
-        if (entry != null) {
-            entry.setPage(page);
-        }
-        return entry;
-    }
-    
-    private void storeEntry(Transaction tx, EntryLocation entry) throws IOException {
-        tx.store(entry.getPage(), EntryLocationMarshaller.INSTANCE, true);
-    }
-
-    EntryLocation getNext(Transaction tx, long next) throws IOException {
-        EntryLocation result = null;
-        if (next != EntryLocation.NOT_SET) {
-            result = loadEntry(tx, next);
         }
-        return result;
-    }
-
-    private EntryLocation getPrevious(Transaction tx, long previous) throws IOException {
-        EntryLocation result = null;
-        if (previous != EntryLocation.NOT_SET) {
-            result = loadEntry(tx, previous);
-        }
-        return result;
-    }
-
-    private EntryLocation getRoot(Transaction tx) throws IOException {
-        EntryLocation result = loadEntry(tx, this.rootId);
-        return result;
     }
 
-    ByteSequence getPayload(EntryLocation entry) throws IOException {
-        return this.store.getPayload(entry.getLocation());
+    @Override
+    public String toString() {
+        return "" + name + ",[headPageId=" + headPageId  + ",tailPageId=" + tailPageId + ", size=" + size() + "]";
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java Thu Jun  2 15:28:30 2011
@@ -21,39 +21,22 @@ import org.apache.kahadb.util.ByteSequen
 public class PListEntry {
 
     private final ByteSequence byteSequence;
-    private final EntryLocation entry;
+    private final String entry;
 
-    PListEntry(EntryLocation entry, ByteSequence bs) {
+    PListEntry(String entry, ByteSequence bs) {
         this.entry = entry;
         this.byteSequence = bs;
     }
 
-    /**
-     * @return the byteSequence
-     */
     public ByteSequence getByteSequence() {
         return this.byteSequence;
     }
 
     public String getId() {
-        return this.entry.getId();
-    }
-
-    /**
-     * @return the entry
-     */
-    EntryLocation getEntry() {
         return this.entry;
     }
 
     public PListEntry copy() {
         return new PListEntry(this.entry, this.byteSequence);
     }
-
-    @Override
-    public String toString() {
-        return this.entry.getId() + "[pageId=" + this.entry.getPage().getPageId() + ",next=" + this.entry.getNext()
-                + "]";
-    }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java Thu Jun  2 15:28:30 2011
@@ -73,6 +73,10 @@ public class PListStore extends ServiceS
     private Scheduler scheduler;
     private long cleanupInterval = 30000;
 
+    private int indexPageSize = PageFile.DEFAULT_PAGE_SIZE;
+    private int indexCacheSize = PageFile.DEFAULT_PAGE_CACHE_SIZE;
+    private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
+
     public Object getIndexLock() {
         return indexLock;
     }
@@ -82,6 +86,30 @@ public class PListStore extends ServiceS
         this.scheduler = brokerService.getScheduler();
     }
 
+    public int getIndexPageSize() {
+        return indexPageSize;
+    }
+
+    public int getIndexCacheSize() {
+        return indexCacheSize;
+    }
+
+    public int getIndexWriteBatchSize() {
+        return indexWriteBatchSize;
+    }
+
+    public void setIndexPageSize(int indexPageSize) {
+        this.indexPageSize = indexPageSize;
+    }
+
+    public void setIndexCacheSize(int indexCacheSize) {
+        this.indexCacheSize = indexCacheSize;
+    }
+
+    public void setIndexWriteBatchSize(int indexWriteBatchSize) {
+        this.indexWriteBatchSize = indexWriteBatchSize;
+    }
+
     protected class MetaData {
         protected MetaData(PListStore store) {
             this.store = store;
@@ -89,34 +117,34 @@ public class PListStore extends ServiceS
 
         private final PListStore store;
         Page<MetaData> page;
-        BTreeIndex<String, PList> storedSchedulers;
+        BTreeIndex<String, PList> lists;
 
         void createIndexes(Transaction tx) throws IOException {
-            this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, tx.allocate().getPageId());
+            this.lists = new BTreeIndex<String, PList>(pageFile, tx.allocate().getPageId());
         }
 
         void load(Transaction tx) throws IOException {
-            this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
-            this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
-            this.storedSchedulers.load(tx);
+            this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
+            this.lists.setValueMarshaller(new PListMarshaller(this.store));
+            this.lists.load(tx);
         }
 
-        void loadLists(Transaction tx, Map<String, PList> schedulers) throws IOException {
-            for (Iterator<Entry<String, PList>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
+        void loadLists(Transaction tx, Map<String, PList> lists) throws IOException {
+            for (Iterator<Entry<String, PList>> i = this.lists.iterator(tx); i.hasNext();) {
                 Entry<String, PList> entry = i.next();
                 entry.getValue().load(tx);
-                schedulers.put(entry.getKey(), entry.getValue());
+                lists.put(entry.getKey(), entry.getValue());
             }
         }
 
         public void read(DataInput is) throws IOException {
-            this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, is.readLong());
-            this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
-            this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
+            this.lists = new BTreeIndex<String, PList>(pageFile, is.readLong());
+            this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
+            this.lists.setValueMarshaller(new PListMarshaller(this.store));
         }
 
         public void write(DataOutput os) throws IOException {
-            os.writeLong(this.storedSchedulers.getPageId());
+            os.writeLong(this.lists.getPageId());
         }
     }
 
@@ -137,29 +165,9 @@ public class PListStore extends ServiceS
         }
     }
 
-    class ValueMarshaller extends VariableMarshaller<List<EntryLocation>> {
-        public List<EntryLocation> readPayload(DataInput dataIn) throws IOException {
-            List<EntryLocation> result = new ArrayList<EntryLocation>();
-            int size = dataIn.readInt();
-            for (int i = 0; i < size; i++) {
-                EntryLocation jobLocation = new EntryLocation();
-                jobLocation.readExternal(dataIn);
-                result.add(jobLocation);
-            }
-            return result;
-        }
-
-        public void writePayload(List<EntryLocation> value, DataOutput dataOut) throws IOException {
-            dataOut.writeInt(value.size());
-            for (EntryLocation jobLocation : value) {
-                jobLocation.writeExternal(dataOut);
-            }
-        }
-    }
-
-    class JobSchedulerMarshaller extends VariableMarshaller<PList> {
+    class PListMarshaller extends VariableMarshaller<PList> {
         private final PListStore store;
-        JobSchedulerMarshaller(PListStore store) {
+        PListMarshaller(PListStore store) {
             this.store = store;
         }
         public PList readPayload(DataInput dataIn) throws IOException {
@@ -168,8 +176,8 @@ public class PListStore extends ServiceS
             return result;
         }
 
-        public void writePayload(PList js, DataOutput dataOut) throws IOException {
-            js.write(dataOut);
+        public void writePayload(PList list, DataOutput dataOut) throws IOException {
+            list.write(dataOut);
         }
     }
 
@@ -207,9 +215,9 @@ public class PListStore extends ServiceS
                     pl.setName(name);
                     getPageFile().tx().execute(new Transaction.Closure<IOException>() {
                         public void execute(Transaction tx) throws IOException {
-                            pl.setRootId(tx.allocate().getPageId());
+                            pl.setHeadPageId(tx.allocate().getPageId());
                             pl.load(tx);
-                            metaData.storedSchedulers.put(tx, name, pl);
+                            metaData.lists.put(tx, name, pl);
                         }
                     });
                     result = pl;
@@ -236,8 +244,8 @@ public class PListStore extends ServiceS
                 if (result) {
                     getPageFile().tx().execute(new Transaction.Closure<IOException>() {
                         public void execute(Transaction tx) throws IOException {
-                            metaData.storedSchedulers.remove(tx, name);
-                            pl.destroy(tx);
+                            metaData.lists.remove(tx, name);
+                            pl.destroy();
                         }
                     });
                 }
@@ -261,6 +269,9 @@ public class PListStore extends ServiceS
                 this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
                 this.journal.start();
                 this.pageFile = new PageFile(directory, "tmpDB");
+                this.pageFile.setPageSize(getIndexPageSize());
+                this.pageFile.setWriteBatchSize(getIndexWriteBatchSize());
+                this.pageFile.setPageCacheSize(getIndexCacheSize());
                 this.pageFile.load();
 
                 this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
@@ -310,7 +321,7 @@ public class PListStore extends ServiceS
             }
         }
         for (PList pl : this.persistentLists.values()) {
-            pl.unload();
+            pl.unload(null);
         }
         if (this.pageFile != null) {
             this.pageFile.unload();
@@ -351,20 +362,13 @@ public class PListStore extends ServiceS
         }
     }
 
-    private void claimCandidates(PListEntry entry, Set<Integer> candidates) {
-        EntryLocation location = entry.getEntry();
-        if (location != null) {
-            candidates.remove(location.getLocation().getDataFileId());
-        }
-    }
-
-    synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
+    ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
         ByteSequence result = null;
         result = this.journal.read(location);
         return result;
     }
 
-    synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
+    Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
         return this.journal.write(payload, sync);
     }
 
@@ -440,7 +444,8 @@ public class PListStore extends ServiceS
 
     @Override
     public String toString() {
-        return "PListStore:" + this.directory;
+        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
+        return "PListStore:[" + path + " ]";
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java Thu Jun  2 15:28:30 2011
@@ -16,13 +16,18 @@
  */
 package org.apache.activemq.transport.stomp;
 
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Map;
 
+import javax.net.ServerSocketFactory;
 import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.wireformat.WireFormat;
 import org.apache.activemq.xbean.XBeanBrokerService;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java Thu Jun  2 15:28:30 2011
@@ -284,7 +284,10 @@ public abstract class Usage<T extends Us
 
     @Override
     public String toString() {
-        return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + " limit=" + limiter.getLimit() + " percentUsageMinDelta=" + percentUsageMinDelta + "%";
+        return "Usage(" + getName() + ") percentUsage=" + percentUsage
+                + "%, usage=" + retrieveUsage() + ", limit=" + limiter.getLimit()
+                + ", percentUsageMinDelta=" + percentUsageMinDelta + "%"
+                + (parent != null ? ";Parent:" + parent.toString() : "");
     }
 
     @SuppressWarnings("unchecked")

Added: activemq/trunk/activemq-core/src/main/java/org/apache/kahadb/util/LocationMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/kahadb/util/LocationMarshaller.java?rev=1130607&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/kahadb/util/LocationMarshaller.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/kahadb/util/LocationMarshaller.java Thu Jun  2 15:28:30 2011
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.kahadb.journal.Location;
+
+public class LocationMarshaller implements Marshaller<Location> {
+    public final static LocationMarshaller INSTANCE = new LocationMarshaller();
+
+    public Location readPayload(DataInput dataIn) throws IOException {
+        Location rc = new Location();
+        rc.setDataFileId(dataIn.readInt());
+        rc.setOffset(dataIn.readInt());
+        return rc;
+    }
+
+    public void writePayload(Location object, DataOutput dataOut) throws IOException {
+        dataOut.writeInt(object.getDataFileId());
+        dataOut.writeInt(object.getOffset());
+    }
+
+    public int getFixedSize() {
+        return 8;
+    }
+
+    public Location deepCopy(Location source) {
+        return new Location(source);
+    }
+
+    public boolean isDeepCopySupported() {
+        return true;
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/kahadb/util/LocationMarshaller.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/kahadb/util/LocationMarshaller.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java Thu Jun  2 15:28:30 2011
@@ -184,6 +184,18 @@ public class XARecoveryBrokerTest extend
         DataArrayResponse dar = (DataArrayResponse)response;
         assertEquals(4, dar.getData().length);
 
+        // ensure we can close a connection with prepared transactions
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        // open again  to deliver outcome
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
         // Commit the prepared transactions.
         for (int i = 0; i < dar.getData().length; i++) {
             connection.send(createCommitTransaction2Phase(connectionInfo, (TransactionId)dar.getData()[i]));

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java Thu Jun  2 15:28:30 2011
@@ -17,23 +17,42 @@
 package org.apache.activemq.broker.region.cursors;
 
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.IndirectMessageReference;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.store.kahadb.plist.PList;
 import org.apache.activemq.usage.SystemUsage;
+import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.util.ByteSequence;
-import org.junit.Before;
+import org.junit.After;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class FilePendingMessageCursorTest {
-
+    private static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursorTest.class);
     BrokerService brokerService;
     FilePendingMessageCursor underTest;
 
-    @Before
-    public void createBrokerWithTempStoreLimit() throws Exception {
+    @After
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.getTempDataStore().stop();
+        }
+    }
+
+    private void createBrokerWithTempStoreLimit() throws Exception {
         brokerService = new BrokerService();
         SystemUsage usage = brokerService.getSystemUsage();
         usage.getTempUsage().setLimit(1025*1024*15);
@@ -45,7 +64,7 @@ public class FilePendingMessageCursorTes
 
     @Test
     public void testAddToEmptyCursorWhenTempStoreIsFull() throws Exception {
-
+        createBrokerWithTempStoreLimit();
         SystemUsage usage = brokerService.getSystemUsage();
         assertTrue("temp store is full: %" + usage.getTempUsage().getPercentUsage(), usage.getTempUsage().isFull());
 
@@ -57,4 +76,60 @@ public class FilePendingMessageCursorTes
 
         assertFalse("cursor is not full", underTest.isFull());
     }
+
+    @Test
+    public void testAddRemoveAddIndexSize() throws Exception {
+        brokerService = new BrokerService();
+        SystemUsage usage = brokerService.getSystemUsage();
+        usage.getMemoryUsage().setLimit(1024*150);
+        String body = new String(new byte[1024]);
+        Destination destination = new Queue(brokerService, new ActiveMQQueue("Q"), null, new DestinationStatistics(), null);
+
+        underTest = new FilePendingMessageCursor(brokerService.getBroker(), "test", false);
+        underTest.setSystemUsage(usage);
+
+        LOG.info("start");
+        final PageFile pageFile =  underTest.getDiskList().getPageFile();
+        LOG.info("page count: " +pageFile.getPageCount());
+        LOG.info("free count: " + pageFile.getFreePageCount());
+        LOG.info("content size: " +pageFile.getPageContentSize());
+
+        final long initialPageCount =  pageFile.getPageCount();
+
+        final int numMessages = 1000;
+
+        for (int j=0; j<10; j++) {
+            // ensure free pages are reused
+            for (int i=0; i< numMessages; i++) {
+                ActiveMQMessage mqMessage = new ActiveMQMessage();
+                mqMessage.setStringProperty("body", body);
+                mqMessage.setMessageId(new MessageId("1:2:3:" + i));
+                mqMessage.setMemoryUsage(usage.getMemoryUsage());
+                mqMessage.setRegionDestination(destination);
+                underTest.addMessageLast(new IndirectMessageReference(mqMessage));
+            }
+            assertFalse("cursor is not full " + usage.getTempUsage(), underTest.isFull());
+
+            underTest.reset();
+            long receivedCount = 0;
+            while(underTest.hasNext()) {
+                MessageReference ref = underTest.next();
+                underTest.remove();
+                assertEquals("id is correct", receivedCount++, ref.getMessageId().getProducerSequenceId());
+            }
+            assertEquals("got all messages back", receivedCount, numMessages);
+            LOG.info("page count: " +pageFile.getPageCount());
+            LOG.info("free count: " + pageFile.getFreePageCount());
+            LOG.info("content size: " + pageFile.getPageContentSize());
+        }
+
+        assertEquals("expected page usage", initialPageCount, pageFile.getPageCount() - pageFile.getFreePageCount() );
+
+        LOG.info("Destroy");
+        underTest.destroy();
+        LOG.info("page count: " + pageFile.getPageCount());
+        LOG.info("free count: " + pageFile.getFreePageCount());
+        LOG.info("content size: " + pageFile.getPageContentSize());
+        assertEquals("expected page usage", initialPageCount -1, pageFile.getPageCount() - pageFile.getFreePageCount() );
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java Thu Jun  2 15:28:30 2011
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTru
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Vector;
@@ -80,7 +81,7 @@ public class PListTest {
             plist.addFirst(test, bs);
         }
         assertEquals(plist.size(), COUNT);
-        int count = plist.size() - 1;
+        long count = plist.size() - 1;
         for (ByteSequence bs : map.values()) {
             String origStr = new String(bs.getData(), bs.getOffset(), bs.getLength());
             PListEntry entry = plist.get(count);
@@ -107,7 +108,7 @@ public class PListTest {
         assertEquals(plist.size(), COUNT);
         PListEntry entry = plist.getFirst();
         while (entry != null) {
-            plist.remove(entry.copy());
+            plist.remove(entry.getId());
             entry = plist.getFirst();
         }
         assertEquals(0,plist.size());
@@ -133,7 +134,6 @@ public class PListTest {
         }
         plist.destroy();
         assertEquals(0,plist.size());
-        assertNull("no first entry", plist.getFirst());
     }
     
     @Test
@@ -292,47 +292,56 @@ public class PListTest {
         store.setCleanupInterval(5000);
         store.start();
 
-        final int iterations = 500;
+        final int iterations = 5000;
         final int numLists = 10;
 
         // prime the store
 
         // create/delete
+        LOG.info("create");
         for (int i=0; i<numLists;i++) {
             new Job(i, PListTest.TaskType.CREATE, iterations).run();
         }
 
+        LOG.info("delete");
         for (int i=0; i<numLists;i++) {
             new Job(i, PListTest.TaskType.DELETE, iterations).run();
         }
 
-        // fill
+        LOG.info("fill");
         for (int i=0; i<numLists;i++) {
             new Job(i, PListTest.TaskType.ADD, iterations).run();
         }
-        // empty
+        LOG.info("remove");
         for (int i=0; i<numLists;i++) {
             new Job(i, PListTest.TaskType.REMOVE, iterations).run();
         }
-        // empty
+
+        LOG.info("check empty");
+        for (int i=0; i<numLists;i++) {
+            assertEquals("empty " + i, 0, store.getPList("List-" + i).size());
+        }
+
+        LOG.info("delete again");
         for (int i=0; i<numLists;i++) {
             new Job(i, PListTest.TaskType.DELETE, iterations).run();
         }
 
-        // fill
+        LOG.info("fill again");
         for (int i=0; i<numLists;i++) {
             new Job(i, PListTest.TaskType.ADD, iterations).run();
         }
 
-        // parallel
-        ExecutorService executor = Executors.newFixedThreadPool(100);
+        LOG.info("parallel add and remove");
+        ExecutorService executor = Executors.newFixedThreadPool(numLists*2);
         for (int i=0; i<numLists*2; i++) {
             executor.execute(new Job(i, i>=numLists ? PListTest.TaskType.ADD : PListTest.TaskType.REMOVE, iterations));
         }
 
         executor.shutdown();
+        LOG.info("wait for parallel work to complete");
         executor.awaitTermination(60*5, TimeUnit.SECONDS);
-        assertTrue("no excepitons", exceptions.isEmpty());
+        assertTrue("no exceptions", exceptions.isEmpty());
     }
 
     enum TaskType {CREATE, DELETE, ADD, REMOVE, ITERATE}
@@ -373,7 +382,7 @@ public class PListTest {
                     case REMOVE:
                         plist = store.getPList("List-" + id);
 
-                        for (int j = iterations; j > 0; j--) {
+                        for (int j = iterations -1; j >= 0; j--) {
                             plist.remove(idSeed + "id" + j);
                             if (j > 0 && j % (iterations / 2) == 0) {
                                 LOG.info("Job-" + id + " Done remove: " + j);
@@ -383,9 +392,10 @@ public class PListTest {
                     case ITERATE:
                         plist = store.getPList("List-" + id);
 
-                        PListEntry element = plist.getFirst();
-                        while (element != null) {
-                            element = plist.getNext(element);
+                        Iterator<PListEntry> iterator = plist.iterator();
+                        PListEntry element = null;
+                        while (iterator.hasNext()) {
+                            element = iterator.next();
                         }
                         break;
                     default:

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java Thu Jun  2 15:28:30 2011
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,7 +36,7 @@ public class ListIndex<Key,Value> implem
     protected PageFile pageFile;
     protected long headPageId;
     protected long tailPageId;
-    private long size;
+    private AtomicLong size = new AtomicLong(0);
 
     protected AtomicBoolean loaded = new AtomicBoolean();
 
@@ -43,9 +44,12 @@ public class ListIndex<Key,Value> implem
     private Marshaller<Key> keyMarshaller;
     private Marshaller<Value> valueMarshaller;
 
-    public ListIndex(PageFile pageFile, long rootPageId) {
+    public ListIndex() {
+    }
+
+    public ListIndex(PageFile pageFile, long headPageId) {
         this.pageFile = pageFile;
-        this.headPageId = rootPageId;
+        this.headPageId = headPageId;
     }
 
     synchronized public void load(Transaction tx) throws IOException {
@@ -61,15 +65,15 @@ public class ListIndex<Key,Value> implem
             final Page<ListNode<Key,Value>> p = tx.load(headPageId, null);
             if( p.getType() == Page.PAGE_FREE_TYPE ) {
                  // Need to initialize it..
-                ListNode<Key, Value> root = createNode(p, null);
+                ListNode<Key, Value> root = createNode(p);
                 storeNode(tx, root, true);
-                tailPageId = headPageId;
+                tailPageId = headPageId = p.getPageId();
             } else {
-                ListNode<Key, Value> node = loadNode(tx, headPageId, null);
-                size += node.size(tx);
+                ListNode<Key, Value> node = loadNode(tx, headPageId);
+                size.addAndGet(node.size(tx));
                 while (node.getNext() != -1) {
-                    node = loadNode(tx, node.getNext(), node);
-                    size += node.size(tx);
+                    node = loadNode(tx, node.getNext());
+                    size.addAndGet(node.size(tx));
                     tailPageId = node.getPageId();
                 }
             }
@@ -82,11 +86,11 @@ public class ListIndex<Key,Value> implem
     }
     
     protected ListNode<Key,Value> getHead(Transaction tx) throws IOException {
-        return loadNode(tx, headPageId, null);
+        return loadNode(tx, headPageId);
     }
 
     protected ListNode<Key,Value> getTail(Transaction tx) throws IOException {
-        return loadNode(tx, tailPageId, null);
+        return loadNode(tx, tailPageId);
     }
 
     synchronized public boolean containsKey(Transaction tx, Key key) throws IOException {
@@ -122,14 +126,14 @@ public class ListIndex<Key,Value> implem
     synchronized public Value add(Transaction tx, Key key, Value value) throws IOException {
         assertLoaded();
         getTail(tx).put(tx, key, value);
-        size ++;
+        size.incrementAndGet();
         return null;
     }
 
     synchronized public Value addFirst(Transaction tx, Key key, Value value) throws IOException {
         assertLoaded();
         getHead(tx).addFirst(tx, key, value);
-        size++;
+        size.incrementAndGet();
         return null;
     }
 
@@ -146,7 +150,7 @@ public class ListIndex<Key,Value> implem
     }
 
     public void onRemove() {
-        size--;
+        size.decrementAndGet();
     }
 
     public boolean isTransient() {
@@ -157,8 +161,10 @@ public class ListIndex<Key,Value> implem
         for (Iterator<ListNode<Key,Value>> iterator = listNodeIterator(tx); iterator.hasNext(); ) {
             ListNode<Key,Value>candidate = iterator.next();
             candidate.clear(tx);
+            // break up the transaction
+            tx.commit();
         }
-        size = 0;
+        size.set(0);
     }
 
     synchronized public Iterator<ListNode<Key, Value>> listNodeIterator(Transaction tx) throws IOException {
@@ -173,7 +179,7 @@ public class ListIndex<Key,Value> implem
         return getHead(tx).iterator(tx);
     }
     
-    synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx, int initialPosition) throws IOException {
+    synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx, long initialPosition) throws IOException {
         return getHead(tx).iterator(tx, initialPosition);
     }
 
@@ -191,29 +197,24 @@ public class ListIndex<Key,Value> implem
         }
     }
 
-    ListNode<Key,Value> loadNode(Transaction tx, long pageId, ListNode<Key,Value> parent) throws IOException {
+    ListNode<Key,Value> loadNode(Transaction tx, long pageId) throws IOException {
         Page<ListNode<Key,Value>> page = tx.load(pageId, marshaller);
         ListNode<Key, Value> node = page.get();
         node.setPage(page);
-        node.setParent(parent);
         return node;
     }
 
-    ListNode<Key,Value> createNode(Page<ListNode<Key,Value>> p, ListNode<Key,Value> parent) throws IOException {
+    ListNode<Key,Value> createNode(Page<ListNode<Key,Value>> page) throws IOException {
         ListNode<Key,Value> node = new ListNode<Key,Value>(this);
-        node.setPage(p);
-        node.setParent(parent);
-        node.setEmpty();
-        p.set(node);
+        node.setPage(page);
+        page.set(node);
         return node;
     }
 
-    ListNode<Key,Value> createNode(Transaction tx, ListNode<Key,Value> parent) throws IOException {
-        Page<ListNode<Key,Value>> page = tx.load(tx.<Object>allocate(1).getPageId(), marshaller);
+    ListNode<Key,Value> createNode(Transaction tx) throws IOException {
+        Page<ListNode<Key,Value>> page = tx.load(tx.<Object>allocate(1).getPageId(), null);
         ListNode<Key,Value> node = new ListNode<Key,Value>(this);
         node.setPage(page);
-        node.setParent(parent);
-        node.setEmpty();
         page.set(node);
         return node;
     }
@@ -225,6 +226,11 @@ public class ListIndex<Key,Value> implem
     public PageFile getPageFile() {
         return pageFile;
     }
+
+    public void setPageFile(PageFile pageFile) {
+        this.pageFile = pageFile;
+    }
+
     public long getHeadPageId() {
         return headPageId;
     }
@@ -252,6 +258,6 @@ public class ListIndex<Key,Value> implem
     }
 
     public long size() {
-        return size;
+        return size.get();
     }
 }