You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/04/20 16:15:38 UTC

svn commit: r395597 [2/3] - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/kaha/ main/java/org/apache/activemq/kaha/impl/ main/java/org/apache/activemq/store/kahadaptor/ test/java/org/apache/activemq/kaha/ test/java/org/a...

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java Thu Apr 20 07:15:30 2006
@@ -17,7 +17,6 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
 import org.apache.activemq.kaha.ListContainer;
@@ -31,20 +30,13 @@
  * 
  * @version $Revision: 1.2 $
  */
-public class ListContainerImpl implements ListContainer{
-    private static final Log log=LogFactory.getLog(MapContainerImpl.class);
-    protected StoreImpl store;
-    protected LocatableItem root;
-    protected Object id;
-    protected LinkedList list=new LinkedList();
-    protected boolean loaded=false;
+final class ListContainerImpl extends BaseContainerImpl implements ListContainer{
+    private static final Log log=LogFactory.getLog(ListContainerImpl.class);
     protected Marshaller marshaller=new ObjectMarshaller();
-    protected boolean closed = false;
 
-    protected ListContainerImpl(Object id,StoreImpl rfs,LocatableItem root) throws IOException{
-        this.id=id;
-        this.store=rfs;
-        this.root=root;
+    protected ListContainerImpl(Object id,IndexItem root,IndexManager indexManager,DataManager dataManager)
+                    throws IOException{
+        super(id,root,indexManager,dataManager);
     }
 
     /*
@@ -55,21 +47,20 @@
     public void load(){
         checkClosed();
         if(!loaded){
-            loaded=true;
-            long start=root.getNextItem();
-            if(start!=Item.POSITION_NOT_SET){
-                try{
-                    long nextItem=start;
-                    while(nextItem!=Item.POSITION_NOT_SET){
-                        LocatableItem item=new LocatableItem();
-                        item.setOffset(nextItem);
-                        store.readLocation(item);
-                        list.add(item);
-                        nextItem=item.getNextItem();
+            synchronized(mutex){
+                if(!loaded){
+                    loaded=true;
+                    try{
+                        long nextItem=root.getNextItem();
+                        while(nextItem!=Item.POSITION_NOT_SET){
+                            IndexItem item=indexManager.getIndex(nextItem);
+                            list.add(item);
+                            nextItem=item.getNextItem();
+                        }
+                    }catch(IOException e){
+                        log.error("Failed to load container "+getId(),e);
+                        throw new RuntimeStoreException(e);
                     }
-                }catch(IOException e){
-                    log.error("Failed to load container "+getId(),e);
-                    throw new RuntimeStoreException(e);
                 }
             }
         }
@@ -83,25 +74,10 @@
     public void unload(){
         checkClosed();
         if(loaded){
-            loaded = false;
+            loaded=false;
             list.clear();
         }
     }
-    
-    public void close(){
-        unload();
-        closed = true;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.kaha.ListContainer#isLoaded()
-     */
-    public boolean isLoaded(){
-        checkClosed();
-        return loaded;
-    }
 
     /*
      * (non-Javadoc)
@@ -113,33 +89,23 @@
         this.marshaller=marshaller;
     }
 
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.kaha.ListContainer#getId()
-     */
-    public Object getId(){
-        checkClosed();
-        return id;
-    }
-    
     public boolean equals(Object obj){
-        checkLoaded();
-        checkClosed();
-        boolean result = false;
-        if (obj != null && obj instanceof List){
-            List other = (List) obj;
-            synchronized(list){
-            result = other.size() == size();
-            if (result){
-                for (int i =0; i < list.size(); i++){
-                    Object o1 = other.get(i);
-                    Object o2 = get(i);
-                    result = o1 == o2 || (o1 != null && o2 != null && o1.equals(o2));
-                    if (!result) break;
+        load();
+        boolean result=false;
+        if(obj!=null&&obj instanceof List){
+            List other=(List) obj;
+            synchronized(mutex){
+                result=other.size()==size();
+                if(result){
+                    for(int i=0;i<list.size();i++){
+                        Object o1=other.get(i);
+                        Object o2=get(i);
+                        result=o1==o2||(o1!=null&&o2!=null&&o1.equals(o2));
+                        if(!result)
+                            break;
+                    }
                 }
             }
-            }
         }
         return result;
     }
@@ -150,8 +116,7 @@
      * @see org.apache.activemq.kaha.ListContainer#size()
      */
     public int size(){
-        checkClosed();
-        checkLoaded();
+        load();
         return list.size();
     }
 
@@ -161,10 +126,9 @@
      * @see org.apache.activemq.kaha.ListContainer#addFirst(java.lang.Object)
      */
     public void addFirst(Object o){
-        checkClosed();
-        checkLoaded();
-        LocatableItem item=writeFirst(o);
-        synchronized(list){
+        load();
+        IndexItem item=writeFirst(o);
+        synchronized(mutex){
             list.addFirst(item);
         }
     }
@@ -175,10 +139,9 @@
      * @see org.apache.activemq.kaha.ListContainer#addLast(java.lang.Object)
      */
     public void addLast(Object o){
-        checkClosed();
-        checkLoaded();
-        LocatableItem item=writeLast(o);
-        synchronized(list){
+        load();
+        IndexItem item=writeLast(o);
+        synchronized(mutex){
             list.addLast(item);
         }
     }
@@ -189,16 +152,15 @@
      * @see org.apache.activemq.kaha.ListContainer#removeFirst()
      */
     public Object removeFirst(){
-        checkClosed();
-        checkLoaded();
+        load();
         Object result=null;
-        synchronized(list){
-            LocatableItem item=(LocatableItem) list.getFirst();
+        synchronized(mutex){
+            IndexItem item=(IndexItem) list.getFirst();
             if(item!=null){
                 result=getValue(item);
                 int index=list.indexOf(item);
-                LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
-                LocatableItem next=index<(list.size()-1)?(LocatableItem) list.get(index+1):null;
+                IndexItem prev=index>0?(IndexItem) list.get(index-1):root;
+                IndexItem next=index<(list.size()-1)?(IndexItem) list.get(index+1):null;
                 list.removeFirst();
                 delete(item,prev,next);
                 item=null;
@@ -213,16 +175,15 @@
      * @see org.apache.activemq.kaha.ListContainer#removeLast()
      */
     public Object removeLast(){
-        checkClosed();
-        checkLoaded();
+        load();
         Object result=null;
-        synchronized(list){
-            LocatableItem item=(LocatableItem) list.getLast();
+        synchronized(mutex){
+            IndexItem item=(IndexItem) list.getLast();
             if(item!=null){
                 result=getValue(item);
                 int index=list.indexOf(item);
-                LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
-                LocatableItem next=null;
+                IndexItem prev=index>0?(IndexItem) list.get(index-1):root;
+                IndexItem next=null;
                 list.removeLast();
                 delete(item,prev,next);
                 item=null;
@@ -237,8 +198,7 @@
      * @see java.util.List#isEmpty()
      */
     public boolean isEmpty(){
-        checkClosed();
-        checkLoaded();
+        load();
         return list.isEmpty();
     }
 
@@ -248,18 +208,18 @@
      * @see java.util.List#contains(java.lang.Object)
      */
     public boolean contains(Object o){
-        checkClosed();
-        checkLoaded();
+        load();
         boolean result=false;
         if(o!=null){
-            synchronized(list){
-                for(Iterator i=list.iterator();i.hasNext();){
-                    LocatableItem item=(LocatableItem) i.next();
-                    Object value=getValue(item);
+            synchronized(mutex){
+                IndexItem next=list.getFirst();
+                while(next!=null){
+                    Object value=getValue(next);
                     if(value!=null&&value.equals(o)){
                         result=true;
                         break;
                     }
+                    next=list.getNextEntry(next);
                 }
             }
         }
@@ -272,8 +232,7 @@
      * @see java.util.List#iterator()
      */
     public Iterator iterator(){
-        checkClosed();
-        checkLoaded();
+        load();
         return listIterator();
     }
 
@@ -283,14 +242,14 @@
      * @see java.util.List#toArray()
      */
     public Object[] toArray(){
-        checkClosed();
-        checkLoaded();
+        load();
         List tmp=new ArrayList(list.size());
-        synchronized(list){
-            for(Iterator i=list.iterator();i.hasNext();){
-                LocatableItem item=(LocatableItem) i.next();
-                Object value=getValue(item);
+        synchronized(mutex){
+            IndexItem next=list.getFirst();
+            while(next!=null){
+                Object value=getValue(next);
                 tmp.add(value);
+                next=list.getNextEntry(next);
             }
         }
         return tmp.toArray();
@@ -302,14 +261,14 @@
      * @see java.util.List#toArray(T[])
      */
     public Object[] toArray(Object[] a){
-        checkClosed();
-        checkLoaded();
+        load();
         List tmp=new ArrayList(list.size());
-        synchronized(list){
-            for(Iterator i=list.iterator();i.hasNext();){
-                LocatableItem item=(LocatableItem) i.next();
-                Object value=getValue(item);
+        synchronized(mutex){
+            IndexItem next=list.getFirst();
+            while(next!=null){
+                Object value=getValue(next);
                 tmp.add(value);
+                next=list.getNextEntry(next);
             }
         }
         return tmp.toArray(a);
@@ -321,8 +280,7 @@
      * @see java.util.List#add(E)
      */
     public boolean add(Object o){
-        checkClosed();
-        checkLoaded();
+        load();
         addLast(o);
         return true;
     }
@@ -333,28 +291,28 @@
      * @see java.util.List#remove(java.lang.Object)
      */
     public boolean remove(Object o){
-        checkClosed();
-        checkLoaded();
+        load();
         boolean result=false;
-        synchronized(list){
-            for(Iterator i=list.iterator();i.hasNext();){
-                LocatableItem item=(LocatableItem) i.next();
-                Object value = getValue(item);
-                if (value != null && value.equals(o)){
-                    remove(item);
+        synchronized(mutex){
+            IndexItem next=list.getFirst();
+            while(next!=null){
+                Object value=getValue(next);
+                if(value!=null&&value.equals(o)){
+                    remove(next);
+                    result=true;
                     break;
                 }
+                next=list.getNextEntry(next);
             }
-            
         }
         return result;
     }
 
-    protected void remove(LocatableItem item){
-        synchronized(list){
+    protected void remove(IndexItem item){
+        synchronized(mutex){
             int index=list.indexOf(item);
-            LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
-            LocatableItem next=index<(list.size()-1)?(LocatableItem) list.get(index+1):null;
+            IndexItem prev=index>0?(IndexItem) list.get(index-1):root;
+            IndexItem next=index<(list.size()-1)?(IndexItem) list.get(index+1):null;
             list.remove(index);
             delete(item,prev,next);
         }
@@ -366,10 +324,9 @@
      * @see java.util.List#containsAll(java.util.Collection)
      */
     public boolean containsAll(Collection c){
-        checkClosed();
-        checkLoaded();
+        load();
         boolean result=false;
-        synchronized(list){
+        synchronized(mutex){
             for(Iterator i=c.iterator();i.hasNext();){
                 Object obj=i.next();
                 if(!(result=contains(obj))){
@@ -387,14 +344,12 @@
      * @see java.util.List#addAll(java.util.Collection)
      */
     public boolean addAll(Collection c){
-        checkClosed();
-        checkLoaded();
+        load();
         boolean result=false;
         for(Iterator i=c.iterator();i.hasNext();){
             add(i.next());
-            result=true;
         }
-        return result;
+        return true;
     }
 
     /*
@@ -403,8 +358,7 @@
      * @see java.util.List#addAll(int, java.util.Collection)
      */
     public boolean addAll(int index,Collection c){
-        checkClosed();
-        checkLoaded();
+        load();
         boolean result=false;
         ListIterator e1=listIterator(index);
         Iterator e2=c.iterator();
@@ -421,8 +375,7 @@
      * @see java.util.List#removeAll(java.util.Collection)
      */
     public boolean removeAll(Collection c){
-        checkClosed();
-        checkLoaded();
+        load();
         boolean result=true;
         for(Iterator i=c.iterator();i.hasNext();){
             Object obj=i.next();
@@ -437,19 +390,18 @@
      * @see java.util.List#retainAll(java.util.Collection)
      */
     public boolean retainAll(Collection c){
-        checkClosed();
-        checkLoaded();
+        load();
         List tmpList=new ArrayList();
-        synchronized(list){
-        for(Iterator i = list.iterator(); i.hasNext();){
-            LocatableItem item = (LocatableItem) i.next();
-            Object o = getValue(item);
-            
-            if(!c.contains(o)){
-                tmpList.add(o);
+        synchronized(mutex){
+            IndexItem next=list.getFirst();
+            while(next!=null){
+                Object o=getValue(next);
+                if(!c.contains(o)){
+                    tmpList.add(o);
+                }
+                next=list.getNextEntry(next);
             }
         }
-        }
         for(Iterator i=tmpList.iterator();i.hasNext();){
             remove(i.next());
         }
@@ -463,30 +415,9 @@
      */
     public void clear(){
         checkClosed();
-        synchronized(list){
+        synchronized(mutex){
             list.clear();
-            try {
-            long start=root.getNextItem();
-            if(start!=Item.POSITION_NOT_SET){
-                long nextItem=start;
-                while(nextItem!=Item.POSITION_NOT_SET){
-                    LocatableItem item=new LocatableItem();
-                    item.setOffset(nextItem);
-                    list.add(item);
-                    nextItem=item.getNextItem();
-                }
-            }
-            root.setNextItem(Item.POSITION_NOT_SET);
-            store.updateItem(root);
-            for(int i=0;i<list.size();i++){
-                LocatableItem item=(LocatableItem) list.get(i);
-                store.removeItem(item);
-            }
-            list.clear();
-            }catch(IOException e){
-                log.error("Failed to clear ListContainer "+getId(),e);
-                throw new RuntimeStoreException(e);
-            }
+            doClear();
         }
     }
 
@@ -496,10 +427,9 @@
      * @see java.util.List#get(int)
      */
     public Object get(int index){
-        checkClosed();
-        checkLoaded();
+        load();
         Object result=null;
-        LocatableItem item=(LocatableItem) list.get(index);
+        IndexItem item=(IndexItem) list.get(index);
         if(item!=null){
             result=getValue(item);
         }
@@ -512,13 +442,12 @@
      * @see java.util.List#set(int, E)
      */
     public Object set(int index,Object element){
-        checkClosed();
-        checkLoaded();
+        load();
         Object result=null;
-        synchronized(list){
-            LocatableItem replace=list.isEmpty()?null:(LocatableItem) list.get(index);
-            LocatableItem prev=(list.isEmpty() || (index-1) < 0)?null:(LocatableItem) list.get(index-1);
-            LocatableItem next=(list.isEmpty() || (index+1) >= size())?null:(LocatableItem) list.get(index+1);
+        synchronized(mutex){
+            IndexItem replace=list.isEmpty()?null:(IndexItem) list.get(index);
+            IndexItem prev=(list.isEmpty()||(index-1)<0)?null:(IndexItem) list.get(index-1);
+            IndexItem next=(list.isEmpty()||(index+1)>=size())?null:(IndexItem) list.get(index+1);
             result=getValue(replace);
             list.remove(index);
             delete(replace,prev,next);
@@ -526,12 +455,12 @@
         }
         return result;
     }
-    
-    protected LocatableItem internalSet(int index,Object element){
-        synchronized(list){
-            LocatableItem replace=list.isEmpty()?null:(LocatableItem) list.get(index);
-            LocatableItem prev=(list.isEmpty() || (index-1) < 0)?null:(LocatableItem) list.get(index-1);
-            LocatableItem next=(list.isEmpty() || (index+1) >= size())?null:(LocatableItem) list.get(index+1);
+
+    protected IndexItem internalSet(int index,Object element){
+        synchronized(mutex){
+            IndexItem replace=list.isEmpty()?null:(IndexItem) list.get(index);
+            IndexItem prev=(list.isEmpty()||(index-1)<0)?null:(IndexItem) list.get(index-1);
+            IndexItem next=(list.isEmpty()||(index+1)>=size())?null:(IndexItem) list.get(index+1);
             list.remove(index);
             delete(replace,prev,next);
             return internalAdd(index,element);
@@ -544,26 +473,25 @@
      * @see java.util.List#add(int, E)
      */
     public void add(int index,Object element){
-        checkClosed();
-        checkLoaded();
-        synchronized(list){
-            LocatableItem item=insert(index,element);
+        load();
+        synchronized(mutex){
+            IndexItem item=insert(index,element);
             list.add(index,item);
         }
     }
-    
-    protected LocatableItem internalAdd(int index,Object element){
-        synchronized(list){
-            LocatableItem item=insert(index,element);
+
+    protected IndexItem internalAdd(int index,Object element){
+        synchronized(mutex){
+            IndexItem item=insert(index,element);
             list.add(index,item);
             return item;
         }
     }
-    
-    protected LocatableItem internalGet(int index){
-        synchronized(list){
-            if (index >= 0 && index < list.size()){
-                return (LocatableItem) list.get(index);
+
+    protected IndexItem internalGet(int index){
+        synchronized(mutex){
+            if(index>=0&&index<list.size()){
+                return list.get(index);
             }
         }
         return null;
@@ -575,17 +503,17 @@
      * @see org.apache.activemq.kaha.ListContainer#doRemove(int)
      */
     public boolean doRemove(int index){
-        checkClosed();
-        checkLoaded();
+        load();
         boolean result=false;
-        synchronized(list){
-            LocatableItem item=(LocatableItem) list.get(index);
+        synchronized(mutex){
+            IndexItem item=list.get(index);
             if(item!=null){
-                LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
-                LocatableItem next=index<(list.size()-1)?(LocatableItem) list.get(index+1):null;
+                result=true;
+                IndexItem prev=list.getPrevEntry(item);
+                prev=prev!=null?prev:root;
+                IndexItem next=list.getNextEntry(prev);
                 list.remove(index);
                 delete(item,prev,next);
-                result=true;
             }
         }
         return result;
@@ -597,15 +525,15 @@
      * @see java.util.List#remove(int)
      */
     public Object remove(int index){
-        checkClosed();
-        checkLoaded();
+        load();
         Object result=null;
-        synchronized(list){
-            LocatableItem item=(LocatableItem) list.get(index);
+        synchronized(mutex){
+            IndexItem item=list.get(index);
             if(item!=null){
                 result=getValue(item);
-                LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
-                LocatableItem next=index<(list.size()-1)?(LocatableItem) list.get(index+1):null;
+                IndexItem prev=list.getPrevEntry(item);
+                prev=prev!=null?prev:root;
+                IndexItem next=list.getNextEntry(prev);
                 list.remove(index);
                 delete(item,prev,next);
             }
@@ -619,19 +547,20 @@
      * @see java.util.List#indexOf(java.lang.Object)
      */
     public int indexOf(Object o){
-        checkClosed();
-        checkLoaded();
+        load();
         int result=-1;
         if(o!=null){
-            synchronized(list){
+            synchronized(mutex){
                 int count=0;
-                for(Iterator i=list.iterator();i.hasNext();count++){
-                    LocatableItem item=(LocatableItem) i.next();
-                    Object value=getValue(item);
+                IndexItem next=list.getFirst();
+                while(next!=null){
+                    Object value=getValue(next);
                     if(value!=null&&value.equals(o)){
                         result=count;
                         break;
                     }
+                    count++;
+                    next=list.getNextEntry(next);
                 }
             }
         }
@@ -644,19 +573,20 @@
      * @see java.util.List#lastIndexOf(java.lang.Object)
      */
     public int lastIndexOf(Object o){
-        checkClosed();
-        checkLoaded();
+        load();
         int result=-1;
         if(o!=null){
-            synchronized(list){
+            synchronized(mutex){
                 int count=list.size()-1;
-                for(ListIterator i=list.listIterator();i.hasPrevious();count--){
-                    LocatableItem item=(LocatableItem) i.previous();
-                    Object value=getValue(item);
+                IndexItem next=list.getLast();
+                while(next!=null){
+                    Object value=getValue(next);
                     if(value!=null&&value.equals(o)){
                         result=count;
                         break;
                     }
+                    count--;
+                    next=list.getPrevEntry(next);
                 }
             }
         }
@@ -669,10 +599,8 @@
      * @see java.util.List#listIterator()
      */
     public ListIterator listIterator(){
-        checkClosed();
-        checkLoaded();
-        ListIterator iter = ((List) list.clone()).listIterator();
-        return new ContainerListIterator(this,iter);
+        load();
+        return new ContainerListIterator(this,list,list.getRoot());
     }
 
     /*
@@ -681,11 +609,15 @@
      * @see java.util.List#listIterator(int)
      */
     public ListIterator listIterator(int index){
-        checkClosed();
-        checkLoaded();
-        List result = (List) list.clone();
-        ListIterator iter = result.listIterator(index);
-        return new ContainerListIterator(this,iter);
+        load();
+        IndexItem start=list.get(index);
+        if(start!=null){
+            start=list.getPrevEntry(start);
+        }
+        if(start==null){
+            start=root;
+        }
+        return new ContainerListIterator(this,list,start);
     }
 
     /*
@@ -694,129 +626,119 @@
      * @see java.util.List#subList(int, int)
      */
     public List subList(int fromIndex,int toIndex){
-        checkClosed();
-        checkLoaded();
-        List tmp = list.subList(fromIndex, toIndex);
-        LinkedList result = new LinkedList();
-        for (Iterator i = tmp.iterator(); i.hasNext();){
-            LocatableItem item = (LocatableItem) i.next();
-            result.add(getValue(item));
+        load();
+        List result=new ArrayList();
+        int count=fromIndex;
+        IndexItem next=list.get(fromIndex);
+        while(next!=null&&count++<toIndex){
+            result.add(getValue(next));
+            next=list.getNextEntry(next);
         }
         return result;
     }
 
-    protected LocatableItem writeLast(Object value){
-        long pos=Item.POSITION_NOT_SET;
-        LocatableItem item=null;
+    protected IndexItem writeLast(Object value){
+        IndexItem index=null;
         try{
-            LocatableItem last=list.isEmpty()?null:(LocatableItem) list.getLast();
-            last=last==null?root:last;
-            long prev=last.getOffset();
-            long next=Item.POSITION_NOT_SET;
-            item=new LocatableItem(prev,next,pos);
-            next=store.storeItem(marshaller,value,item);
-            if(last!=null){
-                last.setNextItem(next);
-                store.updateItem(last);
+            if(value!=null){
+                DataItem data=dataManager.storeItem(marshaller,value);
+                index=indexManager.createNewIndex();
+                index.setValueData(data);
+                IndexItem prev=list.getLast();
+                prev=prev!=null?prev:root;
+                IndexItem next=list.getNextEntry(prev);
+                prev.setNextItem(index.getOffset());
+                index.setPreviousItem(prev.getOffset());
+                indexManager.updateIndex(prev);
+                if(next!=null){
+                    next.setPreviousItem(index.getOffset());
+                    index.setNextItem(next.getOffset());
+                    indexManager.updateIndex(next);
+                }
+                indexManager.updateIndex(index);
             }
         }catch(IOException e){
             log.error("Failed to write "+value,e);
             throw new RuntimeStoreException(e);
         }
-        return item;
+        return index;
     }
 
-    protected LocatableItem writeFirst(Object value){
-        long pos=Item.POSITION_NOT_SET;
-        LocatableItem item=null;
+    protected IndexItem writeFirst(Object value){
+        IndexItem index=null;
         try{
-            LocatableItem next=list.isEmpty()?null:(LocatableItem) list.getFirst();
-            LocatableItem last=root;
-            long prevPos=last.getOffset();
-            long nextPos=next!=null?next.getOffset():Item.POSITION_NOT_SET;
-            item=new LocatableItem(prevPos,nextPos,pos);
-            nextPos=store.storeItem(marshaller,value,item);
-            if(last!=null){
-                last.setNextItem(nextPos);
-                store.updateItem(last);
-            }
-            if(next!=null){
-                next.setPreviousItem(nextPos);
-                store.updateItem(next);
+            if(value!=null){
+                DataItem data=dataManager.storeItem(marshaller,value);
+                index=indexManager.createNewIndex();
+                index.setValueData(data);
+                IndexItem prev=root;
+                IndexItem next=list.getNextEntry(prev);
+                prev.setNextItem(index.getOffset());
+                index.setPreviousItem(prev.getOffset());
+                indexManager.updateIndex(prev);
+                if(next!=null){
+                    next.setPreviousItem(index.getOffset());
+                    index.setNextItem(next.getOffset());
+                    indexManager.updateIndex(next);
+                }
+                indexManager.updateIndex(index);
             }
         }catch(IOException e){
             log.error("Failed to write "+value,e);
             throw new RuntimeStoreException(e);
         }
-        return item;
+        return index;
     }
 
-    protected LocatableItem insert(int insertPos,Object value){
+    protected IndexItem insert(int insertPos,Object value){
         long pos=Item.POSITION_NOT_SET;
-        LocatableItem item=null;
+        IndexItem index=null;
         try{
-            int lastPos=insertPos-1;
-            LocatableItem prev=(list.isEmpty() || (insertPos-1) < 0)?null:(LocatableItem) list.get(lastPos);
-            LocatableItem next=(list.isEmpty() || (insertPos+1) >= size())?null:(LocatableItem) list.get(insertPos+1);
-            prev=prev==null?root:prev;
-            long prevPos=prev.getOffset();
-            long nextPos=next!=null?next.getOffset():Item.POSITION_NOT_SET;
-            item=new LocatableItem(prevPos,nextPos,pos);
-            nextPos=store.storeItem(marshaller,value,item);
-            if(prev!=null){
-                prev.setNextItem(nextPos);
-                store.updateItem(prev);
-            }
-            if(next!=null){
-                next.setPreviousItem(nextPos);
-                store.updateItem(next);
+            if(value!=null){
+                DataItem data=dataManager.storeItem(marshaller,value);
+                index=indexManager.createNewIndex();
+                index.setValueData(data);
+                IndexItem prev=null;
+                IndexItem next=null;
+                if(insertPos<=0){
+                    prev=root;
+                    next=list.getNextEntry(root);
+                }else if(insertPos>=list.size()){
+                    prev=list.getLast();
+                    next=null;
+                }else{
+                    prev=list.get(insertPos);
+                    prev=prev!=null?prev:root;
+                    next=list.getNextEntry(prev);
+                }
+                prev.setNextItem(index.getOffset());
+                index.setPreviousItem(prev.getOffset());
+                indexManager.updateIndex(prev);
+                if(next!=null){
+                    next.setPreviousItem(index.getOffset());
+                    index.setNextItem(next.getOffset());
+                    indexManager.updateIndex(next);
+                }
+                indexManager.updateIndex(index);
             }
         }catch(IOException e){
             log.error("Failed to insert "+value,e);
             throw new RuntimeStoreException(e);
         }
-        return item;
+        return index;
     }
 
-    protected Object getValue(LocatableItem item){
+    protected Object getValue(IndexItem item){
         Object result=null;
         if(item!=null){
             try{
-                result=store.readItem(marshaller,item);
+                DataItem data=item.getValueDataItem();
+                result=dataManager.readItem(marshaller,data);
             }catch(IOException e){
                 log.error("Failed to get value for "+item,e);
                 throw new RuntimeStoreException(e);
             }
         }
         return result;
-    }
-
-    protected void delete(LocatableItem item,LocatableItem prev,LocatableItem next){
-        try{
-            prev=prev==null?root:prev;
-            if(next!=null){
-                prev.setNextItem(next.getOffset());
-                next.setPreviousItem(prev.getOffset());
-                store.updateItem(next);
-            }else{
-                prev.setNextItem(Item.POSITION_NOT_SET);
-            }
-            store.updateItem(prev);
-            store.removeItem(item);
-        }catch(IOException e){
-            log.error("Failed to delete "+item,e);
-            throw new RuntimeStoreException(e);
-        }
-    }
-    
-    protected final void checkClosed(){
-        if (closed){
-            throw new RuntimeStoreException("The store is closed");
-        }
-    }
-    protected final void checkLoaded(){
-        if (!loaded){
-            throw new RuntimeStoreException("The container is not loaded");
-        }
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java Thu Apr 20 07:15:30 2006
@@ -32,24 +32,15 @@
  * 
  * @version $Revision: 1.2 $
  */
-public class MapContainerImpl implements MapContainer{
+final class MapContainerImpl extends BaseContainerImpl implements MapContainer{
     private static final Log log=LogFactory.getLog(MapContainerImpl.class);
-    protected StoreImpl store;
-    protected LocatableItem root;
-    protected Object id;
     protected Map map=new HashMap();
     protected Map valueToKeyMap=new HashMap();
-    protected LinkedList list=new LinkedList();
-    protected boolean loaded=false;
     protected Marshaller keyMarshaller=new ObjectMarshaller();
     protected Marshaller valueMarshaller=new ObjectMarshaller();
-    protected final Object mutex=new Object();
-    protected boolean closed=false;
 
-    protected MapContainerImpl(Object id,StoreImpl si,LocatableItem root) throws IOException{
-        this.id=id;
-        this.store=si;
-        this.root=root;
+    protected MapContainerImpl(Object id,IndexItem root,IndexManager indexManager,DataManager dataManager){
+        super(id,root,indexManager,dataManager);
     }
 
     /*
@@ -60,25 +51,24 @@
     public void load(){
         checkClosed();
         if(!loaded){
-            loaded=true;
             synchronized(mutex){
-                try{
-                    long start=root.getNextItem();
-                    if(start!=Item.POSITION_NOT_SET){
-                        long nextItem=start;
+                if(!loaded){
+                    loaded=true;
+                    try{
+                        long nextItem=root.getNextItem();
                         while(nextItem!=Item.POSITION_NOT_SET){
-                            LocatableItem item=new LocatableItem();
-                            item.setOffset(nextItem);
-                            Object key=store.readItem(keyMarshaller,item);
+                            IndexItem item=indexManager.getIndex(nextItem);
+                            DataItem data=item.getKeyDataItem();
+                            Object key=dataManager.readItem(keyMarshaller,data);
                             map.put(key,item);
                             valueToKeyMap.put(item,key);
                             list.add(item);
                             nextItem=item.getNextItem();
                         }
+                    }catch(IOException e){
+                        log.error("Failed to load container "+getId(),e);
+                        throw new RuntimeStoreException(e);
                     }
-                }catch(IOException e){
-                    log.error("Failed to load container "+getId(),e);
-                    throw new RuntimeStoreException(e);
                 }
             }
         }
@@ -101,11 +91,6 @@
         }
     }
 
-    public void close(){
-        unload();
-        closed=true;
-    }
-
     public void setKeyMarshaller(Marshaller keyMarshaller){
         checkClosed();
         this.keyMarshaller=keyMarshaller;
@@ -119,31 +104,10 @@
     /*
      * (non-Javadoc)
      * 
-     * @see org.apache.activemq.kaha.MapContainer#isLoaded()
-     */
-    public boolean isLoaded(){
-        checkClosed();
-        return loaded;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.activemq.kaha.MapContainer#getId()
-     */
-    public Object getId(){
-        checkClosed();
-        return id;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
      * @see org.apache.activemq.kaha.MapContainer#size()
      */
     public int size(){
-        checkClosed();
-        checkLoaded();
+        load();
         return map.size();
     }
 
@@ -153,8 +117,7 @@
      * @see org.apache.activemq.kaha.MapContainer#isEmpty()
      */
     public boolean isEmpty(){
-        checkClosed();
-        checkLoaded();
+        load();
         return map.isEmpty();
     }
 
@@ -164,8 +127,7 @@
      * @see org.apache.activemq.kaha.MapContainer#containsKey(java.lang.Object)
      */
     public boolean containsKey(Object key){
-        checkClosed();
-        checkLoaded();
+        load();
         synchronized(mutex){
             return map.containsKey(key);
         }
@@ -177,12 +139,11 @@
      * @see org.apache.activemq.kaha.MapContainer#get(java.lang.Object)
      */
     public Object get(Object key){
-        checkClosed();
-        checkLoaded();
+        load();
         Object result=null;
-        LocatableItem item=null;
+        IndexItem item=null;
         synchronized(mutex){
-            item=(LocatableItem) map.get(key);
+            item=(IndexItem) map.get(key);
         }
         if(item!=null){
             result=getValue(item);
@@ -196,18 +157,18 @@
      * @see org.apache.activemq.kaha.MapContainer#containsValue(java.lang.Object)
      */
     public boolean containsValue(Object o){
-        checkClosed();
-        checkLoaded();
+        load();
         boolean result=false;
         if(o!=null){
             synchronized(list){
-                for(Iterator i=list.iterator();i.hasNext();){
-                    LocatableItem item=(LocatableItem) i.next();
+                IndexItem item=list.getFirst();
+                while(item!=null){
                     Object value=getValue(item);
                     if(value!=null&&value.equals(o)){
                         result=true;
                         break;
                     }
+                    item=list.getNextEntry(item);
                 }
             }
         }
@@ -220,8 +181,7 @@
      * @see org.apache.activemq.kaha.MapContainer#putAll(java.util.Map)
      */
     public void putAll(Map t){
-        checkClosed();
-        checkLoaded();
+        load();
         if(t!=null){
             synchronized(mutex){
                 for(Iterator i=t.entrySet().iterator();i.hasNext();){
@@ -238,8 +198,7 @@
      * @see org.apache.activemq.kaha.MapContainer#keySet()
      */
     public Set keySet(){
-        checkClosed();
-        checkLoaded();
+        load();
         return new ContainerKeySet(this);
     }
 
@@ -249,8 +208,7 @@
      * @see org.apache.activemq.kaha.MapContainer#values()
      */
     public Collection values(){
-        checkClosed();
-        checkLoaded();
+        load();
         return new ContainerValueCollection(this);
     }
 
@@ -260,8 +218,7 @@
      * @see org.apache.activemq.kaha.MapContainer#entrySet()
      */
     public Set entrySet(){
-        checkClosed();
-        checkLoaded();
+        load();
         return new ContainerEntrySet(this);
     }
 
@@ -271,14 +228,13 @@
      * @see org.apache.activemq.kaha.MapContainer#put(java.lang.Object, java.lang.Object)
      */
     public Object put(Object key,Object value){
-        checkClosed();
-        checkLoaded();
+        load();
         Object result=null;
         synchronized(mutex){
             if(map.containsKey(key)){
                 result=remove(key);
             }
-            LocatableItem item=write(key,value);
+            IndexItem item=write(key,value);
             map.put(key,item);
             valueToKeyMap.put(item,key);
             list.add(item);
@@ -292,36 +248,31 @@
      * @see org.apache.activemq.kaha.MapContainer#remove(java.lang.Object)
      */
     public Object remove(Object key){
-        checkClosed();
-        checkLoaded();
+        load();
         Object result=null;
         synchronized(mutex){
-            LocatableItem item=(LocatableItem) map.get(key);
+            IndexItem item=(IndexItem) map.get(key);
             if(item!=null){
                 map.remove(key);
                 valueToKeyMap.remove(item);
                 result=getValue(item);
-                int index=list.indexOf(item);
-                LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
-                LocatableItem next=index<(list.size()-1)?(LocatableItem) list.get(index+1):null;
-                list.remove(index);
-                {
-                    delete(item,prev,next);
-                }
-                item=null;
+                IndexItem prev=list.getPrevEntry(item);
+                prev=prev!=null?prev:root;
+                IndexItem next=list.getNextEntry(item);
+                list.remove(item);
+                delete(item,prev,next);
             }
         }
         return result;
     }
 
     public boolean removeValue(Object o){
-        checkClosed();
-        checkLoaded();
+        load();
         boolean result=false;
         if(o!=null){
-            synchronized(list){
-                for(Iterator i=list.iterator();i.hasNext();){
-                    LocatableItem item=(LocatableItem) i.next();
+            synchronized(mutex){
+                IndexItem item=list.getFirst();
+                while(item!=null){
                     Object value=getValue(item);
                     if(value!=null&&value.equals(o)){
                         result=true;
@@ -332,13 +283,14 @@
                         }
                         break;
                     }
+                    item=list.getNextEntry(item);
                 }
             }
         }
         return result;
     }
 
-    protected void remove(LocatableItem item){
+    protected void remove(IndexItem item){
         Object key=valueToKeyMap.get(item);
         if(key!=null){
             remove(key);
@@ -358,34 +310,7 @@
                 map.clear();
                 valueToKeyMap.clear();
                 list.clear();// going to re-use this
-                try{
-                    long start=root.getNextItem();
-                    if(start!=Item.POSITION_NOT_SET){
-                        long nextItem=start;
-                        while(nextItem!=Item.POSITION_NOT_SET){
-                            LocatableItem item=new LocatableItem();
-                            item.setOffset(nextItem);
-                            list.add(item);
-                            nextItem=item.getNextItem();
-                        }
-                    }
-                    root.setNextItem(Item.POSITION_NOT_SET);
-                    store.updateItem(root);
-                    for(int i=0;i<list.size();i++){
-                        LocatableItem item=(LocatableItem) list.get(i);
-                        if(item.getReferenceItem()!=Item.POSITION_NOT_SET){
-                            Item value=new Item();
-                            value.setOffset(item.getReferenceItem());
-                            store.removeItem(value);
-                        }
-                       
-                        store.removeItem(item);
-                    }
-                    list.clear();
-                }catch(IOException e){
-                    log.error("Failed to clear MapContainer "+getId(),e);
-                    throw new RuntimeStoreException(e);
-                }
+                doClear();
             }
         }
     }
@@ -394,17 +319,16 @@
         return new HashSet(map.keySet());
     }
 
-    protected LinkedList getItemList(){
+    protected IndexLinkedList getItemList(){
         return list;
     }
 
-    protected Object getValue(LocatableItem item){
+    protected Object getValue(IndexItem item){
         Object result=null;
-        if(item!=null&&item.getReferenceItem()!=Item.POSITION_NOT_SET){
-            Item rec=new Item();
-            rec.setOffset(item.getReferenceItem());
+        if(item!=null){
             try{
-                result=store.readItem(valueMarshaller,rec);
+                DataItem data=item.getValueDataItem();
+                result=dataManager.readItem(valueMarshaller,data);
             }catch(IOException e){
                 log.error("Failed to get value for "+item,e);
                 throw new RuntimeStoreException(e);
@@ -413,64 +337,29 @@
         return result;
     }
 
-    protected LocatableItem write(Object key,Object value){
-        long pos=Item.POSITION_NOT_SET;
-        LocatableItem item=null;
+    protected IndexItem write(Object key,Object value){
+        IndexItem index=null;
         try{
+            if(key!=null){
+                index=indexManager.createNewIndex();
+                DataItem data=dataManager.storeItem(keyMarshaller,key);
+                index.setKeyData(data);
+            }
             if(value!=null){
-                Item valueItem=new Item();
-                pos=store.storeItem(valueMarshaller,value,valueItem);
+                DataItem data=dataManager.storeItem(valueMarshaller,value);
+                index.setValueData(data);
             }
-            LocatableItem last=list.isEmpty()?null:(LocatableItem) list.getLast();
+            IndexItem last=list.isEmpty()?null:(IndexItem) list.getLast();
             last=last==null?root:last;
             long prev=last.getOffset();
-            long next=Item.POSITION_NOT_SET;
-            item=new LocatableItem(prev,next,pos);
-            next=store.storeItem(keyMarshaller,key,item);
-            if(last!=null){
-                last.setNextItem(next);
-                store.updateItem(last);
-            }
+            index.setPreviousItem(prev);
+            indexManager.updateIndex(index);
+            last.setNextItem(index.getOffset());
+            indexManager.updateIndex(last);
         }catch(IOException e){
-            e.printStackTrace();
             log.error("Failed to write "+key+" , "+value,e);
             throw new RuntimeStoreException(e);
         }
-        return item;
-    }
-
-    protected void delete(LocatableItem key,LocatableItem prev,LocatableItem next){
-        try{
-            prev=prev==null?root:prev;
-            if(next!=null){
-                prev.setNextItem(next.getOffset());
-                next.setPreviousItem(prev.getOffset());
-                store.updateItem(next);
-            }else{
-                prev.setNextItem(Item.POSITION_NOT_SET);
-            }
-            store.updateItem(prev);
-            if(key.getReferenceItem()!=Item.POSITION_NOT_SET){
-                Item value=new Item();
-                value.setOffset(key.getReferenceItem());
-                store.removeItem(value);
-            }
-            store.removeItem(key);
-        }catch(IOException e){
-            log.error("Failed to delete "+key,e);
-            throw new RuntimeStoreException(e);
-        }
-    }
-
-    protected final void checkClosed(){
-        if(closed){
-            throw new RuntimeStoreException("The store is closed");
-        }
-    }
-
-    protected final void checkLoaded(){
-        if(!loaded){
-            throw new RuntimeStoreException("The container is not loaded");
-        }
+        return index;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayInputStream.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayInputStream.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayInputStream.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayInputStream.java Thu Apr 20 07:15:30 2006
@@ -11,51 +11,42 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-
 package org.apache.activemq.kaha.impl;
-import java.io.ByteArrayInputStream;
 
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UTFDataFormatException;
 /**
  * Optimized ByteArrayInputStream that can be used more than once
  * 
  * @version $Revision: 1.1.1.1 $
  */
-public class StoreByteArrayInputStream extends ByteArrayInputStream {
+final class StoreByteArrayInputStream extends InputStream implements DataInput{
+    private byte[] buf;
+    private int pos;
+
     /**
      * Creates a <code>WireByteArrayInputStream</code>.
      * 
      * @param buf the input buffer.
      */
-    public StoreByteArrayInputStream(byte buf[]) {
-        super(buf);
+    public StoreByteArrayInputStream(byte buf[]){
+        this.buf=buf;
+        this.pos=0;
     }
 
     /**
-     * Creates <code>WireByteArrayInputStream</code> that uses <code>buf</code> as its buffer array.
-     * 
-     * @param buf the input buffer.
-     * @param offset the offset in the buffer of the first byte to read.
-     * @param length the maximum number of bytes to read from the buffer.
-     */
-    public StoreByteArrayInputStream(byte buf[], int offset, int length) {
-        super(buf, offset, length);
-    }
-    
-   
-    /**
      * Creates <code>WireByteArrayInputStream</code> with a minmalist byte array
      */
-    public StoreByteArrayInputStream() {
-        super(new byte[0]);
+    public StoreByteArrayInputStream(){
+        this(new byte[0]);
     }
-    
-    /**
-     * @return the current position in the stream
-     */
-    public int position(){
+
+    public int size(){
         return pos;
     }
-    
+
     /**
      * @return the underlying data array
      */
@@ -66,33 +57,21 @@
     /**
      * reset the <code>WireByteArrayInputStream</code> to use an new byte array
      * 
-     * @param newBuff buffer to use
-     * @param offset the offset in the buffer of the first byte to read.
-     * @param length the maximum number of bytes to read from the buffer.
-     */
-    public void restart(byte[] newBuff, int offset, int length) {
-        buf = newBuff;
-        pos = offset;
-        count = Math.min(offset + length, newBuff.length);
-        mark = offset;
-    }
-
-    /**
-     * reset the <code>WireByteArrayInputStream</code> to use an new byte array
-     * 
      * @param newBuff
      */
-    public void restart(byte[] newBuff) {
-        restart(newBuff, 0, newBuff.length);
+    public void restart(byte[] newBuff){
+        buf=newBuff;
+        pos=0;
     }
-    
+
     /**
      * re-start the input stream - reusing the current buffer
+     * 
      * @param size
      */
     public void restart(int size){
-        if (buf == null || buf.length < size){
-            buf = new byte[size];
+        if(buf==null||buf.length<size){
+            buf=new byte[size];
         }
         restart(buf);
     }
@@ -106,8 +85,8 @@
      * 
      * @return the next byte of data, or <code>-1</code> if the end of the stream has been reached.
      */
-    public int read() {
-        return (pos < count) ? (buf[pos++] & 0xff) : -1;
+    public int read(){
+        return (pos<buf.length)?(buf[pos++]&0xff):-1;
     }
 
     /**
@@ -117,33 +96,173 @@
      * @param off the start offset of the data.
      * @param len the maximum number of bytes read.
      * @return the total number of bytes read into the buffer, or <code>-1</code> if there is no more data because the
-     * end of the stream has been reached.
+     *         end of the stream has been reached.
      */
-    public int read(byte b[], int off, int len) {
-        if (b == null) {
+    public int read(byte b[],int off,int len){
+        if(b==null){
             throw new NullPointerException();
         }
-        else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
-            throw new IndexOutOfBoundsException();
-        }
-        if (pos >= count) {
+        if(pos>=buf.length){
             return -1;
         }
-        if (pos + len > count) {
-            len = count - pos;
+        if(pos+len>buf.length){
+            len=buf.length-pos;
         }
-        if (len <= 0) {
+        if(len<=0){
             return 0;
         }
-        System.arraycopy(buf, pos, b, off, len);
-        pos += len;
+        System.arraycopy(buf,pos,b,off,len);
+        pos+=len;
         return len;
     }
 
     /**
      * @return the number of bytes that can be read from the input stream without blocking.
      */
-    public int available() {
-        return count - pos;
+    public int available(){
+        return buf.length-pos;
+    }
+
+    public void readFully(byte[] b){
+        read(b,0,b.length);
+    }
+
+    public void readFully(byte[] b,int off,int len){
+        read(b,off,len);
+    }
+
+    public int skipBytes(int n){
+        if(pos+n>buf.length){
+            n=buf.length-pos;
+        }
+        if(n<0){
+            return 0;
+        }
+        pos+=n;
+        return n;
+    }
+
+    public boolean readBoolean(){
+        return read()!=0;
+    }
+
+    public byte readByte(){
+        return (byte) read();
+    }
+
+    public int readUnsignedByte(){
+        return read();
+    }
+
+    public short readShort(){
+        int ch1=read();
+        int ch2=read();
+        return (short) ((ch1<<8)+(ch2<<0));
+    }
+
+    public int readUnsignedShort(){
+        int ch1=read();
+        int ch2=read();
+        return ((ch1<<8)+(ch2<<0));
+    }
+
+    public char readChar(){
+        int ch1=read();
+        int ch2=read();
+        return (char) ((ch1<<8)+(ch2<<0));
+    }
+
+    public int readInt(){
+        int ch1=read();
+        int ch2=read();
+        int ch3=read();
+        int ch4=read();
+        return ((ch1<<24)+(ch2<<16)+(ch3<<8)+(ch4<<0));
+    }
+
+    public long readLong(){
+        return (((long) buf[pos++]<<56)+((long) (buf[pos++]&255)<<48)+((long) (buf[pos++]&255)<<40)
+                        +((long) (buf[pos++]&255)<<32)+((long) (buf[pos++]&255)<<24)+((buf[pos++]&255)<<16)
+                        +((buf[pos++]&255)<<8)+((buf[pos++]&255)<<0));
+    }
+
+    public float readFloat() throws IOException{
+        return Float.intBitsToFloat(readInt());
+    }
+
+    public double readDouble() throws IOException{
+        return Double.longBitsToDouble(readLong());
+    }
+
+    public String readLine(){
+        int start=pos;
+        while(pos<buf.length){
+            int c=read();
+            if(c=='\n'){
+                break;
+            }
+            if(c=='\r'){
+                c=read();
+                if(c!='\n'&&c!=-1){
+                    pos--;
+                }
+                break;
+            }
+        }
+        return new String(buf,start,pos);
+    }
+
+    public String readUTF() throws IOException{
+        int length=readUnsignedShort();
+        char[] characters=new char[length];
+        int c,c2,c3;
+        int count=0;
+        int total=pos+length;
+        while(pos<total){
+            c=(int) buf[pos]&0xff;
+            if(c>127)
+                break;
+            pos++;
+            characters[count++]=(char) c;
+        }
+        while(pos<total){
+            c=(int) buf[pos]&0xff;
+            switch(c>>4){
+            case 0:
+            case 1:
+            case 2:
+            case 3:
+            case 4:
+            case 5:
+            case 6:
+            case 7:
+                pos++;
+                characters[count++]=(char) c;
+                break;
+            case 12:
+            case 13:
+                pos+=2;
+                if(pos>length)
+                    throw new UTFDataFormatException("bad string");
+                c2=(int) buf[pos-1];
+                if((c2&0xC0)!=0x80)
+                    throw new UTFDataFormatException("bad string");
+                characters[count++]=(char) (((c&0x1F)<<6)|(c2&0x3F));
+                break;
+            case 14:
+                pos+=3;
+                if(pos>length)
+                    throw new UTFDataFormatException("bad string");
+                c2=(int) buf[pos-2];
+                c3=(int) buf[pos-1];
+                if(((c2&0xC0)!=0x80)||((c3&0xC0)!=0x80))
+                    throw new UTFDataFormatException("bad string");
+                characters[count++]=(char) (((c&0x0F)<<12)|((c2&0x3F)<<6)|((c3&0x3F)<<0));
+                break;
+            default:
+                throw new UTFDataFormatException("bad string");
+            }
+        }
+        return new String(characters,0,count);
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayOutputStream.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayOutputStream.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayOutputStream.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayOutputStream.java Thu Apr 20 07:15:30 2006
@@ -11,22 +11,20 @@
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
  */
-
 package org.apache.activemq.kaha.impl;
-import java.io.ByteArrayOutputStream;
 
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UTFDataFormatException;
 /**
  * Optimized ByteArrayOutputStream
  * 
  * @version $Revision: 1.1.1.1 $
  */
-public class StoreByteArrayOutputStream extends ByteArrayOutputStream {
-    /**
-     * Creates a new byte array output stream. 
-     */
-    public StoreByteArrayOutputStream() {
-        super(16 * 1024);
-    }
+final class StoreByteArrayOutputStream extends OutputStream implements DataOutput{
+    private byte buf[];
+    private int pos;
 
     /**
      * Creates a new byte array output stream, with a buffer capacity of the specified size, in bytes.
@@ -34,8 +32,18 @@
      * @param size the initial size.
      * @exception IllegalArgumentException if size is negative.
      */
-    public StoreByteArrayOutputStream(int size) {
-        super(size);
+    public StoreByteArrayOutputStream(int size){
+        if(size<0){
+            throw new IllegalArgumentException("Invalid size: "+size);
+        }
+        buf=new byte[size];
+    }
+
+    /**
+     * Creates a new byte array output stream.
+     */
+    public StoreByteArrayOutputStream(){
+        this(16*1024);
     }
 
     /**
@@ -43,9 +51,9 @@
      * 
      * @param size
      */
-    public void restart(int size) {
-        buf = new byte[size];
-        count = 0;
+    public void restart(int size){
+        buf=new byte[size];
+        pos=0;
     }
 
     /**
@@ -53,15 +61,11 @@
      * 
      * @param b the byte to be written.
      */
-    public void write(int b) {
-        int newcount = count + 1;
-        if (newcount > buf.length) {
-            byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
-            System.arraycopy(buf, 0, newbuf, 0, count);
-            buf = newbuf;
-        }
-        buf[count] = (byte) b;
-        count = newcount;
+    public void write(int b){
+        int newcount=pos+1;
+        ensureEnoughBuffer(newcount);
+        buf[pos]=(byte) b;
+        pos=newcount;
     }
 
     /**
@@ -72,47 +76,155 @@
      * @param off the start offset in the data.
      * @param len the number of bytes to write.
      */
-    public void write(byte b[], int off, int len) {
-        if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
-            throw new IndexOutOfBoundsException();
-        }
-        else if (len == 0) {
+    public void write(byte b[],int off,int len){
+        if(len==0){
             return;
         }
-        int newcount = count + len;
-        if (newcount > buf.length) {
-            byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
-            System.arraycopy(buf, 0, newbuf, 0, count);
-            buf = newbuf;
-        }
-        System.arraycopy(b, off, buf, count, len);
-        count = newcount;
+        int newcount=pos+len;
+        ensureEnoughBuffer(newcount);
+        System.arraycopy(b,off,buf,pos,len);
+        pos=newcount;
     }
 
     /**
      * @return the underlying byte[] buffer
      */
-    public byte[] getData() {
+    public byte[] getData(){
         return buf;
     }
-    
+
     /**
      * reset the output stream
      */
     public void reset(){
-        count = 0;
+        pos=0;
     }
-    
+
     /**
      * Set the current position for writing
+     * 
      * @param offset
      */
     public void position(int offset){
-        if (offset > buf.length) {
-            byte newbuf[] = new byte[Math.max(buf.length << 1, offset)];
-            System.arraycopy(buf, 0, newbuf, 0, count);
-            buf = newbuf;
+        ensureEnoughBuffer(offset);
+        pos=offset;
+    }
+    
+    public int size(){
+        return pos;
+    }
+
+    public void writeBoolean(boolean v){
+        ensureEnoughBuffer(1);
+        buf[pos++]=(byte) (v?1:0);
+    }
+
+    public void writeByte(int v){
+        ensureEnoughBuffer(1);
+        buf[pos++]=(byte) (v>>>0);
+    }
+
+    public void writeShort(int v){
+        ensureEnoughBuffer(2);
+        buf[pos++]=(byte) (v>>>8);
+        buf[pos++]=(byte) (v>>>0);
+    }
+
+    public void writeChar(int v){
+        ensureEnoughBuffer(2);
+        buf[pos++]=(byte) (v>>>8);
+        buf[pos++]=(byte) (v>>>0);
+    }
+
+    public void writeInt(int v){
+        ensureEnoughBuffer(4);
+        buf[pos++]=(byte) (v>>>24);
+        buf[pos++]=(byte) (v>>>16);
+        buf[pos++]=(byte) (v>>>8);
+        buf[pos++]=(byte) (v>>>0);
+    }
+
+    public void writeLong(long v){
+        ensureEnoughBuffer(8);
+        buf[pos++]=(byte) (v>>>56);
+        buf[pos++]=(byte) (v>>>48);
+        buf[pos++]=(byte) (v>>>40);
+        buf[pos++]=(byte) (v>>>32);
+        buf[pos++]=(byte) (v>>>24);
+        buf[pos++]=(byte) (v>>>16);
+        buf[pos++]=(byte) (v>>>8);
+        buf[pos++]=(byte) (v>>>0);
+    }
+
+    public void writeFloat(float v) throws IOException{
+        writeInt(Float.floatToIntBits(v));
+    }
+
+    public void writeDouble(double v) throws IOException{
+        writeLong(Double.doubleToLongBits(v));
+    }
+
+    public void writeBytes(String s){
+        int length=s.length();
+        for(int i=0;i<length;i++){
+            write((byte) s.charAt(i));
+        }
+    }
+
+    public void writeChars(String s){
+        int length=s.length();
+        for(int i=0;i<length;i++){
+            int c=s.charAt(i);
+            write((c>>>8)&0xFF);
+            write((c>>>0)&0xFF);
+        }
+    }
+
+    public void writeUTF(String str) throws IOException{
+        int strlen=str.length();
+        int encodedsize=0;
+        int c;
+        for(int i=0;i<strlen;i++){
+            c=str.charAt(i);
+            if((c>=0x0001)&&(c<=0x007F)){
+                encodedsize++;
+            }else if(c>0x07FF){
+                encodedsize+=3;
+            }else{
+                encodedsize+=2;
+            }
+        }
+        if(encodedsize>65535)
+            throw new UTFDataFormatException("encoded string too long: "+encodedsize+" bytes");
+        ensureEnoughBuffer(encodedsize+2);
+        writeShort(encodedsize);
+        int i=0;
+        for(i=0;i<strlen;i++){
+            c=str.charAt(i);
+            if(!((c>=0x0001)&&(c<=0x007F)))
+                break;
+            buf[pos++]=(byte) c;
+        }
+        for(;i<strlen;i++){
+            c=str.charAt(i);
+            if((c>=0x0001)&&(c<=0x007F)){
+                buf[pos++]=(byte) c;
+            }else if(c>0x07FF){
+                buf[pos++]=(byte) (0xE0|((c>>12)&0x0F));
+                buf[pos++]=(byte) (0x80|((c>>6)&0x3F));
+                buf[pos++]=(byte) (0x80|((c>>0)&0x3F));
+            }else{
+                buf[pos++]=(byte) (0xC0|((c>>6)&0x1F));
+                buf[pos++]=(byte) (0x80|((c>>0)&0x3F));
+            }
+        }
+    }
+
+    private void ensureEnoughBuffer(int newcount){
+        if(newcount>buf.length){
+            byte newbuf[]=new byte[Math.max(buf.length<<1,newcount)];
+            System.arraycopy(buf,0,newbuf,0,pos);
+            buf=newbuf;
         }
-        count = offset;
     }
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataReader.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataReader.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataReader.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataReader.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,50 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import org.apache.activemq.kaha.Marshaller;
+/**
+ * Optimized Store reader
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+final class StoreDataReader{
+    private DataManager dataManager;
+    private StoreByteArrayInputStream dataIn;
+    private byte[] header=new byte[DataItem.HEAD_SIZE];
+
+    /**
+     * Construct a Store reader
+     * 
+     * @param file
+     */
+    StoreDataReader(DataManager fileManager){
+        this.dataManager=fileManager;
+        this.dataIn=new StoreByteArrayInputStream();
+    }
+
+    protected Object readItem(Marshaller marshaller,DataItem item) throws IOException{
+        RandomAccessFile file=dataManager.getDataFile(item);
+        file.seek(item.getOffset());
+        file.readFully(header);
+        dataIn.restart(header);
+        item.readHeader(dataIn);
+        byte[] data=new byte[item.getSize()];
+        file.readFully(data);
+        dataIn.restart(data);
+        return marshaller.readPayload(dataIn);
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataWriter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataWriter.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataWriter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataWriter.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,59 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+/**
+ * Optimized writes to a RandomAcessFile
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.IOException;
+import org.apache.activemq.kaha.Marshaller;
+/**
+ * Optimized Store writer
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+final class StoreDataWriter{
+    private StoreByteArrayOutputStream dataOut;
+    private DataManager dataManager;
+
+    /**
+     * Construct a Store writer
+     * 
+     * @param file
+     */
+    StoreDataWriter(DataManager fileManager){
+        this.dataManager=fileManager;
+        this.dataOut=new StoreByteArrayOutputStream();
+    }
+
+    DataItem storeItem(Marshaller marshaller,Object payload) throws IOException{
+        dataOut.reset();
+        dataOut.position(DataItem.HEAD_SIZE);
+        marshaller.writePayload(payload,dataOut);
+        int size=dataOut.size();
+        int payloadSize=size-DataItem.HEAD_SIZE;
+        DataItem item=new DataItem();
+        item.setSize(payloadSize);
+        DataFile dataFile=dataManager.findSpaceForData(item);
+        dataOut.reset();
+        item.writeHeader(dataOut);
+        dataFile.getRandomAccessFile().seek(item.getOffset());
+        dataFile.getRandomAccessFile().write(dataOut.getData(),0,size);
+        dataFile.incrementLength(size);
+        dataManager.addInterestInFile(dataFile);
+        return item;
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexReader.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexReader.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexReader.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexReader.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,47 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+/**
+ * Optimized Store reader
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class StoreIndexReader{
+    protected RandomAccessFile file;
+    protected StoreByteArrayInputStream dataIn;
+    protected byte[] buffer=new byte[IndexItem.INDEX_SIZE];
+
+    /**
+     * Construct a Store reader
+     * 
+     * @param file
+     */
+    StoreIndexReader(RandomAccessFile file){
+        this.file=file;
+        this.dataIn=new StoreByteArrayInputStream();
+    }
+
+    protected IndexItem readItem(long offset) throws IOException{
+        file.seek(offset);
+        file.readFully(buffer);
+        dataIn.restart(buffer);
+        IndexItem result=new IndexItem();
+        result.setOffset(offset);
+        result.read(dataIn);
+        return result;
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexWriter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexWriter.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexWriter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexWriter.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,48 @@
+/**
+ * 
+ * Copyright 2005-2006 The Apache Software Foundation
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+/**
+ * Optimized writes to a RandomAcessFile
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+/**
+ * Optimized Store writer
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class StoreIndexWriter{
+    protected StoreByteArrayOutputStream dataOut;
+    protected RandomAccessFile file;
+
+    /**
+     * Construct a Store index writer
+     * 
+     * @param file
+     */
+    StoreIndexWriter(RandomAccessFile file){
+        this.file=file;
+        this.dataOut=new StoreByteArrayOutputStream();
+    }
+
+    void storeItem(IndexItem index) throws IOException{
+        dataOut.reset();
+        index.write(dataOut);
+        file.seek(index.getOffset());
+        file.write(dataOut.getData(),0,IndexItem.INDEX_SIZE);
+    }
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java Thu Apr 20 07:15:30 2006
@@ -18,8 +18,8 @@
 
 package org.apache.activemq.store.kahadaptor;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import org.apache.activemq.kaha.Marshaller;
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
@@ -32,13 +32,13 @@
 public class AtomicIntegerMarshaller implements Marshaller{
    
 
-    public void writePayload(Object object,DataOutputStream dataOut) throws IOException{
+    public void writePayload(Object object,DataOutput dataOut) throws IOException{
        AtomicInteger ai = (AtomicInteger) object;
        dataOut.writeInt(ai.get());
        
     }
 
-    public Object readPayload(DataInputStream dataIn) throws IOException{
+    public Object readPayload(DataInput dataIn) throws IOException{
         int value = dataIn.readInt();
         return new AtomicInteger(value);
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java Thu Apr 20 07:15:30 2006
@@ -17,10 +17,9 @@
 
 package org.apache.activemq.store.kahadaptor;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
-
 import org.apache.activeio.command.WireFormat;
 import org.apache.activeio.packet.ByteArrayPacket;
 import org.apache.activeio.packet.Packet;
@@ -38,7 +37,7 @@
       
     }
     
-    public void writePayload(Object object,DataOutputStream dataOut) throws IOException{
+    public void writePayload(Object object,DataOutput dataOut) throws IOException{
         Packet packet = wireFormat.marshal(object);
         byte[] data = packet.sliceAsBytes();
         dataOut.writeInt(data.length);
@@ -46,7 +45,7 @@
     }
 
    
-    public Object readPayload(DataInputStream dataIn) throws IOException{
+    public Object readPayload(DataInput dataIn) throws IOException{
         int size=dataIn.readInt();
         byte[] data=new byte[size];
         dataIn.readFully(data);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java Thu Apr 20 07:15:30 2006
@@ -18,7 +18,6 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
-import org.apache.activeio.command.WireFormat;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -33,6 +32,8 @@
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionStore;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 /**
  * @org.apache.xbean.XBean
@@ -40,6 +41,7 @@
  * @version $Revision: 1.4 $
  */
 public class KahaPersistentAdaptor implements PersistenceAdapter{
+    private static final Log log=LogFactory.getLog(KahaPersistentAdaptor.class);
     static final String PREPARED_TRANSACTIONS_NAME="PreparedTransactions";
     KahaTransactionStore transactionStore;
     ConcurrentHashMap topics=new ConcurrentHashMap();
@@ -58,13 +60,18 @@
     }
 
     public Set getDestinations(){
+        
         Set rc=new HashSet();
+        try {
         for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
             Object obj=i.next();
             if(obj instanceof ActiveMQDestination){
                 rc.add(obj);
             }
         }
+        }catch(IOException e){
+            log.error("Failed to get destinations " ,e);
+        }
         return rc;
     }
 
@@ -89,7 +96,6 @@
             MapContainer ackContainer=store.getMapContainer(destination.toString()+"-Acks");
             ackContainer.setKeyMarshaller(new StringMarshaller());
             ackContainer.setValueMarshaller(new AtomicIntegerMarshaller());
-            ackContainer.load();
             rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination);
             messageStores.put(destination, rc);
             if(transactionStore!=null){
@@ -137,10 +143,7 @@
 
     public void deleteAllMessages() throws IOException{
         if(store!=null){
-            store.clear();
-        }
-        if(transactionStore!=null){
-            transactionStore.delete();
+            store.delete();
         }
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Thu Apr 20 07:15:30 2006
@@ -23,6 +23,7 @@
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.kaha.ListContainer;
 import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.kaha.StringMarshaller;
 import org.apache.activemq.store.MessageRecoveryListener;
@@ -71,7 +72,8 @@
         String id=messageId.toString();
         ListContainer container=(ListContainer) subscriberAcks.get(subcriberId);
         if(container!=null){
-            container.remove(id);
+            //container.remove(id);
+            container.removeFirst();
             AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
             if(count!=null){
                 if(count.decrementAndGet()>0){
@@ -96,7 +98,11 @@
         info.setSelector(selector);
         info.setSubcriptionName(subscriptionName);
         String key=getSubscriptionKey(clientId,subscriptionName);
-        subscriberContainer.put(key,info);
+        // if already exists - won't add it again as it causes data files
+        // to hang around
+        if(!subscriberContainer.containsKey(key)){
+            subscriberContainer.put(key,info);
+        }
         addSubscriberAckContainer(key);
     }
 
@@ -122,15 +128,19 @@
                     throws Exception{
         String key=getSubscriptionKey(clientId,subscriptionName);
         ListContainer list=(ListContainer) subscriberAcks.get(key);
-        for(Iterator i=list.iterator();i.hasNext();){
-            Object msg=messageContainer.get(i.next());
-            if(msg!=null){
-                if(msg.getClass()==String.class){
-                    listener.recoverMessageReference((String) msg);
-                }else{
-                    listener.recoverMessage((Message) msg);
+        if(list!=null){
+            for(Iterator i=list.iterator();i.hasNext();){
+                Object msg=messageContainer.get(i.next());
+                if(msg!=null){
+                    if(msg.getClass()==String.class){
+                        listener.recoverMessageReference((String) msg);
+                    }else{
+                        listener.recoverMessage((Message) msg);
+                    }
                 }
+                listener.finished();
             }
+        }else{
             listener.finished();
         }
     }
@@ -154,8 +164,8 @@
 
     protected void addSubscriberAckContainer(Object key) throws IOException{
         ListContainer container=store.getListContainer(key);
-        container.setMarshaller(new StringMarshaller());
-        container.load();
+        Marshaller marshaller=new StringMarshaller();
+        container.setMarshaller(marshaller);
         subscriberAcks.put(key,container);
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java Thu Apr 20 07:15:30 2006
@@ -17,12 +17,11 @@
 
 package org.apache.activemq.store.kahadaptor;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.activeio.command.WireFormat;
 import org.apache.activeio.packet.ByteArrayPacket;
 import org.apache.activeio.packet.Packet;
@@ -41,7 +40,7 @@
       
     }
     
-    public void writePayload(Object object,DataOutputStream dataOut) throws IOException{
+    public void writePayload(Object object,DataOutput dataOut) throws IOException{
         KahaTransaction kt = (KahaTransaction) object;
         List list = kt.getList();
         dataOut.writeInt(list.size());
@@ -62,7 +61,7 @@
        }
 
    
-    public Object readPayload(DataInputStream dataIn) throws IOException{
+    public Object readPayload(DataInput dataIn) throws IOException{
         KahaTransaction result = new KahaTransaction();
         List list = new ArrayList();
         result.setList(list);

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/LoadTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/LoadTest.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/LoadTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/LoadTest.java Thu Apr 20 07:15:30 2006
@@ -16,7 +16,7 @@
 import java.io.IOException;
 import java.io.PrintWriter;
 import junit.framework.TestCase;
-import org.apache.activemq.kaha.impl.StoreImpl;
+import org.apache.activemq.kaha.impl.KahaStore;
 import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
 /**
  * Store test
@@ -27,7 +27,7 @@
     static final int COUNT=10000;
     static final int NUM_LOADERS=2;
     protected String name="load.db";
-    protected StoreImpl store;
+    protected KahaStore store;
 
     /*
      * Test method for 'org.apache.activemq.kaha.Store.close()'
@@ -40,11 +40,10 @@
             loader.start();
         }
         stop.await();
-        store.dumpFreeSpace(new PrintWriter(System.out));
     }
 
-    protected StoreImpl getStore() throws IOException{
-        return (StoreImpl) StoreFactory.open(name,"rw");
+    protected KahaStore getStore() throws IOException{
+        return (KahaStore) StoreFactory.open(name,"rw");
     }
 
     protected void setUp() throws Exception{

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java Thu Apr 20 07:15:30 2006
@@ -21,7 +21,6 @@
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.kaha.StringMarshaller;
-import org.apache.activemq.kaha.impl.StoreImpl;
 import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
 import junit.framework.TestCase;
 /**