You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/04/20 16:15:38 UTC

svn commit: r395597 [1/3] - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/kaha/ main/java/org/apache/activemq/kaha/impl/ main/java/org/apache/activemq/store/kahadaptor/ test/java/org/apache/activemq/kaha/ test/java/org/a...

Author: rajdavies
Date: Thu Apr 20 07:15:30 2006
New Revision: 395597

URL: http://svn.apache.org/viewcvs?rev=395597&view=rev
Log:
tuning around kaha persistence

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataFile.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataItem.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexItem.java
      - copied, changed from r393376, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/LocatableItem.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexLinkedList.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexManager.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataReader.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataWriter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexReader.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexWriter.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/IndexLinkedListTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaQueueTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/MemoryAllocationTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/TemporaryTopicMemoryAllocationTest.java
Removed:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/FreeSpaceManager.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/FreeSpaceTree.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/LocatableItem.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RootContainer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreImpl.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreReader.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreWriter.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/BytesMarshaller.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Marshaller.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StringMarshaller.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerListIterator.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerValueCollection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerValueCollectionIterator.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/Item.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayInputStream.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayOutputStream.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/LoadTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDurableTopicTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/BytesMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/BytesMarshaller.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/BytesMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/BytesMarshaller.java Thu Apr 20 07:15:30 2006
@@ -13,8 +13,8 @@
  */
 package org.apache.activemq.kaha;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 /**
  * Implementation of a Marshaller for byte arrays
@@ -29,7 +29,7 @@
      * @param dataOut
      * @throws IOException
      */
