You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/04/20 16:15:38 UTC
svn commit: r395597 [2/3] - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/kaha/ main/java/org/apache/activemq/kaha/impl/
main/java/org/apache/activemq/store/kahadaptor/
test/java/org/apache/activemq/kaha/ test/java/org/a...
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java Thu Apr 20 07:15:30 2006
@@ -17,7 +17,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import org.apache.activemq.kaha.ListContainer;
@@ -31,20 +30,13 @@
*
* @version $Revision: 1.2 $
*/
-public class ListContainerImpl implements ListContainer{
- private static final Log log=LogFactory.getLog(MapContainerImpl.class);
- protected StoreImpl store;
- protected LocatableItem root;
- protected Object id;
- protected LinkedList list=new LinkedList();
- protected boolean loaded=false;
+final class ListContainerImpl extends BaseContainerImpl implements ListContainer{
+ private static final Log log=LogFactory.getLog(ListContainerImpl.class);
protected Marshaller marshaller=new ObjectMarshaller();
- protected boolean closed = false;
- protected ListContainerImpl(Object id,StoreImpl rfs,LocatableItem root) throws IOException{
- this.id=id;
- this.store=rfs;
- this.root=root;
+ protected ListContainerImpl(Object id,IndexItem root,IndexManager indexManager,DataManager dataManager)
+ throws IOException{
+ super(id,root,indexManager,dataManager);
}
/*
@@ -55,21 +47,20 @@
public void load(){
checkClosed();
if(!loaded){
- loaded=true;
- long start=root.getNextItem();
- if(start!=Item.POSITION_NOT_SET){
- try{
- long nextItem=start;
- while(nextItem!=Item.POSITION_NOT_SET){
- LocatableItem item=new LocatableItem();
- item.setOffset(nextItem);
- store.readLocation(item);
- list.add(item);
- nextItem=item.getNextItem();
+ synchronized(mutex){
+ if(!loaded){
+ loaded=true;
+ try{
+ long nextItem=root.getNextItem();
+ while(nextItem!=Item.POSITION_NOT_SET){
+ IndexItem item=indexManager.getIndex(nextItem);
+ list.add(item);
+ nextItem=item.getNextItem();
+ }
+ }catch(IOException e){
+ log.error("Failed to load container "+getId(),e);
+ throw new RuntimeStoreException(e);
}
- }catch(IOException e){
- log.error("Failed to load container "+getId(),e);
- throw new RuntimeStoreException(e);
}
}
}
@@ -83,25 +74,10 @@
public void unload(){
checkClosed();
if(loaded){
- loaded = false;
+ loaded=false;
list.clear();
}
}
-
- public void close(){
- unload();
- closed = true;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.kaha.ListContainer#isLoaded()
- */
- public boolean isLoaded(){
- checkClosed();
- return loaded;
- }
/*
* (non-Javadoc)
@@ -113,33 +89,23 @@
this.marshaller=marshaller;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.kaha.ListContainer#getId()
- */
- public Object getId(){
- checkClosed();
- return id;
- }
-
public boolean equals(Object obj){
- checkLoaded();
- checkClosed();
- boolean result = false;
- if (obj != null && obj instanceof List){
- List other = (List) obj;
- synchronized(list){
- result = other.size() == size();
- if (result){
- for (int i =0; i < list.size(); i++){
- Object o1 = other.get(i);
- Object o2 = get(i);
- result = o1 == o2 || (o1 != null && o2 != null && o1.equals(o2));
- if (!result) break;
+ load();
+ boolean result=false;
+ if(obj!=null&&obj instanceof List){
+ List other=(List) obj;
+ synchronized(mutex){
+ result=other.size()==size();
+ if(result){
+ for(int i=0;i<list.size();i++){
+ Object o1=other.get(i);
+ Object o2=get(i);
+ result=o1==o2||(o1!=null&&o2!=null&&o1.equals(o2));
+ if(!result)
+ break;
+ }
}
}
- }
}
return result;
}
@@ -150,8 +116,7 @@
* @see org.apache.activemq.kaha.ListContainer#size()
*/
public int size(){
- checkClosed();
- checkLoaded();
+ load();
return list.size();
}
@@ -161,10 +126,9 @@
* @see org.apache.activemq.kaha.ListContainer#addFirst(java.lang.Object)
*/
public void addFirst(Object o){
- checkClosed();
- checkLoaded();
- LocatableItem item=writeFirst(o);
- synchronized(list){
+ load();
+ IndexItem item=writeFirst(o);
+ synchronized(mutex){
list.addFirst(item);
}
}
@@ -175,10 +139,9 @@
* @see org.apache.activemq.kaha.ListContainer#addLast(java.lang.Object)
*/
public void addLast(Object o){
- checkClosed();
- checkLoaded();
- LocatableItem item=writeLast(o);
- synchronized(list){
+ load();
+ IndexItem item=writeLast(o);
+ synchronized(mutex){
list.addLast(item);
}
}
@@ -189,16 +152,15 @@
* @see org.apache.activemq.kaha.ListContainer#removeFirst()
*/
public Object removeFirst(){
- checkClosed();
- checkLoaded();
+ load();
Object result=null;
- synchronized(list){
- LocatableItem item=(LocatableItem) list.getFirst();
+ synchronized(mutex){
+ IndexItem item=(IndexItem) list.getFirst();
if(item!=null){
result=getValue(item);
int index=list.indexOf(item);
- LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
- LocatableItem next=index<(list.size()-1)?(LocatableItem) list.get(index+1):null;
+ IndexItem prev=index>0?(IndexItem) list.get(index-1):root;
+ IndexItem next=index<(list.size()-1)?(IndexItem) list.get(index+1):null;
list.removeFirst();
delete(item,prev,next);
item=null;
@@ -213,16 +175,15 @@
* @see org.apache.activemq.kaha.ListContainer#removeLast()
*/
public Object removeLast(){
- checkClosed();
- checkLoaded();
+ load();
Object result=null;
- synchronized(list){
- LocatableItem item=(LocatableItem) list.getLast();
+ synchronized(mutex){
+ IndexItem item=(IndexItem) list.getLast();
if(item!=null){
result=getValue(item);
int index=list.indexOf(item);
- LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
- LocatableItem next=null;
+ IndexItem prev=index>0?(IndexItem) list.get(index-1):root;
+ IndexItem next=null;
list.removeLast();
delete(item,prev,next);
item=null;
@@ -237,8 +198,7 @@
* @see java.util.List#isEmpty()
*/
public boolean isEmpty(){
- checkClosed();
- checkLoaded();
+ load();
return list.isEmpty();
}
@@ -248,18 +208,18 @@
* @see java.util.List#contains(java.lang.Object)
*/
public boolean contains(Object o){
- checkClosed();
- checkLoaded();
+ load();
boolean result=false;
if(o!=null){
- synchronized(list){
- for(Iterator i=list.iterator();i.hasNext();){
- LocatableItem item=(LocatableItem) i.next();
- Object value=getValue(item);
+ synchronized(mutex){
+ IndexItem next=list.getFirst();
+ while(next!=null){
+ Object value=getValue(next);
if(value!=null&&value.equals(o)){
result=true;
break;
}
+ next=list.getNextEntry(next);
}
}
}
@@ -272,8 +232,7 @@
* @see java.util.List#iterator()
*/
public Iterator iterator(){
- checkClosed();
- checkLoaded();
+ load();
return listIterator();
}
@@ -283,14 +242,14 @@
* @see java.util.List#toArray()
*/
public Object[] toArray(){
- checkClosed();
- checkLoaded();
+ load();
List tmp=new ArrayList(list.size());
- synchronized(list){
- for(Iterator i=list.iterator();i.hasNext();){
- LocatableItem item=(LocatableItem) i.next();
- Object value=getValue(item);
+ synchronized(mutex){
+ IndexItem next=list.getFirst();
+ while(next!=null){
+ Object value=getValue(next);
tmp.add(value);
+ next=list.getNextEntry(next);
}
}
return tmp.toArray();
@@ -302,14 +261,14 @@
* @see java.util.List#toArray(T[])
*/
public Object[] toArray(Object[] a){
- checkClosed();
- checkLoaded();
+ load();
List tmp=new ArrayList(list.size());
- synchronized(list){
- for(Iterator i=list.iterator();i.hasNext();){
- LocatableItem item=(LocatableItem) i.next();
- Object value=getValue(item);
+ synchronized(mutex){
+ IndexItem next=list.getFirst();
+ while(next!=null){
+ Object value=getValue(next);
tmp.add(value);
+ next=list.getNextEntry(next);
}
}
return tmp.toArray(a);
@@ -321,8 +280,7 @@
* @see java.util.List#add(E)
*/
public boolean add(Object o){
- checkClosed();
- checkLoaded();
+ load();
addLast(o);
return true;
}
@@ -333,28 +291,28 @@
* @see java.util.List#remove(java.lang.Object)
*/
public boolean remove(Object o){
- checkClosed();
- checkLoaded();
+ load();
boolean result=false;
- synchronized(list){
- for(Iterator i=list.iterator();i.hasNext();){
- LocatableItem item=(LocatableItem) i.next();
- Object value = getValue(item);
- if (value != null && value.equals(o)){
- remove(item);
+ synchronized(mutex){
+ IndexItem next=list.getFirst();
+ while(next!=null){
+ Object value=getValue(next);
+ if(value!=null&&value.equals(o)){
+ remove(next);
+ result=true;
break;
}
+ next=list.getNextEntry(next);
}
-
}
return result;
}
- protected void remove(LocatableItem item){
- synchronized(list){
+ protected void remove(IndexItem item){
+ synchronized(mutex){
int index=list.indexOf(item);
- LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
- LocatableItem next=index<(list.size()-1)?(LocatableItem) list.get(index+1):null;
+ IndexItem prev=index>0?(IndexItem) list.get(index-1):root;
+ IndexItem next=index<(list.size()-1)?(IndexItem) list.get(index+1):null;
list.remove(index);
delete(item,prev,next);
}
@@ -366,10 +324,9 @@
* @see java.util.List#containsAll(java.util.Collection)
*/
public boolean containsAll(Collection c){
- checkClosed();
- checkLoaded();
+ load();
boolean result=false;
- synchronized(list){
+ synchronized(mutex){
for(Iterator i=c.iterator();i.hasNext();){
Object obj=i.next();
if(!(result=contains(obj))){
@@ -387,14 +344,12 @@
* @see java.util.List#addAll(java.util.Collection)
*/
public boolean addAll(Collection c){
- checkClosed();
- checkLoaded();
+ load();
boolean result=false;
for(Iterator i=c.iterator();i.hasNext();){
add(i.next());
- result=true;
}
- return result;
+ return true;
}
/*
@@ -403,8 +358,7 @@
* @see java.util.List#addAll(int, java.util.Collection)
*/
public boolean addAll(int index,Collection c){
- checkClosed();
- checkLoaded();
+ load();
boolean result=false;
ListIterator e1=listIterator(index);
Iterator e2=c.iterator();
@@ -421,8 +375,7 @@
* @see java.util.List#removeAll(java.util.Collection)
*/
public boolean removeAll(Collection c){
- checkClosed();
- checkLoaded();
+ load();
boolean result=true;
for(Iterator i=c.iterator();i.hasNext();){
Object obj=i.next();
@@ -437,19 +390,18 @@
* @see java.util.List#retainAll(java.util.Collection)
*/
public boolean retainAll(Collection c){
- checkClosed();
- checkLoaded();
+ load();
List tmpList=new ArrayList();
- synchronized(list){
- for(Iterator i = list.iterator(); i.hasNext();){
- LocatableItem item = (LocatableItem) i.next();
- Object o = getValue(item);
-
- if(!c.contains(o)){
- tmpList.add(o);
+ synchronized(mutex){
+ IndexItem next=list.getFirst();
+ while(next!=null){
+ Object o=getValue(next);
+ if(!c.contains(o)){
+ tmpList.add(o);
+ }
+ next=list.getNextEntry(next);
}
}
- }
for(Iterator i=tmpList.iterator();i.hasNext();){
remove(i.next());
}
@@ -463,30 +415,9 @@
*/
public void clear(){
checkClosed();
- synchronized(list){
+ synchronized(mutex){
list.clear();
- try {
- long start=root.getNextItem();
- if(start!=Item.POSITION_NOT_SET){
- long nextItem=start;
- while(nextItem!=Item.POSITION_NOT_SET){
- LocatableItem item=new LocatableItem();
- item.setOffset(nextItem);
- list.add(item);
- nextItem=item.getNextItem();
- }
- }
- root.setNextItem(Item.POSITION_NOT_SET);
- store.updateItem(root);
- for(int i=0;i<list.size();i++){
- LocatableItem item=(LocatableItem) list.get(i);
- store.removeItem(item);
- }
- list.clear();
- }catch(IOException e){
- log.error("Failed to clear ListContainer "+getId(),e);
- throw new RuntimeStoreException(e);
- }
+ doClear();
}
}
@@ -496,10 +427,9 @@
* @see java.util.List#get(int)
*/
public Object get(int index){
- checkClosed();
- checkLoaded();
+ load();
Object result=null;
- LocatableItem item=(LocatableItem) list.get(index);
+ IndexItem item=(IndexItem) list.get(index);
if(item!=null){
result=getValue(item);
}
@@ -512,13 +442,12 @@
* @see java.util.List#set(int, E)
*/
public Object set(int index,Object element){
- checkClosed();
- checkLoaded();
+ load();
Object result=null;
- synchronized(list){
- LocatableItem replace=list.isEmpty()?null:(LocatableItem) list.get(index);
- LocatableItem prev=(list.isEmpty() || (index-1) < 0)?null:(LocatableItem) list.get(index-1);
- LocatableItem next=(list.isEmpty() || (index+1) >= size())?null:(LocatableItem) list.get(index+1);
+ synchronized(mutex){
+ IndexItem replace=list.isEmpty()?null:(IndexItem) list.get(index);
+ IndexItem prev=(list.isEmpty()||(index-1)<0)?null:(IndexItem) list.get(index-1);
+ IndexItem next=(list.isEmpty()||(index+1)>=size())?null:(IndexItem) list.get(index+1);
result=getValue(replace);
list.remove(index);
delete(replace,prev,next);
@@ -526,12 +455,12 @@
}
return result;
}
-
- protected LocatableItem internalSet(int index,Object element){
- synchronized(list){
- LocatableItem replace=list.isEmpty()?null:(LocatableItem) list.get(index);
- LocatableItem prev=(list.isEmpty() || (index-1) < 0)?null:(LocatableItem) list.get(index-1);
- LocatableItem next=(list.isEmpty() || (index+1) >= size())?null:(LocatableItem) list.get(index+1);
+
+ protected IndexItem internalSet(int index,Object element){
+ synchronized(mutex){
+ IndexItem replace=list.isEmpty()?null:(IndexItem) list.get(index);
+ IndexItem prev=(list.isEmpty()||(index-1)<0)?null:(IndexItem) list.get(index-1);
+ IndexItem next=(list.isEmpty()||(index+1)>=size())?null:(IndexItem) list.get(index+1);
list.remove(index);
delete(replace,prev,next);
return internalAdd(index,element);
@@ -544,26 +473,25 @@
* @see java.util.List#add(int, E)
*/
public void add(int index,Object element){
- checkClosed();
- checkLoaded();
- synchronized(list){
- LocatableItem item=insert(index,element);
+ load();
+ synchronized(mutex){
+ IndexItem item=insert(index,element);
list.add(index,item);
}
}
-
- protected LocatableItem internalAdd(int index,Object element){
- synchronized(list){
- LocatableItem item=insert(index,element);
+
+ protected IndexItem internalAdd(int index,Object element){
+ synchronized(mutex){
+ IndexItem item=insert(index,element);
list.add(index,item);
return item;
}
}
-
- protected LocatableItem internalGet(int index){
- synchronized(list){
- if (index >= 0 && index < list.size()){
- return (LocatableItem) list.get(index);
+
+ protected IndexItem internalGet(int index){
+ synchronized(mutex){
+ if(index>=0&&index<list.size()){
+ return list.get(index);
}
}
return null;
@@ -575,17 +503,17 @@
* @see org.apache.activemq.kaha.ListContainer#doRemove(int)
*/
public boolean doRemove(int index){
- checkClosed();
- checkLoaded();
+ load();
boolean result=false;
- synchronized(list){
- LocatableItem item=(LocatableItem) list.get(index);
+ synchronized(mutex){
+ IndexItem item=list.get(index);
if(item!=null){
- LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
- LocatableItem next=index<(list.size()-1)?(LocatableItem) list.get(index+1):null;
+ result=true;
+ IndexItem prev=list.getPrevEntry(item);
+ prev=prev!=null?prev:root;
+ IndexItem next=list.getNextEntry(prev);
list.remove(index);
delete(item,prev,next);
- result=true;
}
}
return result;
@@ -597,15 +525,15 @@
* @see java.util.List#remove(int)
*/
public Object remove(int index){
- checkClosed();
- checkLoaded();
+ load();
Object result=null;
- synchronized(list){
- LocatableItem item=(LocatableItem) list.get(index);
+ synchronized(mutex){
+ IndexItem item=list.get(index);
if(item!=null){
result=getValue(item);
- LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
- LocatableItem next=index<(list.size()-1)?(LocatableItem) list.get(index+1):null;
+ IndexItem prev=list.getPrevEntry(item);
+ prev=prev!=null?prev:root;
+ IndexItem next=list.getNextEntry(prev);
list.remove(index);
delete(item,prev,next);
}
@@ -619,19 +547,20 @@
* @see java.util.List#indexOf(java.lang.Object)
*/
public int indexOf(Object o){
- checkClosed();
- checkLoaded();
+ load();
int result=-1;
if(o!=null){
- synchronized(list){
+ synchronized(mutex){
int count=0;
- for(Iterator i=list.iterator();i.hasNext();count++){
- LocatableItem item=(LocatableItem) i.next();
- Object value=getValue(item);
+ IndexItem next=list.getFirst();
+ while(next!=null){
+ Object value=getValue(next);
if(value!=null&&value.equals(o)){
result=count;
break;
}
+ count++;
+ next=list.getNextEntry(next);
}
}
}
@@ -644,19 +573,20 @@
* @see java.util.List#lastIndexOf(java.lang.Object)
*/
public int lastIndexOf(Object o){
- checkClosed();
- checkLoaded();
+ load();
int result=-1;
if(o!=null){
- synchronized(list){
+ synchronized(mutex){
int count=list.size()-1;
- for(ListIterator i=list.listIterator();i.hasPrevious();count--){
- LocatableItem item=(LocatableItem) i.previous();
- Object value=getValue(item);
+ IndexItem next=list.getLast();
+ while(next!=null){
+ Object value=getValue(next);
if(value!=null&&value.equals(o)){
result=count;
break;
}
+ count--;
+ next=list.getPrevEntry(next);
}
}
}
@@ -669,10 +599,8 @@
* @see java.util.List#listIterator()
*/
public ListIterator listIterator(){
- checkClosed();
- checkLoaded();
- ListIterator iter = ((List) list.clone()).listIterator();
- return new ContainerListIterator(this,iter);
+ load();
+ return new ContainerListIterator(this,list,list.getRoot());
}
/*
@@ -681,11 +609,15 @@
* @see java.util.List#listIterator(int)
*/
public ListIterator listIterator(int index){
- checkClosed();
- checkLoaded();
- List result = (List) list.clone();
- ListIterator iter = result.listIterator(index);
- return new ContainerListIterator(this,iter);
+ load();
+ IndexItem start=list.get(index);
+ if(start!=null){
+ start=list.getPrevEntry(start);
+ }
+ if(start==null){
+ start=root;
+ }
+ return new ContainerListIterator(this,list,start);
}
/*
@@ -694,129 +626,119 @@
* @see java.util.List#subList(int, int)
*/
public List subList(int fromIndex,int toIndex){
- checkClosed();
- checkLoaded();
- List tmp = list.subList(fromIndex, toIndex);
- LinkedList result = new LinkedList();
- for (Iterator i = tmp.iterator(); i.hasNext();){
- LocatableItem item = (LocatableItem) i.next();
- result.add(getValue(item));
+ load();
+ List result=new ArrayList();
+ int count=fromIndex;
+ IndexItem next=list.get(fromIndex);
+ while(next!=null&&count++<toIndex){
+ result.add(getValue(next));
+ next=list.getNextEntry(next);
}
return result;
}
- protected LocatableItem writeLast(Object value){
- long pos=Item.POSITION_NOT_SET;
- LocatableItem item=null;
+ protected IndexItem writeLast(Object value){
+ IndexItem index=null;
try{
- LocatableItem last=list.isEmpty()?null:(LocatableItem) list.getLast();
- last=last==null?root:last;
- long prev=last.getOffset();
- long next=Item.POSITION_NOT_SET;
- item=new LocatableItem(prev,next,pos);
- next=store.storeItem(marshaller,value,item);
- if(last!=null){
- last.setNextItem(next);
- store.updateItem(last);
+ if(value!=null){
+ DataItem data=dataManager.storeItem(marshaller,value);
+ index=indexManager.createNewIndex();
+ index.setValueData(data);
+ IndexItem prev=list.getLast();
+ prev=prev!=null?prev:root;
+ IndexItem next=list.getNextEntry(prev);
+ prev.setNextItem(index.getOffset());
+ index.setPreviousItem(prev.getOffset());
+ indexManager.updateIndex(prev);
+ if(next!=null){
+ next.setPreviousItem(index.getOffset());
+ index.setNextItem(next.getOffset());
+ indexManager.updateIndex(next);
+ }
+ indexManager.updateIndex(index);
}
}catch(IOException e){
log.error("Failed to write "+value,e);
throw new RuntimeStoreException(e);
}
- return item;
+ return index;
}
- protected LocatableItem writeFirst(Object value){
- long pos=Item.POSITION_NOT_SET;
- LocatableItem item=null;
+ protected IndexItem writeFirst(Object value){
+ IndexItem index=null;
try{
- LocatableItem next=list.isEmpty()?null:(LocatableItem) list.getFirst();
- LocatableItem last=root;
- long prevPos=last.getOffset();
- long nextPos=next!=null?next.getOffset():Item.POSITION_NOT_SET;
- item=new LocatableItem(prevPos,nextPos,pos);
- nextPos=store.storeItem(marshaller,value,item);
- if(last!=null){
- last.setNextItem(nextPos);
- store.updateItem(last);
- }
- if(next!=null){
- next.setPreviousItem(nextPos);
- store.updateItem(next);
+ if(value!=null){
+ DataItem data=dataManager.storeItem(marshaller,value);
+ index=indexManager.createNewIndex();
+ index.setValueData(data);
+ IndexItem prev=root;
+ IndexItem next=list.getNextEntry(prev);
+ prev.setNextItem(index.getOffset());
+ index.setPreviousItem(prev.getOffset());
+ indexManager.updateIndex(prev);
+ if(next!=null){
+ next.setPreviousItem(index.getOffset());
+ index.setNextItem(next.getOffset());
+ indexManager.updateIndex(next);
+ }
+ indexManager.updateIndex(index);
}
}catch(IOException e){
log.error("Failed to write "+value,e);
throw new RuntimeStoreException(e);
}
- return item;
+ return index;
}
- protected LocatableItem insert(int insertPos,Object value){
+ protected IndexItem insert(int insertPos,Object value){
long pos=Item.POSITION_NOT_SET;
- LocatableItem item=null;
+ IndexItem index=null;
try{
- int lastPos=insertPos-1;
- LocatableItem prev=(list.isEmpty() || (insertPos-1) < 0)?null:(LocatableItem) list.get(lastPos);
- LocatableItem next=(list.isEmpty() || (insertPos+1) >= size())?null:(LocatableItem) list.get(insertPos+1);
- prev=prev==null?root:prev;
- long prevPos=prev.getOffset();
- long nextPos=next!=null?next.getOffset():Item.POSITION_NOT_SET;
- item=new LocatableItem(prevPos,nextPos,pos);
- nextPos=store.storeItem(marshaller,value,item);
- if(prev!=null){
- prev.setNextItem(nextPos);
- store.updateItem(prev);
- }
- if(next!=null){
- next.setPreviousItem(nextPos);
- store.updateItem(next);
+ if(value!=null){
+ DataItem data=dataManager.storeItem(marshaller,value);
+ index=indexManager.createNewIndex();
+ index.setValueData(data);
+ IndexItem prev=null;
+ IndexItem next=null;
+ if(insertPos<=0){
+ prev=root;
+ next=list.getNextEntry(root);
+ }else if(insertPos>=list.size()){
+ prev=list.getLast();
+ next=null;
+ }else{
+ prev=list.get(insertPos);
+ prev=prev!=null?prev:root;
+ next=list.getNextEntry(prev);
+ }
+ prev.setNextItem(index.getOffset());
+ index.setPreviousItem(prev.getOffset());
+ indexManager.updateIndex(prev);
+ if(next!=null){
+ next.setPreviousItem(index.getOffset());
+ index.setNextItem(next.getOffset());
+ indexManager.updateIndex(next);
+ }
+ indexManager.updateIndex(index);
}
}catch(IOException e){
log.error("Failed to insert "+value,e);
throw new RuntimeStoreException(e);
}
- return item;
+ return index;
}
- protected Object getValue(LocatableItem item){
+ protected Object getValue(IndexItem item){
Object result=null;
if(item!=null){
try{
- result=store.readItem(marshaller,item);
+ DataItem data=item.getValueDataItem();
+ result=dataManager.readItem(marshaller,data);
}catch(IOException e){
log.error("Failed to get value for "+item,e);
throw new RuntimeStoreException(e);
}
}
return result;
- }
-
- protected void delete(LocatableItem item,LocatableItem prev,LocatableItem next){
- try{
- prev=prev==null?root:prev;
- if(next!=null){
- prev.setNextItem(next.getOffset());
- next.setPreviousItem(prev.getOffset());
- store.updateItem(next);
- }else{
- prev.setNextItem(Item.POSITION_NOT_SET);
- }
- store.updateItem(prev);
- store.removeItem(item);
- }catch(IOException e){
- log.error("Failed to delete "+item,e);
- throw new RuntimeStoreException(e);
- }
- }
-
- protected final void checkClosed(){
- if (closed){
- throw new RuntimeStoreException("The store is closed");
- }
- }
- protected final void checkLoaded(){
- if (!loaded){
- throw new RuntimeStoreException("The container is not loaded");
- }
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java Thu Apr 20 07:15:30 2006
@@ -32,24 +32,15 @@
*
* @version $Revision: 1.2 $
*/
-public class MapContainerImpl implements MapContainer{
+final class MapContainerImpl extends BaseContainerImpl implements MapContainer{
private static final Log log=LogFactory.getLog(MapContainerImpl.class);
- protected StoreImpl store;
- protected LocatableItem root;
- protected Object id;
protected Map map=new HashMap();
protected Map valueToKeyMap=new HashMap();
- protected LinkedList list=new LinkedList();
- protected boolean loaded=false;
protected Marshaller keyMarshaller=new ObjectMarshaller();
protected Marshaller valueMarshaller=new ObjectMarshaller();
- protected final Object mutex=new Object();
- protected boolean closed=false;
- protected MapContainerImpl(Object id,StoreImpl si,LocatableItem root) throws IOException{
- this.id=id;
- this.store=si;
- this.root=root;
+ protected MapContainerImpl(Object id,IndexItem root,IndexManager indexManager,DataManager dataManager){
+ super(id,root,indexManager,dataManager);
}
/*
@@ -60,25 +51,24 @@
public void load(){
checkClosed();
if(!loaded){
- loaded=true;
synchronized(mutex){
- try{
- long start=root.getNextItem();
- if(start!=Item.POSITION_NOT_SET){
- long nextItem=start;
+ if(!loaded){
+ loaded=true;
+ try{
+ long nextItem=root.getNextItem();
while(nextItem!=Item.POSITION_NOT_SET){
- LocatableItem item=new LocatableItem();
- item.setOffset(nextItem);
- Object key=store.readItem(keyMarshaller,item);
+ IndexItem item=indexManager.getIndex(nextItem);
+ DataItem data=item.getKeyDataItem();
+ Object key=dataManager.readItem(keyMarshaller,data);
map.put(key,item);
valueToKeyMap.put(item,key);
list.add(item);
nextItem=item.getNextItem();
}
+ }catch(IOException e){
+ log.error("Failed to load container "+getId(),e);
+ throw new RuntimeStoreException(e);
}
- }catch(IOException e){
- log.error("Failed to load container "+getId(),e);
- throw new RuntimeStoreException(e);
}
}
}
@@ -101,11 +91,6 @@
}
}
- public void close(){
- unload();
- closed=true;
- }
-
public void setKeyMarshaller(Marshaller keyMarshaller){
checkClosed();
this.keyMarshaller=keyMarshaller;
@@ -119,31 +104,10 @@
/*
* (non-Javadoc)
*
- * @see org.apache.activemq.kaha.MapContainer#isLoaded()
- */
- public boolean isLoaded(){
- checkClosed();
- return loaded;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.activemq.kaha.MapContainer#getId()
- */
- public Object getId(){
- checkClosed();
- return id;
- }
-
- /*
- * (non-Javadoc)
- *
* @see org.apache.activemq.kaha.MapContainer#size()
*/
public int size(){
- checkClosed();
- checkLoaded();
+ load();
return map.size();
}
@@ -153,8 +117,7 @@
* @see org.apache.activemq.kaha.MapContainer#isEmpty()
*/
public boolean isEmpty(){
- checkClosed();
- checkLoaded();
+ load();
return map.isEmpty();
}
@@ -164,8 +127,7 @@
* @see org.apache.activemq.kaha.MapContainer#containsKey(java.lang.Object)
*/
public boolean containsKey(Object key){
- checkClosed();
- checkLoaded();
+ load();
synchronized(mutex){
return map.containsKey(key);
}
@@ -177,12 +139,11 @@
* @see org.apache.activemq.kaha.MapContainer#get(java.lang.Object)
*/
public Object get(Object key){
- checkClosed();
- checkLoaded();
+ load();
Object result=null;
- LocatableItem item=null;
+ IndexItem item=null;
synchronized(mutex){
- item=(LocatableItem) map.get(key);
+ item=(IndexItem) map.get(key);
}
if(item!=null){
result=getValue(item);
@@ -196,18 +157,18 @@
* @see org.apache.activemq.kaha.MapContainer#containsValue(java.lang.Object)
*/
public boolean containsValue(Object o){
- checkClosed();
- checkLoaded();
+ load();
boolean result=false;
if(o!=null){
synchronized(list){
- for(Iterator i=list.iterator();i.hasNext();){
- LocatableItem item=(LocatableItem) i.next();
+ IndexItem item=list.getFirst();
+ while(item!=null){
Object value=getValue(item);
if(value!=null&&value.equals(o)){
result=true;
break;
}
+ item=list.getNextEntry(item);
}
}
}
@@ -220,8 +181,7 @@
* @see org.apache.activemq.kaha.MapContainer#putAll(java.util.Map)
*/
public void putAll(Map t){
- checkClosed();
- checkLoaded();
+ load();
if(t!=null){
synchronized(mutex){
for(Iterator i=t.entrySet().iterator();i.hasNext();){
@@ -238,8 +198,7 @@
* @see org.apache.activemq.kaha.MapContainer#keySet()
*/
public Set keySet(){
- checkClosed();
- checkLoaded();
+ load();
return new ContainerKeySet(this);
}
@@ -249,8 +208,7 @@
* @see org.apache.activemq.kaha.MapContainer#values()
*/
public Collection values(){
- checkClosed();
- checkLoaded();
+ load();
return new ContainerValueCollection(this);
}
@@ -260,8 +218,7 @@
* @see org.apache.activemq.kaha.MapContainer#entrySet()
*/
public Set entrySet(){
- checkClosed();
- checkLoaded();
+ load();
return new ContainerEntrySet(this);
}
@@ -271,14 +228,13 @@
* @see org.apache.activemq.kaha.MapContainer#put(java.lang.Object, java.lang.Object)
*/
public Object put(Object key,Object value){
- checkClosed();
- checkLoaded();
+ load();
Object result=null;
synchronized(mutex){
if(map.containsKey(key)){
result=remove(key);
}
- LocatableItem item=write(key,value);
+ IndexItem item=write(key,value);
map.put(key,item);
valueToKeyMap.put(item,key);
list.add(item);
@@ -292,36 +248,31 @@
* @see org.apache.activemq.kaha.MapContainer#remove(java.lang.Object)
*/
public Object remove(Object key){
- checkClosed();
- checkLoaded();
+ load();
Object result=null;
synchronized(mutex){
- LocatableItem item=(LocatableItem) map.get(key);
+ IndexItem item=(IndexItem) map.get(key);
if(item!=null){
map.remove(key);
valueToKeyMap.remove(item);
result=getValue(item);
- int index=list.indexOf(item);
- LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
- LocatableItem next=index<(list.size()-1)?(LocatableItem) list.get(index+1):null;
- list.remove(index);
- {
- delete(item,prev,next);
- }
- item=null;
+ IndexItem prev=list.getPrevEntry(item);
+ prev=prev!=null?prev:root;
+ IndexItem next=list.getNextEntry(item);
+ list.remove(item);
+ delete(item,prev,next);
}
}
return result;
}
public boolean removeValue(Object o){
- checkClosed();
- checkLoaded();
+ load();
boolean result=false;
if(o!=null){
- synchronized(list){
- for(Iterator i=list.iterator();i.hasNext();){
- LocatableItem item=(LocatableItem) i.next();
+ synchronized(mutex){
+ IndexItem item=list.getFirst();
+ while(item!=null){
Object value=getValue(item);
if(value!=null&&value.equals(o)){
result=true;
@@ -332,13 +283,14 @@
}
break;
}
+ item=list.getNextEntry(item);
}
}
}
return result;
}
- protected void remove(LocatableItem item){
+ protected void remove(IndexItem item){
Object key=valueToKeyMap.get(item);
if(key!=null){
remove(key);
@@ -358,34 +310,7 @@
map.clear();
valueToKeyMap.clear();
list.clear();// going to re-use this
- try{
- long start=root.getNextItem();
- if(start!=Item.POSITION_NOT_SET){
- long nextItem=start;
- while(nextItem!=Item.POSITION_NOT_SET){
- LocatableItem item=new LocatableItem();
- item.setOffset(nextItem);
- list.add(item);
- nextItem=item.getNextItem();
- }
- }
- root.setNextItem(Item.POSITION_NOT_SET);
- store.updateItem(root);
- for(int i=0;i<list.size();i++){
- LocatableItem item=(LocatableItem) list.get(i);
- if(item.getReferenceItem()!=Item.POSITION_NOT_SET){
- Item value=new Item();
- value.setOffset(item.getReferenceItem());
- store.removeItem(value);
- }
-
- store.removeItem(item);
- }
- list.clear();
- }catch(IOException e){
- log.error("Failed to clear MapContainer "+getId(),e);
- throw new RuntimeStoreException(e);
- }
+ doClear();
}
}
}
@@ -394,17 +319,16 @@
return new HashSet(map.keySet());
}
- protected LinkedList getItemList(){
+ protected IndexLinkedList getItemList(){
return list;
}
- protected Object getValue(LocatableItem item){
+ protected Object getValue(IndexItem item){
Object result=null;
- if(item!=null&&item.getReferenceItem()!=Item.POSITION_NOT_SET){
- Item rec=new Item();
- rec.setOffset(item.getReferenceItem());
+ if(item!=null){
try{
- result=store.readItem(valueMarshaller,rec);
+ DataItem data=item.getValueDataItem();
+ result=dataManager.readItem(valueMarshaller,data);
}catch(IOException e){
log.error("Failed to get value for "+item,e);
throw new RuntimeStoreException(e);
@@ -413,64 +337,29 @@
return result;
}
- protected LocatableItem write(Object key,Object value){
- long pos=Item.POSITION_NOT_SET;
- LocatableItem item=null;
+ protected IndexItem write(Object key,Object value){
+ IndexItem index=null;
try{
+ if(key!=null){
+ index=indexManager.createNewIndex();
+ DataItem data=dataManager.storeItem(keyMarshaller,key);
+ index.setKeyData(data);
+ }
if(value!=null){
- Item valueItem=new Item();
- pos=store.storeItem(valueMarshaller,value,valueItem);
+ DataItem data=dataManager.storeItem(valueMarshaller,value);
+ index.setValueData(data);
}
- LocatableItem last=list.isEmpty()?null:(LocatableItem) list.getLast();
+ IndexItem last=list.isEmpty()?null:(IndexItem) list.getLast();
last=last==null?root:last;
long prev=last.getOffset();
- long next=Item.POSITION_NOT_SET;
- item=new LocatableItem(prev,next,pos);
- next=store.storeItem(keyMarshaller,key,item);
- if(last!=null){
- last.setNextItem(next);
- store.updateItem(last);
- }
+ index.setPreviousItem(prev);
+ indexManager.updateIndex(index);
+ last.setNextItem(index.getOffset());
+ indexManager.updateIndex(last);
}catch(IOException e){
- e.printStackTrace();
log.error("Failed to write "+key+" , "+value,e);
throw new RuntimeStoreException(e);
}
- return item;
- }
-
- protected void delete(LocatableItem key,LocatableItem prev,LocatableItem next){
- try{
- prev=prev==null?root:prev;
- if(next!=null){
- prev.setNextItem(next.getOffset());
- next.setPreviousItem(prev.getOffset());
- store.updateItem(next);
- }else{
- prev.setNextItem(Item.POSITION_NOT_SET);
- }
- store.updateItem(prev);
- if(key.getReferenceItem()!=Item.POSITION_NOT_SET){
- Item value=new Item();
- value.setOffset(key.getReferenceItem());
- store.removeItem(value);
- }
- store.removeItem(key);
- }catch(IOException e){
- log.error("Failed to delete "+key,e);
- throw new RuntimeStoreException(e);
- }
- }
-
- protected final void checkClosed(){
- if(closed){
- throw new RuntimeStoreException("The store is closed");
- }
- }
-
- protected final void checkLoaded(){
- if(!loaded){
- throw new RuntimeStoreException("The container is not loaded");
- }
+ return index;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayInputStream.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayInputStream.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayInputStream.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayInputStream.java Thu Apr 20 07:15:30 2006
@@ -11,51 +11,42 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-
package org.apache.activemq.kaha.impl;
-import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UTFDataFormatException;
/**
* Optimized ByteArrayInputStream that can be used more than once
*
* @version $Revision: 1.1.1.1 $
*/
-public class StoreByteArrayInputStream extends ByteArrayInputStream {
+final class StoreByteArrayInputStream extends InputStream implements DataInput{
+ private byte[] buf;
+ private int pos;
+
/**
* Creates a <code>WireByteArrayInputStream</code>.
*
* @param buf the input buffer.
*/
- public StoreByteArrayInputStream(byte buf[]) {
- super(buf);
+ public StoreByteArrayInputStream(byte buf[]){
+ this.buf=buf;
+ this.pos=0;
}
/**
- * Creates <code>WireByteArrayInputStream</code> that uses <code>buf</code> as its buffer array.
- *
- * @param buf the input buffer.
- * @param offset the offset in the buffer of the first byte to read.
- * @param length the maximum number of bytes to read from the buffer.
- */
- public StoreByteArrayInputStream(byte buf[], int offset, int length) {
- super(buf, offset, length);
- }
-
-
- /**
* Creates <code>WireByteArrayInputStream</code> with a minmalist byte array
*/
- public StoreByteArrayInputStream() {
- super(new byte[0]);
+ public StoreByteArrayInputStream(){
+ this(new byte[0]);
}
-
- /**
- * @return the current position in the stream
- */
- public int position(){
+
+ public int size(){
return pos;
}
-
+
/**
* @return the underlying data array
*/
@@ -66,33 +57,21 @@
/**
* reset the <code>WireByteArrayInputStream</code> to use an new byte array
*
- * @param newBuff buffer to use
- * @param offset the offset in the buffer of the first byte to read.
- * @param length the maximum number of bytes to read from the buffer.
- */
- public void restart(byte[] newBuff, int offset, int length) {
- buf = newBuff;
- pos = offset;
- count = Math.min(offset + length, newBuff.length);
- mark = offset;
- }
-
- /**
- * reset the <code>WireByteArrayInputStream</code> to use an new byte array
- *
* @param newBuff
*/
- public void restart(byte[] newBuff) {
- restart(newBuff, 0, newBuff.length);
+ public void restart(byte[] newBuff){
+ buf=newBuff;
+ pos=0;
}
-
+
/**
* re-start the input stream - reusing the current buffer
+ *
* @param size
*/
public void restart(int size){
- if (buf == null || buf.length < size){
- buf = new byte[size];
+ if(buf==null||buf.length<size){
+ buf=new byte[size];
}
restart(buf);
}
@@ -106,8 +85,8 @@
*
* @return the next byte of data, or <code>-1</code> if the end of the stream has been reached.
*/
- public int read() {
- return (pos < count) ? (buf[pos++] & 0xff) : -1;
+ public int read(){
+ return (pos<buf.length)?(buf[pos++]&0xff):-1;
}
/**
@@ -117,33 +96,173 @@
* @param off the start offset of the data.
* @param len the maximum number of bytes read.
* @return the total number of bytes read into the buffer, or <code>-1</code> if there is no more data because the
- * end of the stream has been reached.
+ * end of the stream has been reached.
*/
- public int read(byte b[], int off, int len) {
- if (b == null) {
+ public int read(byte b[],int off,int len){
+ if(b==null){
throw new NullPointerException();
}
- else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
- throw new IndexOutOfBoundsException();
- }
- if (pos >= count) {
+ if(pos>=buf.length){
return -1;
}
- if (pos + len > count) {
- len = count - pos;
+ if(pos+len>buf.length){
+ len=buf.length-pos;
}
- if (len <= 0) {
+ if(len<=0){
return 0;
}
- System.arraycopy(buf, pos, b, off, len);
- pos += len;
+ System.arraycopy(buf,pos,b,off,len);
+ pos+=len;
return len;
}
/**
* @return the number of bytes that can be read from the input stream without blocking.
*/
- public int available() {
- return count - pos;
+ public int available(){
+ return buf.length-pos;
+ }
+
+ public void readFully(byte[] b){
+ read(b,0,b.length);
+ }
+
+ public void readFully(byte[] b,int off,int len){
+ read(b,off,len);
+ }
+
+ public int skipBytes(int n){
+ if(pos+n>buf.length){
+ n=buf.length-pos;
+ }
+ if(n<0){
+ return 0;
+ }
+ pos+=n;
+ return n;
+ }
+
+ public boolean readBoolean(){
+ return read()!=0;
+ }
+
+ public byte readByte(){
+ return (byte) read();
+ }
+
+ public int readUnsignedByte(){
+ return read();
+ }
+
+ public short readShort(){
+ int ch1=read();
+ int ch2=read();
+ return (short) ((ch1<<8)+(ch2<<0));
+ }
+
+ public int readUnsignedShort(){
+ int ch1=read();
+ int ch2=read();
+ return ((ch1<<8)+(ch2<<0));
+ }
+
+ public char readChar(){
+ int ch1=read();
+ int ch2=read();
+ return (char) ((ch1<<8)+(ch2<<0));
+ }
+
+ public int readInt(){
+ int ch1=read();
+ int ch2=read();
+ int ch3=read();
+ int ch4=read();
+ return ((ch1<<24)+(ch2<<16)+(ch3<<8)+(ch4<<0));
+ }
+
+ public long readLong(){
+ return (((long) buf[pos++]<<56)+((long) (buf[pos++]&255)<<48)+((long) (buf[pos++]&255)<<40)
+ +((long) (buf[pos++]&255)<<32)+((long) (buf[pos++]&255)<<24)+((buf[pos++]&255)<<16)
+ +((buf[pos++]&255)<<8)+((buf[pos++]&255)<<0));
+ }
+
+ public float readFloat() throws IOException{
+ return Float.intBitsToFloat(readInt());
+ }
+
+ public double readDouble() throws IOException{
+ return Double.longBitsToDouble(readLong());
+ }
+
+ public String readLine(){
+ int start=pos;
+ while(pos<buf.length){
+ int c=read();
+ if(c=='\n'){
+ break;
+ }
+ if(c=='\r'){
+ c=read();
+ if(c!='\n'&&c!=-1){
+ pos--;
+ }
+ break;
+ }
+ }
+ return new String(buf,start,pos);
+ }
+
+ public String readUTF() throws IOException{
+ int length=readUnsignedShort();
+ char[] characters=new char[length];
+ int c,c2,c3;
+ int count=0;
+ int total=pos+length;
+ while(pos<total){
+ c=(int) buf[pos]&0xff;
+ if(c>127)
+ break;
+ pos++;
+ characters[count++]=(char) c;
+ }
+ while(pos<total){
+ c=(int) buf[pos]&0xff;
+ switch(c>>4){
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ case 6:
+ case 7:
+ pos++;
+ characters[count++]=(char) c;
+ break;
+ case 12:
+ case 13:
+ pos+=2;
+ if(pos>length)
+ throw new UTFDataFormatException("bad string");
+ c2=(int) buf[pos-1];
+ if((c2&0xC0)!=0x80)
+ throw new UTFDataFormatException("bad string");
+ characters[count++]=(char) (((c&0x1F)<<6)|(c2&0x3F));
+ break;
+ case 14:
+ pos+=3;
+ if(pos>length)
+ throw new UTFDataFormatException("bad string");
+ c2=(int) buf[pos-2];
+ c3=(int) buf[pos-1];
+ if(((c2&0xC0)!=0x80)||((c3&0xC0)!=0x80))
+ throw new UTFDataFormatException("bad string");
+ characters[count++]=(char) (((c&0x0F)<<12)|((c2&0x3F)<<6)|((c3&0x3F)<<0));
+ break;
+ default:
+ throw new UTFDataFormatException("bad string");
+ }
+ }
+ return new String(characters,0,count);
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayOutputStream.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayOutputStream.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayOutputStream.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreByteArrayOutputStream.java Thu Apr 20 07:15:30 2006
@@ -11,22 +11,20 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-
package org.apache.activemq.kaha.impl;
-import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UTFDataFormatException;
/**
* Optimized ByteArrayOutputStream
*
* @version $Revision: 1.1.1.1 $
*/
-public class StoreByteArrayOutputStream extends ByteArrayOutputStream {
- /**
- * Creates a new byte array output stream.
- */
- public StoreByteArrayOutputStream() {
- super(16 * 1024);
- }
+final class StoreByteArrayOutputStream extends OutputStream implements DataOutput{
+ private byte buf[];
+ private int pos;
/**
* Creates a new byte array output stream, with a buffer capacity of the specified size, in bytes.
@@ -34,8 +32,18 @@
* @param size the initial size.
* @exception IllegalArgumentException if size is negative.
*/
- public StoreByteArrayOutputStream(int size) {
- super(size);
+ public StoreByteArrayOutputStream(int size){
+ if(size<0){
+ throw new IllegalArgumentException("Invalid size: "+size);
+ }
+ buf=new byte[size];
+ }
+
+ /**
+ * Creates a new byte array output stream.
+ */
+ public StoreByteArrayOutputStream(){
+ this(16*1024);
}
/**
@@ -43,9 +51,9 @@
*
* @param size
*/
- public void restart(int size) {
- buf = new byte[size];
- count = 0;
+ public void restart(int size){
+ buf=new byte[size];
+ pos=0;
}
/**
@@ -53,15 +61,11 @@
*
* @param b the byte to be written.
*/
- public void write(int b) {
- int newcount = count + 1;
- if (newcount > buf.length) {
- byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
- System.arraycopy(buf, 0, newbuf, 0, count);
- buf = newbuf;
- }
- buf[count] = (byte) b;
- count = newcount;
+ public void write(int b){
+ int newcount=pos+1;
+ ensureEnoughBuffer(newcount);
+ buf[pos]=(byte) b;
+ pos=newcount;
}
/**
@@ -72,47 +76,155 @@
* @param off the start offset in the data.
* @param len the number of bytes to write.
*/
- public void write(byte b[], int off, int len) {
- if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
- throw new IndexOutOfBoundsException();
- }
- else if (len == 0) {
+ public void write(byte b[],int off,int len){
+ if(len==0){
return;
}
- int newcount = count + len;
- if (newcount > buf.length) {
- byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
- System.arraycopy(buf, 0, newbuf, 0, count);
- buf = newbuf;
- }
- System.arraycopy(b, off, buf, count, len);
- count = newcount;
+ int newcount=pos+len;
+ ensureEnoughBuffer(newcount);
+ System.arraycopy(b,off,buf,pos,len);
+ pos=newcount;
}
/**
* @return the underlying byte[] buffer
*/
- public byte[] getData() {
+ public byte[] getData(){
return buf;
}
-
+
/**
* reset the output stream
*/
public void reset(){
- count = 0;
+ pos=0;
}
-
+
/**
* Set the current position for writing
+ *
* @param offset
*/
public void position(int offset){
- if (offset > buf.length) {
- byte newbuf[] = new byte[Math.max(buf.length << 1, offset)];
- System.arraycopy(buf, 0, newbuf, 0, count);
- buf = newbuf;
+ ensureEnoughBuffer(offset);
+ pos=offset;
+ }
+
+ public int size(){
+ return pos;
+ }
+
+ public void writeBoolean(boolean v){
+ ensureEnoughBuffer(1);
+ buf[pos++]=(byte) (v?1:0);
+ }
+
+ public void writeByte(int v){
+ ensureEnoughBuffer(1);
+ buf[pos++]=(byte) (v>>>0);
+ }
+
+ public void writeShort(int v){
+ ensureEnoughBuffer(2);
+ buf[pos++]=(byte) (v>>>8);
+ buf[pos++]=(byte) (v>>>0);
+ }
+
+ public void writeChar(int v){
+ ensureEnoughBuffer(2);
+ buf[pos++]=(byte) (v>>>8);
+ buf[pos++]=(byte) (v>>>0);
+ }
+
+ public void writeInt(int v){
+ ensureEnoughBuffer(4);
+ buf[pos++]=(byte) (v>>>24);
+ buf[pos++]=(byte) (v>>>16);
+ buf[pos++]=(byte) (v>>>8);
+ buf[pos++]=(byte) (v>>>0);
+ }
+
+ public void writeLong(long v){
+ ensureEnoughBuffer(8);
+ buf[pos++]=(byte) (v>>>56);
+ buf[pos++]=(byte) (v>>>48);
+ buf[pos++]=(byte) (v>>>40);
+ buf[pos++]=(byte) (v>>>32);
+ buf[pos++]=(byte) (v>>>24);
+ buf[pos++]=(byte) (v>>>16);
+ buf[pos++]=(byte) (v>>>8);
+ buf[pos++]=(byte) (v>>>0);
+ }
+
+ public void writeFloat(float v) throws IOException{
+ writeInt(Float.floatToIntBits(v));
+ }
+
+ public void writeDouble(double v) throws IOException{
+ writeLong(Double.doubleToLongBits(v));
+ }
+
+ public void writeBytes(String s){
+ int length=s.length();
+ for(int i=0;i<length;i++){
+ write((byte) s.charAt(i));
+ }
+ }
+
+ public void writeChars(String s){
+ int length=s.length();
+ for(int i=0;i<length;i++){
+ int c=s.charAt(i);
+ write((c>>>8)&0xFF);
+ write((c>>>0)&0xFF);
+ }
+ }
+
+ public void writeUTF(String str) throws IOException{
+ int strlen=str.length();
+ int encodedsize=0;
+ int c;
+ for(int i=0;i<strlen;i++){
+ c=str.charAt(i);
+ if((c>=0x0001)&&(c<=0x007F)){
+ encodedsize++;
+ }else if(c>0x07FF){
+ encodedsize+=3;
+ }else{
+ encodedsize+=2;
+ }
+ }
+ if(encodedsize>65535)
+ throw new UTFDataFormatException("encoded string too long: "+encodedsize+" bytes");
+ ensureEnoughBuffer(encodedsize+2);
+ writeShort(encodedsize);
+ int i=0;
+ for(i=0;i<strlen;i++){
+ c=str.charAt(i);
+ if(!((c>=0x0001)&&(c<=0x007F)))
+ break;
+ buf[pos++]=(byte) c;
+ }
+ for(;i<strlen;i++){
+ c=str.charAt(i);
+ if((c>=0x0001)&&(c<=0x007F)){
+ buf[pos++]=(byte) c;
+ }else if(c>0x07FF){
+ buf[pos++]=(byte) (0xE0|((c>>12)&0x0F));
+ buf[pos++]=(byte) (0x80|((c>>6)&0x3F));
+ buf[pos++]=(byte) (0x80|((c>>0)&0x3F));
+ }else{
+ buf[pos++]=(byte) (0xC0|((c>>6)&0x1F));
+ buf[pos++]=(byte) (0x80|((c>>0)&0x3F));
+ }
+ }
+ }
+
+ private void ensureEnoughBuffer(int newcount){
+ if(newcount>buf.length){
+ byte newbuf[]=new byte[Math.max(buf.length<<1,newcount)];
+ System.arraycopy(buf,0,newbuf,0,pos);
+ buf=newbuf;
}
- count = offset;
}
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataReader.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataReader.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataReader.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataReader.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,50 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import org.apache.activemq.kaha.Marshaller;
+/**
+ * Optimized Store reader
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+final class StoreDataReader{
+ private DataManager dataManager;
+ private StoreByteArrayInputStream dataIn;
+ private byte[] header=new byte[DataItem.HEAD_SIZE];
+
+ /**
+ * Construct a Store reader
+ *
+ * @param file
+ */
+ StoreDataReader(DataManager fileManager){
+ this.dataManager=fileManager;
+ this.dataIn=new StoreByteArrayInputStream();
+ }
+
+ protected Object readItem(Marshaller marshaller,DataItem item) throws IOException{
+ RandomAccessFile file=dataManager.getDataFile(item);
+ file.seek(item.getOffset());
+ file.readFully(header);
+ dataIn.restart(header);
+ item.readHeader(dataIn);
+ byte[] data=new byte[item.getSize()];
+ file.readFully(data);
+ dataIn.restart(data);
+ return marshaller.readPayload(dataIn);
+ }
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataWriter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataWriter.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataWriter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreDataWriter.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,59 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+/**
+ * Optimized writes to a RandomAcessFile
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.IOException;
+import org.apache.activemq.kaha.Marshaller;
+/**
+ * Optimized Store writer
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+final class StoreDataWriter{
+ private StoreByteArrayOutputStream dataOut;
+ private DataManager dataManager;
+
+ /**
+ * Construct a Store writer
+ *
+ * @param file
+ */
+ StoreDataWriter(DataManager fileManager){
+ this.dataManager=fileManager;
+ this.dataOut=new StoreByteArrayOutputStream();
+ }
+
+ DataItem storeItem(Marshaller marshaller,Object payload) throws IOException{
+ dataOut.reset();
+ dataOut.position(DataItem.HEAD_SIZE);
+ marshaller.writePayload(payload,dataOut);
+ int size=dataOut.size();
+ int payloadSize=size-DataItem.HEAD_SIZE;
+ DataItem item=new DataItem();
+ item.setSize(payloadSize);
+ DataFile dataFile=dataManager.findSpaceForData(item);
+ dataOut.reset();
+ item.writeHeader(dataOut);
+ dataFile.getRandomAccessFile().seek(item.getOffset());
+ dataFile.getRandomAccessFile().write(dataOut.getData(),0,size);
+ dataFile.incrementLength(size);
+ dataManager.addInterestInFile(dataFile);
+ return item;
+ }
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexReader.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexReader.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexReader.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexReader.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,47 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+/**
+ * Optimized Store reader
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+class StoreIndexReader{
+ protected RandomAccessFile file;
+ protected StoreByteArrayInputStream dataIn;
+ protected byte[] buffer=new byte[IndexItem.INDEX_SIZE];
+
+ /**
+ * Construct a Store reader
+ *
+ * @param file
+ */
+ StoreIndexReader(RandomAccessFile file){
+ this.file=file;
+ this.dataIn=new StoreByteArrayInputStream();
+ }
+
+ protected IndexItem readItem(long offset) throws IOException{
+ file.seek(offset);
+ file.readFully(buffer);
+ dataIn.restart(buffer);
+ IndexItem result=new IndexItem();
+ result.setOffset(offset);
+ result.read(dataIn);
+ return result;
+ }
+}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexWriter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexWriter.java?rev=395597&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexWriter.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexWriter.java Thu Apr 20 07:15:30 2006
@@ -0,0 +1,48 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+/**
+ * Optimized writes to a RandomAcessFile
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+package org.apache.activemq.kaha.impl;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+/**
+ * Optimized Store writer
+ *
+ * @version $Revision: 1.1.1.1 $
+ */
+class StoreIndexWriter{
+ protected StoreByteArrayOutputStream dataOut;
+ protected RandomAccessFile file;
+
+ /**
+ * Construct a Store index writer
+ *
+ * @param file
+ */
+ StoreIndexWriter(RandomAccessFile file){
+ this.file=file;
+ this.dataOut=new StoreByteArrayOutputStream();
+ }
+
+ void storeItem(IndexItem index) throws IOException{
+ dataOut.reset();
+ index.write(dataOut);
+ file.seek(index.getOffset());
+ file.write(dataOut.getData(),0,IndexItem.INDEX_SIZE);
+ }
+}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/AtomicIntegerMarshaller.java Thu Apr 20 07:15:30 2006
@@ -18,8 +18,8 @@
package org.apache.activemq.store.kahadaptor;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import org.apache.activemq.kaha.Marshaller;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
@@ -32,13 +32,13 @@
public class AtomicIntegerMarshaller implements Marshaller{
- public void writePayload(Object object,DataOutputStream dataOut) throws IOException{
+ public void writePayload(Object object,DataOutput dataOut) throws IOException{
AtomicInteger ai = (AtomicInteger) object;
dataOut.writeInt(ai.get());
}
- public Object readPayload(DataInputStream dataIn) throws IOException{
+ public Object readPayload(DataInput dataIn) throws IOException{
int value = dataIn.readInt();
return new AtomicInteger(value);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/CommandMarshaller.java Thu Apr 20 07:15:30 2006
@@ -17,10 +17,9 @@
package org.apache.activemq.store.kahadaptor;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
-
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
@@ -38,7 +37,7 @@
}
- public void writePayload(Object object,DataOutputStream dataOut) throws IOException{
+ public void writePayload(Object object,DataOutput dataOut) throws IOException{
Packet packet = wireFormat.marshal(object);
byte[] data = packet.sliceAsBytes();
dataOut.writeInt(data.length);
@@ -46,7 +45,7 @@
}
- public Object readPayload(DataInputStream dataIn) throws IOException{
+ public Object readPayload(DataInput dataIn) throws IOException{
int size=dataIn.readInt();
byte[] data=new byte[size];
dataIn.readFully(data);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java Thu Apr 20 07:15:30 2006
@@ -18,7 +18,6 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
-import org.apache.activeio.command.WireFormat;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@@ -33,6 +32,8 @@
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
/**
* @org.apache.xbean.XBean
@@ -40,6 +41,7 @@
* @version $Revision: 1.4 $
*/
public class KahaPersistentAdaptor implements PersistenceAdapter{
+ private static final Log log=LogFactory.getLog(KahaPersistentAdaptor.class);
static final String PREPARED_TRANSACTIONS_NAME="PreparedTransactions";
KahaTransactionStore transactionStore;
ConcurrentHashMap topics=new ConcurrentHashMap();
@@ -58,13 +60,18 @@
}
public Set getDestinations(){
+
Set rc=new HashSet();
+ try {
for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
Object obj=i.next();
if(obj instanceof ActiveMQDestination){
rc.add(obj);
}
}
+ }catch(IOException e){
+ log.error("Failed to get destinations " ,e);
+ }
return rc;
}
@@ -89,7 +96,6 @@
MapContainer ackContainer=store.getMapContainer(destination.toString()+"-Acks");
ackContainer.setKeyMarshaller(new StringMarshaller());
ackContainer.setValueMarshaller(new AtomicIntegerMarshaller());
- ackContainer.load();
rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination);
messageStores.put(destination, rc);
if(transactionStore!=null){
@@ -137,10 +143,7 @@
public void deleteAllMessages() throws IOException{
if(store!=null){
- store.clear();
- }
- if(transactionStore!=null){
- transactionStore.delete();
+ store.delete();
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Thu Apr 20 07:15:30 2006
@@ -23,6 +23,7 @@
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StringMarshaller;
import org.apache.activemq.store.MessageRecoveryListener;
@@ -71,7 +72,8 @@
String id=messageId.toString();
ListContainer container=(ListContainer) subscriberAcks.get(subcriberId);
if(container!=null){
- container.remove(id);
+ //container.remove(id);
+ container.removeFirst();
AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
if(count!=null){
if(count.decrementAndGet()>0){
@@ -96,7 +98,11 @@
info.setSelector(selector);
info.setSubcriptionName(subscriptionName);
String key=getSubscriptionKey(clientId,subscriptionName);
- subscriberContainer.put(key,info);
+ // if already exists - won't add it again as it causes data files
+ // to hang around
+ if(!subscriberContainer.containsKey(key)){
+ subscriberContainer.put(key,info);
+ }
addSubscriberAckContainer(key);
}
@@ -122,15 +128,19 @@
throws Exception{
String key=getSubscriptionKey(clientId,subscriptionName);
ListContainer list=(ListContainer) subscriberAcks.get(key);
- for(Iterator i=list.iterator();i.hasNext();){
- Object msg=messageContainer.get(i.next());
- if(msg!=null){
- if(msg.getClass()==String.class){
- listener.recoverMessageReference((String) msg);
- }else{
- listener.recoverMessage((Message) msg);
+ if(list!=null){
+ for(Iterator i=list.iterator();i.hasNext();){
+ Object msg=messageContainer.get(i.next());
+ if(msg!=null){
+ if(msg.getClass()==String.class){
+ listener.recoverMessageReference((String) msg);
+ }else{
+ listener.recoverMessage((Message) msg);
+ }
}
+ listener.finished();
}
+ }else{
listener.finished();
}
}
@@ -154,8 +164,8 @@
protected void addSubscriberAckContainer(Object key) throws IOException{
ListContainer container=store.getListContainer(key);
- container.setMarshaller(new StringMarshaller());
- container.load();
+ Marshaller marshaller=new StringMarshaller();
+ container.setMarshaller(marshaller);
subscriberAcks.put(key,container);
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TransactionMarshaller.java Thu Apr 20 07:15:30 2006
@@ -17,12 +17,11 @@
package org.apache.activemq.store.kahadaptor;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-
import org.apache.activeio.command.WireFormat;
import org.apache.activeio.packet.ByteArrayPacket;
import org.apache.activeio.packet.Packet;
@@ -41,7 +40,7 @@
}
- public void writePayload(Object object,DataOutputStream dataOut) throws IOException{
+ public void writePayload(Object object,DataOutput dataOut) throws IOException{
KahaTransaction kt = (KahaTransaction) object;
List list = kt.getList();
dataOut.writeInt(list.size());
@@ -62,7 +61,7 @@
}
- public Object readPayload(DataInputStream dataIn) throws IOException{
+ public Object readPayload(DataInput dataIn) throws IOException{
KahaTransaction result = new KahaTransaction();
List list = new ArrayList();
result.setList(list);
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/LoadTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/LoadTest.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/LoadTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/LoadTest.java Thu Apr 20 07:15:30 2006
@@ -16,7 +16,7 @@
import java.io.IOException;
import java.io.PrintWriter;
import junit.framework.TestCase;
-import org.apache.activemq.kaha.impl.StoreImpl;
+import org.apache.activemq.kaha.impl.KahaStore;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
/**
* Store test
@@ -27,7 +27,7 @@
static final int COUNT=10000;
static final int NUM_LOADERS=2;
protected String name="load.db";
- protected StoreImpl store;
+ protected KahaStore store;
/*
* Test method for 'org.apache.activemq.kaha.Store.close()'
@@ -40,11 +40,10 @@
loader.start();
}
stop.await();
- store.dumpFreeSpace(new PrintWriter(System.out));
}
- protected StoreImpl getStore() throws IOException{
- return (StoreImpl) StoreFactory.open(name,"rw");
+ protected KahaStore getStore() throws IOException{
+ return (KahaStore) StoreFactory.open(name,"rw");
}
protected void setUp() throws Exception{
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java?rev=395597&r1=395596&r2=395597&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java Thu Apr 20 07:15:30 2006
@@ -21,7 +21,6 @@
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StringMarshaller;
-import org.apache.activemq.kaha.impl.StoreImpl;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import junit.framework.TestCase;
/**