You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/05/25 09:44:51 UTC
svn commit: r409322 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl:
ContainerId.java DataManager.java IndexManager.java KahaStore.java
RedoStoreIndexItem.java StoreIndexWriter.java
Author: chirino
Date: Thu May 25 00:44:50 2006
New Revision: 409322
URL: http://svn.apache.org/viewvc?rev=409322&view=rev
Log:
Kaha can now journal the changes it makes to the indexes so that they can be used to redo the changes on
failure.
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RedoStoreIndexItem.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerId.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexManager.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexWriter.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerId.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerId.java?rev=409322&r1=409321&r2=409322&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerId.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerId.java Thu May 25 00:44:50 2006
@@ -25,20 +25,20 @@
public class ContainerId implements Externalizable{
private static final long serialVersionUID=-8883779541021821943L;
private Object key;
- private String dataContainerPrefix;
+ private String dataContainerName;
/**
* @return Returns the dataContainerPrefix.
*/
- public String getDataContainerPrefix(){
- return dataContainerPrefix;
+ public String getDataContainerName(){
+ return dataContainerName;
}
/**
* @param dataContainerPrefix The dataContainerPrefix to set.
*/
- public void setDataContainerPrefix(String dataContainerPrefix){
- this.dataContainerPrefix=dataContainerPrefix;
+ public void setDataContainerName(String dataContainerPrefix){
+ this.dataContainerName=dataContainerPrefix;
}
/**
@@ -69,12 +69,12 @@
}
public void writeExternal(ObjectOutput out) throws IOException{
- out.writeUTF(getDataContainerPrefix());
+ out.writeUTF(getDataContainerName());
out.writeObject(key);
}
public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException{
- dataContainerPrefix=in.readUTF();
+ dataContainerName=in.readUTF();
key=in.readObject();
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java?rev=409322&r1=409321&r2=409322&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java Thu May 25 00:44:50 2006
@@ -33,10 +33,11 @@
* @version $Revision: 1.1.1.1 $
*/
final class DataManager{
+
private static final Log log=LogFactory.getLog(DataManager.class);
protected static long MAX_FILE_LENGTH=1024*1024*32;
private final File dir;
- private final String prefix;
+ private final String name;
private StoreDataReader reader;
private StoreDataWriter writer;
private DataFile currentWriteFile;
@@ -45,23 +46,28 @@
public static final int ITEM_HEAD_SIZE=5; // type + length
public static final byte DATA_ITEM_TYPE=1;
public static final byte REDO_ITEM_TYPE=2;
+
+ Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER;
+ private String dataFilePrefix;
- DataManager(File dir,String pf){
+ DataManager(File dir, final String name){
this.dir=dir;
- this.prefix=pf;
+ this.name=name;
this.reader=new StoreDataReader(this);
this.writer=new StoreDataWriter(this);
+
+ dataFilePrefix = "data-"+name+"-";
// build up list of current dataFiles
File[] files=dir.listFiles(new FilenameFilter(){
- public boolean accept(File dir,String name){
- return dir.equals(dir)&&name.startsWith(prefix);
+ public boolean accept(File dir,String n){
+ return dir.equals(dir)&&n.startsWith(dataFilePrefix);
}
});
if(files!=null){
for(int i=0;i<files.length;i++){
File file=files[i];
- String name=file.getName();
- String numStr=name.substring(prefix.length(),name.length());
+ String n=file.getName();
+ String numStr=n.substring(dataFilePrefix.length(),n.length());
int num=Integer.parseInt(numStr);
DataFile dataFile=new DataFile(file,num);
fileMap.put(dataFile.getNumber(),dataFile);
@@ -72,8 +78,16 @@
}
}
- public String getPrefix(){
- return prefix;
+ private DataFile createAndAddDataFile(int num){
+ String fileName=dataFilePrefix+num;
+ File file=new File(dir,fileName);
+ DataFile result=new DataFile(file,num);
+ fileMap.put(result.getNumber(),result);
+ return result;
+ }
+
+ public String getName(){
+ return name;
}
DataFile findSpaceForData(DataItem item) throws IOException{
@@ -95,7 +109,7 @@
if(dataFile!=null){
return dataFile.getRandomAccessFile();
}
- throw new IOException("Could not locate data file "+prefix+item.getFile());
+ throw new IOException("Could not locate data file "+name+item.getFile());
}
synchronized Object readItem(Marshaller marshaller, DataItem item) throws IOException{
@@ -106,11 +120,16 @@
return writer.storeItem(marshaller,payload, DATA_ITEM_TYPE);
}
- synchronized DataItem storeRedoItem(Marshaller marshaller, Object payload) throws IOException{
- return writer.storeItem(marshaller,payload, REDO_ITEM_TYPE);
+ synchronized DataItem storeRedoItem(Object payload) throws IOException{
+ return writer.storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
}
- synchronized void recoverRedoItems(Marshaller marshaller, RedoListener listener) throws IOException{
+ synchronized void recoverRedoItems(RedoListener listener) throws IOException{
+
+ // Nothing to recover if there is no current file.
+ if( currentWriteFile == null )
+ return;
+
DataItem item = new DataItem();
item.setFile(currentWriteFile.getNumber().intValue());
item.setOffset(0);
@@ -126,7 +145,7 @@
// Un-marshal the redo item
Object object;
try {
- object = readItem(marshaller, item);
+ object = readItem(redoMarshaller, item);
} catch (IOException e1) {
log.trace("End of data file reached at (payload was invalid): "+item);
return;
@@ -224,17 +243,17 @@
}
}
- private DataFile createAndAddDataFile(int num){
- String fileName=prefix+num;
- File file=new File(dir,fileName);
- DataFile result=new DataFile(file,num);
- fileMap.put(result.getNumber(),result);
- return result;
- }
-
private void removeDataFile(DataFile dataFile) throws IOException{
fileMap.remove(dataFile.getNumber());
boolean result=dataFile.delete();
log.debug("discarding data file "+dataFile+(result?"successful ":"failed"));
+ }
+
+ public Marshaller getRedoMarshaller() {
+ return redoMarshaller;
+ }
+
+ public void setRedoMarshaller(Marshaller redoMarshaller) {
+ this.redoMarshaller = redoMarshaller;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexManager.java?rev=409322&r1=409321&r2=409322&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexManager.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexManager.java Thu May 25 00:44:50 2006
@@ -17,6 +17,7 @@
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.LinkedList;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
@@ -24,8 +25,9 @@
*
* @version $Revision: 1.1.1.1 $
*/
-final class IndexManager{
+final class IndexManager {
private static final Log log=LogFactory.getLog(IndexManager.class);
+ private final String name;
private File file;
private RandomAccessFile indexFile;
private StoreIndexReader reader;
@@ -33,11 +35,12 @@
private LinkedList freeList=new LinkedList();
private long length=0;
- IndexManager(File ifile,String mode) throws IOException{
- file=ifile;
- indexFile=new RandomAccessFile(ifile,mode);
+ IndexManager(File directory, String name, String mode, DataManager redoLog ) throws IOException{
+ this.name = name;
+ file=new File(directory,"index-"+name);
+; indexFile=new RandomAccessFile(file,mode);
reader=new StoreIndexReader(indexFile);
- writer=new StoreIndexWriter(indexFile);
+ writer=new StoreIndexWriter(indexFile, name, redoLog);
long offset=0;
while((offset+IndexItem.INDEX_SIZE)<=indexFile.length()){
IndexItem index=reader.readItem(offset);
@@ -68,6 +71,10 @@
synchronized void updateIndex(IndexItem index) throws IOException{
writer.storeItem(index);
}
+
+ public void redo(RedoStoreIndexItem redo) throws IOException {
+ writer.redoStoreItem(redo);
+ }
synchronized IndexItem createNewIndex() throws IOException{
IndexItem result=getNextFreeIndex();
@@ -118,4 +125,9 @@
void setLength(long value){
this.length=value;
}
+
+ public String getName() {
+ return name;
+ }
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?rev=409322&r1=409321&r2=409322&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java Thu May 25 00:44:50 2006
@@ -37,8 +37,7 @@
*/
public class KahaStore implements Store{
- private static final String DEFAULT_DATA_CONTAINER_NAME = "kaha-data.";
- private static final String DEFAULT_INDEX_CONTAINER_NAME = "kaha-index.";
+ private static final String DEFAULT_CONTAINER_NAME = "kaha";
private File directory;
@@ -54,6 +53,7 @@
private String name;
private String mode;
private boolean initialized;
+ private boolean logIndexChanges=false;
public KahaStore(String name,String mode) throws IOException{
this.name=name;
@@ -138,7 +138,7 @@
}
public MapContainer getMapContainer(Object id) throws IOException{
- return getMapContainer(id, DEFAULT_DATA_CONTAINER_NAME);
+ return getMapContainer(id, DEFAULT_CONTAINER_NAME);
}
public synchronized MapContainer getMapContainer(Object id, String dataContainerName) throws IOException{
@@ -148,11 +148,11 @@
if(result==null){
DataManager dm = getDataManager(dataContainerName);
- IndexManager im = getIndexManager(DEFAULT_INDEX_CONTAINER_NAME);
+ IndexManager im = getIndexManager(dm, dataContainerName);
ContainerId containerId = new ContainerId();
containerId.setKey(id);
- containerId.setDataContainerPrefix(dataContainerName);
+ containerId.setDataContainerName(dataContainerName);
IndexItem root=mapsContainer.getRoot(containerId);
if( root == null ) {
@@ -186,19 +186,19 @@
}
public ListContainer getListContainer(Object id) throws IOException{
- return getListContainer(id,DEFAULT_DATA_CONTAINER_NAME);
+ return getListContainer(id,DEFAULT_CONTAINER_NAME);
}
- public synchronized ListContainer getListContainer(Object id, String dataContainerName) throws IOException{
+ public synchronized ListContainer getListContainer(Object id, String containerName) throws IOException{
initialize();
ListContainerImpl result=(ListContainerImpl) lists.get(id);
if(result==null){
- DataManager dm = getDataManager(dataContainerName);
- IndexManager im = getIndexManager(DEFAULT_INDEX_CONTAINER_NAME);
+ DataManager dm = getDataManager(containerName);
+ IndexManager im = getIndexManager(dm, containerName);
ContainerId containerId = new ContainerId();
containerId.setKey(id);
- containerId.setDataContainerPrefix(dataContainerName);
+ containerId.setDataContainerName(containerName);
IndexItem root=listsContainer.getRoot(containerId);
if( root == null ) {
root=listsContainer.addRoot(containerId);
@@ -238,23 +238,23 @@
directory=new File(name);
directory.mkdirs();
- DataManager rootData = getDataManager(DEFAULT_DATA_CONTAINER_NAME);
- IndexManager rootIndex = getIndexManager(DEFAULT_INDEX_CONTAINER_NAME);
+ DataManager defaultDM = getDataManager(DEFAULT_CONTAINER_NAME);
+ IndexManager defaultIM = getIndexManager(defaultDM, DEFAULT_CONTAINER_NAME);
IndexItem mapRoot=new IndexItem();
IndexItem listRoot=new IndexItem();
- if(rootIndex.isEmpty()){
+ if(defaultIM.isEmpty()){
mapRoot.setOffset(0);
- rootIndex.updateIndex(mapRoot);
+ defaultIM.updateIndex(mapRoot);
listRoot.setOffset(IndexItem.INDEX_SIZE);
- rootIndex.updateIndex(listRoot);
- rootIndex.setLength(IndexItem.INDEX_SIZE*2);
+ defaultIM.updateIndex(listRoot);
+ defaultIM.setLength(IndexItem.INDEX_SIZE*2);
}else{
- mapRoot=rootIndex.getIndex(0);
- listRoot=rootIndex.getIndex(IndexItem.INDEX_SIZE);
+ mapRoot=defaultIM.getIndex(0);
+ listRoot=defaultIM.getIndex(IndexItem.INDEX_SIZE);
}
- mapsContainer=new IndexRootContainer(mapRoot,rootIndex,rootData);
- listsContainer=new IndexRootContainer(listRoot,rootIndex,rootData);
+ mapsContainer=new IndexRootContainer(mapRoot,defaultIM,defaultDM);
+ listsContainer=new IndexRootContainer(listRoot,defaultIM,defaultDM);
for (Iterator i = dataManagers.values().iterator(); i.hasNext();){
DataManager dm = (DataManager) i.next();
@@ -263,23 +263,42 @@
}
}
- protected DataManager getDataManager(String prefix) throws IOException {
- DataManager dm = (DataManager) dataManagers.get(prefix);
+ protected DataManager getDataManager(String name) throws IOException {
+ DataManager dm = (DataManager) dataManagers.get(name);
if (dm == null){
- dm = new DataManager(directory,prefix);
- dataManagers.put(prefix,dm);
+ dm = new DataManager(directory,name);
+ recover(dm);
+ dataManagers.put(name,dm);
}
return dm;
}
- protected IndexManager getIndexManager(String index_name) throws IOException {
- IndexManager im = (IndexManager) indexManagers.get(index_name);
+ protected IndexManager getIndexManager(DataManager dm, String name) throws IOException {
+ IndexManager im = (IndexManager) indexManagers.get(name);
if( im == null ) {
- File ifile=new File(directory,index_name+".idx");
- im = new IndexManager(ifile,mode);
- indexManagers.put(index_name,im);
+ im = new IndexManager(directory,name,mode, logIndexChanges?dm:null);
+ indexManagers.put(name,im);
}
return im;
+ }
+
+ private void recover(final DataManager dm) throws IOException {
+ dm.recoverRedoItems( new RedoListener() {
+ public void onRedoItem(DataItem item, Object o) throws Exception {
+ RedoStoreIndexItem redo = (RedoStoreIndexItem) o;
+ //IndexManager im = getIndexManager(dm, redo.getIndexName());
+ IndexManager im = getIndexManager(dm, dm.getName());
+ im.redo(redo);
+ }
+ });
+ }
+
+ public boolean isLogIndexChanges() {
+ return logIndexChanges;
+ }
+
+ public void setLogIndexChanges(boolean logIndexChanges) {
+ this.logIndexChanges = logIndexChanges;
}
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RedoStoreIndexItem.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RedoStoreIndexItem.java?rev=409322&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RedoStoreIndexItem.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RedoStoreIndexItem.java Thu May 25 00:44:50 2006
@@ -0,0 +1,79 @@
+package org.apache.activemq.kaha.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import org.apache.activemq.kaha.Marshaller;
+
+public class RedoStoreIndexItem implements Externalizable {
+
+ public static final Marshaller MARSHALLER = new Marshaller() {
+ public Object readPayload(DataInput in) throws IOException {
+ RedoStoreIndexItem item = new RedoStoreIndexItem();
+ item.readExternal(in);
+ return item;
+ }
+
+ public void writePayload(Object object, DataOutput out) throws IOException {
+ RedoStoreIndexItem item = (RedoStoreIndexItem) object;
+ item.writeExternal(out);
+ }
+ };
+
+ private static final long serialVersionUID = -4865508871719676655L;
+ private String indexName;
+ private IndexItem indexItem;
+ private long offset;
+
+ public RedoStoreIndexItem() {
+ }
+ public RedoStoreIndexItem(String indexName, long offset, IndexItem item) {
+ this.indexName = indexName;
+ this.offset=offset;
+ this.indexItem = item;
+ }
+
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ readExternal((DataInput)in);
+ }
+ public void readExternal(DataInput in) throws IOException {
+ // indexName = in.readUTF();
+ offset = in.readLong();
+ indexItem = new IndexItem();
+ indexItem.read(in);
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException {
+ writeExternal((DataOutput)out);
+ }
+ public void writeExternal(DataOutput out) throws IOException {
+ // out.writeUTF(indexName);
+ out.writeLong(offset);
+ indexItem.write(out);
+ }
+
+ public String getIndexName() {
+ return indexName;
+ }
+ public void setIndexName(String indexName) {
+ this.indexName = indexName;
+ }
+
+ public IndexItem getIndexItem() {
+ return indexItem;
+ }
+ public void setIndexItem(IndexItem item) {
+ this.indexItem = item;
+ }
+ public long getOffset() {
+ return offset;
+ }
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+
+}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexWriter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexWriter.java?rev=409322&r1=409321&r2=409322&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexWriter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexWriter.java Thu May 25 00:44:50 2006
@@ -26,8 +26,11 @@
* @version $Revision: 1.1.1.1 $
*/
class StoreIndexWriter{
- protected StoreByteArrayOutputStream dataOut;
- protected RandomAccessFile file;
+
+ protected final StoreByteArrayOutputStream dataOut = new StoreByteArrayOutputStream();
+ protected final RandomAccessFile file;
+ protected final String name;
+ protected final DataManager redoLog;
/**
* Construct a Store index writer
@@ -35,14 +38,33 @@
* @param file
*/
StoreIndexWriter(RandomAccessFile file){
+ this(file, null, null);
+ }
+
+ public StoreIndexWriter(RandomAccessFile file, String indexName, DataManager redoLog) {
this.file=file;
- this.dataOut=new StoreByteArrayOutputStream();
+ this.name = indexName;
+ this.redoLog = redoLog;
+ }
+
+ void storeItem(IndexItem indexItem) throws IOException{
+
+ if( redoLog!=null ) {
+ RedoStoreIndexItem redo = new RedoStoreIndexItem(name, indexItem.getOffset(), indexItem);
+ redoLog.storeRedoItem(redo);
+ }
+
+ dataOut.reset();
+ indexItem.write(dataOut);
+ file.seek(indexItem.getOffset());
+ file.write(dataOut.getData(),0,IndexItem.INDEX_SIZE);
}
- void storeItem(IndexItem index) throws IOException{
+ public void redoStoreItem(RedoStoreIndexItem redo) throws IOException {
dataOut.reset();
- index.write(dataOut);
- file.seek(index.getOffset());
+ redo.getIndexItem().write(dataOut);
+ file.seek(redo.getOffset());
file.write(dataOut.getData(),0,IndexItem.INDEX_SIZE);
}
+
}