You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/11/25 07:00:57 UTC

svn commit: r479089 [1/2] - in /incubator/activemq/trunk: ./ activemq-core/ activemq-core/src/main/java/org/apache/activemq/kaha/impl/ activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ activemq-core/src/main/java/org/apache/activemq/kaha...

Author: chirino
Date: Fri Nov 24 22:00:56 2006
New Revision: 479089

URL: http://svn.apache.org/viewvc?view=rev&rev=479089
Log:
Added a new org.apache.activemq.kaha.impl.asyc package that holds data manager/journal that implements both the Kaha DataManager and ActiveIO Journal interfaces.
- Initial bench marks show it to be as fast or faster than the default ActiveIO Journal.
- The bigest differentiator is that this implementation of the journal was built to also provide fast reads.
- The DataManager interface was extracted and now the KahaStore can switch between the original DataManager implementation and the new implementation in the kaha.impl.async packagge.
- Simplified the original implementation by removing the AsyncDataWriters stuff since this is largely what the new package is based on.


Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java
      - copied, changed from r477680, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalImplTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalPerfTool.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalRWPerfTool.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/LocationTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/LinkedNodeTest.java
Removed:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileReader.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/AsyncDataFileWriter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFileReader.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFileWriter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java
Modified:
    incubator/activemq/trunk/activemq-core/pom.xml
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java
    incubator/activemq/trunk/pom.xml

Modified: incubator/activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/pom.xml?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/pom.xml (original)
+++ incubator/activemq/trunk/activemq-core/pom.xml Fri Nov 24 22:00:56 2006
@@ -48,6 +48,12 @@
       <artifactId>activeio-core</artifactId>
       <optional>false</optional>
     </dependency>
+    <dependency>
+      <groupId>${pom.groupId}</groupId>
+      <artifactId>activeio-core</artifactId>
+      <optional>false</optional>
+      <type>test-jar</type>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.geronimo.specs</groupId>

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,42 @@
+package org.apache.activemq.kaha.impl;
+
+import java.io.IOException;
+
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.StoreLocation;
+import org.apache.activemq.kaha.impl.data.RedoListener;
+
+public interface DataManager {
+
+	String getName();
+
+	Object readItem(Marshaller marshaller, StoreLocation item)
+			throws IOException;
+
+	StoreLocation storeDataItem(Marshaller marshaller, Object payload)
+			throws IOException;
+
+	StoreLocation storeRedoItem(Object payload) throws IOException;
+
+	void updateItem(StoreLocation location, Marshaller marshaller,
+			Object payload) throws IOException;
+
+	void recoverRedoItems(RedoListener listener) throws IOException;
+
+	void close() throws IOException;
+
+	void force() throws IOException;
+
+	boolean delete() throws IOException;
+
+	void addInterestInFile(int file) throws IOException;
+
+	void removeInterestInFile(int file) throws IOException;
+
+	void consolidateDataFiles() throws IOException;
+
+	Marshaller getRedoMarshaller();
+
+	void setRedoMarshaller(Marshaller redoMarshaller);
+
+}
\ No newline at end of file

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java Fri Nov 24 22:00:56 2006
@@ -27,7 +27,6 @@
 import org.apache.activemq.kaha.StoreEntry;
 import org.apache.activemq.kaha.StoreLocation;
 import org.apache.activemq.kaha.impl.container.ContainerId;
-import org.apache.activemq.kaha.impl.data.DataManager;
 import org.apache.activemq.kaha.impl.data.Item;
 import org.apache.activemq.kaha.impl.index.IndexItem;
 import org.apache.activemq.kaha.impl.index.IndexManager;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java Fri Nov 24 22:00:56 2006
@@ -33,11 +33,13 @@
 import org.apache.activemq.kaha.RuntimeStoreException;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.kaha.StoreLocation;
+import org.apache.activemq.kaha.impl.async.AsyncDataManager;
+import org.apache.activemq.kaha.impl.async.DataManagerFacade;
 import org.apache.activemq.kaha.impl.container.BaseContainerImpl;
 import org.apache.activemq.kaha.impl.container.ContainerId;
 import org.apache.activemq.kaha.impl.container.ListContainerImpl;
 import org.apache.activemq.kaha.impl.container.MapContainerImpl;
-import org.apache.activemq.kaha.impl.data.DataManager;
+import org.apache.activemq.kaha.impl.data.DataManagerImpl;
 import org.apache.activemq.kaha.impl.data.Item;
 import org.apache.activemq.kaha.impl.data.RedoListener;
 import org.apache.activemq.kaha.impl.index.IndexItem;
@@ -73,8 +75,8 @@
     private String mode;
     private boolean initialized;
     private boolean logIndexChanges=false;
-    private boolean useAsyncWriter=false;
-    private long maxDataFileLength=DataManager.MAX_FILE_LENGTH;
+    private boolean useAsyncDataManager=false;
+    private long maxDataFileLength=1024*1024*32;
     private FileLock lock;
     private String indexType=IndexTypes.DISK_INDEX;
 
