You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/04/20 16:15:38 UTC
svn commit: r395597 [1/3] - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/kaha/ main/java/org/apache/activemq/kaha/impl/
main/java/org/apache/activemq/store/kahadaptor/
test/java/org/apache/activemq/kaha/ test/java/org/a...
Author: rajdavies
Date: Thu Apr 20 07:15:30 2006
New Revision: 395597
URL: http://svn.apache.org/viewcvs?rev=395597&view=rev
Log:
tuning around kaha persistence
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataFile.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataItem.java (with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexItem.java
- copied, changed from r393376, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/LocatableItem.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexLinkedList.java (with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexManager.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java (with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataReader.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataWriter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexReader.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexWriter.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/IndexLinkedListTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaQueueTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/MemoryAllocationTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/TemporaryTopicMemoryAllocationTest.java
Removed:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/FreeSpaceManager.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/FreeSpaceTree.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/LocatableItem.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RootContainer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreImpl.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreReader.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreWriter.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/BytesMarshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Marshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StringMarshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerListIterator.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerValueCollection.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerValueCollectionIterator.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/Item.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayInputStream.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayOutputStream.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/LoadTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDurableTopicTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/SimpleTopicTest.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/BytesMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/BytesMarshaller.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/BytesMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/BytesMarshaller.java Thu Apr 20 07:15:30 2006
@@ -13,8 +13,8 @@
*/
package org.apache.activemq.kaha;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
/**
* Implementation of a Marshaller for byte arrays
@@ -29,7 +29,7 @@
* @param dataOut
* @throws IOException
*/
- public void writePayload(Object object,DataOutputStream dataOut) throws IOException{
+ public void writePayload(Object object,DataOutput dataOut) throws IOException{
byte[] data=(byte[]) object;
dataOut.writeInt(data.length);
dataOut.write(data);
@@ -42,7 +42,7 @@
* @return unmarshalled object
* @throws IOException
*/
- public Object readPayload(DataInputStream dataIn) throws IOException{
+ public Object readPayload(DataInput dataIn) throws IOException{
int size=dataIn.readInt();
byte[] data=new byte[size];
dataIn.readFully(data);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ListContainer.java Thu Apr 20 07:15:30 2006
@@ -99,6 +99,6 @@
* @return true if successful
*/
public boolean doRemove(int position);
-
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Marshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Marshaller.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Marshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Marshaller.java Thu Apr 20 07:15:30 2006
@@ -16,8 +16,8 @@
*/
package org.apache.activemq.kaha;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
/**
@@ -34,7 +34,7 @@
* @param dataOut
* @throws IOException
*/
- public void writePayload(Object object, DataOutputStream dataOut) throws IOException;
+ public void writePayload(Object object, DataOutput dataOut) throws IOException;
/**
@@ -43,7 +43,7 @@
* @return unmarshalled object
* @throws IOException
*/
- public Object readPayload(DataInputStream dataIn) throws IOException;
+ public Object readPayload(DataInput dataIn) throws IOException;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/ObjectMarshaller.java Thu Apr 20 07:15:30 2006
@@ -15,8 +15,8 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -33,7 +33,7 @@
* @param dataOut
* @throws IOException
*/
- public void writePayload(Object object,DataOutputStream dataOut) throws IOException{
+ public void writePayload(Object object,DataOutput dataOut) throws IOException{
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
ObjectOutputStream objectOut=new ObjectOutputStream(bytesOut);
objectOut.writeObject(object);
@@ -50,7 +50,7 @@
* @return unmarshalled object
* @throws IOException
*/
- public Object readPayload(DataInputStream dataIn) throws IOException{
+ public Object readPayload(DataInput dataIn) throws IOException{
int size = dataIn.readInt();
byte[] data = new byte[size];
dataIn.readFully(data);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java Thu Apr 20 07:15:30 2006
@@ -41,8 +41,9 @@
* Checks if a MapContainer exists
* @param id
* @return new MapContainer
+ * @throws IOException
*/
- public boolean doesMapContainerExist(Object id);
+ public boolean doesMapContainerExist(Object id) throws IOException;
/**
* Get a MapContainer with the given id - the MapContainer is created if needed
@@ -62,15 +63,17 @@
/**
* Get a Set of call MapContainer Ids
* @return the set of ids
+ * @throws IOException
*/
- public Set getMapContainerIds();
+ public Set getMapContainerIds() throws IOException;
/**
* Checks if a ListContainer exists
* @param id
* @return new MapContainer
+ * @throws IOException
*/
- public boolean doesListContainerExist(Object id);
+ public boolean doesListContainerExist(Object id) throws IOException;
/**
* Get a ListContainer with the given id and creates it if it doesn't exist
@@ -90,8 +93,9 @@
/**
* Get a Set of call ListContainer Ids
* @return the set of ids
+ * @throws IOException
*/
- public Set getListContainerIds();
+ public Set getListContainerIds() throws IOException;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StoreFactory.java Thu Apr 20 07:15:30 2006
@@ -15,7 +15,8 @@
import java.io.File;
import java.io.IOException;
-import org.apache.activemq.kaha.impl.StoreImpl;
+import org.apache.activemq.kaha.impl.KahaStore;
+
/**
* Factory for creating stores
*
@@ -32,16 +33,17 @@
* @throws IOException
*/
public static Store open(String name,String mode) throws IOException{
- return new StoreImpl(name,mode);
+ return new KahaStore(name,mode);
}
/**
* Delete a database
* @param name of the database
* @return true if successful
+ * @throws IOException
*/
- public static boolean delete(String name){
- File file = new File(name);
- return file.delete();
+ public static boolean delete(String name) throws IOException{
+ KahaStore store = new KahaStore(name,"rw");
+ return store.delete();
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StringMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StringMarshaller.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StringMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/StringMarshaller.java Thu Apr 20 07:15:30 2006
@@ -13,8 +13,8 @@
*/
package org.apache.activemq.kaha;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
/**
* Implementation of a Marshaller for Strings
@@ -29,7 +29,7 @@
* @param dataOut
* @throws IOException
*/
- public void writePayload(Object object,DataOutputStream dataOut) throws IOException{
+ public void writePayload(Object object,DataOutput dataOut) throws IOException{
dataOut.writeUTF(object.toString());
}
@@ -40,7 +40,7 @@
* @return unmarshalled object
* @throws IOException
*/
- public Object readPayload(DataInputStream dataIn) throws IOException{
+ public Object readPayload(DataInput dataIn) throws IOException{
return dataIn.readUTF();
}
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,155 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.activemq.kaha.RuntimeStoreException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+/**
+ * Implementation of a ListContainer
+ *
+ * @version $Revision: 1.2 $
+ */
+public abstract class BaseContainerImpl{
+ private static final Log log=LogFactory.getLog(BaseContainerImpl.class);
+ protected IndexItem root;
+ protected IndexLinkedList list;
+ protected IndexManager indexManager;
+ protected DataManager dataManager;
+ protected Object id;
+ protected boolean loaded=false;
+ protected boolean closed=false;
+ protected final Object mutex=new Object();
+
+ protected BaseContainerImpl(Object id,IndexItem root,IndexManager indexManager,DataManager dataManager){
+ this.id=id;
+ this.root=root;
+ this.indexManager=indexManager;
+ this.dataManager=dataManager;
+ this.list=new IndexLinkedList(root);
+ }
+
+ public abstract void unload();
+
+ public abstract void load();
+
+ public abstract int size();
+
+ public abstract void clear();
+
+ protected abstract Object getValue(IndexItem currentItem);
+
+ protected abstract void remove(IndexItem currentItem);
+
+ protected final IndexLinkedList getInternalList(){
+ return list;
+ }
+
+ public final void close(){
+ unload();
+ closed=true;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.ListContainer#isLoaded()
+ */
+ public final boolean isLoaded(){
+ checkClosed();
+ return loaded;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.kaha.ListContainer#getId()
+ */
+ public final Object getId(){
+ checkClosed();
+ return id;
+ }
+
+ protected final void expressDataInterest() throws IOException{
+ long nextItem=root.getNextItem();
+ while(nextItem!=Item.POSITION_NOT_SET){
+ IndexItem item=indexManager.getIndex(nextItem);
+ item.setOffset(nextItem);
+ dataManager.addInterestInFile(item.getKeyFile());
+ dataManager.addInterestInFile(item.getValueFile());
+ nextItem=item.getNextItem();
+ }
+ }
+
+ protected final void doClear(){
+ checkClosed();
+ synchronized(mutex){
+ loaded=true;
+ synchronized(mutex){
+ List list=new ArrayList();
+ try{
+ long nextItem=root.getNextItem();
+ while(nextItem!=Item.POSITION_NOT_SET){
+ IndexItem item=new IndexItem();
+ item.setOffset(nextItem);
+ list.add(item);
+ nextItem=item.getNextItem();
+ }
+ root.setNextItem(Item.POSITION_NOT_SET);
+ indexManager.updateIndex(root);
+ for(int i=0;i<list.size();i++){
+ IndexItem item=(IndexItem) list.get(i);
+ dataManager.removeInterestInFile(item.getKeyFile());
+ dataManager.removeInterestInFile(item.getValueFile());
+ indexManager.freeIndex(item);
+ }
+ list.clear();
+ }catch(IOException e){
+ log.error("Failed to clear Container "+getId(),e);
+ throw new RuntimeStoreException(e);
+ }
+ }
+ }
+ }
+
+ protected final void delete(IndexItem key,IndexItem prev,IndexItem next){
+ try{
+ dataManager.removeInterestInFile(key.getKeyFile());
+ dataManager.removeInterestInFile(key.getValueFile());
+ prev=prev==null?root:prev;
+ next=next!=root?next:null;
+ if(next!=null){
+ prev.setNextItem(next.getOffset());
+ next.setPreviousItem(prev.getOffset());
+ indexManager.updateIndex(next);
+ }else{
+ prev.setNextItem(Item.POSITION_NOT_SET);
+ }
+ indexManager.updateIndex(prev);
+ indexManager.freeIndex(key);
+ }catch(IOException e){
+ log.error("Failed to delete "+key,e);
+ throw new RuntimeStoreException(e);
+ }
+ }
+
+ protected final void checkClosed(){
+ if(closed){
+ throw new RuntimeStoreException("The store is closed");
+ }
+ }
+}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerListIterator.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerListIterator.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerListIterator.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerListIterator.java Thu Apr 20 07:15:30 2006
@@ -1,54 +1,42 @@
/**
- *
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.util.ListIterator;
-/**
- * @author rajdavies
- *
- */
-public class ContainerListIterator implements ListIterator{
- private ListContainerImpl container;
- private ListIterator iterator;
- private LocatableItem current;
- protected ContainerListIterator(ListContainerImpl container,ListIterator iterator){
- this.container=container;
- this.iterator=iterator;
- this.current = container.internalGet(0);
- }
+/**
+* @version $Revision: 1.2 $
+*/
+public class ContainerListIterator extends ContainerValueCollectionIterator implements ListIterator{
+
+
- /*
- * (non-Javadoc)
- *
- * @see java.util.ListIterator#hasNext()
- */
- public boolean hasNext(){
- return iterator.hasNext();
- }
-
- /*
- * (non-Javadoc)
- *
- * @see java.util.ListIterator#next()
- */
- public Object next(){
- Object result=null;
- current=(LocatableItem) iterator.next();
- if(current!=null){
- result=container.getValue(current);
- }
- return result;
+ protected ContainerListIterator(ListContainerImpl container,IndexLinkedList list,IndexItem start){
+ super(container,list,start);
}
+
/*
* (non-Javadoc)
*
* @see java.util.ListIterator#hasPrevious()
*/
public boolean hasPrevious(){
- return iterator.hasPrevious();
+ return list.getPrevEntry(currentItem) != null;
}
/*
@@ -57,12 +45,8 @@
* @see java.util.ListIterator#previous()
*/
public Object previous(){
- Object result=null;
- current=(LocatableItem) iterator.previous();
- if(current!=null){
- result=container.getValue(current);
- }
- return result;
+ currentItem = list.getPrevEntry(currentItem);
+ return currentItem != null ? container.getValue(currentItem) : null;
}
/*
@@ -71,7 +55,16 @@
* @see java.util.ListIterator#nextIndex()
*/
public int nextIndex(){
- return iterator.nextIndex();
+ int result = -1;
+ if (currentItem != null){
+ IndexItem next = list.getNextEntry(currentItem);
+ if (next != null){
+ result = container.getInternalList().indexOf(next);
+ }
+ }
+
+
+ return result;
}
/*
@@ -80,29 +73,27 @@
* @see java.util.ListIterator#previousIndex()
*/
public int previousIndex(){
- return iterator.previousIndex();
- }
-
- /*
- * (non-Javadoc)
- *
- * @see java.util.ListIterator#remove()
- */
- public void remove(){
- iterator.remove();
- if(current!=null){
- container.remove(current);
+ int result = -1;
+ if (currentItem != null){
+ IndexItem prev = list.getPrevEntry(currentItem);
+ if (prev != null){
+ result = container.getInternalList().indexOf(prev);
+ }
}
+
+
+ return result;
}
+
/*
* (non-Javadoc)
*
* @see java.util.ListIterator#set(E)
*/
public void set(Object o){
- LocatableItem item=container.internalSet(previousIndex()+1,o);
- iterator.set(item);
+ IndexItem item=((ListContainerImpl) container).internalSet(previousIndex()+1,o);
+ currentItem=item;
}
/*
@@ -111,7 +102,7 @@
* @see java.util.ListIterator#add(E)
*/
public void add(Object o){
- LocatableItem item=container.internalAdd(previousIndex()+1,o);
- iterator.set(item);
+ IndexItem item=((ListContainerImpl) container).internalSet(previousIndex()+1,o);
+ currentItem=item;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerValueCollection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerValueCollection.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerValueCollection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerValueCollection.java Thu Apr 20 07:15:30 2006
@@ -42,37 +42,41 @@
public Iterator iterator(){
- LinkedList list=container.getItemList();
- list = (LinkedList) list.clone();
- return new ContainerValueCollectionIterator(container,list.iterator());
+ IndexLinkedList list=container.getItemList();
+ return new ContainerValueCollectionIterator(container,list,list.getRoot());
}
public Object[] toArray(){
Object[] result = null;
- List list = container.getItemList();
+ IndexLinkedList list = container.getItemList();
synchronized(list){
result = new Object[list.size()];
+ IndexItem item = list.getFirst();
int count = 0;
- for(Iterator i=list.iterator();i.hasNext();){
- LocatableItem item=(LocatableItem) i.next();
- Object value=container.getValue(item);
+ while (item != null){
+ Object value=container.getValue(item);
result[count++] = value;
+
+ item = list.getNextEntry(item);
}
+
+
}
return result;
}
public Object[] toArray(Object[] result){
- List list=container.getItemList();
+ IndexLinkedList list=container.getItemList();
synchronized(list){
if(result.length<=list.size()){
- int count=0;
- result=(Object[]) java.lang.reflect.Array.newInstance(result.getClass().getComponentType(),list.size());
- for(Iterator i=list.iterator();i.hasNext();){
- LocatableItem item=(LocatableItem) i.next();
- Object value=container.getValue(item);
- result[count++]=value;
+ IndexItem item = list.getFirst();
+ int count = 0;
+ while (item != null){
+ Object value=container.getValue(item);
+ result[count++] = value;
+
+ item = list.getNextEntry(item);
}
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerValueCollectionIterator.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerValueCollectionIterator.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerValueCollectionIterator.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerValueCollectionIterator.java Thu Apr 20 07:15:30 2006
@@ -25,20 +25,21 @@
* @version $Revision: 1.2 $
*/
public class ContainerValueCollectionIterator implements Iterator{
- private MapContainerImpl container;
- private Iterator iter;
- private LocatableItem currentItem;
- ContainerValueCollectionIterator(MapContainerImpl container,Iterator iter){
+ protected BaseContainerImpl container;
+ protected IndexLinkedList list;
+ protected IndexItem currentItem;
+ ContainerValueCollectionIterator(BaseContainerImpl container,IndexLinkedList list,IndexItem start){
this.container = container;
- this.iter = iter;
+ this.list = list;
+ this.currentItem = start;
}
public boolean hasNext(){
- return iter.hasNext();
+ return currentItem != null && list.getNextEntry(currentItem) != null;
}
public Object next(){
- currentItem = (LocatableItem) iter.next();
+ currentItem = list.getNextEntry(currentItem);
return container.getValue(currentItem);
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataFile.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataFile.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataFile.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataFile.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,97 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+/**
+ * DataFile
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+class DataFile{
+ private File file;
+ private Integer number;
+ private int referenceCount;
+ private RandomAccessFile randomAcessFile;
+ long length=0;
+
+ DataFile(File file,int number){
+ this.file=file;
+ this.number=new Integer(number);
+ length=file.exists()?file.length():0;
+ }
+
+ Integer getNumber(){
+ return number;
+ }
+
+ synchronized RandomAccessFile getRandomAccessFile() throws FileNotFoundException{
+ if(randomAcessFile==null){
+ randomAcessFile=new RandomAccessFile(file,"rw");
+ }
+ return randomAcessFile;
+ }
+
+ synchronized long getLength(){
+ return length;
+ }
+
+ synchronized void incrementLength(int size){
+ length+=size;
+ }
+
+ synchronized void purge() throws IOException{
+ if(randomAcessFile!=null){
+ randomAcessFile.close();
+ randomAcessFile=null;
+ }
+ }
+
+ synchronized boolean delete() throws IOException{
+ purge();
+ return file.delete();
+ }
+
+ synchronized void force() throws IOException{
+ if(randomAcessFile!=null){
+ randomAcessFile.getFD().sync();
+ }
+ }
+
+ synchronized void close() throws IOException{
+ if(randomAcessFile!=null){
+ randomAcessFile.close();
+ }
+ }
+
+ synchronized int increment(){
+ return ++referenceCount;
+ }
+
+ synchronized int decrement(){
+ return --referenceCount;
+ }
+
+ synchronized boolean isUnused(){
+ return referenceCount<=0;
+ }
+
+ public String toString(){
+ String result = file.getName() + " number = " + number + " , length = " + length + " refCount = " + referenceCount;
+ return result;
+ }
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataItem.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataItem.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataItem.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataItem.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,110 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import org.apache.activemq.kaha.Marshaller;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+/**
+ * A a wrapper for a data in the store
+ *
+ * @version $Revision: 1.2 $
+ */
+final class DataItem implements Item{
+ static final int HEAD_SIZE=6; // magic + len
+ private int size;
+ private long offset=POSITION_NOT_SET;
+ private int file=(int) POSITION_NOT_SET;
+
+ DataItem(){}
+
+ boolean isValid(){
+ return file != POSITION_NOT_SET;
+ }
+
+ void writeHeader(DataOutput dataOut) throws IOException{
+ dataOut.writeShort(MAGIC);
+ dataOut.writeInt(size);
+ }
+
+ void readHeader(DataInput dataIn) throws IOException{
+ int magic=dataIn.readShort();
+ if(magic==MAGIC){
+ size=dataIn.readInt();
+ }else{
+ throw new BadMagicException("Unexpected Magic value: "+magic);
+ }
+ }
+
+ void writePayload(Marshaller marshaller,Object object,DataOutputStream dataOut) throws IOException{
+ marshaller.writePayload(object,dataOut);
+ }
+
+ Object readPayload(Marshaller marshaller,DataInputStream dataIn) throws IOException{
+ return marshaller.readPayload(dataIn);
+ }
+
+ /**
+ * @return Returns the size.
+ */
+ int getSize(){
+ return size;
+ }
+
+ /**
+ * @param size The size to set.
+ */
+ void setSize(int size){
+ this.size=size;
+ }
+
+ /**
+ * @return Returns the offset.
+ */
+ long getOffset(){
+ return offset;
+ }
+
+ /**
+ * @param offset The offset to set.
+ */
+ void setOffset(long offset){
+ this.offset=offset;
+ }
+
+ /**
+ * @return Returns the file.
+ */
+ int getFile(){
+ return file;
+ }
+
+ /**
+ * @param file The file to set.
+ */
+ void setFile(int file){
+ this.file=file;
+ }
+
+ /**
+ * @return a pretty print
+ */
+ public String toString(){
+ String result="offset = "+offset+", file = " + file + ", size = "+size;
+ return result;
+ }
+}
\ No newline at end of file
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataItem.java
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,188 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+/**
+ * Manages DataFiles
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+final class DataManager{
+ private static final Log log=LogFactory.getLog(DataManager.class);
+ protected static long MAX_FILE_LENGTH=1024*1024*16;
+ private final File dir;
+ private final String prefix;
+ private StoreDataReader reader;
+ private StoreDataWriter writer;
+ private DataFile currentWriteFile;
+ Map fileMap=new HashMap();
+
+ DataManager(File dir,String pf){
+ this.dir=dir;
+ this.prefix=pf;
+ this.reader=new StoreDataReader(this);
+ this.writer=new StoreDataWriter(this);
+ // build up list of current dataFiles
+ File[] files=dir.listFiles(new FilenameFilter(){
+ public boolean accept(File dir,String name){
+ return dir.equals(dir)&&name.startsWith(prefix);
+ }
+ });
+ if(files!=null){
+ for(int i=0;i<files.length;i++){
+ File file=files[i];
+ String name=file.getName();
+ String numStr=name.substring(prefix.length(),name.length());
+ int num=Integer.parseInt(numStr);
+ DataFile dataFile=new DataFile(file,num);
+ fileMap.put(dataFile.getNumber(),dataFile);
+ if(currentWriteFile==null||currentWriteFile.getNumber().intValue()<num){
+ currentWriteFile=dataFile;
+ }
+ }
+ }
+ }
+
+ DataFile findSpaceForData(DataItem item) throws IOException{
+ if(currentWriteFile==null||((currentWriteFile.getLength()+item.getSize())>MAX_FILE_LENGTH)){
+ int nextNum=currentWriteFile!=null?currentWriteFile.getNumber().intValue()+1:1;
+ if(currentWriteFile!=null&¤tWriteFile.isUnused()){
+ removeDataFile(currentWriteFile);
+ }
+ currentWriteFile=createAndAddDataFile(nextNum);
+ }
+ item.setOffset(currentWriteFile.getLength());
+ item.setFile(currentWriteFile.getNumber().intValue());
+ return currentWriteFile;
+ }
+
+ RandomAccessFile getDataFile(DataItem item) throws IOException{
+ Integer key=new Integer(item.getFile());
+ DataFile dataFile=(DataFile) fileMap.get(key);
+ if(dataFile!=null){
+ return dataFile.getRandomAccessFile();
+ }
+ throw new IOException("Could not locate data file "+prefix+item.getFile());
+ }
+
+ synchronized Object readItem(Marshaller marshaller,DataItem item) throws IOException{
+ return reader.readItem(marshaller,item);
+ }
+
+ synchronized DataItem storeItem(Marshaller marshaller,Object payload) throws IOException{
+ return writer.storeItem(marshaller,payload);
+ }
+
+ synchronized void close() throws IOException{
+ for(Iterator i=fileMap.values().iterator();i.hasNext();){
+ DataFile dataFile=(DataFile) i.next();
+ dataFile.force();
+ }
+ fileMap.clear();
+ }
+
+ synchronized void force() throws IOException{
+ for(Iterator i=fileMap.values().iterator();i.hasNext();){
+ DataFile dataFile=(DataFile) i.next();
+ dataFile.force();
+ }
+ }
+
+ synchronized boolean delete() throws IOException{
+ boolean result=true;
+ for(Iterator i=fileMap.values().iterator();i.hasNext();){
+ DataFile dataFile=(DataFile) i.next();
+ result&=dataFile.delete();
+ }
+ fileMap.clear();
+ return result;
+ }
+
+ synchronized void addInterestInFile(int file) throws IOException{
+ if(file>=0){
+ Integer key=new Integer(file);
+ DataFile dataFile=(DataFile) fileMap.get(key);
+ if(dataFile==null){
+ dataFile=createAndAddDataFile(file);
+ }
+ addInterestInFile(dataFile);
+ }
+ }
+
+ void addInterestInFile(DataFile dataFile){
+ if(dataFile!=null){
+ dataFile.increment();
+ }
+ }
+
+ synchronized void removeInterestInFile(int file) throws IOException{
+ if(file>=0){
+ Integer key=new Integer(file);
+ DataFile dataFile=(DataFile) fileMap.get(key);
+ removeInterestInFile(dataFile);
+ }
+ }
+
+ synchronized void removeInterestInFile(DataFile dataFile) throws IOException{
+ if(dataFile!=null){
+ if(dataFile.decrement()<=0){
+ if(dataFile!=currentWriteFile){
+ removeDataFile(dataFile);
+ }
+ }
+ }
+ }
+
+ synchronized void consolidateDataFiles() throws IOException{
+ List purgeList=new ArrayList();
+ for(Iterator i=fileMap.values().iterator();i.hasNext();){
+ DataFile dataFile=(DataFile) i.next();
+ if(dataFile.isUnused() && dataFile != currentWriteFile){
+ purgeList.add(dataFile);
+ }
+ }
+ for(int i=0;i<purgeList.size();i++){
+ DataFile dataFile=(DataFile) purgeList.get(i);
+ fileMap.remove(dataFile.getNumber());
+ boolean result=dataFile.delete();
+ log.info("discarding data file "+dataFile+(result?"successful ":"failed"));
+ }
+ }
+
+ private DataFile createAndAddDataFile(int num){
+ String fileName=prefix+num;
+ File file=new File(dir,fileName);
+ DataFile result=new DataFile(file,num);
+ fileMap.put(result.getNumber(),result);
+ return result;
+ }
+
+ private void removeDataFile(DataFile dataFile) throws IOException{
+ fileMap.remove(dataFile.getNumber());
+ boolean result=dataFile.delete();
+ log.info("discarding data file "+dataFile+(result?"successful ":"failed"));
+ }
+}
Copied: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexItem.java (from r393376, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/LocatableItem.java)
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexItem.java?p2=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexItem.java&p1=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/LocatableItem.java&r1=393376&r2=395597&rev=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/LocatableItem.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexItem.java Thu Apr 20 07:15:30 2006
@@ -14,113 +14,219 @@
package org.apache.activemq.kaha.impl;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.Externalizable;
import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import org.apache.activemq.kaha.Marshaller;
-
/**
-* A an Item with a relative postion and location to other Items in the Store
-*
-* @version $Revision: 1.2 $
-*/
-public final class LocatableItem extends Item implements Externalizable{
+ * A an Item with a relative postion and location to other Items in the Store
+ *
+ * @version $Revision: 1.2 $
+ */
+final class IndexItem implements Item{
+
+ static final int INDEX_SIZE=43;
+ //used by linked list
+ IndexItem next;
+ IndexItem prev;
-
- private static final long serialVersionUID=-6888731361600185708L;
+ private long offset=POSITION_NOT_SET;
private long previousItem=POSITION_NOT_SET;
private long nextItem=POSITION_NOT_SET;
- private long referenceItem=POSITION_NOT_SET;
-
+ private long keyOffset=POSITION_NOT_SET;
+ private int keyFile=(int) POSITION_NOT_SET;
+ private long valueOffset=POSITION_NOT_SET;
+ private int valueFile=(int) POSITION_NOT_SET;
+ private boolean active=true;
+
- public LocatableItem(){}
+ /**
+ * Default Constructor
+ */
+ IndexItem(){}
- public LocatableItem(long prev,long next,long objOffset) throws IOException{
- this.previousItem=prev;
- this.nextItem=next;
- this.referenceItem=objOffset;
+ void reset(){
+ previousItem=POSITION_NOT_SET;
+ nextItem=POSITION_NOT_SET;
+ keyOffset=POSITION_NOT_SET;
+ keyFile=(int) POSITION_NOT_SET;
+ valueOffset=POSITION_NOT_SET;
+ valueFile=(int) POSITION_NOT_SET;
+ active=true;
+ }
+
+ DataItem getKeyDataItem(){
+ DataItem result=new DataItem();
+ result.setOffset(keyOffset);
+ result.setFile(keyFile);
+ return result;
}
-
- public void writePayload(Marshaller marshaller,Object object,DataOutputStream dataOut) throws IOException{
- dataOut.writeLong(previousItem);
- dataOut.writeLong(nextItem);
- dataOut.writeLong(referenceItem);
- super.writePayload(marshaller,object,dataOut);
+ DataItem getValueDataItem(){
+ DataItem result=new DataItem();
+ result.setOffset(valueOffset);
+ result.setFile(valueFile);
+ return result;
}
- public Object readPayload(Marshaller marshaller,DataInputStream dataIn) throws IOException{
- previousItem=dataIn.readLong();
- nextItem=dataIn.readLong();
- referenceItem=dataIn.readLong();
- return super.readPayload(marshaller, dataIn);
+ void setValueData(DataItem item){
+ valueOffset=item.getOffset();
+ valueFile=item.getFile();
}
-
- void readLocation(DataInput dataIn) throws IOException{
- previousItem=dataIn.readLong();
- nextItem=dataIn.readLong();
- referenceItem=dataIn.readLong();
+
+ void setKeyData(DataItem item){
+ keyOffset=item.getOffset();
+ keyFile=item.getFile();
}
- public void writeLocation(DataOutput dataOut) throws IOException{
+ /**
+ * @param dataOut
+ * @throws IOException
+ */
+ void write(DataOutput dataOut) throws IOException{
+ dataOut.writeShort(MAGIC);
+ dataOut.writeBoolean(active);
dataOut.writeLong(previousItem);
dataOut.writeLong(nextItem);
+ dataOut.writeInt(keyFile);
+ dataOut.writeLong(keyOffset);
+ dataOut.writeInt(valueFile);
+ dataOut.writeLong(valueOffset);
}
- public void setPreviousItem(long newPrevEntry){
+ /**
+ * @param dataIn
+ * @throws IOException
+ */
+ void read(DataInput dataIn) throws IOException{
+ if(dataIn.readShort()!=MAGIC){
+ throw new BadMagicException();
+ }
+ active=dataIn.readBoolean();
+ previousItem=dataIn.readLong();
+ nextItem=dataIn.readLong();
+ keyFile=dataIn.readInt();
+ keyOffset=dataIn.readLong();
+ valueFile=dataIn.readInt();
+ valueOffset=dataIn.readLong();
+ }
+
+ /**
+ * @param newPrevEntry
+ */
+ void setPreviousItem(long newPrevEntry){
previousItem=newPrevEntry;
}
- public long getPreviousItem(){
+ /**
+ * @return prev item
+ */
+ long getPreviousItem(){
return previousItem;
}
- public void setNextItem(long newNextEntry){
+ /**
+ * @param newNextEntry
+ */
+ void setNextItem(long newNextEntry){
nextItem=newNextEntry;
}
- public long getNextItem(){
+ /**
+ * @return next item
+ */
+ long getNextItem(){
return nextItem;
}
- public void setReferenceItem(long newObjectOffset){
- referenceItem=newObjectOffset;
+ /**
+ * @param newObjectOffset
+ */
+ void setKeyOffset(long newObjectOffset){
+ keyOffset=newObjectOffset;
}
- public long getReferenceItem(){
- return referenceItem;
+ /**
+ * @return key offset
+ */
+ long getKeyOffset(){
+ return keyOffset;
}
- /* (non-Javadoc)
- * @see org.apache.activemq.kaha.impl.Item#toString()
+ /**
+ * @return Returns the keyFile.
*/
- public String toString(){
- String result=super.toString();
- result+=" , referenceItem = "+referenceItem+", previousItem = "+previousItem+" , nextItem = "+nextItem;
- return result;
+ int getKeyFile(){
+ return keyFile;
}
- /* (non-Javadoc)
- * @see java.io.Externalizable#writeExternal(java.io.ObjectOutput)
+ /**
+ * @param keyFile The keyFile to set.
*/
- public void writeExternal(ObjectOutput out) throws IOException{
- out.writeLong(previousItem);
- out.writeLong(nextItem);
- out.writeLong(referenceItem);
-
- }
-
- /* (non-Javadoc)
- * @see java.io.Externalizable#readExternal(java.io.ObjectInput)
- */
- public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException{
- previousItem = in.readLong();
- nextItem = in.readLong();
- referenceItem = in.readLong();
-
+ void setKeyFile(int keyFile){
+ this.keyFile=keyFile;
+ }
+
+ /**
+ * @return Returns the valueFile.
+ */
+ int getValueFile(){
+ return valueFile;
+ }
+
+ /**
+ * @param valueFile The valueFile to set.
+ */
+ void setValueFile(int valueFile){
+ this.valueFile=valueFile;
+ }
+
+ /**
+ * @return Returns the valueOffset.
+ */
+ long getValueOffset(){
+ return valueOffset;
+ }
+
+ /**
+ * @param valueOffset The valueOffset to set.
+ */
+ void setValueOffset(long valueOffset){
+ this.valueOffset=valueOffset;
+ }
+
+ /**
+ * @return Returns the active.
+ */
+ boolean isActive(){
+ return active;
+ }
+
+ /**
+ * @param active The active to set.
+ */
+ void setActive(boolean active){
+ this.active=active;
+ }
+
+ /**
+ * @return Returns the offset.
+ */
+ long getOffset(){
+ return offset;
+ }
+
+ /**
+ * @param offset The offset to set.
+ */
+ void setOffset(long offset){
+ this.offset=offset;
+ }
+
+ /**
+ * @return eprtty print of 'this'
+ */
+ public String toString(){
+ String result="offset="+offset+" , keyFile = "+keyFile+" , keyOffset = "+keyOffset+", valueOffset = "
+ +valueOffset+" , previousItem = "+previousItem+" , nextItem = "+nextItem;
+ return result;
}
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexLinkedList.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexLinkedList.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexLinkedList.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexLinkedList.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,276 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+/**
+ * A linked list used by IndexItems
+ *
+ * @version $Revision: 1.2 $
+ */
+final class IndexLinkedList implements Cloneable{
+ private transient IndexItem root;
+ private transient int size=0;
+
+
+ /**
+ * Constructs an empty list.
+ */
+ IndexLinkedList(IndexItem header){
+ this.root = header;
+ this.root.next=root.prev=root;
+ }
+
+ IndexItem getRoot(){
+ return root;
+ }
+
+
+
+ /**
+ * Returns the first element in this list.
+ *
+ * @return the first element in this list.
+ */
+ IndexItem getFirst(){
+ if(size==0)
+ return null;
+ return root.next;
+ }
+
+ /**
+ * Returns the last element in this list.
+ *
+ * @return the last element in this list.
+ */
+ IndexItem getLast(){
+ if(size==0)
+ return null;
+ return root.prev;
+ }
+
+ /**
+ * Removes and returns the first element from this list.
+ *
+ * @return the first element from this list.
+ */
+ IndexItem removeFirst(){
+ if(size==0){
+ return null;
+ }
+ IndexItem result=root.next;
+ remove(root.next);
+ return result;
+ }
+
+ /**
+ * Removes and returns the last element from this list.
+ *
+ * @return the last element from this list.
+ */
+ Object removeLast(){
+ if(size==0)
+ return null;
+ IndexItem result=root.prev;
+ remove(root.prev);
+ return result;
+ }
+
+ /**
+ * Inserts the given element at the beginning of this list.
+ *
+ * @param o the element to be inserted at the beginning of this list.
+ */
+ void addFirst(IndexItem item){
+ addBefore(item,root.next);
+ }
+
+ /**
+ * Appends the given element to the end of this list. (Identical in function to the <tt>add</tt> method; included
+ * only for consistency.)
+ *
+ * @param o the element to be inserted at the end of this list.
+ */
+ void addLast(IndexItem item){
+ addBefore(item,root);
+ }
+
+ /**
+ * Returns the number of elements in this list.
+ *
+ * @return the number of elements in this list.
+ */
+ int size(){
+ return size;
+ }
+
+ /**
+ * is the list empty?
+ *
+ * @return true if there are no elements in the list
+ */
+ boolean isEmpty(){
+ return size==0;
+ }
+
+ /**
+ * Appends the specified element to the end of this list.
+ *
+ * @param o element to be appended to this list.
+ * @return <tt>true</tt> (as per the general contract of <tt>Collection.add</tt>).
+ */
+ boolean add(IndexItem item){
+ addBefore(item,root);
+ return true;
+ }
+
+ /**
+ * Removes all of the elements from this list.
+ */
+ void clear(){
+ root.next=root.prev=root;
+ size=0;
+ }
+
+ // Positional Access Operations
+ /**
+ * Returns the element at the specified position in this list.
+ *
+ * @param index index of element to return.
+ * @return the element at the specified position in this list.
+ *
+ * @throws IndexOutOfBoundsException if the specified index is is out of range (<tt>index < 0 || index >= size()</tt>).
+ */
+ IndexItem get(int index){
+ return entry(index);
+ }
+
+ /**
+ * Inserts the specified element at the specified position in this list. Shifts the element currently at that
+ * position (if any) and any subsequent elements to the right (adds one to their indices).
+ *
+ * @param index index at which the specified element is to be inserted.
+ * @param element element to be inserted.
+ *
+ * @throws IndexOutOfBoundsException if the specified index is out of range (<tt>index < 0 || index > size()</tt>).
+ */
+ void add(int index,IndexItem element){
+ addBefore(element,(index==size?root:entry(index)));
+ }
+
+ /**
+ * Removes the element at the specified position in this list. Shifts any subsequent elements to the left (subtracts
+ * one from their indices). Returns the element that was removed from the list.
+ *
+ * @param index the index of the element to removed.
+ * @return the element previously at the specified position.
+ *
+ * @throws IndexOutOfBoundsException if the specified index is out of range (<tt>index < 0 || index >= size()</tt>).
+ */
+ Object remove(int index){
+ IndexItem e=entry(index);
+ remove(e);
+ return e;
+ }
+
+ /**
+ * Return the indexed entry.
+ */
+ private IndexItem entry(int index){
+ if(index<0||index>=size)
+ throw new IndexOutOfBoundsException("Index: "+index+", Size: "+size);
+ IndexItem e=root;
+ if(index<size/2){
+ for(int i=0;i<=index;i++)
+ e=e.next;
+ }else{
+ for(int i=size;i>index;i--)
+ e=e.prev;
+ }
+ return e;
+ }
+
+ // Search Operations
+ /**
+ * Returns the index in this list of the first occurrence of the specified element, or -1 if the List does not
+ * contain this element. More formally, returns the lowest index i such that
+ * <tt>(o==null ? get(i)==null : o.equals(get(i)))</tt>, or -1 if there is no such index.
+ *
+ * @param o element to search for.
+ * @return the index in this list of the first occurrence of the specified element, or -1 if the list does not
+ * contain this element.
+ */
+ int indexOf(IndexItem o){
+ int index=0;
+ for(IndexItem e=root.next;e!=root;e=e.next){
+ if(o==e){
+ return index;
+ }
+ index++;
+ }
+ return -1;
+ }
+
+ /**
+ * Retrieve the next entry after this entry
+ *
+ * @param entry
+ * @return next entry
+ */
+ IndexItem getNextEntry(IndexItem entry){
+ return entry.next != root ? entry.next : null;
+ }
+
+ /**
+ * Retrive the prev entry after this entry
+ *
+ * @param entry
+ * @return prev entry
+ */
+ IndexItem getPrevEntry(IndexItem entry){
+ return entry.prev != root ? entry.prev : null;
+ }
+
+ /**
+ * Insert an Entry before this entry
+ *
+ * @param o the elment to insert
+ * @param e the Entry to insert the object before
+ *
+ */
+ void addBefore(IndexItem insert,IndexItem e){
+ insert.next=e;
+ insert.prev=e.prev;
+ insert.prev.next=insert;
+ insert.next.prev=insert;
+ size++;
+ }
+
+ void remove(IndexItem e){
+ if(e==root)
+ return;
+ e.prev.next=e.next;
+ e.next.prev=e.prev;
+ size--;
+ }
+
+ /**
+ *@return clone
+ */
+ public Object clone(){
+ IndexLinkedList clone=new IndexLinkedList(this.root);
+ for(IndexItem e=root.next;e!=root;e=e.next)
+ clone.add(e);
+ return clone;
+ }
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexLinkedList.java
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexManager.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexManager.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexManager.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexManager.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,121 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.LinkedList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+/**
+ * Optimized Store reader
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+final class IndexManager{
+ private static final Log log=LogFactory.getLog(IndexManager.class);
+ private File file;
+ private RandomAccessFile indexFile;
+ private StoreIndexReader reader;
+ private StoreIndexWriter writer;
+ private LinkedList freeList=new LinkedList();
+ private long length=0;
+
+ IndexManager(File ifile,String mode) throws IOException{
+ file=ifile;
+ indexFile=new RandomAccessFile(ifile,mode);
+ reader=new StoreIndexReader(indexFile);
+ writer=new StoreIndexWriter(indexFile);
+ long offset=0;
+ while((offset+IndexItem.INDEX_SIZE)<=indexFile.length()){
+ IndexItem index=reader.readItem(offset);
+ if(!index.isActive()){
+ index.reset();
+ freeList.add(index);
+ }
+ offset+=IndexItem.INDEX_SIZE;
+ }
+ length=offset;
+ }
+
+ synchronized boolean isEmpty() throws IOException{
+ return freeList.isEmpty()&&length==0;
+ }
+
+ synchronized IndexItem getIndex(long offset) throws IOException{
+ return reader.readItem(offset);
+ }
+
+ synchronized void freeIndex(IndexItem item) throws IOException{
+ item.reset();
+ item.setActive(false);
+ writer.storeItem(item);
+ freeList.add(item);
+ }
+
+ synchronized void updateIndex(IndexItem index) throws IOException{
+ writer.storeItem(index);
+ }
+
+ synchronized IndexItem createNewIndex() throws IOException{
+ IndexItem result=getNextFreeIndex();
+ if(result==null){
+ // allocate one
+ result=new IndexItem();
+ result.setOffset(length);
+ length+=IndexItem.INDEX_SIZE;
+ }
+ return result;
+ }
+
+ synchronized void close() throws IOException{
+ if(indexFile!=null){
+ indexFile.close();
+ indexFile=null;
+ }
+ }
+
+ synchronized void force() throws IOException{
+ if(indexFile!=null){
+ indexFile.getFD().sync();
+ }
+ }
+
+ synchronized boolean delete() throws IOException{
+ freeList.clear();
+ if(indexFile!=null){
+ indexFile.close();
+ indexFile=null;
+ }
+ return file.delete();
+ }
+
+ private IndexItem getNextFreeIndex() throws IOException{
+ IndexItem result=null;
+ if(!freeList.isEmpty()){
+ result=(IndexItem) freeList.removeLast();
+ result.reset();
+ }
+ return result;
+ }
+
+ long getLength(){
+ return length;
+ }
+
+ void setLength(long value){
+ this.length=value;
+ }
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,120 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.ObjectMarshaller;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+
+/**
+* A container of roots for other Containers
+*
+* @version $Revision: 1.2 $
+*/
+
+class IndexRootContainer {
+ private static final Log log=LogFactory.getLog(IndexRootContainer.class);
+ protected static final Marshaller rootMarshaller = new ObjectMarshaller();
+ protected IndexItem root;
+ protected IndexManager indexManager;
+ protected DataManager dataManager;
+ protected Map map = new ConcurrentHashMap();
+ protected LinkedList list = new LinkedList();
+
+
+ IndexRootContainer(IndexItem root,IndexManager im,DataManager dfm) throws IOException{
+ this.root=root;
+ this.indexManager=im;
+ this.dataManager=dfm;
+ long nextItem=root.getNextItem();
+ while(nextItem!=Item.POSITION_NOT_SET){
+ IndexItem item=indexManager.getIndex(nextItem);
+ DataItem data=item.getKeyDataItem();
+ Object key=dataManager.readItem(rootMarshaller,data);
+ map.put(key,item);
+ list.add(item);
+ nextItem=item.getNextItem();
+ dataManager.addInterestInFile(item.getKeyFile());
+ }
+ }
+
+ Set getKeys(){
+ return map.keySet();
+ }
+
+
+
+ IndexItem addRoot(Object key) throws IOException{
+ if (map.containsKey(key)){
+ removeRoot(key);
+ }
+
+ DataItem data = dataManager.storeItem(rootMarshaller, key);
+ IndexItem index = indexManager.createNewIndex();
+ index.setKeyData(data);
+ IndexItem newRoot = indexManager.createNewIndex();
+ indexManager.updateIndex(newRoot);
+ index.setValueOffset(newRoot.getOffset());
+
+ IndexItem last=list.isEmpty()?null:(IndexItem) list.getLast();
+ last=last==null?root:last;
+ long prev=last.getOffset();
+ index.setPreviousItem(prev);
+ indexManager.updateIndex(index);
+ last.setNextItem(index.getOffset());
+ indexManager.updateIndex(last);
+ map.put(key, index);
+ list.add(index);
+ return newRoot;
+ }
+
+ void removeRoot(Object key) throws IOException{
+ IndexItem item = (IndexItem) map.remove(key);
+ if (item != null){
+ dataManager.removeInterestInFile(item.getKeyFile());
+ IndexItem rootIndex = indexManager.getIndex(item.getValueOffset());
+ indexManager.freeIndex(rootIndex);
+ int index=list.indexOf(item);
+ IndexItem prev=index>0?(IndexItem) list.get(index-1):root;
+ prev=prev==null?root:prev;
+ IndexItem next=index<(list.size()-1)?(IndexItem) list.get(index+1):null;
+ if(next!=null){
+ prev.setNextItem(next.getOffset());
+ next.setPreviousItem(prev.getOffset());
+ indexManager.updateIndex(next);
+ }else{
+ prev.setNextItem(Item.POSITION_NOT_SET);
+ }
+ indexManager.updateIndex(prev);
+ list.remove(item);
+ }
+ }
+
+ IndexItem getRoot(Object key) throws IOException{
+ IndexItem index = (IndexItem) map.get(key);
+ if (index != null){
+ return indexManager.getIndex(index.getValueOffset());
+ }
+ throw new IOException("Cannot find root for key " + key);
+ }
+
+
+
+}
\ No newline at end of file
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
------------------------------------------------------------------------------
svn:executable = *
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/Item.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/Item.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/Item.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/Item.java Thu Apr 20 07:15:30 2006
@@ -13,104 +13,15 @@
*/
package org.apache.activemq.kaha.impl;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import org.apache.activemq.kaha.Marshaller;
/**
* A a wrapper for a data in the store
*
* @version $Revision: 1.2 $
*/
-public class Item{
+public interface Item{
static final long POSITION_NOT_SET=-1;
static final short MAGIC=31317;
static final int ACTIVE=22;
static final int FREE=33;
- static final int HEAD_SIZE=8; // magic + active + len
static final int LOCATION_SIZE=24;
- private long offset=POSITION_NOT_SET;
- private int size;
- private boolean active;
-
- Item(){}
-
- void writeHeader(DataOutput dataOut) throws IOException{
- dataOut.writeShort(MAGIC);
- dataOut.writeByte(active?ACTIVE:FREE);
- dataOut.writeInt(size);
- dataOut.writeByte(0);//padding
- }
-
- void readHeader(DataInput dataIn) throws IOException{
- int magic=dataIn.readShort();
- if(magic==MAGIC){
- active=(dataIn.readByte()==ACTIVE);
- size=dataIn.readInt();
- }else if (magic == 0){
- size = -999; //end of data
- }else{
- throw new BadMagicException("Unexpected Magic value: "+magic);
- }
- }
-
- void writePayload(Marshaller marshaller,Object object,DataOutputStream dataOut) throws IOException{
- marshaller.writePayload(object,dataOut);
- }
-
- Object readPayload(Marshaller marshaller,DataInputStream dataIn) throws IOException{
- return marshaller.readPayload(dataIn);
- }
-
- void readLocation(DataInput dataIn) throws IOException{}
-
- void writeLocation(DataOutput dataOut) throws IOException{}
-
- /**
- * @return Returns the size.
- */
- int getSize(){
- return size;
- }
-
- /**
- * @param size
- * The size to set.
- */
- void setSize(int size){
- this.size=size;
- }
-
- void setOffset(long pos){
- offset=pos;
- }
-
- long getOffset(){
- return offset;
- }
-
- /**
- * @return Returns the active.
- */
- boolean isActive(){
- return active;
- }
-
- /**
- * @param active
- * The active to set.
- */
- void setActive(boolean active){
- this.active=active;
- }
-
- /**
- * @return a pretty print
- */
- public String toString(){
- String result="offset = "+offset+" ,active = "+active+" , size = "+size;
- return result;
- }
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,205 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+/**
+ * Optimized writes to a RandomAcessFile
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.RuntimeStoreException;
+import org.apache.activemq.kaha.Store;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+/**
+ * Optimized Store writer
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+public class KahaStore implements Store{
+ DataManager rootData;
+ DataManager containersData;
+ IndexManager indexManager;
+ private IndexRootContainer mapsContainer;
+ private IndexRootContainer listsContainer;
+ private Map lists=new ConcurrentHashMap();
+ private Map maps=new ConcurrentHashMap();
+ private boolean closed=false;
+ private String name;
+ private String mode;
+ private boolean initialized;
+
+ public KahaStore(String name,String mode) throws IOException{
+ this.name=name;
+ this.mode=mode;
+ initialize();
+ }
+
+ public synchronized void close() throws IOException{
+ if(!closed){
+ closed=true;
+ if(initialized){
+ indexManager.close();
+ rootData.close();
+ containersData.close();
+ }
+ }
+ }
+
+ public synchronized void force() throws IOException{
+ if(initialized){
+ indexManager.force();
+ rootData.force();
+ containersData.force();
+ }
+ }
+
+ public synchronized void clear() throws IOException{
+ initialize();
+ for(Iterator i=maps.values().iterator();i.hasNext();){
+ BaseContainerImpl container=(BaseContainerImpl) i.next();
+ container.clear();
+ }
+ for(Iterator i=lists.values().iterator();i.hasNext();){
+ BaseContainerImpl container=(BaseContainerImpl) i.next();
+ container.clear();
+ }
+ lists.clear();
+ maps.clear();
+ }
+
+ public synchronized boolean delete() throws IOException{
+ initialize();
+ clear();
+ boolean result=indexManager.delete();
+ result&=rootData.delete();
+ result&=containersData.delete();
+ initialized=false;
+ return result;
+ }
+
+ public boolean doesMapContainerExist(Object id) throws IOException{
+ initialize();
+ return maps.containsKey(id);
+ }
+
+ public MapContainer getMapContainer(Object id) throws IOException{
+ initialize();
+ MapContainer result=(MapContainer) maps.get(id);
+ if(result==null){
+ IndexItem root=mapsContainer.addRoot(id);
+ result=new MapContainerImpl(id,root,indexManager,containersData);
+ maps.put(id,result);
+ }
+ return result;
+ }
+
+ public void deleteMapContainer(Object id) throws IOException{
+ initialize();
+ MapContainer container=(MapContainer) maps.remove(id);
+ if(container!=null){
+ container.clear();
+ mapsContainer.removeRoot(id);
+ }
+ }
+
+ public Set getMapContainerIds() throws IOException{
+ initialize();
+ return maps.keySet();
+ }
+
+ public boolean doesListContainerExist(Object id) throws IOException{
+ initialize();
+ return lists.containsKey(id);
+ }
+
+ public ListContainer getListContainer(Object id) throws IOException{
+ initialize();
+ ListContainer result=(ListContainer) lists.get(id);
+ if(result==null){
+ IndexItem root=listsContainer.addRoot(id);
+ result=new ListContainerImpl(id,root,indexManager,containersData);
+ lists.put(id,result);
+ }
+ return result;
+ }
+
+ public void deleteListContainer(Object id) throws IOException{
+ initialize();
+ ListContainer container=(ListContainer) lists.remove(id);
+ if(container!=null){
+ container.clear();
+ listsContainer.removeRoot(id);
+ }
+ }
+
+ public Set getListContainerIds() throws IOException{
+ initialize();
+ return lists.keySet();
+ }
+
+ protected void checkClosed(){
+ if(closed){
+ throw new RuntimeStoreException("The store is closed");
+ }
+ }
+
+ protected synchronized void initialize() throws IOException{
+ if(!initialized){
+ initialized=true;
+ File dir=new File(name);
+ dir.mkdirs();
+ File ifile=new File(dir,"kaha.idx");
+ indexManager=new IndexManager(ifile,mode);
+ rootData=new DataManager(dir,"roots-data");
+ containersData=new DataManager(dir,"containers-data");
+ IndexItem mapRoot=new IndexItem();
+ IndexItem listRoot=new IndexItem();
+ if(indexManager.isEmpty()){
+ mapRoot.setOffset(0);
+ indexManager.updateIndex(mapRoot);
+ listRoot.setOffset(IndexItem.INDEX_SIZE);
+ indexManager.updateIndex(listRoot);
+ indexManager.setLength(IndexItem.INDEX_SIZE*2);
+ }else{
+ mapRoot=indexManager.getIndex(0);
+ listRoot=indexManager.getIndex(IndexItem.INDEX_SIZE);
+ }
+ mapsContainer=new IndexRootContainer(mapRoot,indexManager,rootData);
+ listsContainer=new IndexRootContainer(listRoot,indexManager,rootData);
+ rootData.consolidateDataFiles();
+ for(Iterator i=mapsContainer.getKeys().iterator();i.hasNext();){
+ Object key=i.next();
+ IndexItem root=mapsContainer.getRoot(key);
+ BaseContainerImpl container=new MapContainerImpl(key,root,indexManager,containersData);
+ container.expressDataInterest();
+ maps.put(key,container);
+ }
+ for(Iterator i=listsContainer.getKeys().iterator();i.hasNext();){
+ Object key=i.next();
+ IndexItem root=listsContainer.getRoot(key);
+ BaseContainerImpl container=new ListContainerImpl(key,root,indexManager,containersData);
+ container.expressDataInterest();
+ lists.put(key,container);
+ }
+ containersData.consolidateDataFiles();
+ }
+ }
+}