You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/05/25 09:44:51 UTC

svn commit: r409322 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl: ContainerId.java DataManager.java IndexManager.java KahaStore.java RedoStoreIndexItem.java StoreIndexWriter.java

Author: chirino
Date: Thu May 25 00:44:50 2006
New Revision: 409322

URL: http://svn.apache.org/viewvc?rev=409322&view=rev
Log:
Kaha can now journal the changes it makes to the indexes so that they can be used to redo the changes on
failure.

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RedoStoreIndexItem.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerId.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexManager.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexWriter.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerId.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerId.java?rev=409322&r1=409321&r2=409322&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerId.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerId.java Thu May 25 00:44:50 2006
@@ -25,20 +25,20 @@
 public class ContainerId implements Externalizable{
     private static final long serialVersionUID=-8883779541021821943L;
     private Object key;
-    private String dataContainerPrefix;
+    private String dataContainerName;
 
     /**
      * @return Returns the dataContainerPrefix.
      */
-    public String getDataContainerPrefix(){
-        return dataContainerPrefix;
+    public String getDataContainerName(){
+        return dataContainerName;
     }
 
     /**
      * @param dataContainerPrefix The dataContainerPrefix to set.
      */
-    public void setDataContainerPrefix(String dataContainerPrefix){
-        this.dataContainerPrefix=dataContainerPrefix;
+    public void setDataContainerName(String dataContainerPrefix){
+        this.dataContainerName=dataContainerPrefix;
     }
 
     /**
@@ -69,12 +69,12 @@
     }
 
     public void writeExternal(ObjectOutput out) throws IOException{
-        out.writeUTF(getDataContainerPrefix());
+        out.writeUTF(getDataContainerName());
         out.writeObject(key);
     }
 
     public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException{
-        dataContainerPrefix=in.readUTF();
+        dataContainerName=in.readUTF();
         key=in.readObject();
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java?rev=409322&r1=409321&r2=409322&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java Thu May 25 00:44:50 2006
@@ -33,10 +33,11 @@
  * @version $Revision: 1.1.1.1 $
  */
 final class DataManager{
+    
     private static final Log log=LogFactory.getLog(DataManager.class);
     protected static long MAX_FILE_LENGTH=1024*1024*32;
     private final File dir;
-    private final String prefix;
+    private final String name;
     private StoreDataReader reader;
     private StoreDataWriter writer;
     private DataFile currentWriteFile;
@@ -45,23 +46,28 @@
     public static final int ITEM_HEAD_SIZE=5; // type + length
     public static final byte DATA_ITEM_TYPE=1;
     public static final byte REDO_ITEM_TYPE=2;
+    
+    Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER;
+    private String dataFilePrefix;
 
-    DataManager(File dir,String pf){
+    DataManager(File dir, final String name){
         this.dir=dir;
-        this.prefix=pf;
+        this.name=name;
         this.reader=new StoreDataReader(this);
         this.writer=new StoreDataWriter(this);
+        
+        dataFilePrefix = "data-"+name+"-";
         // build up list of current dataFiles
         File[] files=dir.listFiles(new FilenameFilter(){
-            public boolean accept(File dir,String name){
-                return dir.equals(dir)&&name.startsWith(prefix);
+            public boolean accept(File dir,String n){
+                return dir.equals(dir)&&n.startsWith(dataFilePrefix);
             }
         });
         if(files!=null){
             for(int i=0;i<files.length;i++){
                 File file=files[i];
-                String name=file.getName();
-                String numStr=name.substring(prefix.length(),name.length());
+                String n=file.getName();
+                String numStr=n.substring(dataFilePrefix.length(),n.length());
                 int num=Integer.parseInt(numStr);
                 DataFile dataFile=new DataFile(file,num);
                 fileMap.put(dataFile.getNumber(),dataFile);
@@ -72,8 +78,16 @@
         }
     }
     
-    public String getPrefix(){
-        return prefix;
+    private DataFile createAndAddDataFile(int num){
+        String fileName=dataFilePrefix+num;
+        File file=new File(dir,fileName);
+        DataFile result=new DataFile(file,num);
+        fileMap.put(result.getNumber(),result);
+        return result;
+    }
+
+    public String getName(){
+        return name;
     }
 
     DataFile findSpaceForData(DataItem item) throws IOException{
@@ -95,7 +109,7 @@
         if(dataFile!=null){
             return dataFile.getRandomAccessFile();
         }
-        throw new IOException("Could not locate data file "+prefix+item.getFile());
+        throw new IOException("Could not locate data file "+name+item.getFile());
     }
     
     synchronized Object readItem(Marshaller marshaller, DataItem item) throws IOException{
@@ -106,11 +120,16 @@
         return writer.storeItem(marshaller,payload, DATA_ITEM_TYPE);
     }
     
-    synchronized DataItem storeRedoItem(Marshaller marshaller, Object payload) throws IOException{
-        return writer.storeItem(marshaller,payload, REDO_ITEM_TYPE);
+    synchronized DataItem storeRedoItem(Object payload) throws IOException{
+        return writer.storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
     }
 
-    synchronized void recoverRedoItems(Marshaller marshaller, RedoListener listener) throws IOException{
+    synchronized void recoverRedoItems(RedoListener listener) throws IOException{
+        
+        // Nothing to recover if there is no current file.
+        if( currentWriteFile == null )
+            return;
+        
         DataItem item = new DataItem();
         item.setFile(currentWriteFile.getNumber().intValue());
         item.setOffset(0);
@@ -126,7 +145,7 @@
                 // Un-marshal the redo item
                 Object object;
                 try {
-                    object = readItem(marshaller, item);
+                    object = readItem(redoMarshaller, item);
                 } catch (IOException e1) {
                     log.trace("End of data file reached at (payload was invalid): "+item);
                     return;
@@ -224,17 +243,17 @@
         }
     }
 
-    private DataFile createAndAddDataFile(int num){
-        String fileName=prefix+num;
-        File file=new File(dir,fileName);
-        DataFile result=new DataFile(file,num);
-        fileMap.put(result.getNumber(),result);
-        return result;
-    }
-
     private void removeDataFile(DataFile dataFile) throws IOException{
         fileMap.remove(dataFile.getNumber());
         boolean result=dataFile.delete();
         log.debug("discarding data file "+dataFile+(result?"successful ":"failed"));
+    }
+
+    public Marshaller getRedoMarshaller() {
+        return redoMarshaller;
+    }
+
+    public void setRedoMarshaller(Marshaller redoMarshaller) {
+        this.redoMarshaller = redoMarshaller;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexManager.java?rev=409322&r1=409321&r2=409322&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexManager.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexManager.java Thu May 25 00:44:50 2006
@@ -17,6 +17,7 @@
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.LinkedList;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 /**
@@ -24,8 +25,9 @@
  * 
  * @version $Revision: 1.1.1.1 $
  */
-final class IndexManager{
+final class IndexManager {
     private static final Log log=LogFactory.getLog(IndexManager.class);
+    private final String name;
     private File file;
     private RandomAccessFile indexFile;
     private StoreIndexReader reader;
@@ -33,11 +35,12 @@
     private LinkedList freeList=new LinkedList();
     private long length=0;
 
-    IndexManager(File ifile,String mode) throws IOException{
-        file=ifile;
-        indexFile=new RandomAccessFile(ifile,mode);
+    IndexManager(File directory, String name, String mode, DataManager redoLog ) throws IOException{
+        this.name = name;
+        file=new File(directory,"index-"+name);
+;       indexFile=new RandomAccessFile(file,mode);
         reader=new StoreIndexReader(indexFile);
-        writer=new StoreIndexWriter(indexFile);
+        writer=new StoreIndexWriter(indexFile, name, redoLog);
         long offset=0;
         while((offset+IndexItem.INDEX_SIZE)<=indexFile.length()){
             IndexItem index=reader.readItem(offset);
@@ -68,6 +71,10 @@
     synchronized void updateIndex(IndexItem index) throws IOException{
         writer.storeItem(index);
     }
+    
+    public void redo(RedoStoreIndexItem redo) throws IOException {
+        writer.redoStoreItem(redo);
+    }
 
     synchronized IndexItem createNewIndex() throws IOException{
         IndexItem result=getNextFreeIndex();
@@ -118,4 +125,9 @@
     void setLength(long value){
         this.length=value;
     }
+
+    public String getName() {
+        return name;
+    }
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?rev=409322&r1=409321&r2=409322&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java Thu May 25 00:44:50 2006
@@ -37,8 +37,7 @@
  */
 public class KahaStore implements Store{
     
-    private static final String DEFAULT_DATA_CONTAINER_NAME = "kaha-data.";
-    private static final String DEFAULT_INDEX_CONTAINER_NAME = "kaha-index.";
+    private static final String DEFAULT_CONTAINER_NAME = "kaha";
     
     private File directory;
 
@@ -54,6 +53,7 @@
     private String name;
     private String mode;
     private boolean initialized;
+    private boolean logIndexChanges=false;
 
     public KahaStore(String name,String mode) throws IOException{
         this.name=name;
@@ -138,7 +138,7 @@
     }
 
     public MapContainer getMapContainer(Object id) throws IOException{
-        return getMapContainer(id, DEFAULT_DATA_CONTAINER_NAME);
+        return getMapContainer(id, DEFAULT_CONTAINER_NAME);
     }
     
     public synchronized MapContainer getMapContainer(Object id, String dataContainerName) throws IOException{
@@ -148,11 +148,11 @@
         if(result==null){
             
             DataManager dm = getDataManager(dataContainerName);
-            IndexManager im = getIndexManager(DEFAULT_INDEX_CONTAINER_NAME);
+            IndexManager im = getIndexManager(dm, dataContainerName);
             
             ContainerId containerId = new ContainerId();
             containerId.setKey(id);
-            containerId.setDataContainerPrefix(dataContainerName);
+            containerId.setDataContainerName(dataContainerName);
 
             IndexItem root=mapsContainer.getRoot(containerId);
             if( root == null ) {
@@ -186,19 +186,19 @@
     }
 
     public ListContainer getListContainer(Object id) throws IOException{
-        return getListContainer(id,DEFAULT_DATA_CONTAINER_NAME);
+        return getListContainer(id,DEFAULT_CONTAINER_NAME);
     }
     
-    public synchronized ListContainer getListContainer(Object id, String dataContainerName) throws IOException{
+    public synchronized ListContainer getListContainer(Object id, String containerName) throws IOException{
         initialize();
        
         ListContainerImpl result=(ListContainerImpl) lists.get(id);
         if(result==null){
-            DataManager dm = getDataManager(dataContainerName);
-            IndexManager im = getIndexManager(DEFAULT_INDEX_CONTAINER_NAME);
+            DataManager dm = getDataManager(containerName);
+            IndexManager im = getIndexManager(dm, containerName);
             ContainerId containerId = new ContainerId();
             containerId.setKey(id);
-            containerId.setDataContainerPrefix(dataContainerName);
+            containerId.setDataContainerName(containerName);
             IndexItem root=listsContainer.getRoot(containerId);
             if( root == null ) {
                 root=listsContainer.addRoot(containerId);
@@ -238,23 +238,23 @@
             directory=new File(name);
             directory.mkdirs();
             
-            DataManager rootData = getDataManager(DEFAULT_DATA_CONTAINER_NAME);
-            IndexManager rootIndex = getIndexManager(DEFAULT_INDEX_CONTAINER_NAME);
+            DataManager defaultDM = getDataManager(DEFAULT_CONTAINER_NAME);
+            IndexManager defaultIM = getIndexManager(defaultDM, DEFAULT_CONTAINER_NAME);
             
             IndexItem mapRoot=new IndexItem();
             IndexItem listRoot=new IndexItem();
-            if(rootIndex.isEmpty()){
+            if(defaultIM.isEmpty()){
                 mapRoot.setOffset(0);
-                rootIndex.updateIndex(mapRoot);
+                defaultIM.updateIndex(mapRoot);
                 listRoot.setOffset(IndexItem.INDEX_SIZE);
-                rootIndex.updateIndex(listRoot);
-                rootIndex.setLength(IndexItem.INDEX_SIZE*2);
+                defaultIM.updateIndex(listRoot);
+                defaultIM.setLength(IndexItem.INDEX_SIZE*2);
             }else{
-                mapRoot=rootIndex.getIndex(0);
-                listRoot=rootIndex.getIndex(IndexItem.INDEX_SIZE);
+                mapRoot=defaultIM.getIndex(0);
+                listRoot=defaultIM.getIndex(IndexItem.INDEX_SIZE);
             }
-            mapsContainer=new IndexRootContainer(mapRoot,rootIndex,rootData);
-            listsContainer=new IndexRootContainer(listRoot,rootIndex,rootData);
+            mapsContainer=new IndexRootContainer(mapRoot,defaultIM,defaultDM);
+            listsContainer=new IndexRootContainer(listRoot,defaultIM,defaultDM);
 
             for (Iterator i = dataManagers.values().iterator(); i.hasNext();){
                 DataManager dm = (DataManager) i.next();
@@ -263,23 +263,42 @@
         }
     }
     
-    protected DataManager getDataManager(String prefix) throws IOException {
-        DataManager dm = (DataManager) dataManagers.get(prefix);
+    protected DataManager getDataManager(String name) throws IOException {
+        DataManager dm = (DataManager) dataManagers.get(name);
         if (dm == null){
-            dm = new DataManager(directory,prefix);
-            dataManagers.put(prefix,dm);
+            dm = new DataManager(directory,name);
+            recover(dm);
+            dataManagers.put(name,dm);
         }
         return dm;
     }
     
-    protected IndexManager getIndexManager(String index_name) throws IOException {
-        IndexManager im = (IndexManager) indexManagers.get(index_name);
+    protected IndexManager getIndexManager(DataManager dm, String name) throws IOException {
+        IndexManager im = (IndexManager) indexManagers.get(name);
         if( im == null ) {
-            File ifile=new File(directory,index_name+".idx");
-            im = new IndexManager(ifile,mode);
-            indexManagers.put(index_name,im);
+            im = new IndexManager(directory,name,mode, logIndexChanges?dm:null);
+            indexManagers.put(name,im);
         }
         return im;
+    }
+
+    private void recover(final DataManager dm) throws IOException {
+        dm.recoverRedoItems( new RedoListener() {
+            public void onRedoItem(DataItem item, Object o) throws Exception {
+                RedoStoreIndexItem redo = (RedoStoreIndexItem) o;
+                //IndexManager im = getIndexManager(dm, redo.getIndexName());
+                IndexManager im = getIndexManager(dm, dm.getName());
+                im.redo(redo);
+            }
+        });
+    }
+
+    public boolean isLogIndexChanges() {
+        return logIndexChanges;
+    }
+
+    public void setLogIndexChanges(boolean logIndexChanges) {
+        this.logIndexChanges = logIndexChanges;
     }
 
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RedoStoreIndexItem.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RedoStoreIndexItem.java?rev=409322&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RedoStoreIndexItem.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/RedoStoreIndexItem.java Thu May 25 00:44:50 2006
@@ -0,0 +1,79 @@
+package org.apache.activemq.kaha.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import org.apache.activemq.kaha.Marshaller;
+
+public class RedoStoreIndexItem implements Externalizable {
+
+    public static final Marshaller MARSHALLER = new Marshaller() {
+        public Object readPayload(DataInput in) throws IOException {
+            RedoStoreIndexItem item = new RedoStoreIndexItem();
+            item.readExternal(in);
+            return item;
+        }
+
+        public void writePayload(Object object, DataOutput out) throws IOException {
+            RedoStoreIndexItem item = (RedoStoreIndexItem) object;
+            item.writeExternal(out);
+        }
+    }; 
+    
+    private static final long serialVersionUID = -4865508871719676655L;
+    private String indexName;
+    private IndexItem indexItem;
+    private long offset;
+
+    public RedoStoreIndexItem() {
+    }
+    public RedoStoreIndexItem(String indexName, long offset, IndexItem item) {
+        this.indexName = indexName;
+        this.offset=offset;
+        this.indexItem = item;
+    }
+
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        readExternal((DataInput)in);
+    }
+    public void readExternal(DataInput in) throws IOException {
+        // indexName = in.readUTF();
+        offset = in.readLong();
+        indexItem = new IndexItem();
+        indexItem.read(in);
+    }
+    
+    public void writeExternal(ObjectOutput out) throws IOException {
+        writeExternal((DataOutput)out);
+    }
+    public void writeExternal(DataOutput out) throws IOException {
+        // out.writeUTF(indexName);
+        out.writeLong(offset);
+        indexItem.write(out);
+    }
+    
+    public String getIndexName() {
+        return indexName;
+    }
+    public void setIndexName(String indexName) {
+        this.indexName = indexName;
+    }
+    
+    public IndexItem getIndexItem() {
+        return indexItem;
+    }
+    public void setIndexItem(IndexItem item) {
+        this.indexItem = item;
+    }
+    public long getOffset() {
+        return offset;
+    }
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexWriter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexWriter.java?rev=409322&r1=409321&r2=409322&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexWriter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/StoreIndexWriter.java Thu May 25 00:44:50 2006
@@ -26,8 +26,11 @@
  * @version $Revision: 1.1.1.1 $
  */
 class StoreIndexWriter{
-    protected StoreByteArrayOutputStream dataOut;
-    protected RandomAccessFile file;
+    
+    protected final StoreByteArrayOutputStream dataOut = new StoreByteArrayOutputStream();
+    protected final RandomAccessFile file;
+    protected final String name;
+    protected final DataManager redoLog;
 
     /**
      * Construct a Store index writer
@@ -35,14 +38,33 @@
      * @param file
      */
     StoreIndexWriter(RandomAccessFile file){
+        this(file, null, null);
+    }
+
+    public StoreIndexWriter(RandomAccessFile file, String indexName, DataManager redoLog) {
         this.file=file;
-        this.dataOut=new StoreByteArrayOutputStream();
+        this.name = indexName;
+        this.redoLog = redoLog;
+    }
+
+    void storeItem(IndexItem indexItem) throws IOException{
+        
+        if( redoLog!=null ) {
+            RedoStoreIndexItem redo = new RedoStoreIndexItem(name, indexItem.getOffset(), indexItem);
+            redoLog.storeRedoItem(redo);
+        }
+        
+        dataOut.reset();
+        indexItem.write(dataOut);
+        file.seek(indexItem.getOffset());
+        file.write(dataOut.getData(),0,IndexItem.INDEX_SIZE);
     }
 
-    void storeItem(IndexItem index) throws IOException{
+    public void redoStoreItem(RedoStoreIndexItem redo) throws IOException {
         dataOut.reset();
-        index.write(dataOut);
-        file.seek(index.getOffset());
+        redo.getIndexItem().write(dataOut);
+        file.seek(redo.getOffset());
         file.write(dataOut.getData(),0,IndexItem.INDEX_SIZE);
     }
+    
 }