-    public void writePayload(Object object,DataOutputStream dataOut) throws IOException{
+    public void writePayload(Object object,DataOutput dataOut) throws IOException{
         byte[] data=(byte[]) object;
         dataOut.writeInt(data.length);
         dataOut.write(data);
@@ -42,7 +42,7 @@
      * @return unmarshalled object
      * @throws IOException
      */
-    public Object readPayload(DataInputStream dataIn) throws IOException{
+    public Object readPayload(DataInput dataIn) throws IOException{
         int size=dataIn.readInt();
         byte[] data=new byte[size];
         dataIn.readFully(data);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java Thu Apr 20 07:15:30 2006
@@ -99,6 +99,6 @@
      * @return true if successful
      */
     public boolean doRemove(int position);
-    
+        
     
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Marshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Marshaller.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Marshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Marshaller.java Thu Apr 20 07:15:30 2006
@@ -16,8 +16,8 @@
  */
 package org.apache.activemq.kaha;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 
 /**
@@ -34,7 +34,7 @@
      * @param dataOut
      * @throws IOException
      */
-    public void writePayload(Object object, DataOutputStream dataOut) throws IOException;
+    public void writePayload(Object object, DataOutput dataOut) throws IOException;
     
     
     /**
@@ -43,7 +43,7 @@
      * @return unmarshalled object
      * @throws IOException
      */
-    public Object readPayload(DataInputStream dataIn) throws IOException;
+    public Object readPayload(DataInput dataIn) throws IOException;
 
    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java Thu Apr 20 07:15:30 2006
@@ -15,8 +15,8 @@
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -33,7 +33,7 @@
      * @param dataOut
      * @throws IOException
      */
-    public void writePayload(Object object,DataOutputStream dataOut) throws IOException{
+    public void writePayload(Object object,DataOutput dataOut) throws IOException{
         ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
         ObjectOutputStream objectOut=new ObjectOutputStream(bytesOut);
         objectOut.writeObject(object);
@@ -50,7 +50,7 @@
      * @return unmarshalled object
      * @throws IOException
      */
-    public Object readPayload(DataInputStream dataIn) throws IOException{
+    public Object readPayload(DataInput dataIn) throws IOException{
         int size = dataIn.readInt();
         byte[] data = new byte[size];
         dataIn.readFully(data);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java Thu Apr 20 07:15:30 2006
@@ -41,8 +41,9 @@
      * Checks if a MapContainer exists
      * @param id
      * @return new MapContainer
+     * @throws IOException 
      */
-    public boolean doesMapContainerExist(Object id);
+    public boolean doesMapContainerExist(Object id) throws IOException;
 
     /**
      * Get a MapContainer with the given id - the MapContainer is created if needed
@@ -62,15 +63,17 @@
     /**
      * Get a Set of call MapContainer Ids
      * @return the set of ids
+     * @throws IOException 
      */
-    public Set getMapContainerIds();
+    public Set getMapContainerIds() throws IOException;
     
     /**
      * Checks if a ListContainer exists
      * @param id
      * @return new MapContainer
+     * @throws IOException 
      */
-    public boolean doesListContainerExist(Object id);
+    public boolean doesListContainerExist(Object id) throws IOException;
 
    /**
     * Get a ListContainer with the given id and creates it if it doesn't exist
@@ -90,8 +93,9 @@
    /**
     * Get a Set of call ListContainer Ids
     * @return the set of ids
+ * @throws IOException 
     */
-   public Set getListContainerIds();
+   public Set getListContainerIds() throws IOException;
     
     
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java Thu Apr 20 07:15:30 2006
@@ -15,7 +15,8 @@
 
 import java.io.File;
 import java.io.IOException;
-import org.apache.activemq.kaha.impl.StoreImpl;
+import org.apache.activemq.kaha.impl.KahaStore;
+
 /**
  * Factory for creating stores
  * 
@@ -32,16 +33,17 @@
      * @throws IOException
      */
     public static Store open(String name,String mode) throws IOException{
-        return new StoreImpl(name,mode);
+        return new KahaStore(name,mode);
     }
     
     /**
      * Delete a database
      * @param name of the database
      * @return true if successful
+     * @throws IOException 
      */
-    public static boolean delete(String name){
-        File file = new File(name);
-        return file.delete();
+    public static boolean delete(String name) throws IOException{
+        KahaStore store = new KahaStore(name,"rw");
+        return store.delete();
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StringMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StringMarshaller.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StringMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StringMarshaller.java Thu Apr 20 07:15:30 2006
@@ -13,8 +13,8 @@
  */
 package org.apache.activemq.kaha;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 /**
  * Implementation of a Marshaller for Strings
@@ -29,7 +29,7 @@
      * @param dataOut
      * @throws IOException
      */
-    public void writePayload(Object object,DataOutputStream dataOut) throws IOException{
+    public void writePayload(Object object,DataOutput dataOut) throws IOException{
         dataOut.writeUTF(object.toString());
     }
 
@@ -40,7 +40,7 @@
      * @return unmarshalled object
      * @throws IOException
      */
-    public Object readPayload(DataInputStream dataIn) throws IOException{
+    public Object readPayload(DataInput dataIn) throws IOException{
         return dataIn.readUTF();
     }
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,155 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.activemq.kaha.RuntimeStoreException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+/**
+ * Implementation of a ListContainer
+ * 
+ * @version $Revision: 1.2 $
+ */
+public abstract class BaseContainerImpl{
+    private static final Log log=LogFactory.getLog(BaseContainerImpl.class);
+    protected IndexItem root;
+    protected IndexLinkedList list;
+    protected IndexManager indexManager;
+    protected DataManager dataManager;
+    protected Object id;
+    protected boolean loaded=false;
+    protected boolean closed=false;
+    protected final Object mutex=new Object();
+
+    protected BaseContainerImpl(Object id,IndexItem root,IndexManager indexManager,DataManager dataManager){
+        this.id=id;
+        this.root=root;
+        this.indexManager=indexManager;
+        this.dataManager=dataManager;
+        this.list=new IndexLinkedList(root);
+    }
+
+    public abstract void unload();
+
+    public abstract void load();
+
+    public abstract int size();
+
+    public abstract void clear();
+
+    protected abstract Object getValue(IndexItem currentItem);
+
+    protected abstract void remove(IndexItem currentItem);
+
+    protected final IndexLinkedList getInternalList(){
+        return list;
+    }
+
+    public final void close(){
+        unload();
+        closed=true;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#isLoaded()
+     */
+    public final boolean isLoaded(){
+        checkClosed();
+        return loaded;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.activemq.kaha.ListContainer#getId()
+     */
+    public final Object getId(){
+        checkClosed();
+        return id;
+    }
+
+    protected final void expressDataInterest() throws IOException{
+        long nextItem=root.getNextItem();
+        while(nextItem!=Item.POSITION_NOT_SET){
+            IndexItem item=indexManager.getIndex(nextItem);
+            item.setOffset(nextItem);
+            dataManager.addInterestInFile(item.getKeyFile());
+            dataManager.addInterestInFile(item.getValueFile());
+            nextItem=item.getNextItem();
+        }
+    }
+
+    protected final void doClear(){
+        checkClosed();
+        synchronized(mutex){
+            loaded=true;
+            synchronized(mutex){
+                List list=new ArrayList();
+                try{
+                    long nextItem=root.getNextItem();
+                    while(nextItem!=Item.POSITION_NOT_SET){
+                        IndexItem item=new IndexItem();
+                        item.setOffset(nextItem);
+                        list.add(item);
+                        nextItem=item.getNextItem();
+                    }
+                    root.setNextItem(Item.POSITION_NOT_SET);
+                    indexManager.updateIndex(root);
+                    for(int i=0;i<list.size();i++){
+                        IndexItem item=(IndexItem) list.get(i);
+                        dataManager.removeInterestInFile(item.getKeyFile());
+                        dataManager.removeInterestInFile(item.getValueFile());
+                        indexManager.freeIndex(item);
+                    }
+                    list.clear();
+                }catch(IOException e){
+                    log.error("Failed to clear Container "+getId(),e);
+                    throw new RuntimeStoreException(e);
+                }
+            }
+        }
+    }
+
+    protected final void delete(IndexItem key,IndexItem prev,IndexItem next){
+        try{
+            dataManager.removeInterestInFile(key.getKeyFile());
+            dataManager.removeInterestInFile(key.getValueFile());
+            prev=prev==null?root:prev;
+            next=next!=root?next:null;
+            if(next!=null){
+                prev.setNextItem(next.getOffset());
+                next.setPreviousItem(prev.getOffset());
+                indexManager.updateIndex(next);
+            }else{
+                prev.setNextItem(Item.POSITION_NOT_SET);
+            }
+            indexManager.updateIndex(prev);
+            indexManager.freeIndex(key);
+        }catch(IOException e){
+            log.error("Failed to delete "+key,e);
+            throw new RuntimeStoreException(e);
+        }
+    }
+
+    protected final void checkClosed(){
+        if(closed){
+            throw new RuntimeStoreException("The store is closed");
+        }
+    }
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerListIterator.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerListIterator.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerListIterator.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerListIterator.java Thu Apr 20 07:15:30 2006
@@ -1,54 +1,42 @@
 /**
- * 
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
  */
 package org.apache.activemq.kaha.impl;
 
 import java.util.ListIterator;
-/**
- * @author rajdavies
- * 
- */
-public class ContainerListIterator implements ListIterator{
-    private ListContainerImpl container;
-    private ListIterator iterator;
-    private LocatableItem current;
 
-    protected ContainerListIterator(ListContainerImpl container,ListIterator iterator){
-        this.container=container;
-        this.iterator=iterator;
-        this.current = container.internalGet(0);
-    }
+/** 
+* @version $Revision: 1.2 $
+*/
+public class ContainerListIterator extends ContainerValueCollectionIterator implements ListIterator{
+    
+   
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see java.util.ListIterator#hasNext()
-     */
-    public boolean hasNext(){
-        return iterator.hasNext();
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see java.util.ListIterator#next()
-     */
-    public Object next(){
-        Object result=null;
-        current=(LocatableItem) iterator.next();
-        if(current!=null){
-            result=container.getValue(current);
-        }
-        return result;
+    protected ContainerListIterator(ListContainerImpl container,IndexLinkedList list,IndexItem start){
+       super(container,list,start);
     }
 
+   
     /*
      * (non-Javadoc)
      * 
      * @see java.util.ListIterator#hasPrevious()
      */
     public boolean hasPrevious(){
-        return iterator.hasPrevious();
+        return list.getPrevEntry(currentItem) != null;
     }
 
     /*
@@ -57,12 +45,8 @@
      * @see java.util.ListIterator#previous()
      */
     public Object previous(){
-        Object result=null;
-        current=(LocatableItem) iterator.previous();
-        if(current!=null){
-            result=container.getValue(current);
-        }
-        return result;
+        currentItem = list.getPrevEntry(currentItem);
+        return currentItem != null ? container.getValue(currentItem) : null;
     }
 
     /*
@@ -71,7 +55,16 @@
      * @see java.util.ListIterator#nextIndex()
      */
     public int nextIndex(){
-        return iterator.nextIndex();
+        int result = -1;
+        if (currentItem != null){
+            IndexItem next = list.getNextEntry(currentItem);
+            if (next != null){
+                result = container.getInternalList().indexOf(next);
+            }
+        }
+        
+        
+        return result;
     }
 
     /*
@@ -80,29 +73,27 @@
      * @see java.util.ListIterator#previousIndex()
      */
     public int previousIndex(){
-        return iterator.previousIndex();
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see java.util.ListIterator#remove()
-     */
-    public void remove(){
-        iterator.remove();
-        if(current!=null){
-            container.remove(current);
+        int result = -1;
+        if (currentItem != null){
+            IndexItem prev = list.getPrevEntry(currentItem);
+            if (prev != null){
+                result = container.getInternalList().indexOf(prev);
+            }
         }
+        
+        
+        return result;
     }
 
+    
     /*
      * (non-Javadoc)
      * 
      * @see java.util.ListIterator#set(E)
      */
     public void set(Object o){
-        LocatableItem item=container.internalSet(previousIndex()+1,o);
-        iterator.set(item);
+        IndexItem item=((ListContainerImpl) container).internalSet(previousIndex()+1,o);
+        currentItem=item;
     }
 
     /*
@@ -111,7 +102,7 @@
      * @see java.util.ListIterator#add(E)
      */
     public void add(Object o){
-        LocatableItem item=container.internalAdd(previousIndex()+1,o);
-        iterator.set(item);
+        IndexItem item=((ListContainerImpl) container).internalSet(previousIndex()+1,o);
+        currentItem=item;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerValueCollection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerValueCollection.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerValueCollection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerValueCollection.java Thu Apr 20 07:15:30 2006
@@ -42,37 +42,41 @@
 
     
     public Iterator iterator(){
-        LinkedList list=container.getItemList();
-        list = (LinkedList) list.clone();
-        return new ContainerValueCollectionIterator(container,list.iterator());
+        IndexLinkedList list=container.getItemList();
+        return new ContainerValueCollectionIterator(container,list,list.getRoot());
     }
 
    
     public Object[] toArray(){
         Object[] result = null;
-        List list = container.getItemList();
+        IndexLinkedList list = container.getItemList();
         synchronized(list){
             result = new Object[list.size()];
+            IndexItem item = list.getFirst();
             int count = 0;
-            for(Iterator i=list.iterator();i.hasNext();){
-                LocatableItem item=(LocatableItem) i.next();
-                Object value=container.getValue(item);
+            while (item != null){
+                Object value=container.getValue(item);  
                 result[count++] = value;
+                
+                item = list.getNextEntry(item);
             }
+           
+            
         }
         return result;
     }
 
     public Object[] toArray(Object[] result){
-        List list=container.getItemList();
+        IndexLinkedList list=container.getItemList();
         synchronized(list){
             if(result.length<=list.size()){
-                int count=0;
-                result=(Object[]) java.lang.reflect.Array.newInstance(result.getClass().getComponentType(),list.size());
-                for(Iterator i=list.iterator();i.hasNext();){
-                    LocatableItem item=(LocatableItem) i.next();
-                    Object value=container.getValue(item);
-                    result[count++]=value;
+                IndexItem item = list.getFirst();
+                int count = 0;
+                while (item != null){
+                    Object value=container.getValue(item);  
+                    result[count++] = value;
+                    
+                    item = list.getNextEntry(item);
                 }
             }
         }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerValueCollectionIterator.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerValueCollectionIterator.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerValueCollectionIterator.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerValueCollectionIterator.java Thu Apr 20 07:15:30 2006
@@ -25,20 +25,21 @@
 * @version $Revision: 1.2 $
 */
 public class ContainerValueCollectionIterator implements Iterator{
-    private MapContainerImpl container;
-    private Iterator  iter;
-    private LocatableItem currentItem;
-    ContainerValueCollectionIterator(MapContainerImpl container,Iterator iter){
+    protected BaseContainerImpl container;
+    protected IndexLinkedList list;
+    protected IndexItem currentItem;
+    ContainerValueCollectionIterator(BaseContainerImpl container,IndexLinkedList list,IndexItem start){
         this.container = container;
-        this.iter = iter;
+        this.list = list;
+        this.currentItem = start;
     }
     
     public boolean hasNext(){
-        return iter.hasNext();
+        return currentItem != null && list.getNextEntry(currentItem) != null;
     }
 
     public Object next(){
-        currentItem = (LocatableItem) iter.next();
+        currentItem = list.getNextEntry(currentItem);
         return container.getValue(currentItem);
     }
 

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataFile.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataFile.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataFile.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataFile.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,97 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+/**
+ * DataFile
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class DataFile{
+    private File file;
+    private Integer number;
+    private int referenceCount;
+    private RandomAccessFile randomAcessFile;
+    long length=0;
+
+    DataFile(File file,int number){
+        this.file=file;
+        this.number=new Integer(number);
+        length=file.exists()?file.length():0;
+    }
+
+    Integer getNumber(){
+        return number;
+    }
+
+    synchronized RandomAccessFile getRandomAccessFile() throws FileNotFoundException{
+        if(randomAcessFile==null){
+            randomAcessFile=new RandomAccessFile(file,"rw");
+        }
+        return randomAcessFile;
+    }
+
+    synchronized long getLength(){
+        return length;
+    }
+
+    synchronized void incrementLength(int size){
+        length+=size;
+    }
+
+    synchronized void purge() throws IOException{
+        if(randomAcessFile!=null){
+            randomAcessFile.close();
+            randomAcessFile=null;
+        }
+    }
+
+    synchronized boolean delete() throws IOException{
+        purge();
+        return file.delete();
+    }
+
+    synchronized void force() throws IOException{
+        if(randomAcessFile!=null){
+            randomAcessFile.getFD().sync();
+        }
+    }
+
+    synchronized void close() throws IOException{
+        if(randomAcessFile!=null){
+            randomAcessFile.close();
+        }
+    }
+
+    synchronized int increment(){
+        return ++referenceCount;
+    }
+
+    synchronized int decrement(){
+        return --referenceCount;
+    }
+
+    synchronized boolean isUnused(){
+        return referenceCount<=0;
+    }
+    
+    public String toString(){
+        String result = file.getName() + " number = " + number + " , length = " + length + " refCount = " + referenceCount;
+        return result;
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataItem.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataItem.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataItem.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataItem.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,110 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import org.apache.activemq.kaha.Marshaller;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+/**
+ * A a wrapper for a data in the store
+ * 
+ * @version $Revision: 1.2 $
+ */
+final class DataItem implements Item{
+    static final int HEAD_SIZE=6; // magic + len
+    private int size;
+    private long offset=POSITION_NOT_SET;
+    private int file=(int) POSITION_NOT_SET;
+
+    DataItem(){}
+    
+    boolean isValid(){
+        return file != POSITION_NOT_SET;
+    }
+
+    void writeHeader(DataOutput dataOut) throws IOException{
+        dataOut.writeShort(MAGIC);
+        dataOut.writeInt(size);
+    }
+
+    void readHeader(DataInput dataIn) throws IOException{
+        int magic=dataIn.readShort();
+        if(magic==MAGIC){
+            size=dataIn.readInt();
+        }else{
+            throw new BadMagicException("Unexpected Magic value: "+magic);
+        }
+    }
+
+    void writePayload(Marshaller marshaller,Object object,DataOutputStream dataOut) throws IOException{
+        marshaller.writePayload(object,dataOut);
+    }
+
+    Object readPayload(Marshaller marshaller,DataInputStream dataIn) throws IOException{
+        return marshaller.readPayload(dataIn);
+    }
+
+    /**
+     * @return Returns the size.
+     */
+    int getSize(){
+        return size;
+    }
+
+    /**
+     * @param size The size to set.
+     */
+    void setSize(int size){
+        this.size=size;
+    }
+
+    /**
+     * @return Returns the offset.
+     */
+    long getOffset(){
+        return offset;
+    }
+
+    /**
+     * @param offset The offset to set.
+     */
+    void setOffset(long offset){
+        this.offset=offset;
+    }
+
+    /**
+     * @return Returns the file.
+     */
+    int getFile(){
+        return file;
+    }
+
+    /**
+     * @param file The file to set.
+     */
+    void setFile(int file){
+        this.file=file;
+    }
+
+    /**
+     * @return a pretty print
+     */
+    public String toString(){
+        String result="offset = "+offset+", file = " + file + ", size = "+size;
+        return result;
+    }
+}
\ No newline at end of file

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataItem.java
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,188 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+/**
+ * Manages DataFiles
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+final class DataManager{
+    private static final Log log=LogFactory.getLog(DataManager.class);
+    protected static long MAX_FILE_LENGTH=1024*1024*16;
+    private final File dir;
+    private final String prefix;
+    private StoreDataReader reader;
+    private StoreDataWriter writer;
+    private DataFile currentWriteFile;
+    Map fileMap=new HashMap();
+
+    DataManager(File dir,String pf){
+        this.dir=dir;
+        this.prefix=pf;
+        this.reader=new StoreDataReader(this);
+        this.writer=new StoreDataWriter(this);
+        // build up list of current dataFiles
+        File[] files=dir.listFiles(new FilenameFilter(){
+            public boolean accept(File dir,String name){
+                return dir.equals(dir)&&name.startsWith(prefix);
+            }
+        });
+        if(files!=null){
+            for(int i=0;i<files.length;i++){
+                File file=files[i];
+                String name=file.getName();
+                String numStr=name.substring(prefix.length(),name.length());
+                int num=Integer.parseInt(numStr);
+                DataFile dataFile=new DataFile(file,num);
+                fileMap.put(dataFile.getNumber(),dataFile);
+                if(currentWriteFile==null||currentWriteFile.getNumber().intValue()<num){
+                    currentWriteFile=dataFile;
+                }
+            }
+        }
+    }
+
+    DataFile findSpaceForData(DataItem item) throws IOException{
+        if(currentWriteFile==null||((currentWriteFile.getLength()+item.getSize())>MAX_FILE_LENGTH)){
+            int nextNum=currentWriteFile!=null?currentWriteFile.getNumber().intValue()+1:1;
+            if(currentWriteFile!=null&&currentWriteFile.isUnused()){
+                removeDataFile(currentWriteFile);
+            }
+            currentWriteFile=createAndAddDataFile(nextNum);
+        }
+        item.setOffset(currentWriteFile.getLength());
+        item.setFile(currentWriteFile.getNumber().intValue());
+        return currentWriteFile;
+    }
+
+    RandomAccessFile getDataFile(DataItem item) throws IOException{
+        Integer key=new Integer(item.getFile());
+        DataFile dataFile=(DataFile) fileMap.get(key);
+        if(dataFile!=null){
+            return dataFile.getRandomAccessFile();
+        }
+        throw new IOException("Could not locate data file "+prefix+item.getFile());
+    }
+
+    synchronized Object readItem(Marshaller marshaller,DataItem item) throws IOException{
+        return reader.readItem(marshaller,item);
+    }
+
+    synchronized DataItem storeItem(Marshaller marshaller,Object payload) throws IOException{
+        return writer.storeItem(marshaller,payload);
+    }
+
+    synchronized void close() throws IOException{
+        for(Iterator i=fileMap.values().iterator();i.hasNext();){
+            DataFile dataFile=(DataFile) i.next();
+            dataFile.force();
+        }
+        fileMap.clear();
+    }
+
+    synchronized void force() throws IOException{
+        for(Iterator i=fileMap.values().iterator();i.hasNext();){
+            DataFile dataFile=(DataFile) i.next();
+            dataFile.force();
+        }
+    }
+
+    synchronized boolean delete() throws IOException{
+        boolean result=true;
+        for(Iterator i=fileMap.values().iterator();i.hasNext();){
+            DataFile dataFile=(DataFile) i.next();
+            result&=dataFile.delete();
+        }
+        fileMap.clear();
+        return result;
+    }
+
+    synchronized void addInterestInFile(int file) throws IOException{
+        if(file>=0){
+            Integer key=new Integer(file);
+            DataFile dataFile=(DataFile) fileMap.get(key);
+            if(dataFile==null){
+                dataFile=createAndAddDataFile(file);
+            }
+            addInterestInFile(dataFile);
+        }
+    }
+
+    void addInterestInFile(DataFile dataFile){
+        if(dataFile!=null){
+            dataFile.increment();
+        }
+    }
+
+    synchronized void removeInterestInFile(int file) throws IOException{
+        if(file>=0){
+            Integer key=new Integer(file);
+            DataFile dataFile=(DataFile) fileMap.get(key);
+            removeInterestInFile(dataFile);
+        }
+    }
+
+    synchronized void removeInterestInFile(DataFile dataFile) throws IOException{
+        if(dataFile!=null){
+            if(dataFile.decrement()<=0){
+                if(dataFile!=currentWriteFile){
+                    removeDataFile(dataFile);
+                }
+            }
+        }
+    }
+
+    synchronized void consolidateDataFiles() throws IOException{
+        List purgeList=new ArrayList();
+        for(Iterator i=fileMap.values().iterator();i.hasNext();){
+            DataFile dataFile=(DataFile) i.next();
+            if(dataFile.isUnused() && dataFile != currentWriteFile){
+                purgeList.add(dataFile);
+            }
+        }
+        for(int i=0;i<purgeList.size();i++){
+            DataFile dataFile=(DataFile) purgeList.get(i);
+            fileMap.remove(dataFile.getNumber());
+            boolean result=dataFile.delete();
+            log.info("discarding data file "+dataFile+(result?"successful ":"failed"));
+        }
+    }
+
+    private DataFile createAndAddDataFile(int num){
+        String fileName=prefix+num;
+        File file=new File(dir,fileName);
+        DataFile result=new DataFile(file,num);
+        fileMap.put(result.getNumber(),result);
+        return result;
+    }
+
+    private void removeDataFile(DataFile dataFile) throws IOException{
+        fileMap.remove(dataFile.getNumber());
+        boolean result=dataFile.delete();
+        log.info("discarding data file "+dataFile+(result?"successful ":"failed"));
+    }
+}

Copied: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexItem.java (from r393376, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/LocatableItem.java)
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexItem.java?p2=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexItem.java&p1=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/LocatableItem.java&r1=393376&r2=395597&rev=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/LocatableItem.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexItem.java Thu Apr 20 07:15:30 2006
@@ -14,113 +14,219 @@
 package org.apache.activemq.kaha.impl;
 
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.Externalizable;
 import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.activemq.kaha.Marshaller;
-
 /**
-* A an Item with a relative postion and location to other Items in the Store
-* 
-* @version $Revision: 1.2 $
-*/
-public final class LocatableItem extends Item implements Externalizable{
+ * A an Item with a relative postion and location to other Items in the Store
+ * 
+ * @version $Revision: 1.2 $
+ */
+final class IndexItem implements Item{
+    
+    static final int INDEX_SIZE=43;
+    //used by linked list
+    IndexItem next;
+    IndexItem prev;
     
-  
-    private static final long serialVersionUID=-6888731361600185708L;
+    private long offset=POSITION_NOT_SET;
     private long previousItem=POSITION_NOT_SET;
     private long nextItem=POSITION_NOT_SET;
-    private long referenceItem=POSITION_NOT_SET;
-   
+    private long keyOffset=POSITION_NOT_SET;
+    private int keyFile=(int) POSITION_NOT_SET;
+    private long valueOffset=POSITION_NOT_SET;
+    private int valueFile=(int) POSITION_NOT_SET;
+    private boolean active=true;
+    
 
-    public LocatableItem(){}
+    /**
+     * Default Constructor
+     */
+    IndexItem(){}
 
-    public LocatableItem(long prev,long next,long objOffset) throws IOException{
-        this.previousItem=prev;
-        this.nextItem=next;
-        this.referenceItem=objOffset;
+    void reset(){
+        previousItem=POSITION_NOT_SET;
+        nextItem=POSITION_NOT_SET;
+        keyOffset=POSITION_NOT_SET;
+        keyFile=(int) POSITION_NOT_SET;
+        valueOffset=POSITION_NOT_SET;
+        valueFile=(int) POSITION_NOT_SET;
+        active=true;
+    }
+
+    DataItem getKeyDataItem(){
+        DataItem result=new DataItem();
+        result.setOffset(keyOffset);
+        result.setFile(keyFile);
+        return result;
     }
 
-
-    public void writePayload(Marshaller marshaller,Object object,DataOutputStream dataOut) throws IOException{
-        dataOut.writeLong(previousItem);
-        dataOut.writeLong(nextItem);
-        dataOut.writeLong(referenceItem);
-        super.writePayload(marshaller,object,dataOut);
+    DataItem getValueDataItem(){
+        DataItem result=new DataItem();
+        result.setOffset(valueOffset);
+        result.setFile(valueFile);
+        return result;
     }
 
-    public Object readPayload(Marshaller marshaller,DataInputStream dataIn) throws IOException{
-        previousItem=dataIn.readLong();
-        nextItem=dataIn.readLong();
-        referenceItem=dataIn.readLong();
-        return super.readPayload(marshaller, dataIn);
+    void setValueData(DataItem item){
+        valueOffset=item.getOffset();
+        valueFile=item.getFile();
     }
-    
-    void readLocation(DataInput dataIn) throws IOException{
-        previousItem=dataIn.readLong();
-        nextItem=dataIn.readLong();
-        referenceItem=dataIn.readLong();
+
+    void setKeyData(DataItem item){
+        keyOffset=item.getOffset();
+        keyFile=item.getFile();
     }
 
-    public void writeLocation(DataOutput dataOut) throws IOException{
+    /**
+     * @param dataOut
+     * @throws IOException
+     */
+    void write(DataOutput dataOut) throws IOException{
+        dataOut.writeShort(MAGIC);
+        dataOut.writeBoolean(active);
         dataOut.writeLong(previousItem);
         dataOut.writeLong(nextItem);
+        dataOut.writeInt(keyFile);
+        dataOut.writeLong(keyOffset);
+        dataOut.writeInt(valueFile);
+        dataOut.writeLong(valueOffset);
     }
 
-    public void setPreviousItem(long newPrevEntry){
+    /**
+     * @param dataIn
+     * @throws IOException
+     */
+    void read(DataInput dataIn) throws IOException{
+        if(dataIn.readShort()!=MAGIC){
+            throw new BadMagicException();
+        }
+        active=dataIn.readBoolean();
+        previousItem=dataIn.readLong();
+        nextItem=dataIn.readLong();
+        keyFile=dataIn.readInt();
+        keyOffset=dataIn.readLong();
+        valueFile=dataIn.readInt();
+        valueOffset=dataIn.readLong();
+    }
+
+    /**
+     * @param newPrevEntry
+     */
+    void setPreviousItem(long newPrevEntry){
         previousItem=newPrevEntry;
     }
 
-    public long getPreviousItem(){
+    /**
+     * @return prev item
+     */
+    long getPreviousItem(){
         return previousItem;
     }
 
-    public void setNextItem(long newNextEntry){
+    /**
+     * @param newNextEntry
+     */
+    void setNextItem(long newNextEntry){
         nextItem=newNextEntry;
     }
 
-    public long getNextItem(){
+    /**
+     * @return next item
+     */
+    long getNextItem(){
         return nextItem;
     }
 
-    public void setReferenceItem(long newObjectOffset){
-        referenceItem=newObjectOffset;
+    /**
+     * @param newObjectOffset
+     */
+    void setKeyOffset(long newObjectOffset){
+        keyOffset=newObjectOffset;
     }
 
-    public long getReferenceItem(){
-        return referenceItem;
+    /**
+     * @return key offset
+     */
+    long getKeyOffset(){
+        return keyOffset;
     }
 
-    /* (non-Javadoc)
-     * @see org.apache.activemq.kaha.impl.Item#toString()
+    /**
+     * @return Returns the keyFile.
      */
-    public String toString(){
-        String result=super.toString();
-        result+=" , referenceItem = "+referenceItem+", previousItem = "+previousItem+" , nextItem = "+nextItem;
-        return result;
+    int getKeyFile(){
+        return keyFile;
     }
 
-    /* (non-Javadoc)
-     * @see java.io.Externalizable#writeExternal(java.io.ObjectOutput)
+    /**
+     * @param keyFile The keyFile to set.
      */
-    public void writeExternal(ObjectOutput out) throws IOException{
-        out.writeLong(previousItem);
-        out.writeLong(nextItem);
-        out.writeLong(referenceItem);
-        
-    }
-
-    /* (non-Javadoc)
-     * @see java.io.Externalizable#readExternal(java.io.ObjectInput)
-     */
-    public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException{
-       previousItem = in.readLong();
-       nextItem = in.readLong();
-       referenceItem = in.readLong();
-        
+    void setKeyFile(int keyFile){
+        this.keyFile=keyFile;
+    }
+
+    /**
+     * @return Returns the valueFile.
+     */
+    int getValueFile(){
+        return valueFile;
+    }
+
+    /**
+     * @param valueFile The valueFile to set.
+     */
+    void setValueFile(int valueFile){
+        this.valueFile=valueFile;
+    }
+
+    /**
+     * @return Returns the valueOffset.
+     */
+    long getValueOffset(){
+        return valueOffset;
+    }
+
+    /**
+     * @param valueOffset The valueOffset to set.
+     */
+    void setValueOffset(long valueOffset){
+        this.valueOffset=valueOffset;
+    }
+
+    /**
+     * @return Returns the active.
+     */
+    boolean isActive(){
+        return active;
+    }
+
+    /**
+     * @param active The active to set.
+     */
+    void setActive(boolean active){
+        this.active=active;
+    }
+
+    /**
+     * @return Returns the offset.
+     */
+    long getOffset(){
+        return offset;
+    }
+
+    /**
+     * @param offset The offset to set.
+     */
+    void setOffset(long offset){
+        this.offset=offset;
+    }
+
+    /**
+     * @return eprtty print of 'this'
+     */
+    public String toString(){
+        String result="offset="+offset+" , keyFile = "+keyFile+" , keyOffset = "+keyOffset+", valueOffset = "
+                        +valueOffset+" , previousItem = "+previousItem+" , nextItem = "+nextItem;
+        return result;
     }
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexLinkedList.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexLinkedList.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexLinkedList.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexLinkedList.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,276 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+/**
+ * A linked list used by IndexItems
+ * 
+ * @version $Revision: 1.2 $
+ */
+final class IndexLinkedList implements Cloneable{
+    private transient IndexItem root;
+    private transient int size=0;
+
+   
+    /**
+     * Constructs an empty list.
+     */
+    IndexLinkedList(IndexItem header){
+        this.root = header;
+        this.root.next=root.prev=root;
+    }
+    
+    IndexItem getRoot(){
+        return root;
+    }
+    
+
+
+    /**
+     * Returns the first element in this list.
+     * 
+     * @return the first element in this list.
+     */
+    IndexItem getFirst(){
+        if(size==0)
+            return null;
+        return root.next;
+    }
+
+    /**
+     * Returns the last element in this list.
+     * 
+     * @return the last element in this list.
+     */
+    IndexItem getLast(){
+        if(size==0)
+            return null;
+        return root.prev;
+    }
+
+    /**
+     * Removes and returns the first element from this list.
+     * 
+     * @return the first element from this list.
+     */
+    IndexItem removeFirst(){
+        if(size==0){
+            return null;
+        }
+        IndexItem result=root.next;
+        remove(root.next);
+        return result;
+    }
+
+    /**
+     * Removes and returns the last element from this list.
+     * 
+     * @return the last element from this list.
+     */
+    Object removeLast(){
+        if(size==0)
+            return null;
+        IndexItem result=root.prev;
+        remove(root.prev);
+        return result;
+    }
+
+    /**
+     * Inserts the given element at the beginning of this list.
+     * 
+     * @param o the element to be inserted at the beginning of this list.
+     */
+    void addFirst(IndexItem item){
+        addBefore(item,root.next);
+    }
+
+    /**
+     * Appends the given element to the end of this list. (Identical in function to the <tt>add</tt> method; included
+     * only for consistency.)
+     * 
+     * @param o the element to be inserted at the end of this list.
+     */
+    void addLast(IndexItem item){
+        addBefore(item,root);
+    }
+
+    /**
+     * Returns the number of elements in this list.
+     * 
+     * @return the number of elements in this list.
+     */
+    int size(){
+        return size;
+    }
+
+    /**
+     * is the list empty?
+     * 
+     * @return true if there are no elements in the list
+     */
+    boolean isEmpty(){
+        return size==0;
+    }
+
+    /**
+     * Appends the specified element to the end of this list.
+     * 
+     * @param o element to be appended to this list.
+     * @return <tt>true</tt> (as per the general contract of <tt>Collection.add</tt>).
+     */
+    boolean add(IndexItem item){
+        addBefore(item,root);
+        return true;
+    }
+
+    /**
+     * Removes all of the elements from this list.
+     */
+    void clear(){
+        root.next=root.prev=root;
+        size=0;
+    }
+
+    // Positional Access Operations
+    /**
+     * Returns the element at the specified position in this list.
+     * 
+     * @param index index of element to return.
+     * @return the element at the specified position in this list.
+     * 
+     * @throws IndexOutOfBoundsException if the specified index is is out of range (<tt>index &lt; 0 || index &gt;= size()</tt>).
+     */
+    IndexItem get(int index){
+        return entry(index);
+    }
+
+    /**
+     * Inserts the specified element at the specified position in this list. Shifts the element currently at that
+     * position (if any) and any subsequent elements to the right (adds one to their indices).
+     * 
+     * @param index index at which the specified element is to be inserted.
+     * @param element element to be inserted.
+     * 
+     * @throws IndexOutOfBoundsException if the specified index is out of range (<tt>index &lt; 0 || index &gt; size()</tt>).
+     */
+    void add(int index,IndexItem element){
+        addBefore(element,(index==size?root:entry(index)));
+    }
+
+    /**
+     * Removes the element at the specified position in this list. Shifts any subsequent elements to the left (subtracts
+     * one from their indices). Returns the element that was removed from the list.
+     * 
+     * @param index the index of the element to removed.
+     * @return the element previously at the specified position.
+     * 
+     * @throws IndexOutOfBoundsException if the specified index is out of range (<tt>index &lt; 0 || index &gt;= size()</tt>).
+     */
+    Object remove(int index){
+        IndexItem e=entry(index);
+        remove(e);
+        return e;
+    }
+
+    /**
+     * Return the indexed entry.
+     */
+    private IndexItem entry(int index){
+        if(index<0||index>=size)
+            throw new IndexOutOfBoundsException("Index: "+index+", Size: "+size);
+        IndexItem e=root;
+        if(index<size/2){
+            for(int i=0;i<=index;i++)
+                e=e.next;
+        }else{
+            for(int i=size;i>index;i--)
+                e=e.prev;
+        }
+        return e;
+    }
+
+    // Search Operations
+    /**
+     * Returns the index in this list of the first occurrence of the specified element, or -1 if the List does not
+     * contain this element. More formally, returns the lowest index i such that
+     * <tt>(o==null ? get(i)==null : o.equals(get(i)))</tt>, or -1 if there is no such index.
+     * 
+     * @param o element to search for.
+     * @return the index in this list of the first occurrence of the specified element, or -1 if the list does not
+     *         contain this element.
+     */
+    int indexOf(IndexItem o){
+        int index=0;
+        for(IndexItem e=root.next;e!=root;e=e.next){
+            if(o==e){
+                return index;
+            }
+            index++;
+        }
+        return -1;
+    }
+
+    /**
+     * Retrieve the next entry after this entry
+     * 
+     * @param entry
+     * @return next entry
+     */
+    IndexItem getNextEntry(IndexItem entry){
+        return entry.next != root ? entry.next : null;
+    }
+
+    /**
+     * Retrive the prev entry after this entry
+     * 
+     * @param entry
+     * @return prev entry
+     */
+    IndexItem getPrevEntry(IndexItem entry){
+        return entry.prev != root ? entry.prev : null;
+    }
+
+    /**
+     * Insert an Entry before this entry
+     * 
+     * @param o the elment to insert
+     * @param e the Entry to insert the object before
+     * 
+     */
+    void addBefore(IndexItem insert,IndexItem e){
+        insert.next=e;
+        insert.prev=e.prev;
+        insert.prev.next=insert;
+        insert.next.prev=insert;
+        size++;
+    }
+
+    void remove(IndexItem e){
+        if(e==root)
+            return;
+        e.prev.next=e.next;
+        e.next.prev=e.prev;
+        size--;
+    }
+    
+    /**
+     *@return clone
+     */
+    public Object clone(){
+        IndexLinkedList clone=new IndexLinkedList(this.root);
+        for(IndexItem e=root.next;e!=root;e=e.next)
+            clone.add(e);
+        return clone;
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexLinkedList.java
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexManager.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexManager.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexManager.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexManager.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,121 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.LinkedList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+/**
+ * Optimized Store reader
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+final class IndexManager{
+    private static final Log log=LogFactory.getLog(IndexManager.class);
+    private File file;
+    private RandomAccessFile indexFile;
+    private StoreIndexReader reader;
+    private StoreIndexWriter writer;
+    private LinkedList freeList=new LinkedList();
+    private long length=0;
+
+    IndexManager(File ifile,String mode) throws IOException{
+        file=ifile;
+        indexFile=new RandomAccessFile(ifile,mode);
+        reader=new StoreIndexReader(indexFile);
+        writer=new StoreIndexWriter(indexFile);
+        long offset=0;
+        while((offset+IndexItem.INDEX_SIZE)<=indexFile.length()){
+            IndexItem index=reader.readItem(offset);
+            if(!index.isActive()){
+                index.reset();
+                freeList.add(index);
+            }
+            offset+=IndexItem.INDEX_SIZE;
+        }
+        length=offset;
+    }
+
+    synchronized boolean isEmpty() throws IOException{
+        return freeList.isEmpty()&&length==0;
+    }
+
+    synchronized IndexItem getIndex(long offset) throws IOException{
+        return reader.readItem(offset);
+    }
+
+    synchronized void freeIndex(IndexItem item) throws IOException{
+        item.reset();
+        item.setActive(false);
+        writer.storeItem(item);
+        freeList.add(item);
+    }
+
+    synchronized void updateIndex(IndexItem index) throws IOException{
+        writer.storeItem(index);
+    }
+
+    synchronized IndexItem createNewIndex() throws IOException{
+        IndexItem result=getNextFreeIndex();
+        if(result==null){
+            // allocate one
+            result=new IndexItem();
+            result.setOffset(length);
+            length+=IndexItem.INDEX_SIZE;
+        }
+        return result;
+    }
+
+    synchronized void close() throws IOException{
+        if(indexFile!=null){
+            indexFile.close();
+            indexFile=null;
+        }
+    }
+
+    synchronized void force() throws IOException{
+        if(indexFile!=null){
+            indexFile.getFD().sync();
+        }
+    }
+
+    synchronized boolean delete() throws IOException{
+        freeList.clear();
+        if(indexFile!=null){
+            indexFile.close();
+            indexFile=null;
+        }
+        return file.delete();
+    }
+
+    private IndexItem getNextFreeIndex() throws IOException{
+        IndexItem result=null;
+        if(!freeList.isEmpty()){
+            result=(IndexItem) freeList.removeLast();
+            result.reset();
+        }
+        return result;
+    }
+
+    long getLength(){
+        return length;
+    }
+
+    void setLength(long value){
+        this.length=value;
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,120 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.ObjectMarshaller;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+
+/**
+* A container of roots for other Containers
+* 
+* @version $Revision: 1.2 $
+*/
+
+class IndexRootContainer {
+    private static final Log log=LogFactory.getLog(IndexRootContainer.class);
+    protected static final Marshaller rootMarshaller = new ObjectMarshaller();
+    protected IndexItem root;
+    protected IndexManager indexManager;
+    protected DataManager dataManager;
+    protected Map map = new ConcurrentHashMap();
+    protected LinkedList list = new LinkedList();
+    
+    
+    IndexRootContainer(IndexItem root,IndexManager im,DataManager dfm) throws IOException{
+        this.root=root;
+        this.indexManager=im;
+        this.dataManager=dfm;
+        long nextItem=root.getNextItem();
+        while(nextItem!=Item.POSITION_NOT_SET){
+            IndexItem item=indexManager.getIndex(nextItem);
+            DataItem data=item.getKeyDataItem();
+            Object key=dataManager.readItem(rootMarshaller,data);
+            map.put(key,item);
+            list.add(item);
+            nextItem=item.getNextItem();
+            dataManager.addInterestInFile(item.getKeyFile());
+        }
+    }
+    
+    Set getKeys(){
+        return map.keySet();
+    }
+    
+    
+    
+    IndexItem addRoot(Object key) throws IOException{
+        if (map.containsKey(key)){
+            removeRoot(key);
+        }
+        
+        DataItem data = dataManager.storeItem(rootMarshaller, key);
+        IndexItem index = indexManager.createNewIndex();
+        index.setKeyData(data);
+        IndexItem newRoot = indexManager.createNewIndex();
+        indexManager.updateIndex(newRoot);
+        index.setValueOffset(newRoot.getOffset());
+       
+        IndexItem last=list.isEmpty()?null:(IndexItem) list.getLast();
+        last=last==null?root:last;
+        long prev=last.getOffset();
+        index.setPreviousItem(prev);
+        indexManager.updateIndex(index);
+        last.setNextItem(index.getOffset());
+        indexManager.updateIndex(last);
+        map.put(key, index);
+        list.add(index);
+        return newRoot;
+    }
+    
+    void removeRoot(Object key) throws IOException{
+        IndexItem item = (IndexItem) map.remove(key);
+        if (item != null){
+            dataManager.removeInterestInFile(item.getKeyFile());
+            IndexItem rootIndex = indexManager.getIndex(item.getValueOffset());
+            indexManager.freeIndex(rootIndex);
+            int index=list.indexOf(item);
+            IndexItem prev=index>0?(IndexItem) list.get(index-1):root;
+            prev=prev==null?root:prev;
+            IndexItem next=index<(list.size()-1)?(IndexItem) list.get(index+1):null;
+            if(next!=null){
+                prev.setNextItem(next.getOffset());
+                next.setPreviousItem(prev.getOffset());
+                indexManager.updateIndex(next);
+            }else{
+                prev.setNextItem(Item.POSITION_NOT_SET);
+            }
+            indexManager.updateIndex(prev);
+            list.remove(item);
+        }
+    }
+    
+    IndexItem getRoot(Object key) throws IOException{
+        IndexItem index =  (IndexItem) map.get(key);
+        if (index != null){
+            return indexManager.getIndex(index.getValueOffset());
+        }
+        throw new IOException("Cannot find root for key " + key);
+    }
+
+    
+
+}
\ No newline at end of file

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/Item.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/Item.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/Item.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/Item.java Thu Apr 20 07:15:30 2006
@@ -13,104 +13,15 @@
  */
 package org.apache.activemq.kaha.impl;
 
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import org.apache.activemq.kaha.Marshaller;
 /**
  * A a wrapper for a data in the store
  * 
  * @version $Revision: 1.2 $
  */
-public class Item{
+public interface Item{
     static final long POSITION_NOT_SET=-1;
     static final short MAGIC=31317;
     static final int ACTIVE=22;
     static final int FREE=33;
-    static final int HEAD_SIZE=8; // magic + active + len
     static final int LOCATION_SIZE=24;
-    private long offset=POSITION_NOT_SET;
-    private int size;
-    private boolean active;
-
-    Item(){}
-
-    void writeHeader(DataOutput dataOut) throws IOException{
-        dataOut.writeShort(MAGIC);
-        dataOut.writeByte(active?ACTIVE:FREE);
-        dataOut.writeInt(size);
-        dataOut.writeByte(0);//padding
-    }
-
-    void readHeader(DataInput dataIn) throws IOException{
-        int magic=dataIn.readShort();
-        if(magic==MAGIC){
-            active=(dataIn.readByte()==ACTIVE);
-            size=dataIn.readInt();
-        }else if (magic == 0){
-            size = -999; //end of data
-        }else{
-            throw new BadMagicException("Unexpected Magic value: "+magic);
-        }
-    }
-
-    void writePayload(Marshaller marshaller,Object object,DataOutputStream dataOut) throws IOException{
-        marshaller.writePayload(object,dataOut);
-    }
-
-    Object readPayload(Marshaller marshaller,DataInputStream dataIn) throws IOException{
-        return marshaller.readPayload(dataIn);
-    }
-
-    void readLocation(DataInput dataIn) throws IOException{}
-
-    void writeLocation(DataOutput dataOut) throws IOException{}
-
-    /**
-     * @return Returns the size.
-     */
-    int getSize(){
-        return size;
-    }
-
-    /**
-     * @param size
-     *            The size to set.
-     */
-    void setSize(int size){
-        this.size=size;
-    }
-
-    void setOffset(long pos){
-        offset=pos;
-    }
-
-    long getOffset(){
-        return offset;
-    }
-
-    /**
-     * @return Returns the active.
-     */
-    boolean isActive(){
-        return active;
-    }
-
-    /**
-     * @param active
-     *            The active to set.
-     */
-    void setActive(boolean active){
-        this.active=active;
-    }
-
-    /**
-     * @return a pretty print
-     */
-    public String toString(){
-        String result="offset = "+offset+" ,active = "+active+" , size = "+size;
-        return result;
-    }
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,205 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+/**
+ * Optimized writes to a RandomAcessFile
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.RuntimeStoreException;
+import org.apache.activemq.kaha.Store;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+/**
+ * Optimized Store writer
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class KahaStore implements Store{
+    DataManager rootData;
+    DataManager containersData;
+    IndexManager indexManager;
+    private IndexRootContainer mapsContainer;
+    private IndexRootContainer listsContainer;
+    private Map lists=new ConcurrentHashMap();
+    private Map maps=new ConcurrentHashMap();
+    private boolean closed=false;
+    private String name;
+    private String mode;
+    private boolean initialized;
+
+    public KahaStore(String name,String mode) throws IOException{
+        this.name=name;
+        this.mode=mode;
+        initialize();
+    }
+
+    public synchronized void close() throws IOException{
+        if(!closed){
+            closed=true;
+            if(initialized){
+                indexManager.close();
+                rootData.close();
+                containersData.close();
+            }
+        }
+    }
+
+    public synchronized void force() throws IOException{
+        if(initialized){
+            indexManager.force();
+            rootData.force();
+            containersData.force();
+        }
+    }
+
+    public synchronized void clear() throws IOException{
+        initialize();
+        for(Iterator i=maps.values().iterator();i.hasNext();){
+            BaseContainerImpl container=(BaseContainerImpl) i.next();
+            container.clear();
+        }
+        for(Iterator i=lists.values().iterator();i.hasNext();){
+            BaseContainerImpl container=(BaseContainerImpl) i.next();
+            container.clear();
+        }
+        lists.clear();
+        maps.clear();
+    }
+
+    public synchronized boolean delete() throws IOException{
+        initialize();
+        clear();
+        boolean result=indexManager.delete();
+        result&=rootData.delete();
+        result&=containersData.delete();
+        initialized=false;
+        return result;
+    }
+
+    public boolean doesMapContainerExist(Object id) throws IOException{
+        initialize();
+        return maps.containsKey(id);
+    }
+
+    public MapContainer getMapContainer(Object id) throws IOException{
+        initialize();
+        MapContainer result=(MapContainer) maps.get(id);
+        if(result==null){
+            IndexItem root=mapsContainer.addRoot(id);
+            result=new MapContainerImpl(id,root,indexManager,containersData);
+            maps.put(id,result);
+        }
+        return result;
+    }
+
+    public void deleteMapContainer(Object id) throws IOException{
+        initialize();
+        MapContainer container=(MapContainer) maps.remove(id);
+        if(container!=null){
+            container.clear();
+            mapsContainer.removeRoot(id);
+        }
+    }
+
+    public Set getMapContainerIds() throws IOException{
+        initialize();
+        return maps.keySet();
+    }
+
+    public boolean doesListContainerExist(Object id) throws IOException{
+        initialize();
+        return lists.containsKey(id);
+    }
+
+    public ListContainer getListContainer(Object id) throws IOException{
+        initialize();
+        ListContainer result=(ListContainer) lists.get(id);
+        if(result==null){
+            IndexItem root=listsContainer.addRoot(id);
+            result=new ListContainerImpl(id,root,indexManager,containersData);
+            lists.put(id,result);
+        }
+        return result;
+    }
+
+    public void deleteListContainer(Object id) throws IOException{
+        initialize();
+        ListContainer container=(ListContainer) lists.remove(id);
+        if(container!=null){
+            container.clear();
+            listsContainer.removeRoot(id);
+        }
+    }
+
+    public Set getListContainerIds() throws IOException{
+        initialize();
+        return lists.keySet();
+    }
+
+    protected void checkClosed(){
+        if(closed){
+            throw new RuntimeStoreException("The store is closed");
+        }
+    }
+
+    protected synchronized void initialize() throws IOException{
+        if(!initialized){
+            initialized=true;
+            File dir=new File(name);
+            dir.mkdirs();
+            File ifile=new File(dir,"kaha.idx");
+            indexManager=new IndexManager(ifile,mode);
+            rootData=new DataManager(dir,"roots-data");
+            containersData=new DataManager(dir,"containers-data");
+            IndexItem mapRoot=new IndexItem();
+            IndexItem listRoot=new IndexItem();
+            if(indexManager.isEmpty()){
+                mapRoot.setOffset(0);
+                indexManager.updateIndex(mapRoot);
+                listRoot.setOffset(IndexItem.INDEX_SIZE);
+                indexManager.updateIndex(listRoot);
+                indexManager.setLength(IndexItem.INDEX_SIZE*2);
+            }else{
+                mapRoot=indexManager.getIndex(0);
+                listRoot=indexManager.getIndex(IndexItem.INDEX_SIZE);
+            }
+            mapsContainer=new IndexRootContainer(mapRoot,indexManager,rootData);
+            listsContainer=new IndexRootContainer(listRoot,indexManager,rootData);
+            rootData.consolidateDataFiles();
+            for(Iterator i=mapsContainer.getKeys().iterator();i.hasNext();){
+                Object key=i.next();
+                IndexItem root=mapsContainer.getRoot(key);
+                BaseContainerImpl container=new MapContainerImpl(key,root,indexManager,containersData);
+                container.expressDataInterest();
+                maps.put(key,container);
+            }
+            for(Iterator i=listsContainer.getKeys().iterator();i.hasNext();){
+                Object key=i.next();
+                IndexItem root=listsContainer.getRoot(key);
+                BaseContainerImpl container=new ListContainerImpl(key,root,indexManager,containersData);
+                container.expressDataInterest();
+                lists.put(key,container);
+            }
+            containersData.consolidateDataFiles();
+        }
+    }
+}