You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/10/05 09:19:37 UTC
svn commit: r453123 [2/3] - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/kaha/ main/java/org/apache/activemq/kaha/impl/
main/java/org/apache/activemq/kaha/impl/container/
main/java/org/apache/activemq/kaha/impl/data/ ma...
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java?view=diff&rev=453123&r1=453122&r2=453123
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java Thu Oct 5 00:19:35 2006
@@ -1,16 +1,21 @@
/**
*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
*/
+
package org.apache.activemq.kaha.impl.container;
import java.io.IOException;
@@ -20,24 +25,26 @@
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
-
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.activemq.kaha.Store;
-import org.apache.activemq.kaha.impl.data.DataItem;
+import org.apache.activemq.kaha.StoreEntry;
+import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.data.DataManager;
import org.apache.activemq.kaha.impl.data.Item;
import org.apache.activemq.kaha.impl.index.IndexItem;
import org.apache.activemq.kaha.impl.index.IndexManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
/**
* Implementation of a ListContainer
*
* @version $Revision: 1.2 $
*/
public class ListContainerImpl extends BaseContainerImpl implements ListContainer{
+
private static final Log log=LogFactory.getLog(ListContainerImpl.class);
protected Marshaller marshaller=Store.ObjectMarshaller;
protected LinkedList cacheList=new LinkedList();
@@ -45,9 +52,9 @@
protected int maximumCacheSize=100;
protected IndexItem lastCached;
- public ListContainerImpl(ContainerId id,IndexItem root,IndexManager rootIndexManager,IndexManager indexManager,
- DataManager dataManager) throws IOException{
- super(id,root,rootIndexManager,indexManager,dataManager);
+ public ListContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager,String indexType)
+ throws IOException{
+ super(id,root,indexManager,dataManager,indexType);
}
/*
@@ -55,25 +62,23 @@
*
* @see org.apache.activemq.kaha.ListContainer#load()
*/
- public void load(){
+ public synchronized void load(){
checkClosed();
if(!loaded){
- synchronized(mutex){
- if(!loaded){
- loaded=true;
+ if(!loaded){
+ loaded=true;
+ try{
init();
- try{
- long nextItem=root.getNextItem();
- while(nextItem!=Item.POSITION_NOT_SET){
- IndexItem item=indexManager.getIndex(nextItem);
- indexList.add(item);
- itemAdded(item,indexList.size()-1,getValue(item));
- nextItem=item.getNextItem();
- }
- }catch(IOException e){
- log.error("Failed to load container "+getId(),e);
- throw new RuntimeStoreException(e);
+ long nextItem=root.getNextItem();
+ while(nextItem!=Item.POSITION_NOT_SET){
+ IndexItem item=indexManager.getIndex(nextItem);
+ indexList.add(item);
+ itemAdded(item,indexList.size()-1,getValue(item));
+ nextItem=item.getNextItem();
}
+ }catch(IOException e){
+ log.error("Failed to load container "+getId(),e);
+ throw new RuntimeStoreException(e);
}
}
}
@@ -84,7 +89,7 @@
*
* @see org.apache.activemq.kaha.ListContainer#unload()
*/
- public void unload(){
+ public synchronized void unload(){
checkClosed();
if(loaded){
loaded=false;
@@ -98,26 +103,24 @@
*
* @see org.apache.activemq.kaha.ListContainer#setKeyMarshaller(org.apache.activemq.kaha.Marshaller)
*/
- public void setMarshaller(Marshaller marshaller){
+ public synchronized void setMarshaller(Marshaller marshaller){
checkClosed();
this.marshaller=marshaller;
}
- public boolean equals(Object obj){
+ public synchronized boolean equals(Object obj){
load();
boolean result=false;
if(obj!=null&&obj instanceof List){
- List other=(List) obj;
- synchronized(mutex){
- result=other.size()==size();
- if(result){
- for(int i=0;i<indexList.size();i++){
- Object o1=other.get(i);
- Object o2=get(i);
- result=o1==o2||(o1!=null&&o2!=null&&o1.equals(o2));
- if(!result){
- break;
- }
+ List other=(List)obj;
+ result=other.size()==size();
+ if(result){
+ for(int i=0;i<indexList.size();i++){
+ Object o1=other.get(i);
+ Object o2=get(i);
+ result=o1==o2||(o1!=null&&o2!=null&&o1.equals(o2));
+ if(!result){
+ break;
}
}
}
@@ -130,7 +133,7 @@
*
* @see org.apache.activemq.kaha.ListContainer#size()
*/
- public int size(){
+ public synchronized int size(){
load();
return indexList.size();
}
@@ -140,13 +143,8 @@
*
* @see org.apache.activemq.kaha.ListContainer#addFirst(java.lang.Object)
*/
- public void addFirst(Object o){
- load();
- IndexItem item=writeFirst(o);
- synchronized(mutex){
- indexList.addFirst(item);
- itemAdded(item,0,o);
- }
+ public synchronized void addFirst(Object o){
+ internalAddFirst(o);
}
/*
@@ -154,13 +152,8 @@
*
* @see org.apache.activemq.kaha.ListContainer#addLast(java.lang.Object)
*/
- public void addLast(Object o){
- load();
- IndexItem item=writeLast(o);
- synchronized(mutex){
- indexList.addLast(item);
- itemAdded(item,indexList.size()-1,o);
- }
+ public synchronized void addLast(Object o){
+ internalAddLast(o);
}
/*
@@ -168,21 +161,18 @@
*
* @see org.apache.activemq.kaha.ListContainer#removeFirst()
*/
- public Object removeFirst(){
+ public synchronized Object removeFirst(){
load();
Object result=null;
- synchronized(mutex){
- IndexItem item=(IndexItem) indexList.getFirst();
- if(item!=null){
- itemRemoved(0);
- result=getValue(item);
- int index=indexList.indexOf(item);
- IndexItem prev=index>0?(IndexItem) indexList.get(index-1):root;
- IndexItem next=index<(indexList.size()-1)?(IndexItem) indexList.get(index+1):null;
- indexList.removeFirst();
- delete(item,prev,next);
- item=null;
- }
+ IndexItem item=(IndexItem)indexList.getFirst();
+ if(item!=null){
+ itemRemoved(0);
+ result=getValue(item);
+ IndexItem prev=root;
+ IndexItem next=indexList.size()>1?(IndexItem)indexList.get(1):null;
+ indexList.removeFirst();
+ delete(item,prev,next);
+ item=null;
}
return result;
}
@@ -192,19 +182,17 @@
*
* @see org.apache.activemq.kaha.ListContainer#removeLast()
*/
- public Object removeLast(){
+ public synchronized Object removeLast(){
load();
Object result=null;
- synchronized(mutex){
- IndexItem last=indexList.getLast();
- if(last!=null){
- itemRemoved(indexList.size()-1);
- result=getValue(last);
- IndexItem prev=indexList.getPrevEntry(last);
- IndexItem next=null;
- indexList.removeLast();
- delete(last,prev,next);
- }
+ IndexItem last=indexList.getLast();
+ if(last!=null){
+ itemRemoved(indexList.size()-1);
+ result=getValue(last);
+ IndexItem prev=indexList.getPrevEntry(last);
+ IndexItem next=null;
+ indexList.removeLast();
+ delete(last,prev,next);
}
return result;
}
@@ -214,7 +202,7 @@
*
* @see java.util.List#isEmpty()
*/
- public boolean isEmpty(){
+ public synchronized boolean isEmpty(){
load();
return indexList.isEmpty();
}
@@ -224,20 +212,18 @@
*
* @see java.util.List#contains(java.lang.Object)
*/
- public boolean contains(Object o){
+ public synchronized boolean contains(Object o){
load();
boolean result=false;
if(o!=null){
- synchronized(mutex){
- IndexItem next=indexList.getFirst();
- while(next!=null){
- Object value=getValue(next);
- if(value!=null&&value.equals(o)){
- result=true;
- break;
- }
- next=indexList.getNextEntry(next);
+ IndexItem next=indexList.getFirst();
+ while(next!=null){
+ Object value=getValue(next);
+ if(value!=null&&value.equals(o)){
+ result=true;
+ break;
}
+ next=indexList.getNextEntry(next);
}
}
return result;
@@ -248,7 +234,7 @@
*
* @see java.util.List#iterator()
*/
- public Iterator iterator(){
+ public synchronized Iterator iterator(){
load();
return listIterator();
}
@@ -258,16 +244,14 @@
*
* @see java.util.List#toArray()
*/
- public Object[] toArray(){
+ public synchronized Object[] toArray(){
load();
List tmp=new ArrayList(indexList.size());
- synchronized(mutex){
- IndexItem next=indexList.getFirst();
- while(next!=null){
- Object value=getValue(next);
- tmp.add(value);
- next=indexList.getNextEntry(next);
- }
+ IndexItem next=indexList.getFirst();
+ while(next!=null){
+ Object value=getValue(next);
+ tmp.add(value);
+ next=indexList.getNextEntry(next);
}
return tmp.toArray();
}
@@ -277,16 +261,14 @@
*
* @see java.util.List#toArray(T[])
*/
- public Object[] toArray(Object[] a){
+ public synchronized Object[] toArray(Object[] a){
load();
List tmp=new ArrayList(indexList.size());
- synchronized(mutex){
- IndexItem next=indexList.getFirst();
- while(next!=null){
- Object value=getValue(next);
- tmp.add(value);
- next=indexList.getNextEntry(next);
- }
+ IndexItem next=indexList.getFirst();
+ while(next!=null){
+ Object value=getValue(next);
+ tmp.add(value);
+ next=indexList.getNextEntry(next);
}
return tmp.toArray(a);
}
@@ -296,7 +278,7 @@
*
* @see java.util.List#add(E)
*/
- public boolean add(Object o){
+ public synchronized boolean add(Object o){
load();
addLast(o);
return true;
@@ -307,34 +289,30 @@
*
* @see java.util.List#remove(java.lang.Object)
*/
- public boolean remove(Object o){
+ public synchronized boolean remove(Object o){
load();
boolean result=false;
- synchronized(mutex){
- int pos=0;
- IndexItem next=indexList.getFirst();
- while(next!=null){
- Object value=getValue(next);
- if(value!=null&&value.equals(o)){
- remove(next);
- itemRemoved(pos);
- result=true;
- break;
- }
- next=indexList.getNextEntry(next);
- pos++;
+ int pos=0;
+ IndexItem next=indexList.getFirst();
+ while(next!=null){
+ Object value=getValue(next);
+ if(value!=null&&value.equals(o)){
+ remove(next);
+ itemRemoved(pos);
+ result=true;
+ break;
}
+ next=indexList.getNextEntry(next);
+ pos++;
}
return result;
}
protected void remove(IndexItem item){
- synchronized(mutex){
- IndexItem prev=indexList.getPrevEntry(item);
- IndexItem next=indexList.getNextEntry(item);
- indexList.remove(item);
- delete(item,prev,next);
- }
+ IndexItem prev=indexList.getPrevEntry(item);
+ IndexItem next=indexList.getNextEntry(item);
+ indexList.remove(item);
+ delete(item,prev,next);
}
/*
@@ -342,16 +320,14 @@
*
* @see java.util.List#containsAll(java.util.Collection)
*/
- public boolean containsAll(Collection c){
+ public synchronized boolean containsAll(Collection c){
load();
boolean result=false;
- synchronized(mutex){
- for(Iterator i=c.iterator();i.hasNext();){
- Object obj=i.next();
- if(!(result=contains(obj))){
- result=false;
- break;
- }
+ for(Iterator i=c.iterator();i.hasNext();){
+ Object obj=i.next();
+ if(!(result=contains(obj))){
+ result=false;
+ break;
}
}
return result;
@@ -362,9 +338,8 @@
*
* @see java.util.List#addAll(java.util.Collection)
*/
- public boolean addAll(Collection c){
+ public synchronized boolean addAll(Collection c){
load();
- boolean result=false;
for(Iterator i=c.iterator();i.hasNext();){
add(i.next());
}
@@ -376,7 +351,7 @@
*
* @see java.util.List#addAll(int, java.util.Collection)
*/
- public boolean addAll(int index,Collection c){
+ public synchronized boolean addAll(int index,Collection c){
load();
boolean result=false;
ListIterator e1=listIterator(index);
@@ -393,7 +368,7 @@
*
* @see java.util.List#removeAll(java.util.Collection)
*/
- public boolean removeAll(Collection c){
+ public synchronized boolean removeAll(Collection c){
load();
boolean result=true;
for(Iterator i=c.iterator();i.hasNext();){
@@ -408,18 +383,16 @@
*
* @see java.util.List#retainAll(java.util.Collection)
*/
- public boolean retainAll(Collection c){
+ public synchronized boolean retainAll(Collection c){
load();
List tmpList=new ArrayList();
- synchronized(mutex){
- IndexItem next=indexList.getFirst();
- while(next!=null){
- Object o=getValue(next);
- if(!c.contains(o)){
- tmpList.add(o);
- }
- next=indexList.getNextEntry(next);
+ IndexItem next=indexList.getFirst();
+ while(next!=null){
+ Object o=getValue(next);
+ if(!c.contains(o)){
+ tmpList.add(o);
}
+ next=indexList.getNextEntry(next);
}
for(Iterator i=tmpList.iterator();i.hasNext();){
remove(i.next());
@@ -432,13 +405,11 @@
*
* @see java.util.List#clear()
*/
- public void clear(){
+ public synchronized void clear(){
checkClosed();
- synchronized(mutex){
- super.clear();
- doClear();
- clearCache();
- }
+ super.clear();
+ doClear();
+ clearCache();
}
/*
@@ -446,7 +417,7 @@
*
* @see java.util.List#get(int)
*/
- public Object get(int index){
+ public synchronized Object get(int index){
load();
return getCachedItem(index);
}
@@ -459,29 +430,25 @@
public Object set(int index,Object element){
load();
Object result=null;
- synchronized(mutex){
- IndexItem replace=indexList.isEmpty()?null:(IndexItem) indexList.get(index);
- IndexItem prev=(indexList.isEmpty()||(index-1)<0)?null:(IndexItem) indexList.get(index-1);
- IndexItem next=(indexList.isEmpty()||(index+1)>=size())?null:(IndexItem) indexList.get(index+1);
- result=getValue(replace);
- indexList.remove(index);
- delete(replace,prev,next);
- itemRemoved(index);
- add(index,element);
- }
+ IndexItem replace=indexList.isEmpty()?null:(IndexItem)indexList.get(index);
+ IndexItem prev=(indexList.isEmpty()||(index-1)<0)?null:(IndexItem)indexList.get(index-1);
+ IndexItem next=(indexList.isEmpty()||(index+1)>=size())?null:(IndexItem)indexList.get(index+1);
+ result=getValue(replace);
+ indexList.remove(index);
+ delete(replace,prev,next);
+ itemRemoved(index);
+ add(index,element);
return result;
}
protected IndexItem internalSet(int index,Object element){
- synchronized(mutex){
- IndexItem replace=indexList.isEmpty()?null:(IndexItem) indexList.get(index);
- IndexItem prev=(indexList.isEmpty()||(index-1)<0)?null:(IndexItem) indexList.get(index-1);
- IndexItem next=(indexList.isEmpty()||(index+1)>=size())?null:(IndexItem) indexList.get(index+1);
- indexList.remove(index);
- delete(replace,prev,next);
- itemRemoved(index);
- return internalAdd(index,element);
- }
+ IndexItem replace=indexList.isEmpty()?null:(IndexItem)indexList.get(index);
+ IndexItem prev=(indexList.isEmpty()||(index-1)<0)?null:(IndexItem)indexList.get(index-1);
+ IndexItem next=(indexList.isEmpty()||(index+1)>=size())?null:(IndexItem)indexList.get(index+1);
+ indexList.remove(index);
+ delete(replace,prev,next);
+ itemRemoved(index);
+ return internalAdd(index,element);
}
/*
@@ -489,29 +456,43 @@
*
* @see java.util.List#add(int, E)
*/
- public void add(int index,Object element){
+ public synchronized void add(int index,Object element){
load();
- synchronized(mutex){
- IndexItem item=insert(index,element);
- indexList.add(index,item);
- itemAdded(item,index,element);
- }
+ IndexItem item=insert(index,element);
+ indexList.add(index,item);
+ itemAdded(item,index,element);
+ }
+
+ protected StoreEntry internalAddLast(Object o) {
+ load();
+ IndexItem item=writeLast(o);
+ indexList.addLast(item);
+ itemAdded(item,indexList.size()-1,o);
+ return item;
+ }
+
+ protected StoreEntry internalAddFirst(Object o){
+ load();
+ IndexItem item=writeFirst(o);
+ indexList.addFirst(item);
+ itemAdded(item,0,o);
+ return item;
}
protected IndexItem internalAdd(int index,Object element){
- synchronized(mutex){
- IndexItem item=insert(index,element);
- indexList.add(index,item);
- itemAdded(item,index,element);
- return item;
- }
+ load();
+ IndexItem item=insert(index,element);
+ indexList.add(index,item);
+ itemAdded(item,index,element);
+ return item;
}
+
+
- protected IndexItem internalGet(int index){
- synchronized(mutex){
- if(index>=0&&index<indexList.size()){
- return indexList.get(index);
- }
+ protected StoreEntry internalGet(int index){
+ load();
+ if(index>=0&&index<indexList.size()){
+ return indexList.get(index);
}
return null;
}
@@ -521,20 +502,18 @@
*
* @see org.apache.activemq.kaha.ListContainer#doRemove(int)
*/
- public boolean doRemove(int index){
+ public synchronized boolean doRemove(int index){
load();
boolean result=false;
- synchronized(mutex){
- IndexItem item=indexList.get(index);
- if(item!=null){
- result=true;
- IndexItem prev=indexList.getPrevEntry(item);
- prev=prev!=null?prev:root;
- IndexItem next=indexList.getNextEntry(prev);
- indexList.remove(index);
- itemRemoved(index);
- delete(item,prev,next);
- }
+ IndexItem item=indexList.get(index);
+ if(item!=null){
+ result=true;
+ IndexItem prev=indexList.getPrevEntry(item);
+ prev=prev!=null?prev:root;
+ IndexItem next=indexList.getNextEntry(prev);
+ indexList.remove(index);
+ itemRemoved(index);
+ delete(item,prev,next);
}
return result;
}
@@ -544,20 +523,18 @@
*
* @see java.util.List#remove(int)
*/
- public Object remove(int index){
+ public synchronized Object remove(int index){
load();
Object result=null;
- synchronized(mutex){
- IndexItem item=indexList.get(index);
- if(item!=null){
- itemRemoved(index);
- result=getValue(item);
- IndexItem prev=indexList.getPrevEntry(item);
- prev=prev!=null?prev:root;
- IndexItem next=indexList.getNextEntry(item);
- indexList.remove(index);
- delete(item,prev,next);
- }
+ IndexItem item=indexList.get(index);
+ if(item!=null){
+ itemRemoved(index);
+ result=getValue(item);
+ IndexItem prev=indexList.getPrevEntry(item);
+ prev=prev!=null?prev:root;
+ IndexItem next=indexList.getNextEntry(item);
+ indexList.remove(index);
+ delete(item,prev,next);
}
return result;
}
@@ -567,22 +544,20 @@
*
* @see java.util.List#indexOf(java.lang.Object)
*/
- public int indexOf(Object o){
+ public synchronized int indexOf(Object o){
load();
int result=-1;
if(o!=null){
- synchronized(mutex){
- int count=0;
- IndexItem next=indexList.getFirst();
- while(next!=null){
- Object value=getValue(next);
- if(value!=null&&value.equals(o)){
- result=count;
- break;
- }
- count++;
- next=indexList.getNextEntry(next);
+ int count=0;
+ IndexItem next=indexList.getFirst();
+ while(next!=null){
+ Object value=getValue(next);
+ if(value!=null&&value.equals(o)){
+ result=count;
+ break;
}
+ count++;
+ next=indexList.getNextEntry(next);
}
}
return result;
@@ -593,22 +568,20 @@
*
* @see java.util.List#lastIndexOf(java.lang.Object)
*/
- public int lastIndexOf(Object o){
+ public synchronized int lastIndexOf(Object o){
load();
int result=-1;
if(o!=null){
- synchronized(mutex){
- int count=indexList.size()-1;
- IndexItem next=indexList.getLast();
- while(next!=null){
- Object value=getValue(next);
- if(value!=null&&value.equals(o)){
- result=count;
- break;
- }
- count--;
- next=indexList.getPrevEntry(next);
+ int count=indexList.size()-1;
+ IndexItem next=indexList.getLast();
+ while(next!=null){
+ Object value=getValue(next);
+ if(value!=null&&value.equals(o)){
+ result=count;
+ break;
}
+ count--;
+ next=indexList.getPrevEntry(next);
}
}
return result;
@@ -619,7 +592,7 @@
*
* @see java.util.List#listIterator()
*/
- public ListIterator listIterator(){
+ public synchronized ListIterator listIterator(){
load();
return new CachedContainerListIterator(this,0);
}
@@ -629,7 +602,7 @@
*
* @see java.util.List#listIterator(int)
*/
- public ListIterator listIterator(int index){
+ public synchronized ListIterator listIterator(int index){
load();
return new CachedContainerListIterator(this,index);
}
@@ -639,7 +612,7 @@
*
* @see java.util.List#subList(int, int)
*/
- public List subList(int fromIndex,int toIndex){
+ public synchronized List subList(int fromIndex,int toIndex){
load();
List result=new ArrayList();
int count=fromIndex;
@@ -650,12 +623,75 @@
}
return result;
}
+
+ /**
+ * add an Object to the list but get a StoreEntry of its position
+ * @param object
+ * @return the entry in the Store
+ */
+ public synchronized StoreEntry placeLast(Object object) {
+ StoreEntry item = internalAddLast(object);
+ return item;
+ }
+
+ /**
+ * insert an Object in first position int the list but get a StoreEntry of its position
+ * @param object
+ * @return the location in the Store
+ */
+ public synchronized StoreEntry placeFirst(Object object) {
+ StoreEntry item = internalAddFirst(object);
+ return item;
+ }
+
+ /**
+ * @param entry
+ * @param object
+ * @see org.apache.activemq.kaha.ListContainer#update(org.apache.activemq.kaha.StoreEntry, java.lang.Object)
+ */
+ public void update(StoreEntry entry,Object object){
+ try{
+ dataManager.updateItem(entry.getValueDataItem(),marshaller, object);
+ }catch(IOException e){
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ /**
+ * Retrieve an Object from the Store by its location
+ * @param entry
+ * @return the Object at that entry
+ */
+ public synchronized Object get(StoreEntry entry) {
+ load();
+ return getValue(entry);
+ }
+
+ /**
+ * remove the Object at the StoreEntry
+ * @param entry
+ * @return true if successful
+ */
+ public synchronized boolean remove(StoreEntry entry) {
+ IndexItem item = (IndexItem)entry;
+ load();
+ boolean result = false;
+ if(item!=null){
+ clearCache();
+ IndexItem prev=indexList.getPrevEntry(item);
+ prev=prev!=null?prev:root;
+ IndexItem next=indexList.getNextEntry(item);
+ delete(item,prev,next);
+ }
+ return result;
+ }
protected IndexItem writeLast(Object value){
IndexItem index=null;
try{
if(value!=null){
- DataItem data=dataManager.storeDataItem(marshaller,value);
+ StoreLocation data=dataManager.storeDataItem(marshaller,value);
index=indexManager.createNewIndex();
index.setValueData(data);
IndexItem prev=indexList.getLast();
@@ -663,13 +699,13 @@
IndexItem next=indexList.getNextEntry(prev);
prev.setNextItem(index.getOffset());
index.setPreviousItem(prev.getOffset());
- updateIndex(prev);
+ updateIndexes(prev);
if(next!=null){
next.setPreviousItem(index.getOffset());
index.setNextItem(next.getOffset());
- updateIndex(next);
+ updateIndexes(next);
}
- updateIndex(index);
+ storeIndex(index);
}
}catch(IOException e){
log.error("Failed to write "+value,e);
@@ -682,20 +718,20 @@
IndexItem index=null;
try{
if(value!=null){
- DataItem data=dataManager.storeDataItem(marshaller,value);
+ StoreLocation data=dataManager.storeDataItem(marshaller,value);
index=indexManager.createNewIndex();
index.setValueData(data);
IndexItem prev=root;
IndexItem next=indexList.getNextEntry(prev);
prev.setNextItem(index.getOffset());
index.setPreviousItem(prev.getOffset());
- updateIndex(prev);
+ updateIndexes(prev);
if(next!=null){
next.setPreviousItem(index.getOffset());
index.setNextItem(next.getOffset());
- updateIndex(next);
+ updateIndexes(next);
}
- updateIndex(index);
+ storeIndex(index);
}
}catch(IOException e){
log.error("Failed to write "+value,e);
@@ -705,11 +741,10 @@
}
protected IndexItem insert(int insertPos,Object value){
- long pos=Item.POSITION_NOT_SET;
IndexItem index=null;
try{
if(value!=null){
- DataItem data=dataManager.storeDataItem(marshaller,value);
+ StoreLocation data=dataManager.storeDataItem(marshaller,value);
index=indexManager.createNewIndex();
index.setValueData(data);
IndexItem prev=null;
@@ -727,13 +762,13 @@
}
prev.setNextItem(index.getOffset());
index.setPreviousItem(prev.getOffset());
- updateIndex(prev);
+ updateIndexes(prev);
if(next!=null){
next.setPreviousItem(index.getOffset());
index.setNextItem(next.getOffset());
- updateIndex(next);
+ updateIndexes(next);
}
- updateIndex(index);
+ storeIndex(index);
}
}catch(IOException e){
log.error("Failed to insert "+value,e);
@@ -742,11 +777,13 @@
return index;
}
- protected Object getValue(IndexItem item){
+ protected Object getValue(StoreEntry item){
Object result=null;
if(item!=null){
try{
- DataItem data=item.getValueDataItem();
+ // ensure it's up to date
+ //item=indexList.getEntry(item);
+ StoreLocation data=item.getValueDataItem();
result=dataManager.readItem(marshaller,data);
}catch(IOException e){
log.error("Failed to get value for "+item,e);
@@ -759,7 +796,7 @@
/**
* @return a string representation of this collection.
*/
- public String toString(){
+ public synchronized String toString(){
StringBuffer result=new StringBuffer();
result.append("[");
Iterator i=iterator();
@@ -852,7 +889,7 @@
/**
* clear any cached values
*/
- public void clearCache(){
+ public synchronized void clearCache(){
cacheList.clear();
offset=0;
lastCached=null;
@@ -861,58 +898,62 @@
/**
* @return the cacheList
*/
- public LinkedList getCacheList(){
+ public synchronized LinkedList getCacheList(){
return cacheList;
}
/**
- * @param cacheList the cacheList to set
+ * @param cacheList
+ * the cacheList to set
*/
- public void setCacheList(LinkedList cacheList){
+ public synchronized void setCacheList(LinkedList cacheList){
this.cacheList=cacheList;
}
/**
* @return the lastCached
*/
- public IndexItem getLastCached(){
+ public synchronized StoreEntry getLastCached(){
return lastCached;
}
/**
- * @param lastCached the lastCached to set
+ * @param lastCached
+ * the lastCached to set
*/
- public void setLastCached(IndexItem lastCached){
+ public synchronized void setLastCached(IndexItem lastCached){
this.lastCached=lastCached;
}
/**
* @return the maximumCacheSize
*/
- public int getMaximumCacheSize(){
+ public synchronized int getMaximumCacheSize(){
return maximumCacheSize;
}
/**
- * @param maximumCacheSize the maximumCacheSize to set
+ * @param maximumCacheSize
+ * the maximumCacheSize to set
*/
- public void setMaximumCacheSize(int maximumCacheSize){
+ public synchronized void setMaximumCacheSize(int maximumCacheSize){
this.maximumCacheSize=maximumCacheSize;
}
/**
* @return the offset
*/
- public int getOffset(){
+ public synchronized int getOffset(){
return offset;
}
/**
- * @param offset the offset to set
+ * @param offset
+ * the offset to set
*/
- public void setOffset(int offset){
+ public synchronized void setOffset(int offset){
this.offset=offset;
}
-
-
+
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java?view=diff&rev=453123&r1=453122&r2=453123
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java Thu Oct 5 00:19:35 2006
@@ -1,20 +1,21 @@
/**
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
*/
+
package org.apache.activemq.kaha.impl.container;
import java.io.IOException;
@@ -26,10 +27,10 @@
import java.util.Set;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
-import org.apache.activemq.kaha.ObjectMarshaller;
import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.activemq.kaha.Store;
-import org.apache.activemq.kaha.impl.data.DataItem;
+import org.apache.activemq.kaha.StoreEntry;
+import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.data.DataManager;
import org.apache.activemq.kaha.impl.data.Item;
import org.apache.activemq.kaha.impl.index.IndexItem;
@@ -37,20 +38,22 @@
import org.apache.activemq.kaha.impl.index.IndexManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
/**
* Implementation of a MapContainer
*
* @version $Revision: 1.2 $
*/
public final class MapContainerImpl extends BaseContainerImpl implements MapContainer{
+
private static final Log log=LogFactory.getLog(MapContainerImpl.class);
protected Map map=new HashMap();
protected Map valueToKeyMap=new HashMap();
- protected Marshaller keyMarshaller= Store.ObjectMarshaller;
+ protected Marshaller keyMarshaller=Store.ObjectMarshaller;
protected Marshaller valueMarshaller=Store.ObjectMarshaller;
- public MapContainerImpl(ContainerId id,IndexItem root,IndexManager rootIndexManager,IndexManager indexManager,DataManager dataManager){
- super(id,root,rootIndexManager,indexManager,dataManager);
+ public MapContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager,String indexType){
+ super(id,root,indexManager,dataManager,indexType);
}
/*
@@ -58,32 +61,29 @@
*
* @see org.apache.activemq.kaha.MapContainer#load()
*/
- public void load(){
+ public synchronized void load(){
checkClosed();
if(!loaded){
- synchronized(mutex){
- if(!loaded){
+ if(!loaded){
+ loaded=true;
+ try{
init();
- loaded=true;
- try{
- long nextItem=root.getNextItem();
- while(nextItem!=Item.POSITION_NOT_SET){
- IndexItem item=indexManager.getIndex(nextItem);
- DataItem data=item.getKeyDataItem();
- Object key=dataManager.readItem(keyMarshaller,data);
- map.put(key,item);
- valueToKeyMap.put(item,key);
- indexList.add(item);
- nextItem=item.getNextItem();
- }
- }catch(IOException e){
- log.error("Failed to load container "+getId(),e);
- throw new RuntimeStoreException(e);
+ long nextItem=root.getNextItem();
+ while(nextItem!=Item.POSITION_NOT_SET){
+ IndexItem item=indexManager.getIndex(nextItem);
+ StoreLocation data=item.getKeyDataItem();
+ Object key=dataManager.readItem(keyMarshaller,data);
+ map.put(key,item);
+ valueToKeyMap.put(item,key);
+ indexList.add(item);
+ nextItem=item.getNextItem();
}
+ }catch(IOException e){
+ log.error("Failed to load container "+getId(),e);
+ throw new RuntimeStoreException(e);
}
}
}
-
}
/*
@@ -91,24 +91,22 @@
*
* @see org.apache.activemq.kaha.MapContainer#unload()
*/
- public void unload(){
+ public synchronized void unload(){
checkClosed();
if(loaded){
loaded=false;
- synchronized(mutex){
- map.clear();
- valueToKeyMap.clear();
- indexList.clear();
- }
+ map.clear();
+ valueToKeyMap.clear();
+ indexList.clear();
}
}
- public void setKeyMarshaller(Marshaller keyMarshaller){
+ public synchronized void setKeyMarshaller(Marshaller keyMarshaller){
checkClosed();
this.keyMarshaller=keyMarshaller;
}
- public void setValueMarshaller(Marshaller valueMarshaller){
+ public synchronized void setValueMarshaller(Marshaller valueMarshaller){
checkClosed();
this.valueMarshaller=valueMarshaller;
}
@@ -118,7 +116,7 @@
*
* @see org.apache.activemq.kaha.MapContainer#size()
*/
- public int size(){
+ public synchronized int size(){
load();
return map.size();
}
@@ -128,7 +126,7 @@
*
* @see org.apache.activemq.kaha.MapContainer#isEmpty()
*/
- public boolean isEmpty(){
+ public synchronized boolean isEmpty(){
load();
return map.isEmpty();
}
@@ -138,11 +136,9 @@
*
* @see org.apache.activemq.kaha.MapContainer#containsKey(java.lang.Object)
*/
- public boolean containsKey(Object key){
+ public synchronized boolean containsKey(Object key){
load();
- synchronized(mutex){
- return map.containsKey(key);
- }
+ return map.containsKey(key);
}
/*
@@ -150,13 +146,11 @@
*
* @see org.apache.activemq.kaha.MapContainer#get(java.lang.Object)
*/
- public Object get(Object key){
+ public synchronized Object get(Object key){
load();
Object result=null;
- IndexItem item=null;
- synchronized(mutex){
- item=(IndexItem) map.get(key);
- }
+ StoreEntry item=null;
+ item=(StoreEntry)map.get(key);
if(item!=null){
result=getValue(item);
}
@@ -168,20 +162,18 @@
*
* @see org.apache.activemq.kaha.MapContainer#containsValue(java.lang.Object)
*/
- public boolean containsValue(Object o){
+ public synchronized boolean containsValue(Object o){
load();
boolean result=false;
if(o!=null){
- synchronized(indexList){
- IndexItem item=indexList.getFirst();
- while(item!=null){
- Object value=getValue(item);
- if(value!=null&&value.equals(o)){
- result=true;
- break;
- }
- item=indexList.getNextEntry(item);
+ IndexItem item=indexList.getFirst();
+ while(item!=null){
+ Object value=getValue(item);
+ if(value!=null&&value.equals(o)){
+ result=true;
+ break;
}
+ item=indexList.getNextEntry(item);
}
}
return result;
@@ -192,14 +184,12 @@
*
* @see org.apache.activemq.kaha.MapContainer#putAll(java.util.Map)
*/
- public void putAll(Map t){
+ public synchronized void putAll(Map t){
load();
if(t!=null){
- synchronized(mutex){
- for(Iterator i=t.entrySet().iterator();i.hasNext();){
- Map.Entry entry=(Map.Entry) i.next();
- put(entry.getKey(),entry.getValue());
- }
+ for(Iterator i=t.entrySet().iterator();i.hasNext();){
+ Map.Entry entry=(Map.Entry)i.next();
+ put(entry.getKey(),entry.getValue());
}
}
}
@@ -209,7 +199,7 @@
*
* @see org.apache.activemq.kaha.MapContainer#keySet()
*/
- public Set keySet(){
+ public synchronized Set keySet(){
load();
return new ContainerKeySet(this);
}
@@ -219,7 +209,7 @@
*
* @see org.apache.activemq.kaha.MapContainer#values()
*/
- public Collection values(){
+ public synchronized Collection values(){
load();
return new ContainerValueCollection(this);
}
@@ -229,7 +219,7 @@
*
* @see org.apache.activemq.kaha.MapContainer#entrySet()
*/
- public Set entrySet(){
+ public synchronized Set entrySet(){
load();
return new ContainerEntrySet(this);
}
@@ -237,20 +227,19 @@
/*
* (non-Javadoc)
*
- * @see org.apache.activemq.kaha.MapContainer#put(java.lang.Object, java.lang.Object)
+ * @see org.apache.activemq.kaha.MapContainer#put(java.lang.Object,
+ * java.lang.Object)
*/
- public Object put(Object key,Object value){
+ public synchronized Object put(Object key,Object value){
load();
Object result=null;
- synchronized(mutex){
- if(map.containsKey(key)){
- result=remove(key);
- }
- IndexItem item=write(key,value);
- map.put(key,item);
- valueToKeyMap.put(item,key);
- indexList.add(item);
+ if(map.containsKey(key)){
+ result=remove(key);
}
+ IndexItem item=write(key,value);
+ map.put(key,item);
+ valueToKeyMap.put(item,key);
+ indexList.add(item);
return result;
}
@@ -259,45 +248,41 @@
*
* @see org.apache.activemq.kaha.MapContainer#remove(java.lang.Object)
*/
- public Object remove(Object key){
+ public synchronized Object remove(Object key){
load();
Object result=null;
- synchronized(mutex){
- IndexItem item=(IndexItem) map.get(key);
- if(item!=null){
- map.remove(key);
- valueToKeyMap.remove(item);
- // ensure we have the upto date item
- item=indexList.getEntry(item);
- result=getValue(item);
- IndexItem prev=indexList.getPrevEntry(item);
- IndexItem next=indexList.getNextEntry(item);
- indexList.remove(item);
- delete(item,prev,next);
- }
+ IndexItem item=(IndexItem)map.get(key);
+ if(item!=null){
+ //refresh the index
+ item = (IndexItem)indexList.refreshEntry(item);
+ map.remove(key);
+ valueToKeyMap.remove(item);
+ result=getValue(item);
+ IndexItem prev=indexList.getPrevEntry(item);
+ IndexItem next=indexList.getNextEntry(item);
+ indexList.remove(item);
+ delete(item,prev,next);
}
return result;
}
- public boolean removeValue(Object o){
+ public synchronized boolean removeValue(Object o){
load();
boolean result=false;
if(o!=null){
- synchronized(mutex){
- IndexItem item=indexList.getFirst();
- while(item!=null){
- Object value=getValue(item);
- if(value!=null&&value.equals(o)){
- result=true;
- // find the key
- Object key=valueToKeyMap.get(item);
- if(key!=null){
- remove(key);
- }
- break;
+ IndexItem item=indexList.getFirst();
+ while(item!=null){
+ Object value=getValue(item);
+ if(value!=null&&value.equals(o)){
+ result=true;
+ // find the key
+ Object key=valueToKeyMap.get(item);
+ if(key!=null){
+ remove(key);
}
- item=indexList.getNextEntry(item);
+ break;
}
+ item=indexList.getNextEntry(item);
}
}
return result;
@@ -315,32 +300,64 @@
*
* @see org.apache.activemq.kaha.MapContainer#clear()
*/
- public void clear(){
+ public synchronized void clear(){
checkClosed();
- synchronized(mutex){
- loaded=true;
- synchronized(mutex){
- map.clear();
- valueToKeyMap.clear();
- super.clear();
- doClear();
- }
- }
+ loaded=true;
+ map.clear();
+ valueToKeyMap.clear();
+ super.clear();
+ doClear();
}
-
- protected Set getInternalKeySet(){
- return new HashSet(map.keySet());
+
+ /**
+ * Add an entry to the Store Map
+ * @param key
+ * @param value
+ * @return the StoreEntry associated with the entry
+ */
+ public StoreEntry place(Object key, Object value) {
+ load();
+ if(map.containsKey(key)){
+ remove(key);
+ }
+ IndexItem item=write(key,value);
+ map.put(key,item);
+ valueToKeyMap.put(item,key);
+ indexList.add(item);
+ return item;
}
-
- protected IndexLinkedList getItemList(){
- return indexList;
+
+ /**
+ * Remove an Entry from ther Map
+ * @param entry
+ */
+ public void remove(StoreEntry entry) {
+ load();
+ IndexItem item=(IndexItem)entry;
+ if(item!=null){
+
+ Object key = valueToKeyMap.remove(item);
+ map.remove(key);
+ IndexItem prev=indexList.getPrevEntry(item);
+ IndexItem next=indexList.getNextEntry(item);
+ indexList.remove(item);
+ delete(item,prev,next);
+ }
}
-
- protected Object getValue(IndexItem item){
+
+ /**
+ * Get the value from it's location
+ * @param Valuelocation
+ * @return
+ */
+ public synchronized Object getValue(StoreEntry item){
+ load();
Object result=null;
if(item!=null){
try{
- DataItem data=item.getValueDataItem();
+ // ensure this value is up to date
+ //item=indexList.getEntry(item);
+ StoreLocation data=item.getValueDataItem();
result=dataManager.readItem(valueMarshaller,data);
}catch(IOException e){
log.error("Failed to get value for "+item,e);
@@ -349,14 +366,18 @@
}
return result;
}
-
-
-
- protected Object getKey(IndexItem item){
+
+ /**
+ * Get the Key object from it's location
+ * @param keyLocation
+ * @return
+ */
+ public synchronized Object getKey(StoreEntry item){
+ load();
Object result=null;
if(item!=null){
try{
- DataItem data=item.getKeyDataItem();
+ StoreLocation data=item.getKeyDataItem();
result=dataManager.readItem(keyMarshaller,data);
}catch(IOException e){
log.error("Failed to get key for "+item,e);
@@ -365,17 +386,26 @@
}
return result;
}
+
+
+ protected Set getInternalKeySet(){
+ return new HashSet(map.keySet());
+ }
+
+ protected IndexLinkedList getItemList(){
+ return indexList;
+ }
protected IndexItem write(Object key,Object value){
IndexItem index=null;
try{
if(key!=null){
index=indexManager.createNewIndex();
- DataItem data=dataManager.storeDataItem(keyMarshaller,key);
+ StoreLocation data=dataManager.storeDataItem(keyMarshaller,key);
index.setKeyData(data);
}
if(value!=null){
- DataItem data=dataManager.storeDataItem(valueMarshaller,value);
+ StoreLocation data=dataManager.storeDataItem(valueMarshaller,value);
index.setValueData(data);
}
IndexItem prev=indexList.getLast();
@@ -383,13 +413,13 @@
IndexItem next=indexList.getNextEntry(prev);
prev.setNextItem(index.getOffset());
index.setPreviousItem(prev.getOffset());
- updateIndex(prev);
+ updateIndexes(prev);
if(next!=null){
next.setPreviousItem(index.getOffset());
index.setNextItem(next.getOffset());
- updateIndex(next);
+ updateIndexes(next);
}
- updateIndex(index);
+ storeIndex(index);
}catch(IOException e){
log.error("Failed to write "+key+" , "+value,e);
throw new RuntimeStoreException(e);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataItem.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataItem.java?view=diff&rev=453123&r1=453122&r2=453123
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataItem.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataItem.java Thu Oct 5 00:19:35 2006
@@ -17,13 +17,15 @@
*/
package org.apache.activemq.kaha.impl.data;
+import org.apache.activemq.kaha.StoreLocation;
+
/**
* A a wrapper for a data in the store
*
* @version $Revision: 1.2 $
*/
-public final class DataItem implements Item{
+public final class DataItem implements Item, StoreLocation{
private int file=(int) POSITION_NOT_SET;
private long offset=POSITION_NOT_SET;
@@ -42,7 +44,8 @@
}
/**
- * @return Returns the size.
+ * @return
+ * @see org.apache.activemq.kaha.StoreLocation#getSize()
*/
public int getSize(){
return size;
@@ -56,7 +59,8 @@
}
/**
- * @return Returns the offset.
+ * @return
+ * @see org.apache.activemq.kaha.StoreLocation#getOffset()
*/
public long getOffset(){
return offset;
@@ -70,7 +74,8 @@
}
/**
- * @return Returns the file.
+ * @return
+ * @see org.apache.activemq.kaha.StoreLocation#getFile()
*/
public int getFile(){
return file;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java?view=diff&rev=453123&r1=453122&r2=453123
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java Thu Oct 5 00:19:35 2006
@@ -28,6 +28,7 @@
import java.util.Map;
import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.commons.logging.Log;
@@ -41,6 +42,7 @@
private static final Log log=LogFactory.getLog(DataManager.class);
public static long MAX_FILE_LENGTH=1024*1024*32;
+ private static final String NAME_PREFIX="data-";
private final File dir;
private final String name;
private StoreDataReader reader;
@@ -62,7 +64,7 @@
this.reader=new StoreDataReader(this);
this.writer=new StoreDataWriter(this);
- dataFilePrefix = "data-"+name+"-";
+ dataFilePrefix = NAME_PREFIX+name+"-";
// build up list of current dataFiles
File[] files=dir.listFiles(new FilenameFilter(){
public boolean accept(File dir,String n){
@@ -109,26 +111,31 @@
return currentWriteFile;
}
- RandomAccessFile getDataFile(DataItem item) throws IOException{
+ RandomAccessFile getDataFile(StoreLocation item) throws IOException{
Integer key=new Integer(item.getFile());
DataFile dataFile=(DataFile) fileMap.get(key);
if(dataFile!=null){
return dataFile.getRandomAccessFile();
}
- throw new IOException("Could not locate data file "+name+item.getFile());
+ log.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
+ throw new IOException("Could not locate data file "+NAME_PREFIX+name+"-"+item.getFile());
}
- public synchronized Object readItem(Marshaller marshaller, DataItem item) throws IOException{
+ public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException{
return reader.readItem(marshaller,item);
}
- public synchronized DataItem storeDataItem(Marshaller marshaller, Object payload) throws IOException{
+ public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException{
return writer.storeItem(marshaller,payload, DATA_ITEM_TYPE);
}
- public synchronized DataItem storeRedoItem(Object payload) throws IOException{
+ public synchronized StoreLocation storeRedoItem(Object payload) throws IOException{
return writer.storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
}
+
+ public synchronized void updateItem(StoreLocation location,Marshaller marshaller, Object payload) throws IOException {
+ writer.updateItem(location,marshaller,payload,DATA_ITEM_TYPE);
+ }
public synchronized void recoverRedoItems(RedoListener listener) throws IOException{
@@ -188,6 +195,7 @@
}
}
+
public synchronized boolean delete() throws IOException{
boolean result=true;
for(Iterator i=fileMap.values().iterator();i.hasNext();){
@@ -197,6 +205,7 @@
fileMap.clear();
return result;
}
+
public synchronized void addInterestInFile(int file) throws IOException{
if(file>=0){
@@ -275,5 +284,9 @@
*/
public void setMaxFileLength(long maxFileLength){
this.maxFileLength=maxFileLength;
+ }
+
+ public String toString(){
+ return "DataManager:("+NAME_PREFIX+name+")";
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/RedoListener.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/RedoListener.java?view=diff&rev=453123&r1=453122&r2=453123
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/RedoListener.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/RedoListener.java Thu Oct 5 00:19:35 2006
@@ -17,9 +17,11 @@
*/
package org.apache.activemq.kaha.impl.data;
+import org.apache.activemq.kaha.StoreLocation;
+
public interface RedoListener {
- void onRedoItem(DataItem item, Object object) throws Exception;
+ void onRedoItem(StoreLocation item, Object object) throws Exception;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreDataReader.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreDataReader.java?view=diff&rev=453123&r1=453122&r2=453123
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreDataReader.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreDataReader.java Thu Oct 5 00:19:35 2006
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.StoreLocation;
/**
* Optimized Store reader
*
@@ -58,7 +59,7 @@
return rc;
}
- protected Object readItem(Marshaller marshaller,DataItem item) throws IOException{
+ protected Object readItem(Marshaller marshaller,StoreLocation item) throws IOException{
RandomAccessFile file=dataManager.getDataFile(item);
// TODO: we could reuse the buffer in dataIn if it's big enough to avoid
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreDataWriter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreDataWriter.java?view=diff&rev=453123&r1=453122&r2=453123
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreDataWriter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/StoreDataWriter.java Thu Oct 5 00:19:35 2006
@@ -19,8 +19,10 @@
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.RandomAccessFile;
import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.StoreLocation;
/**
* Optimized Store writer
*
@@ -50,7 +52,7 @@
* @throws IOException
* @throws FileNotFoundException
*/
- DataItem storeItem(Marshaller marshaller, Object payload, byte type) throws IOException {
+ StoreLocation storeItem(Marshaller marshaller, Object payload, byte type) throws IOException {
// Write the packet our internal buffer.
buffer.reset();
@@ -74,5 +76,20 @@
dataManager.addInterestInFile(dataFile);
return item;
+ }
+
+ void updateItem(StoreLocation location,Marshaller marshaller, Object payload, byte type) throws IOException {
+ //Write the packet our internal buffer.
+ buffer.reset();
+ buffer.position(DataManager.ITEM_HEAD_SIZE);
+ marshaller.writePayload(payload,buffer);
+ int size=buffer.size();
+ int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
+ buffer.reset();
+ buffer.writeByte(type);
+ buffer.writeInt(payloadSize);
+ RandomAccessFile dataFile = dataManager.getDataFile(location);
+ dataFile.seek(location.getOffset());
+ dataFile.write(buffer.getData(),0,size);
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java?view=diff&rev=453123&r1=453122&r2=453123
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java Thu Oct 5 00:19:35 2006
@@ -18,6 +18,7 @@
package org.apache.activemq.kaha.impl.index;
import java.io.IOException;
+import org.apache.activemq.kaha.StoreEntry;
/**
* A linked list used by IndexItems
*
@@ -72,7 +73,7 @@
*
* @return the first element from this list.
*/
- public IndexItem removeFirst(){
+ public StoreEntry removeFirst(){
if(size==0){
return null;
}
@@ -89,7 +90,7 @@
public Object removeLast(){
if(size==0)
return null;
- IndexItem result=last;
+ StoreEntry result=last;
remove(last);
return result;
}
@@ -141,8 +142,8 @@
* @param o element to be appended to this list.
* @return <tt>true</tt> (as per the general contract of <tt>Collection.add</tt>).
*/
- public boolean add(IndexItem item){
- size++;
+ public boolean add(IndexItem item){
+ addLast(item);
return true;
}
@@ -224,7 +225,7 @@
* @return the index in this list of the first occurrence of the specified element, or -1 if the list does not
* contain this element.
*/
- public int indexOf(IndexItem o){
+ public int indexOf(StoreEntry o){
int index=0;
if(size>0){
for(IndexItem e=getNextEntry(root);e!=null;e=getNextEntry(e)){
@@ -265,25 +266,25 @@
* @param entry
* @return prev entry
*/
- public IndexItem getPrevEntry(IndexItem current){
- IndexItem result=null;
- if(current!=null&¤t.getPreviousItem()>=0){
- try{
- result=indexManager.getIndex(current.getPreviousItem());
- }catch(IOException e){
- throw new RuntimeException("Failed to index",e);
- }
- }
- //essential root get's updated consistently
- if(result != null &&root!=null && root.equals(result)){
- return root;
- }
- return result;
+ public IndexItem getPrevEntry(IndexItem current){
+ IndexItem result=null;
+ if(current!=null&¤t.getPreviousItem()>=0){
+ try{
+ result=indexManager.getIndex(current.getPreviousItem());
+ }catch(IOException e){
+ throw new RuntimeException("Failed to get current index for "+current,e);
+ }
+ }
+ // essential root get's updated consistently
+ if(result!=null&&root!=null&&root.equals(result)){
+ return root;
+ }
+ return result;
}
- public IndexItem getEntry(IndexItem current){
- IndexItem result=null;
- if(current!=null&¤t.getOffset()>=0){
+ public StoreEntry getEntry(StoreEntry current){
+ StoreEntry result=null;
+ if(current != null && current.getOffset() >= 0){
try{
result=indexManager.getIndex(current.getOffset());
}catch(IOException e){
@@ -295,7 +296,27 @@
return root;
}
return result;
- }
+ }
+
+ /**
+ * Update the indexes of a StoreEntry
+ * @param current
+ */
+ public StoreEntry refreshEntry(StoreEntry current){
+ StoreEntry result=null;
+ if(current != null && current.getOffset() >= 0){
+ try{
+ result=indexManager.refreshIndex((IndexItem)current);
+ }catch(IOException e){
+ throw new RuntimeException("Failed to index",e);
+ }
+ }
+ //essential root get's updated consistently
+ if(result != null &&root!=null && root.equals(result)){
+ return root;
+ }
+ return result;
+ }
public void remove(IndexItem e){
if(e==root||e.equals(root))
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexItem.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexItem.java?view=diff&rev=453123&r1=453122&r2=453123
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexItem.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexItem.java Thu Oct 5 00:19:35 2006
@@ -21,6 +21,8 @@
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.activemq.kaha.StoreEntry;
+import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.data.DataItem;
import org.apache.activemq.kaha.impl.data.Item;
/**
@@ -28,9 +30,10 @@
*
* @version $Revision: 1.2 $
*/
- public class IndexItem implements Item{
+ public class IndexItem implements Item, StoreEntry{
public static final int INDEX_SIZE=51;
+ public static final int INDEXES_ONLY_SIZE=19;
//used by linked list
IndexItem next;
IndexItem prev;
@@ -66,7 +69,11 @@
active=true;
}
- public DataItem getKeyDataItem(){
+ /**
+ * @return
+ * @see org.apache.activemq.kaha.StoreEntry#getKeyDataItem()
+ */
+ public StoreLocation getKeyDataItem(){
DataItem result=new DataItem();
result.setOffset(keyOffset);
result.setFile(keyFile);
@@ -74,7 +81,11 @@
return result;
}
- public DataItem getValueDataItem(){
+ /**
+ * @return
+ * @see org.apache.activemq.kaha.StoreEntry#getValueDataItem()
+ */
+ public StoreLocation getValueDataItem(){
DataItem result=new DataItem();
result.setOffset(valueOffset);
result.setFile(valueFile);
@@ -82,13 +93,13 @@
return result;
}
- public void setValueData(DataItem item){
+ public void setValueData(StoreLocation item){
valueOffset=item.getOffset();
valueFile=item.getFile();
valueSize=item.getSize();
}
- public void setKeyData(DataItem item){
+ public void setKeyData(StoreLocation item){
keyOffset=item.getOffset();
keyFile=item.getFile();
keySize=item.getSize();
@@ -98,7 +109,7 @@
* @param dataOut
* @throws IOException
*/
- void write(DataOutput dataOut) throws IOException{
+ public void write(DataOutput dataOut) throws IOException{
dataOut.writeShort(MAGIC);
dataOut.writeBoolean(active);
dataOut.writeLong(previousItem);
@@ -110,12 +121,19 @@
dataOut.writeLong(valueOffset);
dataOut.writeInt(valueSize);
}
+
+ void updateIndexes(DataOutput dataOut) throws IOException{
+ dataOut.writeShort(MAGIC);
+ dataOut.writeBoolean(active);
+ dataOut.writeLong(previousItem);
+ dataOut.writeLong(nextItem);
+ }
/**
* @param dataIn
* @throws IOException
*/
- void read(DataInput dataIn) throws IOException{
+ public void read(DataInput dataIn) throws IOException{
if(dataIn.readShort()!=MAGIC){
throw new BadMagicException();
}
@@ -129,6 +147,15 @@
valueOffset=dataIn.readLong();
valueSize=dataIn.readInt();
}
+
+ void readIndexes(DataInput dataIn) throws IOException{
+ if(dataIn.readShort()!=MAGIC){
+ throw new BadMagicException();
+ }
+ active=dataIn.readBoolean();
+ previousItem=dataIn.readLong();
+ nextItem=dataIn.readLong();
+ }
/**
* @param newPrevEntry
@@ -152,7 +179,8 @@
}
/**
- * @return next item
+ * @return
+ * @see org.apache.activemq.kaha.StoreEntry#getNextItem()
*/
public long getNextItem(){
return nextItem;
@@ -173,7 +201,8 @@
}
/**
- * @return Returns the keyFile.
+ * @return
+ * @see org.apache.activemq.kaha.StoreEntry#getKeyFile()
*/
public int getKeyFile(){
return keyFile;
@@ -187,7 +216,8 @@
}
/**
- * @return Returns the valueFile.
+ * @return
+ * @see org.apache.activemq.kaha.StoreEntry#getValueFile()
*/
public int getValueFile(){
return valueFile;
@@ -201,7 +231,8 @@
}
/**
- * @return Returns the valueOffset.
+ * @return
+ * @see org.apache.activemq.kaha.StoreEntry#getValueOffset()
*/
public long getValueOffset(){
return valueOffset;
@@ -229,7 +260,8 @@
}
/**
- * @return Returns the offset.
+ * @return
+ * @see org.apache.activemq.kaha.StoreEntry#getOffset()
*/
public long getOffset(){
return offset;
@@ -242,6 +274,10 @@
this.offset=offset;
}
+ /**
+ * @return
+ * @see org.apache.activemq.kaha.StoreEntry#getKeySize()
+ */
public int getKeySize() {
return keySize;
}
@@ -250,6 +286,10 @@
this.keySize = keySize;
}
+ /**
+ * @return
+ * @see org.apache.activemq.kaha.StoreEntry#getValueSize()
+ */
public int getValueSize() {
return valueSize;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexLinkedList.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexLinkedList.java?view=diff&rev=453123&r1=453122&r2=453123
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexLinkedList.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexLinkedList.java Thu Oct 5 00:19:35 2006
@@ -17,6 +17,13 @@
*/
package org.apache.activemq.kaha.impl.index;
+import org.apache.activemq.kaha.StoreEntry;
+
+/**
+* Inteface to LinkedList of Indexes
+*
+* @version $Revision$
+*/
public interface IndexLinkedList{
/**
@@ -43,7 +50,7 @@
*
* @return the first element from this list.
*/
- public IndexItem removeFirst();
+ public StoreEntry removeFirst();
/**
* Removes and returns the last element from this list.
@@ -54,16 +61,14 @@
/**
* Inserts the given element at the beginning of this list.
- *
- * @param o the element to be inserted at the beginning of this list.
+ * @param item
*/
public void addFirst(IndexItem item);
/**
* Appends the given element to the end of this list. (Identical in function to the <tt>add</tt> method; included
* only for consistency.)
- *
- * @param o the element to be inserted at the end of this list.
+ * @param item
*/
public void addLast(IndexItem item);
@@ -83,8 +88,8 @@
/**
* Appends the specified element to the end of this list.
+ * @param item
*
- * @param o element to be appended to this list.
* @return <tt>true</tt> (as per the general contract of <tt>Collection.add</tt>).
*/
public boolean add(IndexItem item);
@@ -137,7 +142,7 @@
* @return the index in this list of the first occurrence of the specified element, or -1 if the list does not
* contain this element.
*/
- public int indexOf(IndexItem o);
+ public int indexOf(StoreEntry o);
/**
* Retrieve the next entry after this entry
@@ -164,8 +169,15 @@
/**
* Ensure we have the up to date entry
- * @param current
+ * @param entry
* @return the entry
*/
- public IndexItem getEntry(IndexItem current);
+ public StoreEntry getEntry(StoreEntry entry);
+
+ /**
+ * Update the indexes of a StoreEntry
+ * @param current
+ * @return update StoreEntry
+ */
+ public StoreEntry refreshEntry(StoreEntry current);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java?view=diff&rev=453123&r1=453122&r2=453123
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java Thu Oct 5 00:19:35 2006
@@ -18,10 +18,14 @@
package org.apache.activemq.kaha.impl.index;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.channels.FileLock;
import java.util.LinkedList;
+
+import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.impl.data.DataManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,29 +38,22 @@
private static final Log log=LogFactory.getLog(IndexManager.class);
private static final String NAME_PREFIX="index-";
private final String name;
+ private File directory;
private File file;
private RandomAccessFile indexFile;
private StoreIndexReader reader;
private StoreIndexWriter writer;
private LinkedList freeList=new LinkedList();
+ private DataManager redoLog;
+ private String mode;
private long length=0;
public IndexManager(File directory,String name,String mode,DataManager redoLog) throws IOException{
+ this.directory = directory;
this.name=name;
- file=new File(directory,NAME_PREFIX+name);
- indexFile=new RandomAccessFile(file,mode);
- reader=new StoreIndexReader(indexFile);
- writer=new StoreIndexWriter(indexFile,name,redoLog);
- long offset=0;
- while((offset+IndexItem.INDEX_SIZE)<=indexFile.length()){
- IndexItem index=reader.readItem(offset);
- if(!index.isActive()){
- index.reset();
- freeList.add(index);
- }
- offset+=IndexItem.INDEX_SIZE;
- }
- length=offset;
+ this.mode = mode;
+ this.redoLog = redoLog;
+ initialize();
}
public synchronized boolean isEmpty(){
@@ -66,19 +63,28 @@
public synchronized IndexItem getIndex(long offset) throws IOException{
return reader.readItem(offset);
}
+
+ public synchronized IndexItem refreshIndex(IndexItem item) throws IOException{
+ reader.updateIndexes(item);
+ return item;
+ }
public synchronized void freeIndex(IndexItem item) throws IOException{
- item.reset();
+ //item.reset();
item.setActive(false);
- writer.storeItem(item);
+ writer.updateIndexes(item);
freeList.add(item);
}
- public synchronized void updateIndex(IndexItem index) throws IOException{
+ public synchronized void storeIndex(IndexItem index) throws IOException{
writer.storeItem(index);
}
+
+ public synchronized void updateIndexes(IndexItem index) throws IOException{
+ writer.updateIndexes(index);
+ }
- public void redo(final RedoStoreIndexItem redo) throws IOException{
+ public synchronized void redo(final RedoStoreIndexItem redo) throws IOException{
writer.redoStoreItem(redo);
}
@@ -106,6 +112,7 @@
}
}
+
public synchronized boolean delete() throws IOException{
freeList.clear();
if(indexFile!=null){
@@ -115,7 +122,7 @@
return file.delete();
}
- private IndexItem getNextFreeIndex(){
+ private synchronized IndexItem getNextFreeIndex(){
IndexItem result=null;
if(!freeList.isEmpty()){
result=(IndexItem) freeList.removeLast();
@@ -128,12 +135,33 @@
return length;
}
- public void setLength(long value){
+ public synchronized void setLength(long value){
this.length=value;
}
+
+ public synchronized FileLock getLock() throws IOException {
+ return indexFile.getChannel().tryLock();
+ }
public String toString(){
return "IndexManager:("+NAME_PREFIX+name+")";
+ }
+
+ protected void initialize() throws IOException {
+ file=new File(directory,NAME_PREFIX+name);
+ indexFile=new RandomAccessFile(file,mode);
+ reader=new StoreIndexReader(indexFile);
+ writer=new StoreIndexWriter(indexFile,name,redoLog);
+ long offset=0;
+ while((offset+IndexItem.INDEX_SIZE)<=indexFile.length()){
+ IndexItem index=reader.readItem(offset);
+ if(!index.isActive()){
+ index.reset();
+ freeList.add(index);
+ }
+ offset+=IndexItem.INDEX_SIZE;
+ }
+ length=offset;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexReader.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexReader.java?view=diff&rev=453123&r1=453122&r2=453123
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexReader.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexReader.java Thu Oct 5 00:19:35 2006
@@ -50,4 +50,13 @@
result.read(dataIn);
return result;
}
+
+ void updateIndexes(IndexItem indexItem) throws IOException{
+ if (indexItem != null){
+ file.seek(indexItem.getOffset());
+ file.readFully(buffer,0,IndexItem.INDEXES_ONLY_SIZE);
+ dataIn.restart(buffer);
+ indexItem.readIndexes(dataIn);
+ }
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java?view=diff&rev=453123&r1=453122&r2=453123
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java Thu Oct 5 00:19:35 2006
@@ -61,6 +61,18 @@
file.seek(indexItem.getOffset());
file.write(dataOut.getData(),0,IndexItem.INDEX_SIZE);
}
+
+ void updateIndexes(IndexItem indexItem) throws IOException{
+ if( redoLog!=null ) {
+ RedoStoreIndexItem redo = new RedoStoreIndexItem(name, indexItem.getOffset(), indexItem);
+ redoLog.storeRedoItem(redo);
+ }
+
+ dataOut.reset();
+ indexItem.updateIndexes(dataOut);
+ file.seek(indexItem.getOffset());
+ file.write(dataOut.getData(),0,IndexItem.INDEXES_ONLY_SIZE);
+ }
public void redoStoreItem(RedoStoreIndexItem redo) throws IOException {
dataOut.reset();
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java?view=diff&rev=453123&r1=453122&r2=453123
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java Thu Oct 5 00:19:35 2006
@@ -17,12 +17,14 @@
*/
package org.apache.activemq.kaha.impl.index;
+import org.apache.activemq.kaha.StoreEntry;
+
/**
* A linked list used by IndexItems
*
* @version $Revision: 1.2 $
*/
-final class VMIndexLinkedList implements Cloneable, IndexLinkedList{
+public final class VMIndexLinkedList implements Cloneable, IndexLinkedList{
private transient IndexItem root;
private transient int size=0;
@@ -30,7 +32,7 @@
/**
* Constructs an empty list.
*/
- VMIndexLinkedList(IndexItem header){
+ public VMIndexLinkedList(IndexItem header){
this.root = header;
this.root.next=root.prev=root;
}
@@ -62,11 +64,11 @@
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.IndexLinkedList#removeFirst()
*/
- public IndexItem removeFirst(){
+ public StoreEntry removeFirst(){
if(size==0){
return null;
}
- IndexItem result=root.next;
+ StoreEntry result=root.next;
remove(root.next);
return result;
}
@@ -77,7 +79,7 @@
public Object removeLast(){
if(size==0)
return null;
- IndexItem result=root.prev;
+ StoreEntry result=root.prev;
remove(root.prev);
return result;
}
@@ -171,7 +173,7 @@
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.IndexLinkedList#indexOf(org.apache.activemq.kaha.impl.IndexItem)
*/
- public int indexOf(IndexItem o){
+ public int indexOf(StoreEntry o){
int index=0;
for(IndexItem e=root.next;e!=root;e=e.next){
if(o==e){
@@ -228,7 +230,15 @@
return clone;
}
- public IndexItem getEntry(IndexItem current){
+ public StoreEntry getEntry(StoreEntry current){
+ return current;
+ }
+
+ /**
+ * Update the indexes of a StoreEntry
+ * @param current
+ */
+ public StoreEntry refreshEntry(StoreEntry current){
return current;
}
}