@@ -319,10 +321,21 @@
     public synchronized DataManager getDataManager(String name) throws IOException{
         DataManager dm=(DataManager)dataManagers.get(name);
         if(dm==null){
-            dm=new DataManager(directory,name);
-            dm.setMaxFileLength(maxDataFileLength);
-            dm.setUseAsyncWriter(isUseAsyncWriter());
-            recover(dm);
+        	if( isUseAsyncDataManager() ) {
+	        	AsyncDataManager t=new AsyncDataManager();
+	        	t.setDirectory(directory);
+	        	t.setFilePrefix("data-"+name+"-");
+	        	t.setMaxFileLength((int) maxDataFileLength);
+	        	t.start();
+	            dm=new DataManagerFacade(t, name);
+        	} else {
+	        	DataManagerImpl t=new DataManagerImpl(directory,name);
+	            t.setMaxFileLength(maxDataFileLength);
+	            dm=t;
+        	}
+        	if( logIndexChanges ) {
+        		recover(dm);
+        	}
             dataManagers.put(name,dm);
         }
         return dm;
@@ -339,7 +352,6 @@
 
     private void recover(final DataManager dm) throws IOException{
         dm.recoverRedoItems(new RedoListener(){
-
             public void onRedoItem(StoreLocation item,Object o) throws Exception{
                 RedoStoreIndexItem redo=(RedoStoreIndexItem)o;
                 // IndexManager im = getIndexManager(dm, redo.getIndexName());
@@ -531,12 +543,12 @@
         }
     }
 
-	public synchronized boolean isUseAsyncWriter() {
-		return useAsyncWriter;
+	public synchronized boolean isUseAsyncDataManager() {
+		return useAsyncDataManager;
 	}
 
-	public synchronized void setUseAsyncWriter(boolean useAsyncWriter) {
-		this.useAsyncWriter = useAsyncWriter;
+	public synchronized void setUseAsyncDataManager(boolean useAsyncWriter) {
+		this.useAsyncDataManager = useAsyncWriter;
 	}
 
     

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,481 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
+import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Manages DataFiles
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public final class AsyncDataManager {
+
+	private static final Log log=LogFactory.getLog(AsyncDataManager.class);
+	
+    public static int CONTROL_RECORD_MAX_LENGTH=1024;
+    
+    public static final int ITEM_HEAD_RESERVED_SPACE=21; 
+    // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
+    public static final int ITEM_HEAD_SPACE=4+1+ITEM_HEAD_RESERVED_SPACE+3; 
+    public static final int ITEM_HEAD_OFFSET_TO_SOR=ITEM_HEAD_SPACE-3;    
+    public static final int ITEM_FOOT_SPACE=3; // EOR
+    
+    public static final int ITEM_HEAD_FOOT_SPACE=ITEM_HEAD_SPACE+ITEM_FOOT_SPACE;
+
+    public static final byte[] ITEM_HEAD_SOR=new byte[]{'S', 'O', 'R'}; // 
+    public static final byte[] ITEM_HEAD_EOR=new byte[]{'E', 'O', 'R'}; // 
+    
+    public static final byte DATA_ITEM_TYPE=1;
+    public static final byte REDO_ITEM_TYPE=2;
+	
+    public static String DEFAULT_DIRECTORY="data";
+    public static String DEFAULT_FILE_PREFIX="data-";
+    public static int DEFAULT_MAX_FILE_LENGTH=1024*1024*32;
+    
+    private File directory = new File(DEFAULT_DIRECTORY);
+    private String filePrefix=DEFAULT_FILE_PREFIX;
+    private int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
+    private int preferedFileLength = DEFAULT_MAX_FILE_LENGTH-1024*512;
+    
+    private DataFileAppender appender;
+    private DataFileAccessorPool accessorPool = new DataFileAccessorPool(this);
+
+    private Map<Integer,DataFile> fileMap=new HashMap<Integer,DataFile>();
+    private DataFile currentWriteFile;
+    ControlFile controlFile;
+    
+	private Location mark;
+	private final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
+	boolean started = false;
+	boolean useNio = true;
+	
+    protected final ConcurrentHashMap<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
+
+    @SuppressWarnings("unchecked")
+	public synchronized void start() throws IOException {
+    	if( started ) {
+    		return;
+    		
+    	}
+    	
+    	started=true;
+    	directory.mkdirs();
+    	controlFile = new ControlFile(new File(directory, filePrefix+"control"), CONTROL_RECORD_MAX_LENGTH);
+    	controlFile.lock();
+    	
+    	ByteSequence sequence = controlFile.load();
+    	if( sequence != null && sequence.getLength()>0 ) {
+        	unmarshallState(sequence);
+    	}
+    	if( useNio) {
+    		appender = new NIODataFileAppender(this);
+    	}	else {
+    		appender = new DataFileAppender(this);
+    	}
+
+        File[] files=directory.listFiles(new FilenameFilter(){
+            public boolean accept(File dir,String n){
+                return dir.equals(dir)&&n.startsWith(filePrefix);
+            }
+        });
+        
+        if(files!=null){
+            for(int i=0;i<files.length;i++){
+                try {
+					File file=files[i];
+					String n=file.getName();
+					String numStr=n.substring(filePrefix.length(),n.length());
+					int num=Integer.parseInt(numStr);
+					DataFile dataFile=new DataFile(file,num, preferedFileLength);
+					fileMap.put(dataFile.getDataFileId(),dataFile);
+				} catch (NumberFormatException e) {
+					// Ignore file that do not match the patern.
+				}
+            }
+            
+            // Sort the list so that we can link the DataFiles together in the right order.
+            ArrayList<DataFile> l = new ArrayList<DataFile>(fileMap.values());
+            Collections.sort(l);
+            currentWriteFile=null;
+            for (DataFile df : l) {
+            	if( currentWriteFile!=null ) {
+            		currentWriteFile.linkAfter(df);
+            	}
+            	currentWriteFile=df;
+			}
+        }
+        
+        // Need to check the current Write File to see if there was a partial write to it.
+        if( currentWriteFile!=null ) {
+        	
+        	// See if the lastSyncedLocation is valid..
+        	Location l = lastAppendLocation.get();
+        	if( l!=null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue() ) {
+        		l=null;
+        	}
+        	
+        	// If we know the last location that was ok.. then we can skip lots of checking
+        	l = recoveryCheck(currentWriteFile, l);
+        	lastAppendLocation.set(l);
+        }
+        
+        storeState(false);
+    }
+    
+    private Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
+    	if( location == null ) {
+    		location = new Location();
+    		location.setDataFileId(dataFile.getDataFileId());
+    		location.setOffset(0);
+    	}
+    	DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+    	try {
+	    	reader.readLocationDetails(location);
+	    	while( reader.readLocationDetailsAndValidate(location) ) {
+	    		location.setOffset(location.getOffset()+location.getSize());
+	    	}
+    	} finally {
+    		accessorPool.closeDataFileAccessor(reader);
+    	}
+    	dataFile.setLength(location.getOffset());
+    	return location;
+	}
+
+	private void unmarshallState(ByteSequence sequence) throws IOException {
+    	ByteArrayInputStream bais = new ByteArrayInputStream(sequence.getData(), sequence.getOffset(), sequence.getLength());
+    	DataInputStream dis = new DataInputStream(bais);
+    	if( dis.readBoolean() ) {
+    		mark = new Location();
+    		mark.readExternal(dis);
+    	} else {
+    		mark = null;
+    	}
+    	if( dis.readBoolean() ) {
+    		Location l = new Location();
+    		l.readExternal(dis);
+    		lastAppendLocation.set(l);
+    	} else {
+    		lastAppendLocation.set(null);
+    	}
+	}
+    
+    private ByteSequence marshallState() throws IOException {
+    	ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    	DataOutputStream dos = new DataOutputStream(baos);
+
+    	if( mark!=null ) {
+    		dos.writeBoolean(true);
+    		mark.writeExternal(dos);
+    	} else {
+    		dos.writeBoolean(false);
+    	}
+		Location l = lastAppendLocation.get();
+    	if( l!=null ) {
+    		dos.writeBoolean(true);
+    		l.writeExternal(dos);
+    	} else {
+    		dos.writeBoolean(false);
+    	}
+    	
+    	byte[] bs = baos.toByteArray();
+    	return new ByteSequence(bs,0,bs.length);
+	}
+
+	synchronized DataFile allocateLocation(Location location) throws IOException{
+        if(currentWriteFile==null||((currentWriteFile.getLength()+location.getSize())>maxFileLength)){
+            int nextNum=currentWriteFile!=null?currentWriteFile.getDataFileId().intValue()+1:1;
+
+            String fileName=filePrefix+nextNum;
+			DataFile nextWriteFile=new DataFile(new File(directory,fileName),nextNum, preferedFileLength);
+			fileMap.put(nextWriteFile.getDataFileId(),nextWriteFile);
+			if( currentWriteFile!=null ) {
+				currentWriteFile.linkAfter(nextWriteFile);
+	            if(currentWriteFile.isUnused()){
+	                removeDataFile(currentWriteFile);
+	            }
+			}
+            currentWriteFile=nextWriteFile;
+                        
+        }
+        location.setOffset(currentWriteFile.getLength());
+        location.setDataFileId(currentWriteFile.getDataFileId().intValue());        
+        currentWriteFile.incrementLength(location.getSize());
+        currentWriteFile.increment();
+        return currentWriteFile;
+    }
+
+    DataFile getDataFile(Location item) throws IOException{
+        Integer key=new Integer(item.getDataFileId());
+        DataFile dataFile=(DataFile) fileMap.get(key);
+        if(dataFile==null){
+            log.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
+            throw new IOException("Could not locate data file "+filePrefix+"-"+item.getDataFileId());
+        }
+        return dataFile;
+    }
+    
+	private DataFile getNextDataFile(DataFile dataFile) {
+		return (DataFile) dataFile.getNext();
+	}
+
+    public synchronized void close() throws IOException{
+    	accessorPool.close();
+    	storeState(false);
+    	appender.close();
+        fileMap.clear();
+    	controlFile.unlock();
+    	controlFile.dispose();
+    }
+
+    public synchronized boolean delete() throws IOException{
+        boolean result=true;
+        for(Iterator i=fileMap.values().iterator();i.hasNext();){
+            DataFile dataFile=(DataFile) i.next();
+            result&=dataFile.delete();
+        }
+        fileMap.clear();
+        return result;
+    }
+    
+    public synchronized void addInterestInFile(int file) throws IOException{
+        if(file>=0){
+            Integer key=new Integer(file);
+            DataFile dataFile=(DataFile) fileMap.get(key);
+            if(dataFile==null){
+                throw new IOException("That data file does not exist");
+            }
+            addInterestInFile(dataFile);
+        }
+    }
+
+    synchronized void addInterestInFile(DataFile dataFile){
+        if(dataFile!=null){
+            dataFile.increment();
+        }
+    }
+
+    public synchronized void removeInterestInFile(int file) throws IOException{
+        if(file>=0){
+            Integer key=new Integer(file);
+            DataFile dataFile=(DataFile) fileMap.get(key);
+            removeInterestInFile(dataFile);
+        }
+    }
+
+    synchronized void removeInterestInFile(DataFile dataFile) throws IOException{
+        if(dataFile!=null){
+            if(dataFile.decrement()<=0){
+                if(dataFile!=currentWriteFile){
+                    removeDataFile(dataFile);
+                }
+            }
+        }
+    }
+
+    public synchronized void consolidateDataFiles() throws IOException{
+        List<DataFile> purgeList=new ArrayList<DataFile>();
+        for (DataFile dataFile : fileMap.values()) {
+            if(dataFile.isUnused() && dataFile != currentWriteFile){
+                purgeList.add(dataFile);
+            }
+        }
+        for (DataFile dataFile : purgeList) {
+            removeDataFile(dataFile);
+		}
+    }
+
+    private void removeDataFile(DataFile dataFile) throws IOException{
+        fileMap.remove(dataFile.getDataFileId());
+        dataFile.unlink();
+        boolean result=dataFile.delete();
+        log.debug("discarding data file "+dataFile+(result?"successful ":"failed"));
+    }
+
+    /**
+     * @return the maxFileLength
+     */
+    public int getMaxFileLength(){
+        return maxFileLength;
+    }
+
+    /**
+     * @param maxFileLength the maxFileLength to set
+     */
+    public void setMaxFileLength(int maxFileLength){
+        this.maxFileLength=maxFileLength;
+    }
+    
+    public String toString(){
+        return "DataManager:("+filePrefix+")";
+    }
+
+	public synchronized Location getMark() throws IllegalStateException {
+		return mark;
+	}
+
+	public Location getNextLocation(Location location) throws IOException, IllegalStateException {
+			
+			
+			Location cur = null;
+			while( true ) {
+				if( cur == null ) {
+					if( location == null ) {
+						DataFile head = (DataFile) currentWriteFile.getHeadNode();
+						cur = new Location();
+						cur.setDataFileId(head.getDataFileId());
+						cur.setOffset(0);
+						
+//				    	DataFileAccessor reader = accessorPool.openDataFileAccessor(head);
+//				    	try {
+//							if( !reader.readLocationDetailsAndValidate(cur) ) {
+//								return null;
+//							}
+//				    	} finally {
+//				    		accessorPool.closeDataFileAccessor(reader);
+//				    	}
+					} else {
+						// Set to the next offset..
+						cur =  new Location(location);
+						cur.setOffset(cur.getOffset()+cur.getSize());
+					}
+				} else {
+					cur.setOffset(cur.getOffset()+cur.getSize());
+				}
+				
+				DataFile dataFile = getDataFile(cur);
+				
+				// Did it go into the next file??
+				if( dataFile.getLength() <= cur.getOffset() ) {
+					dataFile = getNextDataFile(dataFile);
+					if( dataFile == null ) {
+						return null;
+					} else {
+						cur.setDataFileId(dataFile.getDataFileId().intValue());
+						cur.setOffset(0);
+					}
+				}
+				
+				// Load in location size and type.
+		    	DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+		    	try {
+					reader.readLocationDetails(cur);
+		    	} finally {
+		    		accessorPool.closeDataFileAccessor(reader);
+		    	}
+				
+				if( cur.getType() == 0 ) {
+					return null;
+				} else  if( cur.getType() > 0 ) {
+					// Only return user records.
+					return cur;
+				}
+			}
+	}
+
+	public ByteSequence read(Location location) throws IOException, IllegalStateException {
+		DataFile dataFile = getDataFile(location);
+    	DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+    	ByteSequence rc=null;
+    	try {
+    		rc = reader.readRecord(location);
+    	} finally {
+    		accessorPool.closeDataFileAccessor(reader);
+    	}
+		return rc;
+	}
+
+	public synchronized void setMark(Location location, boolean sync) throws IOException, IllegalStateException {
+		mark = location;
+		storeState(sync);
+	}
+
+	private void storeState(boolean sync) throws IOException {
+		ByteSequence state = marshallState();
+		appender.storeItem(state, Location.MARK_TYPE, sync);
+		controlFile.store(state, sync);
+	}
+
+	public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
+        return appender.storeItem(data, Location.USER_TYPE, sync);
+	}
+	
+	public Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException {
+        return appender.storeItem(data, type, sync);
+	}
+
+	public void update(Location location, ByteSequence data, boolean sync) throws IOException {
+		DataFile dataFile = getDataFile(location);
+    	DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
+    	try {
+    		updater.updateRecord(location, data, sync);
+    	} finally {
+    		accessorPool.closeDataFileAccessor(updater);
+    	}
+	}
+
+	public File getDirectory() {
+		return directory;
+	}
+
+	public void setDirectory(File directory) {
+		this.directory = directory;
+	}
+
+	public String getFilePrefix() {
+		return filePrefix;
+	}
+
+	public void setFilePrefix(String filePrefix) {
+		this.filePrefix = filePrefix;
+	}
+
+	public ConcurrentHashMap<WriteKey, WriteCommand> getInflightWrites() {
+		return inflightWrites;
+	}
+
+	public Location getLastAppendLocation() {
+		return lastAppendLocation.get();
+	}
+
+	public void setLastAppendLocation(Location lastSyncedLocation) {
+		this.lastAppendLocation.set(lastSyncedLocation);
+	}
+
+
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ControlFile.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,161 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileLock;
+
+import org.apache.activemq.util.ByteSequence;
+
+/**
+ * Use to reliably store fixed sized state data.  It stores the state in 
+ * record that is versioned and repeated twice in the file so that a failure in the
+ * middle of the write of the first or second record do not not result in an unknown
+ * state.
+ * 
+ * @version $Revision: 1.1 $
+ */
+final public class ControlFile {
+	
+    private final static boolean DISABLE_FILE_LOCK = "true".equals(System.getProperty("java.nio.channels.FileLock.broken", "false"));
+	private final File file;
+
+    /** The File that holds the control data. */
+    private final RandomAccessFile randomAccessFile;
+	private final int maxRecordSize;
+
+	private long version=0;
+    private FileLock lock;
+	private boolean disposed;
+
+
+    public ControlFile(File file, int recordSize) throws IOException {
+        this.file = file;
+		this.maxRecordSize = recordSize+4;
+        randomAccessFile = new RandomAccessFile(file, "rw");
+    }
+
+    /**
+     * Locks the control file.
+     * @throws IOException 
+     */
+    public void lock() throws IOException {
+    	if( DISABLE_FILE_LOCK )
+    		return;
+    	
+    	if( lock == null ) {
+    		lock = randomAccessFile.getChannel().tryLock();
+	        if (lock == null) {
+	            throw new IOException("Control file '"+file+"' could not be locked.");
+	        }
+    	}
+    }
+
+    /**
+     * Un locks the control file.
+     * 
+     * @throws IOException
+     */
+    public void unlock() throws IOException {
+        if( DISABLE_FILE_LOCK )
+            return;
+        
+        if (lock != null) {
+            lock.release();
+            lock = null;
+        }
+    }
+  
+    public void dispose() {
+    	if( disposed )
+    		return;
+    	disposed=true;
+        try {
+            unlock();
+        } catch (IOException e) {
+        }
+        try {
+            randomAccessFile.close();
+        } catch (IOException e) {
+        }
+    }
+
+    synchronized public ByteSequence load() throws IOException {
+        long l = randomAccessFile.length();
+        if( l < maxRecordSize ) {
+        	return null;
+        } 
+        
+        randomAccessFile.seek(0);
+        long v1 = randomAccessFile.readLong();
+        randomAccessFile.seek(maxRecordSize+8);
+        long v1check = randomAccessFile.readLong();
+        
+        randomAccessFile.seek(maxRecordSize+16);
+        long v2 = randomAccessFile.readLong();
+        randomAccessFile.seek((maxRecordSize*2)+24);
+        long v2check = randomAccessFile.readLong();
+        
+        byte[] data=null;
+		if( v2 == v2check ) {
+            version = v2;
+            randomAccessFile.seek(maxRecordSize+24);
+            int size = randomAccessFile.readInt();
+            data = new byte[size];
+            randomAccessFile.readFully(data);
+        } else if ( v1 == v1check ){
+            version = v1;
+            randomAccessFile.seek(maxRecordSize+8);
+            int size = randomAccessFile.readInt();
+            data = new byte[size];
+            randomAccessFile.readFully(data);
+        } else {
+            // Bummer.. Both checks are screwed. we don't know
+            // if any of the two buffer are ok.  This should
+            // only happen is data got corrupted.
+            throw new IOException("Control data corrupted.");
+        }        
+        return new ByteSequence(data,0,data.length);
+    }
+    
+    public void store(ByteSequence data, boolean sync) throws IOException {
+    	
+        version++;
+        randomAccessFile.setLength((maxRecordSize*2)+32);
+        randomAccessFile.seek(0);
+        
+        // Write the first copy of the control data.
+        randomAccessFile.writeLong(version);
+        randomAccessFile.writeInt(data.getLength());
+        randomAccessFile.write(data.getData());
+        randomAccessFile.writeLong(version);
+
+        // Write the second copy of the control data.
+        randomAccessFile.writeLong(version);
+        randomAccessFile.writeInt(data.getLength());
+        randomAccessFile.write(data.getData());
+        randomAccessFile.writeLong(version);
+        
+        if( sync ) {
+        	randomAccessFile.getFD().sync();
+        }
+    }
+
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.activemq.util.LinkedNode;
+/**
+ * DataFile
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class DataFile extends LinkedNode implements Comparable {
+	
+    private final File file;
+    private final Integer dataFileId;
+	private final int preferedSize;
+
+	int length=0;        
+    private int referenceCount;
+    	
+    DataFile(File file, int number, int preferedSize){
+        this.file=file;
+		this.preferedSize = preferedSize;
+        this.dataFileId=new Integer(number);
+        length=(int)(file.exists()?file.length():0);
+    }
+
+    public Integer getDataFileId(){
+        return dataFileId;
+    }
+
+    public synchronized int getLength(){
+        return length;
+    }
+	public void setLength(int length) {
+		this.length=length;
+	}
+    public synchronized void incrementLength(int size){
+        length+=size;
+    }    
+
+    public synchronized int increment(){
+        return ++referenceCount;
+    }
+
+    public synchronized int decrement(){
+        return --referenceCount;
+    }
+
+    public synchronized boolean isUnused(){
+        return referenceCount<=0;
+    }
+    
+    public synchronized String toString(){
+        String result = file.getName() + " number = " + dataFileId + " , length = " + length + " refCount = " + referenceCount;
+        return result;
+    }
+    	
+	public RandomAccessFile openRandomAccessFile(boolean appender) throws IOException {
+		RandomAccessFile rc=new RandomAccessFile(file,"rw");
+		// When we start to write files size them up so that the OS has a chance
+		// to allocate the file contigously.
+		if( appender ){
+	        if( length < preferedSize ) {
+	        	rc.setLength(preferedSize);
+	        }
+		}
+        return rc;
+	}
+
+	public void closeRandomAccessFile(RandomAccessFile file) throws IOException {
+		// On close set the file size to the real size.
+        if( length != file.length() ) {
+			file.setLength(getLength());
+			file.close();
+        }
+	}
+    
+	public synchronized boolean delete() throws IOException{
+        return file.delete();
+    }
+
+	public int compareTo(Object o) {
+		DataFile df = (DataFile) o;
+		return dataFileId - df.dataFileId;
+	}
+
+
+	
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessor.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,143 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
+import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
+import org.apache.activemq.util.ByteSequence;
+/**
+ * Optimized Store reader and updater.  Single threaded and synchronous.  Use in conjunction 
+ * with the DataFileAccessorPool of concurrent use.
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+final class DataFileAccessor {
+    
+	private final DataFile dataFile;
+	private final ConcurrentHashMap<WriteKey, WriteCommand> inflightWrites;
+	private final RandomAccessFile file;
+	private boolean disposed;
+    
+    /**
+     * Construct a Store reader
+     * 
+     * @param file
+     * @throws IOException 
+     */
+    public DataFileAccessor(AsyncDataManager dataManager, DataFile dataFile) throws IOException{
+		this.dataFile = dataFile;
+		this.inflightWrites = dataManager.getInflightWrites();
+		this.file = dataFile.openRandomAccessFile(false);
+    }
+
+	public DataFile getDataFile() {
+		return dataFile;
+	}
+
+	public void dispose() {
+    	if( disposed )
+    		return;
+    	disposed=true;
+        try {
+        	dataFile.closeRandomAccessFile(file);
+		} catch (IOException e) {
+			e.printStackTrace();
+		}
+    }
+    
+    public ByteSequence readRecord(Location location) throws IOException {
+    	
+    	if( !location.isValid() || location.getSize()==Location.NOT_SET )
+    		throw new IOException("Invalid location: "+location);
+    	
+    	WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(location));
+    	if( asyncWrite!= null ) {
+    		return asyncWrite.data;
+    	}
+
+		try {
+			byte[] data=new byte[location.getSize()-AsyncDataManager.ITEM_HEAD_FOOT_SPACE];
+			file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_SPACE);
+			file.readFully(data);
+			return new ByteSequence(data, 0, data.length);
+		} catch (RuntimeException e) {
+			throw new IOException("Invalid location: "+location+", : "+e);
+		}
+    }
+    
+    public void readLocationDetails(Location location) throws IOException {
+    	WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(location));
+    	if( asyncWrite!= null ) {
+    		location.setSize(asyncWrite.location.getSize());
+    		location.setType(asyncWrite.location.getType());
+    	} else {
+	        file.seek(location.getOffset());
+	        location.setSize(file.readInt());
+	        location.setType(file.readByte());
+    	}
+    }
+
+	public boolean readLocationDetailsAndValidate(Location location) {
+    	try {
+        	WriteCommand asyncWrite = (WriteCommand) inflightWrites.get(new WriteKey(location));
+        	if( asyncWrite!= null ) {
+        		location.setSize(asyncWrite.location.getSize());
+        		location.setType(asyncWrite.location.getType());
+        	} else {
+		        file.seek(location.getOffset());
+		        location.setSize(file.readInt());
+		        location.setType(file.readByte());
+				
+				byte data[] = new byte[3];
+				file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_OFFSET_TO_SOR);
+				file.readFully(data);
+				if( data[0] != AsyncDataManager.ITEM_HEAD_SOR[0] ||
+					data[1] != AsyncDataManager.ITEM_HEAD_SOR[1] ||
+					data[2] != AsyncDataManager.ITEM_HEAD_SOR[2] ) {
+					return false;
+				}
+				file.seek(location.getOffset()+location.getSize()-AsyncDataManager.ITEM_FOOT_SPACE);
+				file.readFully(data);
+				if( data[0] != AsyncDataManager.ITEM_HEAD_EOR[0] ||
+					data[1] != AsyncDataManager.ITEM_HEAD_EOR[1] ||
+					data[2] != AsyncDataManager.ITEM_HEAD_EOR[2] ) {
+					return false;
+				}
+			}
+		} catch (IOException e) {
+			return false;
+		}
+		return true;
+	}
+
+	public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException {
+		
+		file.seek(location.getOffset()+AsyncDataManager.ITEM_HEAD_SPACE);
+		int size = Math.min(data.getLength(), location.getSize());
+		file.write(data.getData(), data.getOffset(), size);
+		if( sync ) {
+			file.getFD().sync();
+		}
+        		
+	}
+
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,138 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+
+/**
+ * Used to pool DataFileAccessors.
+ * 
+ * @author chirino
+ */
+public class DataFileAccessorPool {
+
+	private final AsyncDataManager dataManager;
+	private final HashMap<Integer, Pool> pools = new HashMap<Integer, Pool>();
+	private boolean closed=false;
+	
+	int MAX_OPEN_READERS_PER_FILE=5;
+	
+	class Pool {
+		private final DataFile file;
+		private final ArrayList<DataFileAccessor> pool = new ArrayList<DataFileAccessor>();
+		private boolean used; 
+		
+		public Pool(DataFile file) {
+			this.file = file;
+		}
+
+		public DataFileAccessor openDataFileReader() throws IOException {
+			DataFileAccessor rc=null;
+			if( pool.isEmpty() ) {
+				rc = new DataFileAccessor(dataManager, file);
+			} else {
+				rc = (DataFileAccessor) pool.remove(pool.size()-1);
+			}
+			used=true;
+			return rc;
+		}
+
+		public void closeDataFileReader(DataFileAccessor reader) {
+			used=true;
+			if(pool.size() >= MAX_OPEN_READERS_PER_FILE ) {
+				reader.dispose();
+			} else {
+				pool.add(reader);
+			}
+		}
+
+		public void clearUsedMark() {
+			used=false;
+		}
+
+		public boolean isUsed() {
+			return used;
+		}
+
+		public void dispose() {
+			for (DataFileAccessor reader : pool) {
+				reader.dispose();
+			}
+			pool.clear();
+		}
+		
+	}
+	
+	public DataFileAccessorPool(AsyncDataManager dataManager){
+		this.dataManager=dataManager;
+	}
+	
+	synchronized void clearUsedMark() {
+		for (Iterator iter = pools.values().iterator(); iter.hasNext();) {
+			Pool pool = (Pool) iter.next();
+			pool.clearUsedMark();
+		}
+	}
+	
+	synchronized void disposeUnused() {
+		for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
+			Pool pool = iter.next();
+			if( !pool.isUsed() ) {
+				pool.dispose();
+				iter.remove();
+			}
+		}
+	}
+	
+	synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException {
+		if( closed ) {
+			throw new IOException("Closed.");
+		}
+		
+		Pool pool = pools.get(dataFile.getDataFileId());
+		if( pool == null ) {
+			pool = new Pool(dataFile);
+			pools.put(dataFile.getDataFileId(), pool);
+		}
+		return pool.openDataFileReader();
+	}
+	
+	synchronized void closeDataFileAccessor(DataFileAccessor reader) {
+		Pool pool = pools.get(reader.getDataFile().getDataFileId());
+		if( pool == null || closed ) {
+			reader.dispose();
+		} else {
+			pool.closeDataFileReader(reader);
+		}
+	}
+
+	synchronized public void close() {
+		if(closed)
+			return;
+		closed=true;
+		for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
+			Pool pool = iter.next();
+			pool.dispose();
+		}
+		pools.clear();
+	}
+	
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,380 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.RandomAccessFile;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.LinkedNode;
+
+/**
+ * An optimized writer to do batch appends to a data file.  This object is thread safe 
+ * and gains throughput as you increase the number of concurrent writes it does.
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class DataFileAppender {
+    
+	protected static final byte []RESERVED_SPACE= new byte[AsyncDataManager.ITEM_HEAD_RESERVED_SPACE];
+	protected static final String SHUTDOWN_COMMAND = "SHUTDOWN";
+	int MAX_WRITE_BATCH_SIZE = 1024*1024*4;
+	
+	static public class WriteKey {
+	    private final int file;
+	    private final long offset;
+	    private final int hash;
+
+		public WriteKey(Location item){
+	    	file = item.getDataFileId();
+	    	offset = item.getOffset();
+	    	// TODO: see if we can build a better hash  
+	    	hash = (int) (file  ^ offset);
+	    }
+	 
+	    public int hashCode() {
+	    	return hash;  
+	    }
+	    
+	    public boolean equals(Object obj) {
+	    	WriteKey di = (WriteKey)obj;
+	    	return di.file == file && di.offset == offset;
+	    }
+	}
+	
+	public class WriteBatch {
+		
+		public final DataFile dataFile;
+		public final WriteCommand first;
+		public CountDownLatch latch;
+		public int size;
+		
+		public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException {
+			this.dataFile=dataFile;
+			this.first=write;
+			size+=write.location.getSize();
+			if( write.sync ) {
+				latch = new CountDownLatch(1);
+			}
+		}
+		
+		public boolean canAppend(DataFile dataFile, WriteCommand write) {
+			if( dataFile != this.dataFile ) 
+				return false;
+			if( size+write.location.getSize() >= MAX_WRITE_BATCH_SIZE )
+				return false;
+			return true;
+		}
+		
+		public void append(WriteCommand write) throws IOException {
+			this.first.getTailNode().linkAfter(write);
+			size+=write.location.getSize();
+			if( write.sync && latch==null ) {
+				latch = new CountDownLatch(1);
+			}
+		}
+	}
+	
+    public static class WriteCommand extends LinkedNode {    	
+		public final Location location;
+		public final ByteSequence data;
+		final boolean sync;
+		
+		public WriteCommand(Location location, ByteSequence data, boolean sync) {
+			this.location = location;
+			this.data = data;
+			this.sync = sync;
+		}
+    }
+    
+    protected final AsyncDataManager dataManager;    
+    
+    protected final ConcurrentHashMap<WriteKey, WriteCommand> inflightWrites;
+    
+    protected final Object enqueueMutex = new Object();
+    protected WriteBatch nextWriteBatch; 
+    
+    private boolean running;
+    protected boolean shutdown;
+    protected IOException firstAsyncException;
+    protected final CountDownLatch shutdownDone = new CountDownLatch(1);
+	private Thread thread;
+    
+    /**
+     * Construct a Store writer
+     * 
+     * @param file
+     */
+    public DataFileAppender(AsyncDataManager dataManager){
+        this.dataManager=dataManager;
+        this.inflightWrites = this.dataManager.getInflightWrites();
+    }
+    
+    /**
+     * @param type 
+     * @param marshaller
+     * @param payload
+     * @param type 
+     * @param sync 
+     * @return
+     * @throws IOException
+     * @throws  
+     * @throws  
+     */
+    public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
+    	        
+        // Write the packet our internal buffer.
+    	int size = data.getLength()+AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
+    	
+        final Location location=new Location();
+        location.setSize(size);   
+        location.setType(type);
+        
+        WriteBatch batch;
+    	WriteCommand write = new WriteCommand(location, data, sync);
+
+    	// Locate datafile and enqueue into the executor in sychronized block so that 
+        // writes get equeued onto the executor in order that they were assigned by 
+        // the data manager (which is basically just appending)
+    	
+        synchronized(this) {
+            // Find the position where this item will land at.
+	        DataFile dataFile=dataManager.allocateLocation(location);
+        	batch = enqueue(dataFile, write);
+        }
+                
+    	if( sync ) {
+    		try {
+    			batch.latch.await();
+			} catch (InterruptedException e) {
+				throw new InterruptedIOException();
+			}
+    	} else {
+            inflightWrites.put(new WriteKey(location), write);
+    	}
+    	
+        return location;
+    }
+
+    private WriteBatch enqueue(DataFile dataFile, WriteCommand write) throws IOException {
+    	synchronized(enqueueMutex) {
+        	WriteBatch rc=null;
+        	if( shutdown ) {
+        		throw new IOException("Async Writter Thread Shutdown");
+        	}
+        	if( firstAsyncException !=null )
+        		throw firstAsyncException;
+        	
+        	if( !running ) {
+        		running=true;
+        		thread = new Thread() {
+        			public void run() {
+        				processQueue();
+        			}
+        		};
+        		thread.setPriority(Thread.MAX_PRIORITY);
+        		thread.setDaemon(true);
+        		thread.setName("ActiveMQ Data File Writer");
+        		thread.start();
+        	}
+        	
+        	if( nextWriteBatch == null ) {
+        		nextWriteBatch = new WriteBatch(dataFile,write);
+        		rc = nextWriteBatch;
+	    		enqueueMutex.notify();
+        	} else {
+        		// Append to current batch if possible..
+        		if( nextWriteBatch.canAppend(dataFile, write) ) {
+        			nextWriteBatch.append(write);
+            		rc = nextWriteBatch;
+        		} else {
+            		// Otherwise wait for the queuedCommand to be null 
+        	    	try {
+        		    	while( nextWriteBatch!=null ) {
+        		    		enqueueMutex.wait();
+        		    	}    			
+    				} catch (InterruptedException e) {
+    					throw new InterruptedIOException();
+    				}
+    	        	if( shutdown ) {
+    	        		throw new IOException("Async Writter Thread Shutdown");
+    	        	}
+    	        	
+        	    	// Start a new batch.
+            		nextWriteBatch = new WriteBatch(dataFile,write);
+            		rc = nextWriteBatch;
+    	    		enqueueMutex.notify();
+        		}
+        	}
+        	return rc;
+    	}
+	}
+
+    public void close() throws IOException {
+    	synchronized( enqueueMutex ) {
+    		if( shutdown == false ) {
+	    		shutdown = true;
+	    		if( running ) {
+    	    		enqueueMutex.notifyAll();
+	    		} else {
+	    			shutdownDone.countDown();
+	    		}
+    		}
+    	}
+    	
+    	try {
+			shutdownDone.await();
+		} catch (InterruptedException e) {
+			throw new InterruptedIOException();
+		}
+    	
+    }
+
+    /**
+     * The async processing loop that writes to the data files and
+     * does the force calls.  
+     * 
+     * Since the file sync() call is the slowest of all the operations, 
+     * this algorithm tries to 'batch' or group together several file sync() requests 
+     * into a single file sync() call. The batching is accomplished attaching the 
+     * same CountDownLatch instance to every force request in a group.
+     * 
+     */
+    protected void processQueue() {
+		DataFile dataFile=null;
+		RandomAccessFile file=null;
+    	try {
+    		
+    		DataByteArrayOutputStream buff = new DataByteArrayOutputStream(MAX_WRITE_BATCH_SIZE);
+	    	while( true ) {
+	    		
+	    		Object o = null;
+
+	    		// Block till we get a command.
+	    		synchronized(enqueueMutex) {
+	    			while( true ) {
+	    				if( shutdown ) {
+	    					o = SHUTDOWN_COMMAND;
+	    					break;
+	    				}
+	    				if( nextWriteBatch!=null ) {
+	    					o = nextWriteBatch;
+	    					nextWriteBatch=null;
+	    					break;
+	    				}
+	    				enqueueMutex.wait();
+	    			}
+	    			enqueueMutex.notify();
+	            }        
+	    		
+	    		
+	        	if( o == SHUTDOWN_COMMAND ) {
+	        		break;
+	        	} 
+	        	
+	        	WriteBatch wb = (WriteBatch) o;
+				if( dataFile != wb.dataFile ) {
+	        		if( file!=null ) {
+	        			dataFile.closeRandomAccessFile(file);
+	        		}
+	        		dataFile = wb.dataFile;
+	        		file = dataFile.openRandomAccessFile(true);
+	        	}
+	        	
+	        	WriteCommand write = wb.first;
+	        	
+	        	// Write all the data.
+				// Only need to seek to first location.. all others 
+				// are in sequence.
+	        	file.seek(write.location.getOffset());
+	        	
+	        	// 
+        		// is it just 1 big write?
+	        	if( wb.size == write.location.getSize() ) {
+	        		
+	        		// Just write it directly..
+		        	file.writeInt(write.location.getSize());
+		        	file.writeByte(write.location.getType());
+		        	file.write(RESERVED_SPACE);
+		        	file.write(AsyncDataManager.ITEM_HEAD_SOR);	        		
+		        	file.write(write.data.getData(),write.data.getOffset(), write.data.getLength());
+		        	file.write(AsyncDataManager.ITEM_HEAD_EOR);
+		        	
+	        	} else {
+	        		
+	        		// Combine the smaller writes into 1 big buffer
+		        	while( write!=null ) {
+	
+		        		buff.writeInt(write.location.getSize());
+		        		buff.writeByte(write.location.getType());
+		        		buff.write(RESERVED_SPACE);
+		        		buff.write(AsyncDataManager.ITEM_HEAD_SOR);	        		
+		        		buff.write(write.data.getData(),write.data.getOffset(), write.data.getLength());
+		        		buff.write(AsyncDataManager.ITEM_HEAD_EOR);
+		        		
+		        		write = (WriteCommand) write.getNext();
+		        	}
+		        	
+		        	// Now do the 1 big write.
+		        	ByteSequence sequence = buff.toByteSequence();
+		        	file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
+		        	buff.reset();
+	        	}
+	        	
+    			file.getFD().sync();
+    			
+    			WriteCommand lastWrite = (WriteCommand) wb.first.getTailNode();
+    			dataManager.setLastAppendLocation( lastWrite.location );
+    			
+    			// Signal any waiting threads that the write is on disk.
+    			if( wb.latch!=null ) {
+    				wb.latch.countDown();
+    			}
+    			
+    			// Now that the data is on disk, remove the writes from the in flight
+    			// cache.
+	        	write = wb.first;
+	        	while( write!=null ) {
+	        		if( !write.sync ) {
+	        			inflightWrites.remove(new WriteKey(write.location));
+	        		}
+	        		write = (WriteCommand) write.getNext();
+	        	}
+	    	}
+	    	
+		} catch (IOException e) {
+	    	synchronized( enqueueMutex ) {
+	    		firstAsyncException = e;
+	    	}
+		} catch (InterruptedException e) {
+		} finally {
+    		try {
+				if( file!=null ) {
+					dataFile.closeRandomAccessFile(file);
+				}
+			} catch (IOException e) {
+			}
+    		shutdownDone.countDown();
+    	}
+    }
+        
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataManagerFacade.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,157 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.IOException;
+
+import org.apache.activemq.kaha.Marshaller;
+import org.apache.activemq.kaha.StoreLocation;
+import org.apache.activemq.kaha.impl.data.RedoListener;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+
+/**
+ * Provides a Kaha DataManager Facade to the DataManager.
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public final class DataManagerFacade implements org.apache.activemq.kaha.impl.DataManager {
+
+	private static class StoreLocationFacade implements StoreLocation {
+		private final Location location;
+
+		public StoreLocationFacade(Location location) {
+			this.location = location;
+		}
+
+		public int getFile() {
+			return location.getDataFileId();
+		}
+
+		public long getOffset() {
+			return location.getOffset();
+		}
+
+		public int getSize() {
+			return location.getSize();
+		}
+
+		public Location getLocation() {
+			return location;
+		}
+	}
+
+	static private StoreLocation convertToStoreLocation(Location location) {
+		if(location==null)
+			return null;
+		return new StoreLocationFacade(location);
+	}
+	
+	static private Location convertFromStoreLocation(StoreLocation location) {
+		
+		if(location==null)
+			return null;
+		
+		if( location.getClass()== StoreLocationFacade.class )
+			return ((StoreLocationFacade)location).getLocation();
+		
+		Location l = new Location();
+		l.setOffset((int) location.getOffset());
+		l.setSize(location.getSize());
+		l.setDataFileId(location.getFile());
+		return l;
+	}
+
+	static final private ByteSequence FORCE_COMMAND = new ByteSequence(new byte[]{'F', 'O', 'R', 'C', 'E'});
+	
+	AsyncDataManager dataManager;
+	private final String name;
+	private Marshaller redoMarshaller;
+	
+	
+	public DataManagerFacade(AsyncDataManager dataManager, String name) {
+		this.dataManager=dataManager;
+		this.name = name;
+	}
+	
+	public Object readItem(Marshaller marshaller, StoreLocation location) throws IOException {
+		ByteSequence sequence = dataManager.read(convertFromStoreLocation(location));
+		DataByteArrayInputStream dataIn = new DataByteArrayInputStream(sequence);
+        return marshaller.readPayload(dataIn);
+	}
+
+
+	public StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException {
+    	final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
+        marshaller.writePayload(payload,buffer);	
+		ByteSequence data = buffer.toByteSequence();		
+		return convertToStoreLocation(dataManager.write(data, (byte)1, false));
+	}
+
+
+	public void force() throws IOException {
+		dataManager.write(FORCE_COMMAND, (byte)2, true);
+	}
+
+	public void updateItem(StoreLocation location, Marshaller marshaller, Object payload) throws IOException {
+    	final DataByteArrayOutputStream buffer = new DataByteArrayOutputStream();
+        marshaller.writePayload(payload,buffer);	
+		ByteSequence data = buffer.toByteSequence();		
+		dataManager.update(convertFromStoreLocation(location), data, false);
+	}
+	
+	public void close() throws IOException {
+		dataManager.close();
+	}
+
+	public void consolidateDataFiles() throws IOException {
+		dataManager.consolidateDataFiles();
+	}
+
+	public boolean delete() throws IOException {
+		return dataManager.delete();
+	}
+ 	
+	public void addInterestInFile(int file) throws IOException {
+		dataManager.addInterestInFile(file);
+	}
+	public void removeInterestInFile(int file) throws IOException {
+		dataManager.removeInterestInFile(file);
+	}
+
+	public void recoverRedoItems(RedoListener listener) throws IOException {
+		throw new RuntimeException("Not Implemented..");
+	}
+	public StoreLocation storeRedoItem(Object payload) throws IOException {
+		throw new RuntimeException("Not Implemented..");
+	}
+
+	public Marshaller getRedoMarshaller() {
+		return redoMarshaller;
+	}	
+	public void setRedoMarshaller(Marshaller redoMarshaller) {
+		this.redoMarshaller = redoMarshaller;
+	}
+
+	public String getName() {
+		return name;
+	}
+
+	
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/JournalFacade.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,108 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.IOException;
+
+import org.apache.activeio.journal.InvalidRecordLocationException;
+import org.apache.activeio.journal.Journal;
+import org.apache.activeio.journal.JournalEventListener;
+import org.apache.activeio.journal.RecordLocation;
+import org.apache.activeio.packet.ByteArrayPacket;
+import org.apache.activeio.packet.Packet;
+import org.apache.activemq.util.ByteSequence;
+
+/**
+ * Provides a Journal Facade to the DataManager.
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public final class JournalFacade implements Journal {
+
+	
+	public static class RecordLocationFacade implements RecordLocation {
+		private final Location location;
+
+		public RecordLocationFacade(Location location) {
+			this.location = location;
+		}
+
+		public Location getLocation() {
+			return location;
+		}
+
+		public int compareTo(Object o) {
+			RecordLocationFacade rlf = (RecordLocationFacade)o;
+			int rc = location.compareTo(rlf.location);
+			return rc;
+		}
+	}
+
+	static private RecordLocation convertToRecordLocation(Location location) {
+		if(location==null)
+			return null;
+		return new RecordLocationFacade(location);
+	}
+	
+	static private Location convertFromRecordLocation(RecordLocation location) {
+		
+		if(location==null)
+			return null;
+		
+		return ((RecordLocationFacade)location).getLocation();
+	}
+
+	AsyncDataManager dataManager;
+	
+	public JournalFacade(AsyncDataManager dataManager) {
+		this.dataManager = dataManager;
+	}
+
+	public void close() throws IOException {
+		dataManager.close();
+	}
+
+	public RecordLocation getMark() throws IllegalStateException {
+		return convertToRecordLocation(dataManager.getMark());
+	}
+
+	public RecordLocation getNextRecordLocation(RecordLocation location) throws InvalidRecordLocationException, IOException, IllegalStateException {
+		return convertToRecordLocation(dataManager.getNextLocation(convertFromRecordLocation(location)));
+	}
+
+	public Packet read(RecordLocation location) throws InvalidRecordLocationException, IOException, IllegalStateException {
+		ByteSequence rc = dataManager.read(convertFromRecordLocation(location));
+		if( rc == null )
+			return null;
+		return new ByteArrayPacket(rc.getData(), rc.getOffset(), rc.getLength());
+	}
+
+	public void setJournalEventListener(JournalEventListener listener) throws IllegalStateException {
+	}
+
+	public void setMark(RecordLocation location, boolean sync) throws InvalidRecordLocationException, IOException, IllegalStateException {
+		dataManager.setMark(convertFromRecordLocation(location), sync);
+	}
+
+	public RecordLocation write(Packet packet, boolean sync) throws IOException, IllegalStateException {
+		org.apache.activeio.packet.ByteSequence data = packet.asByteSequence();
+		ByteSequence sequence = new ByteSequence(data.getData(), data.getOffset(), data.getLength());
+		return convertToRecordLocation(dataManager.write(sequence, sync));
+	}
+	
+}

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,126 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Used as a location in the data store.
+ * 
+ * @version $Revision: 1.2 $
+ */
+public final class Location {
+    
+    public static final byte MARK_TYPE=-1;
+    public static final byte USER_TYPE=1;    
+    public static final byte NOT_SET_TYPE=0;    
+    public static final int NOT_SET=-1;
+
+    private int dataFileId=NOT_SET;
+    private int offset=NOT_SET;
+    private int size=NOT_SET;
+    private byte type=NOT_SET_TYPE;
+
+    public Location(){}
+    
+    Location(Location item) {
+        this.dataFileId = item.dataFileId;
+        this.offset = item.offset;
+        this.size = item.size;
+        this.type = item.type;
+    }
+    
+    boolean isValid(){
+        return dataFileId != NOT_SET;
+    }
+
+    /**
+     * @return the size of the data record including the header.
+     */
+    public int getSize(){
+        return size;
+    }
+
+    /**
+     * @param size the size of the data record including the header.
+     */
+    public void setSize(int size){
+        this.size=size;
+    }
+
+    /**
+     * @return the size of the payload of the record.
+     */
+    public int getPaylodSize() {
+        return size-AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
+    }  
+    
+    public int getOffset(){
+        return offset;
+    }
+    public void setOffset(int offset){
+        this.offset=offset;
+    }
+
+    public int getDataFileId(){
+        return dataFileId;
+    }
+
+    public void setDataFileId(int file){
+        this.dataFileId=file;
+    }
+
+	public byte getType() {
+		return type;
+	}
+
+	public void setType(byte type) {
+		this.type = type;
+	}
+
+	public String toString(){
+        String result="offset = "+offset+", file = " + dataFileId + ", size = "+size + ", type = "+type;
+        return result;
+    }
+
+	public int compareTo(Object o) {
+		Location l = (Location)o;
+		if( dataFileId == l.dataFileId ) {
+			int rc = offset-l.offset;
+			return rc;
+		}
+		return dataFileId - l.dataFileId;
+	}
+
+	public void writeExternal(DataOutput dos) throws IOException {
+		dos.writeInt(dataFileId);
+		dos.writeInt(offset);
+		dos.writeInt(size);
+		dos.writeByte(type);
+	}
+
+	public void readExternal(DataInput dis) throws IOException {
+		dataFileId = dis.readInt();
+		offset = dis.readInt();
+		size = dis.readInt();
+		type = dis.readByte();
+	}
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/Location.java
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,213 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * An AsyncDataFileAppender that uses NIO ByteBuffers and File chanels to more efficently
+ * copy data to files.
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class NIODataFileAppender extends DataFileAppender {
+    
+    public NIODataFileAppender(AsyncDataManager fileManager) {
+		super(fileManager);
+	}
+
+	/**
+     * The async processing loop that writes to the data files and
+     * does the force calls.  
+     * 
+     * Since the file sync() call is the slowest of all the operations, 
+     * this algorithm tries to 'batch' or group together several file sync() requests 
+     * into a single file sync() call. The batching is accomplished attaching the 
+     * same CountDownLatch instance to every force request in a group.
+     * 
+     */
+    protected void processQueue() {
+		DataFile dataFile=null;
+		RandomAccessFile file=null;
+    	FileChannel channel=null;
+
+    	try {
+    		
+        	ByteBuffer header = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_HEAD_SPACE);
+        	ByteBuffer footer = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_FOOT_SPACE);
+    		ByteBuffer buffer = ByteBuffer.allocateDirect(MAX_WRITE_BATCH_SIZE);
+    		
+    		// Populate the static parts of the headers and footers..
+    		header.putInt(0); // size
+    		header.put((byte) 0); // type
+    		header.put(RESERVED_SPACE); // reserved
+    		header.put(AsyncDataManager.ITEM_HEAD_SOR);    		
+    		footer.put(AsyncDataManager.ITEM_HEAD_EOR);
+    		
+	    	while( true ) {
+	    		
+	    		Object o = null;
+
+	    		// Block till we get a command.
+	    		synchronized(enqueueMutex) {
+	    			while( true ) {
+	    				if( shutdown ) {
+	    					o = SHUTDOWN_COMMAND;
+	    					break;
+	    				}
+	    				if( nextWriteBatch!=null ) {
+	    					o = nextWriteBatch;
+	    					nextWriteBatch=null;
+	    					break;
+	    				}
+	    				enqueueMutex.wait();
+	    			}
+	    			enqueueMutex.notify();
+	            }        
+	    		
+	    		
+	        	if( o == SHUTDOWN_COMMAND ) {
+	        		break;
+	        	} 
+	        	
+	        	WriteBatch wb = (WriteBatch) o;
+				if( dataFile != wb.dataFile ) {
+	        		if( file!=null ) {
+	        			dataFile.closeRandomAccessFile(file);
+	        		}
+	        		dataFile = wb.dataFile;
+	        		file = dataFile.openRandomAccessFile(true);
+	        		channel = file.getChannel();
+	        	}
+	        	
+	        	WriteCommand write = wb.first;
+	        	
+	        	// Write all the data.
+				// Only need to seek to first location.. all others 
+				// are in sequence.
+	        	file.seek(write.location.getOffset());
+	 
+	        	// 
+        		// is it just 1 big write?
+	        	if( wb.size == write.location.getSize() ) {
+	        		
+	        		header.clear();
+	        		header.putInt(write.location.getSize());
+	        		header.put(write.location.getType());
+	        		header.clear();
+	                transfer(header, channel);
+	                ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(), write.data.getLength());
+	                transfer(source, channel);
+		        	footer.clear();
+	                transfer(footer, channel);
+		        	
+	        	} else {
+	        		
+	        		// Combine the smaller writes into 1 big buffer
+		        	while( write!=null ) {
+	        		
+		        		header.clear();
+		        		header.putInt(write.location.getSize());
+		        		header.put(write.location.getType());
+		        		header.clear();
+		        		copy(header, buffer);
+		        		assert !header.hasRemaining();
+		        		
+		                ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(), write.data.getLength());
+		                copy(source, buffer);
+		        		assert !source.hasRemaining();
+
+		        		footer.clear();
+		        		copy(footer, buffer);
+		        		assert !footer.hasRemaining();
+	
+		        		write = (WriteCommand) write.getNext();
+		        	}
+		        	
+	    			// Fully write out the buffer..
+	    			buffer.flip();
+	                transfer(buffer, channel);
+	                buffer.clear();
+		        }
+		        	
+    			file.getChannel().force(false);
+
+    			WriteCommand lastWrite = (WriteCommand) wb.first.getTailNode();
+    			dataManager.setLastAppendLocation( lastWrite.location );
+
+    			// Signal any waiting threads that the write is on disk.
+    			if( wb.latch!=null ) {
+    				wb.latch.countDown();
+    			}
+    			
+    			// Now that the data is on disk, remove the writes from the in flight
+    			// cache.
+	        	write = wb.first;
+	        	while( write!=null ) {
+	        		if( !write.sync ) {
+	        			inflightWrites.remove(new WriteKey(write.location));
+	        		}
+	        		write = (WriteCommand) write.getNext();
+	        	}
+	    	}
+	    	
+		} catch (IOException e) {
+	    	synchronized( enqueueMutex ) {
+	    		firstAsyncException = e;
+	    	}
+		} catch (InterruptedException e) {
+		} finally {
+    		try {
+				if( file!=null ) {
+					dataFile.closeRandomAccessFile(file);
+				}
+			} catch (IOException e) {
+			}
+    		shutdownDone.countDown();
+    	}
+    }
+
+    /**
+     * Copy the bytes in header to the channel.
+     * @param header - source of data
+     * @param channel - destination where the data will be written.
+     * @throws IOException
+     */
+	private void transfer(ByteBuffer header, FileChannel channel) throws IOException {
+		while (header.hasRemaining()) {
+		    channel.write(header);                
+		}
+	}
+
+	private int copy(ByteBuffer src, ByteBuffer dest) {
+	    int rc = Math.min(dest.remaining(), src.remaining()); 
+		if( rc > 0 ) {
+		    // Adjust our limit so that we don't overflow the dest buffer. 
+			int limit = src.limit();
+			src.limit(src.position()+rc);
+            dest.put(src);
+            // restore the limit.
+			src.limit(limit);
+		}
+		return rc;
+	}
+        
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java Fri Nov 24 22:00:56 2006
@@ -24,7 +24,7 @@
 import org.apache.activemq.kaha.IndexTypes;
 import org.apache.activemq.kaha.RuntimeStoreException;
 import org.apache.activemq.kaha.StoreEntry;
-import org.apache.activemq.kaha.impl.data.DataManager;
+import org.apache.activemq.kaha.impl.DataManager;
 import org.apache.activemq.kaha.impl.data.Item;
 import org.apache.activemq.kaha.impl.index.DiskIndexLinkedList;
 import org.apache.activemq.kaha.impl.index.IndexItem;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java Fri Nov 24 22:00:56 2006
@@ -27,7 +27,7 @@
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.kaha.StoreEntry;
 import org.apache.activemq.kaha.StoreLocation;
-import org.apache.activemq.kaha.impl.data.DataManager;
+import org.apache.activemq.kaha.impl.DataManager;
 import org.apache.activemq.kaha.impl.data.Item;
 import org.apache.activemq.kaha.impl.index.IndexItem;
 import org.apache.activemq.kaha.impl.index.IndexManager;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java Fri Nov 24 22:00:56 2006
@@ -32,7 +32,7 @@
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.kaha.StoreEntry;
 import org.apache.activemq.kaha.StoreLocation;
-import org.apache.activemq.kaha.impl.data.DataManager;
+import org.apache.activemq.kaha.impl.DataManager;
 import org.apache.activemq.kaha.impl.data.Item;
 import org.apache.activemq.kaha.impl.index.IndexItem;
 import org.apache.activemq.kaha.impl.index.IndexLinkedList;