You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/11/25 07:00:57 UTC

svn commit: r479089 [2/2] - in /incubator/activemq/trunk: ./ activemq-core/ activemq-core/src/main/java/org/apache/activemq/kaha/impl/ activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/ activemq-core/src/main/java/org/apache/activemq/kaha...

Copied: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java (from r477680, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java)
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java?view=diff&rev=479089&p1=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java&r1=477680&p2=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java Fri Nov 24 22:00:56 2006
@@ -28,6 +28,7 @@
 
 import org.apache.activemq.kaha.Marshaller;
 import org.apache.activemq.kaha.StoreLocation;
+import org.apache.activemq.kaha.impl.DataManager;
 import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.commons.logging.Log;
@@ -37,21 +38,19 @@
  * 
  * @version $Revision: 1.1.1.1 $
  */
-public final class DataManager{
+public final class DataManagerImpl implements DataManager {
     
-    private static final Log log=LogFactory.getLog(DataManager.class);
+    private static final Log log=LogFactory.getLog(DataManagerImpl.class);
     public static long MAX_FILE_LENGTH=1024*1024*32;
     private static final String NAME_PREFIX="data-";
     private final File dir;
     private final String name;
-    private DataFileReader reader;
-    private DataFileWriter writer;
+    private SyncDataFileReader reader;
+    private SyncDataFileWriter writer;
     private DataFile currentWriteFile;
     private long maxFileLength = MAX_FILE_LENGTH;
     Map fileMap=new HashMap();
     
-    private boolean useAsyncWriter=false;
-
     public static final int ITEM_HEAD_SIZE=5; // type + length
     public static final byte DATA_ITEM_TYPE=1;
     public static final byte REDO_ITEM_TYPE=2;
@@ -59,7 +58,7 @@
     Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER;
     private String dataFilePrefix;
 
-    public DataManager(File dir, final String name){
+    public DataManagerImpl(File dir, final String name){
         this.dir=dir;
         this.name=name;
         
@@ -93,6 +92,9 @@
         return result;
     }
 
+    /* (non-Javadoc)
+	 * @see org.apache.activemq.kaha.impl.data.IDataManager#getName()
+	 */
     public String getName(){
         return name;
     }
@@ -121,22 +123,37 @@
         return dataFile;
     }
     
+    /* (non-Javadoc)
+	 * @see org.apache.activemq.kaha.impl.data.IDataManager#readItem(org.apache.activemq.kaha.Marshaller, org.apache.activemq.kaha.StoreLocation)
+	 */
     public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException{
         return getReader().readItem(marshaller,item);
     }
 
+    /* (non-Javadoc)
+	 * @see org.apache.activemq.kaha.impl.data.IDataManager#storeDataItem(org.apache.activemq.kaha.Marshaller, java.lang.Object)
+	 */
     public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException{
         return getWriter().storeItem(marshaller,payload, DATA_ITEM_TYPE);
     }
     
+    /* (non-Javadoc)
+	 * @see org.apache.activemq.kaha.impl.data.IDataManager#storeRedoItem(java.lang.Object)
+	 */
     public synchronized StoreLocation storeRedoItem(Object payload) throws IOException{
         return getWriter().storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
     }
     
+    /* (non-Javadoc)
+	 * @see org.apache.activemq.kaha.impl.data.IDataManager#updateItem(org.apache.activemq.kaha.StoreLocation, org.apache.activemq.kaha.Marshaller, java.lang.Object)
+	 */
     public synchronized void updateItem(StoreLocation location,Marshaller marshaller, Object payload) throws IOException {
         getWriter().updateItem((DataItem)location,marshaller,payload,DATA_ITEM_TYPE);
     }
 
+    /* (non-Javadoc)
+	 * @see org.apache.activemq.kaha.impl.data.IDataManager#recoverRedoItems(org.apache.activemq.kaha.impl.data.RedoListener)
+	 */
     public synchronized void recoverRedoItems(RedoListener listener) throws IOException{
         
         // Nothing to recover if there is no current file.
@@ -179,6 +196,9 @@
         }
     }
     
+    /* (non-Javadoc)
+	 * @see org.apache.activemq.kaha.impl.data.IDataManager#close()
+	 */
     public synchronized void close() throws IOException{
     	getWriter().close();
         for(Iterator i=fileMap.values().iterator();i.hasNext();){
@@ -189,6 +209,9 @@
         fileMap.clear();
     }
 
+    /* (non-Javadoc)
+	 * @see org.apache.activemq.kaha.impl.data.IDataManager#force()
+	 */
     public synchronized void force() throws IOException{
         for(Iterator i=fileMap.values().iterator();i.hasNext();){
             DataFile dataFile=(DataFile) i.next();
@@ -197,6 +220,9 @@
     }
 
         
+    /* (non-Javadoc)
+	 * @see org.apache.activemq.kaha.impl.data.IDataManager#delete()
+	 */
     public synchronized boolean delete() throws IOException{
         boolean result=true;
         for(Iterator i=fileMap.values().iterator();i.hasNext();){
@@ -208,6 +234,9 @@
     }
     
 
+    /* (non-Javadoc)
+	 * @see org.apache.activemq.kaha.impl.data.IDataManager#addInterestInFile(int)
+	 */
     public synchronized void addInterestInFile(int file) throws IOException{
         if(file>=0){
             Integer key=new Integer(file);
@@ -225,6 +254,9 @@
         }
     }
 
+    /* (non-Javadoc)
+	 * @see org.apache.activemq.kaha.impl.data.IDataManager#removeInterestInFile(int)
+	 */
     public synchronized void removeInterestInFile(int file) throws IOException{
         if(file>=0){
             Integer key=new Integer(file);
@@ -243,6 +275,9 @@
         }
     }
 
+    /* (non-Javadoc)
+	 * @see org.apache.activemq.kaha.impl.data.IDataManager#consolidateDataFiles()
+	 */
     public synchronized void consolidateDataFiles() throws IOException{
         List purgeList=new ArrayList();
         for(Iterator i=fileMap.values().iterator();i.hasNext();){
@@ -264,10 +299,16 @@
         log.debug("discarding data file "+dataFile+(result?"successful ":"failed"));
     }
 
+    /* (non-Javadoc)
+	 * @see org.apache.activemq.kaha.impl.data.IDataManager#getRedoMarshaller()
+	 */
     public Marshaller getRedoMarshaller() {
         return redoMarshaller;
     }
 
+    /* (non-Javadoc)
+	 * @see org.apache.activemq.kaha.impl.data.IDataManager#setRedoMarshaller(org.apache.activemq.kaha.Marshaller)
+	 */
     public void setRedoMarshaller(Marshaller redoMarshaller) {
         this.redoMarshaller = redoMarshaller;
     }
@@ -290,45 +331,30 @@
         return "DataManager:("+NAME_PREFIX+name+")";
     }
 
-	public synchronized DataFileReader getReader() {
+	public synchronized SyncDataFileReader getReader() {
 		if( reader == null ) {
 			reader = createReader();
 		}
 		return reader;
 	}
-	protected synchronized DataFileReader createReader() {
-		if( useAsyncWriter ) {
-			return new AsyncDataFileReader(this, (AsyncDataFileWriter) getWriter());
-		} else {
-			return new SyncDataFileReader(this);
-		}
+	protected synchronized SyncDataFileReader createReader() {
+		return new SyncDataFileReader(this);
 	}
-	public synchronized void setReader(DataFileReader reader) {
+	public synchronized void setReader(SyncDataFileReader reader) {
 		this.reader = reader;
 	}
 
-	public synchronized DataFileWriter getWriter() {
+	public synchronized SyncDataFileWriter getWriter() {
 		if( writer==null ) {
 			writer = createWriter();
 		}
 		return writer;
 	}
-	private DataFileWriter createWriter() {
-		if( useAsyncWriter ) {
-			return new AsyncDataFileWriter(this);
-		} else {
-			return new SyncDataFileWriter(this);
-		}
+	private SyncDataFileWriter createWriter() {
+		return new SyncDataFileWriter(this);
 	}
-	public synchronized void setWriter(DataFileWriter writer) {
+	public synchronized void setWriter(SyncDataFileWriter writer) {
 		this.writer = writer;
 	}
 
-	public synchronized boolean isUseAsyncWriter() {
-		return useAsyncWriter;
-	}
-
-	public synchronized void setUseAsyncWriter(boolean useAsyncWriter) {
-		this.useAsyncWriter = useAsyncWriter;
-	}
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java Fri Nov 24 22:00:56 2006
@@ -27,9 +27,9 @@
  * 
  * @version $Revision: 1.1.1.1 $
  */
-final class SyncDataFileReader implements DataFileReader {
+public final class SyncDataFileReader {
     
-    private DataManager dataManager;
+    private DataManagerImpl dataManager;
     private DataByteArrayInputStream dataIn;
 
     /**
@@ -37,7 +37,7 @@
      * 
      * @param file
      */
-    SyncDataFileReader(DataManager fileManager){
+    SyncDataFileReader(DataManagerImpl fileManager){
         this.dataManager=fileManager;
         this.dataIn=new DataByteArrayInputStream();
     }
@@ -62,7 +62,7 @@
         // TODO: we could reuse the buffer in dataIn if it's big enough to avoid
         // allocating byte[] arrays on every readItem.
         byte[] data=new byte[item.getSize()];
-        file.seek(item.getOffset()+DataManager.ITEM_HEAD_SIZE);
+        file.seek(item.getOffset()+DataManagerImpl.ITEM_HEAD_SIZE);
         file.readFully(data);
         dataIn.restart(data);
         return marshaller.readPayload(dataIn);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java Fri Nov 24 22:00:56 2006
@@ -21,7 +21,6 @@
 import java.io.RandomAccessFile;
 
 import org.apache.activemq.kaha.Marshaller;
-import org.apache.activemq.kaha.StoreLocation;
 import org.apache.activemq.util.DataByteArrayOutputStream;
 /**
  * Optimized Store writer.  Synchronously marshalls and writes to the data file. Simple but 
@@ -29,10 +28,10 @@
  * 
  * @version $Revision: 1.1.1.1 $
  */
-final class SyncDataFileWriter implements DataFileWriter{
+final public class SyncDataFileWriter {
     
     private DataByteArrayOutputStream buffer;
-    private DataManager dataManager;
+    private DataManagerImpl dataManager;
 
 
     /**
@@ -40,7 +39,7 @@
      * 
      * @param file
      */
-    SyncDataFileWriter(DataManager fileManager){
+    SyncDataFileWriter(DataManagerImpl fileManager){
         this.dataManager=fileManager;
         this.buffer=new DataByteArrayOutputStream();
     }
@@ -52,10 +51,10 @@
         
         // Write the packet our internal buffer.
         buffer.reset();
-        buffer.position(DataManager.ITEM_HEAD_SIZE);
+        buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
         marshaller.writePayload(payload,buffer);
         int size=buffer.size();
-        int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
+        int payloadSize=size-DataManagerImpl.ITEM_HEAD_SIZE;
         buffer.reset();
         buffer.writeByte(type);
         buffer.writeInt(payloadSize);
@@ -80,10 +79,10 @@
     public synchronized void updateItem(DataItem item,Marshaller marshaller, Object payload, byte type) throws IOException {
         //Write the packet our internal buffer.
         buffer.reset();
-        buffer.position(DataManager.ITEM_HEAD_SIZE);
+        buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
         marshaller.writePayload(payload,buffer);
         int size=buffer.size();
-        int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
+        int payloadSize=size-DataManagerImpl.ITEM_HEAD_SIZE;
         buffer.reset();
         buffer.writeByte(type);
         buffer.writeInt(payloadSize);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java Fri Nov 24 22:00:56 2006
@@ -26,7 +26,7 @@
 
 
 import org.apache.activemq.kaha.StoreEntry;
-import org.apache.activemq.kaha.impl.data.DataManager;
+import org.apache.activemq.kaha.impl.DataManager;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java Fri Nov 24 22:00:56 2006
@@ -20,7 +20,7 @@
 import java.io.IOException;
 import java.io.RandomAccessFile;
 
-import org.apache.activemq.kaha.impl.data.DataManager;
+import org.apache.activemq.kaha.impl.DataManager;
 import org.apache.activemq.util.DataByteArrayOutputStream;
 /**
  * Optimized Store writer

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,157 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+/**
+ * Provides a base class for you to extend when you want object to maintain
+ * a doubly linked list to other objects without using a collection class. 
+ * 
+ * @author chirino
+ */
+public class LinkedNode {
+	
+	protected LinkedNode next=this;
+	protected LinkedNode prev=this;
+	protected boolean tail=true; 
+	
+
+	public LinkedNode getHeadNode() {
+		if( isHeadNode() ) {
+			return this;
+		}
+		if( isTailNode() ) {
+			return next;
+		}
+		LinkedNode rc = prev;
+		while(!rc.isHeadNode()) {
+			rc = rc.prev;
+		}
+		return rc;
+	}
+	
+	public LinkedNode getTailNode() {
+		if( isTailNode() ) {
+			return this;
+		}
+		if( isHeadNode() ) {
+			return prev;
+		}
+		LinkedNode rc = next;
+		while(!rc.isTailNode()) {
+			rc = rc.next;
+		}
+		return rc;
+	}
+
+	public LinkedNode getNext() {
+		return tail ? null : next;
+	}
+
+	public LinkedNode getPrevious() {
+		return prev.tail ? null : prev;
+	}
+
+	public boolean isHeadNode() {
+		return prev.isTailNode();
+	}
+	
+	public boolean isTailNode() {
+		return tail;
+	}
+
+	/**
+	 * @param rightHead the node to link after this node.
+	 * @return this
+	 */
+	public LinkedNode linkAfter(LinkedNode rightHead) {
+		
+		if( rightHead == this ) {
+			throw new IllegalArgumentException("You cannot link to yourself");
+		}
+		if( !rightHead.isHeadNode() ) {
+			throw new IllegalArgumentException("You only insert nodes that are the first in a list");
+		}
+
+		LinkedNode rightTail = rightHead.prev;
+
+		if( tail ) {
+			tail = false;
+		} else {
+			rightTail.tail=false;
+		}
+				
+		rightHead.prev = this; // link the head of the right side.
+		rightTail.next = next; // link the tail of the right side
+		next.prev = rightTail; // link the head of the left side 		
+		next = rightHead;	   // link the tail of the left side.
+		
+		return this;
+	}
+
+	
+	/**
+	 * @param leftHead the node to link after this node.
+	 * @return 
+	 * @return this
+	 */
+	public LinkedNode linkBefore(LinkedNode leftHead) {
+		
+		
+		if( leftHead == this ) {
+			throw new IllegalArgumentException("You cannot link to yourself");
+		}
+		if( !leftHead.isHeadNode() ) {
+			throw new IllegalArgumentException("You only insert nodes that are the first in a list");
+		}
+
+		// The left side is no longer going to be a tail..
+		LinkedNode leftTail = leftHead.prev;
+		leftTail.tail = false;
+		
+		leftTail.next = this; // link the tail of the left side.		
+		leftHead.prev = prev; // link the head of the left side.
+		prev.next = leftHead; // link the tail of the right side.
+		prev = leftTail;      // link the head of the right side.
+		
+		return leftHead;
+	}
+
+	/**
+	 * Removes this node out of the linked list it is chained in.  
+	 */
+	public void unlink() {
+		// If we are allready unlinked...
+		if( prev==this ) {
+			return;
+		}
+		
+		if( tail ) {
+			prev.tail = true;
+		}
+		
+		// Update the peers links..
+		next.prev = prev;
+		prev.next = next;
+		
+		// Update our links..
+		next = this;
+		prev = this;
+		tail=true;
+	}
+	
+}

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java Fri Nov 24 22:00:56 2006
@@ -18,13 +18,13 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+
 import junit.framework.TestCase;
+
 import org.apache.activemq.kaha.IndexTypes;
 import org.apache.activemq.kaha.StoreFactory;
-import org.apache.activemq.kaha.impl.KahaStore;
 import org.apache.activemq.kaha.impl.container.ContainerId;
 import org.apache.activemq.kaha.impl.container.ListContainerImpl;
-import org.apache.activemq.kaha.impl.data.DataManager;
 import org.apache.activemq.kaha.impl.index.IndexItem;
 import org.apache.activemq.kaha.impl.index.IndexManager;
 /**

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalImplTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalImplTest.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalImplTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalImplTest.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,180 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.activeio.journal.InvalidRecordLocationException;
+import org.apache.activeio.journal.RecordLocation;
+import org.apache.activeio.packet.ByteArrayPacket;
+import org.apache.activeio.packet.Packet;
+import org.apache.activemq.kaha.impl.async.JournalFacade.RecordLocationFacade;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Tests the AsyncDataManager based Journal
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class JournalImplTest extends TestCase {
+
+    Log log = LogFactory.getLog(JournalImplTest.class);
+	
+    int size = 1024*10;
+    int logFileCount=2;
+    File logDirectory = new File("target/dm-data2");
+	private JournalFacade journal;
+    
+    /**
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        if( logDirectory.exists() ) {
+        	deleteDir(logDirectory);
+        }
+        assertTrue("Could not delete directory: "+logDirectory.getCanonicalPath(), !logDirectory.exists() );
+        AsyncDataManager dm = new AsyncDataManager();
+        dm.setDirectory(logDirectory);
+        dm.setMaxFileLength(1024*64);
+        dm.start();   
+        journal = new JournalFacade(dm);
+    }
+
+    /**
+	 */
+	private void deleteDir(File f) {
+		File[] files = f.listFiles();
+		for (int i = 0; i < files.length; i++) {
+			File file = files[i];
+			file.delete();
+		}
+		f.delete();
+	}
+
+	protected void tearDown() throws Exception {
+		journal.close();
+        if( logDirectory.exists() )
+        	deleteDir(logDirectory);
+        //assertTrue( !logDirectory.exists() );
+    }
+    
+    public void testLogFileCreation() throws IOException {
+        	RecordLocation mark = journal.getMark();
+        	assertNull(mark);
+    }
+    
+    @SuppressWarnings("unchecked")
+	public void testAppendAndRead() throws InvalidRecordLocationException, InterruptedException, IOException {
+    	
+        Packet data1 = createPacket("Hello World 1");
+    	RecordLocation location1 = journal.write( data1, false);
+    	Packet data2 = createPacket("Hello World 2");
+    	RecordLocation location2 = journal.write( data2, false);
+    	Packet data3  = createPacket("Hello World 3");
+    	RecordLocation location3 = journal.write( data3, false);
+    	
+    	//Thread.sleep(1000);
+    	
+    	// Now see if we can read that data.
+    	Packet data;
+    	data = journal.read(location2);
+    	assertEquals( data2, data);
+    	data = journal.read(location1);
+    	assertEquals( data1, data);
+    	data = journal.read(location3);
+    	assertEquals( data3, data);
+    	
+    	// Can we cursor the data?
+    	RecordLocation l=journal.getNextRecordLocation(null);
+    	int t = l.compareTo(location1);
+    	assertEquals(0, t);
+    	data = journal.read(l);
+    	assertEquals( data1, data);
+
+    	l=journal.getNextRecordLocation(l);
+    	assertEquals(0, l.compareTo(location2));
+    	data = journal.read(l);
+    	assertEquals( data2, data);
+
+    	l=journal.getNextRecordLocation(l);
+    	assertEquals(0, l.compareTo(location3));
+    	data = journal.read(l);
+    	assertEquals( data3, data);
+    	
+    	l=journal.getNextRecordLocation(l);
+    	assertNull(l);
+    	
+    	log.info(journal);
+    }
+
+    public void testCanReadFromArchivedLogFile() throws InvalidRecordLocationException, InterruptedException, IOException {
+        
+        Packet data1 = createPacket("Hello World 1");
+        RecordLocationFacade location1 = (RecordLocationFacade) journal.write( data1, false);
+        
+        RecordLocationFacade  pos;
+        int counter = 0;
+        do {
+            
+            Packet p = createPacket("<<<data>>>");
+            pos = (RecordLocationFacade) journal.write( p, false);
+            if( counter++ % 1000 == 0 ) {
+            	journal.setMark(pos, false);
+            }
+            
+        } while( pos.getLocation().getDataFileId() < 5 );
+        
+        // Now see if we can read that first packet.
+        Packet data;
+        data = journal.read(location1);
+        assertEquals( data1, data);
+        
+    }
+
+    /**
+     * @param string
+     * @return
+     */
+    private Packet createPacket(String string) {
+        return new ByteArrayPacket(string.getBytes());
+    }
+
+    public static void assertEquals(Packet arg0, Packet arg1) {
+        assertEquals(arg0.sliceAsBytes(), arg1.sliceAsBytes());
+    }
+    
+    public static void assertEquals(byte[] arg0, byte[] arg1) {
+    	
+//    	System.out.println("Comparing: "+new String(arg0)+" and "+new String(arg1));
+    	if( arg0==null ^ arg1==null )
+    		fail("Not equal: "+arg0+" != "+arg1);
+    	if( arg0==null )
+    		return;
+    	if( arg0.length!=arg1.length)
+    		fail("Array lenght not equal: "+arg0.length+" != "+arg1.length);
+    	for( int i=0; i<arg0.length;i++) {
+    		if( arg0[i]!= arg1[i]) {
+        		fail("Array item not equal at index "+i+": "+arg0[i]+" != "+arg1[i]);
+    		}
+    	}
+	}
+}

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalPerfTool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalPerfTool.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalPerfTool.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalPerfTool.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,82 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.activeio.journal.Journal;
+import org.apache.activeio.journal.JournalPerfToolSupport;
+import org.apache.activemq.kaha.impl.async.AsyncDataManager;
+
+/**
+ * A Performance statistics gathering tool for the AsyncDataManager based Journal.
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class JournalPerfTool extends JournalPerfToolSupport {
+	
+	private int logFileSize = 1024*1024*50;
+		
+	public static void main(String[] args) throws Exception {
+		JournalPerfTool tool = new JournalPerfTool();
+		
+        tool.initialWorkers=10;
+        tool.syncFrequency=15;
+        tool.workerIncrement=0;
+        tool.workerThinkTime=0;
+        tool.verbose=false;
+        tool.incrementDelay=5*1000;
+
+		if( args.length > 0 ) {
+			tool.journalDirectory = new File(args[0]);
+		}
+		if( args.length > 1 ) {
+			tool.workerIncrement = Integer.parseInt(args[1]);
+		}
+		if( args.length > 2 ) {
+			tool.incrementDelay = Long.parseLong(args[2]);
+		}
+		if( args.length > 3 ) {
+			tool.verbose = Boolean.getBoolean(args[3]);
+		}
+		if( args.length > 4 ) {
+			tool.recordSize = Integer.parseInt(args[4]);
+		}
+		if( args.length > 5 ) {
+			tool.syncFrequency = Integer.parseInt(args[5]);
+		}
+		if( args.length > 6 ) {
+			tool.workerThinkTime = Integer.parseInt(args[6]);
+		}
+		tool.exec();
+	}
+
+	/**
+	 * @throws IOException
+	 * @see org.apache.activeio.journal.JournalPerfToolSupport#createJournal()
+	 */
+	public Journal createJournal() throws IOException {		
+        AsyncDataManager dm = new AsyncDataManager();
+		dm.setMaxFileLength(logFileSize);
+        dm.setDirectory(this.journalDirectory);
+        dm.start();   
+        return new JournalFacade(dm);
+	}
+	
+}

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalRWPerfTool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalRWPerfTool.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalRWPerfTool.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalRWPerfTool.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.File;
+
+import java.io.IOException;
+
+import org.apache.activeio.journal.Journal;
+import org.apache.activeio.journal.JournalRWPerfToolSupport;
+import org.apache.activemq.kaha.impl.async.AsyncDataManager;
+
+/**
+ * A Performance statistics gathering tool for the AsyncDataManager based Journal.
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class JournalRWPerfTool extends JournalRWPerfToolSupport {
+	
+	private int logFileSize = 1024*1024*50;
+		
+	public static void main(String[] args) throws Exception {
+		JournalRWPerfTool tool = new JournalRWPerfTool();
+		
+        tool.initialWriteWorkers=10;
+        tool.syncFrequency=15;
+        tool.writeWorkerIncrement=0;
+        tool.writeWorkerThinkTime=0;
+        tool.verbose=false;
+        tool.incrementDelay=5*1000;
+
+		if( args.length > 0 ) {
+			tool.journalDirectory = new File(args[0]);
+		}
+		if( args.length > 1 ) {
+			tool.writeWorkerIncrement = Integer.parseInt(args[1]);
+		}
+		if( args.length > 2 ) {
+			tool.incrementDelay = Long.parseLong(args[2]);
+		}
+		if( args.length > 3 ) {
+			tool.verbose = Boolean.getBoolean(args[3]);
+		}
+		if( args.length > 4 ) {
+			tool.recordSize = Integer.parseInt(args[4]);
+		}
+		if( args.length > 5 ) {
+			tool.syncFrequency = Integer.parseInt(args[5]);
+		}
+		if( args.length > 6 ) {
+			tool.writeWorkerThinkTime = Integer.parseInt(args[6]);
+		}
+		tool.exec();
+	}
+
+	/**
+	 * @throws IOException
+	 * @see org.apache.activeio.journal.JournalPerfToolSupport#createJournal()
+	 */
+	public Journal createJournal() throws IOException {
+        AsyncDataManager dm = new AsyncDataManager();
+		dm.setMaxFileLength(logFileSize);
+        dm.setDirectory(this.journalDirectory);
+        dm.start();   
+        return new JournalFacade(dm);
+	}
+	
+}

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/LocationTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/LocationTest.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/LocationTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/LocationTest.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.kaha.impl.async.JournalFacade.RecordLocationFacade;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests the Location Class
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class LocationTest extends TestCase {
+        
+    @SuppressWarnings("unchecked")
+	synchronized public void testRecordLocationImplComparison() throws IOException {
+        Location l1 = new Location();
+        l1.setDataFileId(0);
+        l1.setOffset(5);        
+        Location l2 = new Location(l1);
+        l2.setOffset(10);
+        Location l3 = new Location(l2);
+        l3.setDataFileId(2);
+        l3.setOffset(0);
+
+        assertTrue( l1.compareTo(l2)<0 );
+        
+        // Sort them using a list.  Put them in the wrong order.
+        ArrayList<RecordLocationFacade> l = new ArrayList<RecordLocationFacade>();
+        l.add(new RecordLocationFacade(l2));
+        l.add(new RecordLocationFacade(l3));
+        l.add(new RecordLocationFacade(l1));        
+        Collections.sort(l);
+        
+        // Did they get sorted to the correct order?
+        System.out.println(l.get(0));
+        assertSame( l.get(0).getLocation(), l1 );
+        assertSame( l.get(1).getLocation(), l2 );
+        assertSame( l.get(2).getLocation(), l3 );
+    }
+}

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/LinkedNodeTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/LinkedNodeTest.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/LinkedNodeTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/LinkedNodeTest.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,176 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import junit.framework.TestCase;
+
+/**
+ * 
+ * @author chirino
+ */
+public class LinkedNodeTest extends TestCase {
+	
+	
+	static class IntLinkedNode extends LinkedNode {
+		public final int v;
+
+		public IntLinkedNode(int v){
+			this.v = v;
+		};
+		
+		@Override
+		public String toString() {
+			return ""+v;
+		}
+	}
+
+	IntLinkedNode i1 = new IntLinkedNode(1);
+	IntLinkedNode i2 = new IntLinkedNode(2);
+	IntLinkedNode i3 = new IntLinkedNode(3);
+	IntLinkedNode i4 = new IntLinkedNode(4);
+	IntLinkedNode i5 = new IntLinkedNode(5);
+	IntLinkedNode i6 = new IntLinkedNode(6);
+
+	public void testLinkAfter() {
+		
+		i1.linkAfter(i2.linkAfter(i3));
+		// Order should be 1,2,3
+		
+		assertTrue( i1.getNext() == i2 );
+		assertTrue( i1.getNext().getNext() == i3 );		
+		assertNull( i1.getNext().getNext().getNext() );
+
+		assertTrue( i3.getPrevious() == i2 );
+		assertTrue( i3.getPrevious().getPrevious() == i1 );		
+		assertNull( i3.getPrevious().getPrevious().getPrevious() );
+
+		assertTrue( i1.isHeadNode() );
+		assertFalse(i1.isTailNode() );
+		assertFalse(i2.isHeadNode() );
+		assertFalse(i2.isTailNode() );
+		assertTrue( i3.isTailNode() );
+		assertFalse(i3.isHeadNode() );
+		
+		i1.linkAfter(i4.linkAfter(i5));
+		
+		// Order should be 1,4,5,2,3
+		
+		assertTrue( i1.getNext() == i4 );
+		assertTrue( i1.getNext().getNext() == i5 );
+		assertTrue( i1.getNext().getNext().getNext() == i2 );
+		assertTrue( i1.getNext().getNext().getNext().getNext() == i3 );
+		assertNull( i1.getNext().getNext().getNext().getNext().getNext() );
+
+		assertTrue( i3.getPrevious() == i2 );
+		assertTrue( i3.getPrevious().getPrevious() == i5 );		
+		assertTrue( i3.getPrevious().getPrevious().getPrevious() == i4 );		
+		assertTrue( i3.getPrevious().getPrevious().getPrevious().getPrevious() == i1 );		
+		assertNull( i3.getPrevious().getPrevious().getPrevious().getPrevious().getPrevious() );
+		
+		assertTrue( i1.isHeadNode() );
+		assertFalse(i1.isTailNode() );
+		assertFalse(i4.isHeadNode() );
+		assertFalse(i4.isTailNode() );
+		assertFalse(i5.isHeadNode() );
+		assertFalse(i5.isTailNode() );
+		assertFalse(i2.isHeadNode() );
+		assertFalse(i2.isTailNode() );
+		assertTrue( i3.isTailNode() );
+		assertFalse(i3.isHeadNode() );
+				
+	}
+	
+	public void testLinkBefore() {
+		
+		i3.linkBefore(i2.linkBefore(i1));
+		
+		assertTrue( i1.getNext() == i2 );
+		assertTrue( i1.getNext().getNext() == i3 );		
+		assertNull( i1.getNext().getNext().getNext() );
+
+		assertTrue( i3.getPrevious() == i2 );
+		assertTrue( i3.getPrevious().getPrevious() == i1 );		
+		assertNull( i3.getPrevious().getPrevious().getPrevious() );
+
+		assertTrue( i1.isHeadNode() );
+		assertFalse(i1.isTailNode() );
+		assertFalse(i2.isHeadNode() );
+		assertFalse(i2.isTailNode() );
+		assertTrue( i3.isTailNode() );
+		assertFalse(i3.isHeadNode() );
+		
+		i2.linkBefore(i5.linkBefore(i4));
+		
+		// Order should be 1,4,5,2,3
+		
+		assertTrue( i1.getNext() == i4 );
+		assertTrue( i1.getNext().getNext() == i5 );
+		assertTrue( i1.getNext().getNext().getNext() == i2 );
+		assertTrue( i1.getNext().getNext().getNext().getNext() == i3 );
+		assertNull( i1.getNext().getNext().getNext().getNext().getNext() );
+
+		assertTrue( i3.getPrevious() == i2 );
+		assertTrue( i3.getPrevious().getPrevious() == i5 );		
+		assertTrue( i3.getPrevious().getPrevious().getPrevious() == i4 );		
+		assertTrue( i3.getPrevious().getPrevious().getPrevious().getPrevious() == i1 );		
+		assertNull( i3.getPrevious().getPrevious().getPrevious().getPrevious().getPrevious() );
+		
+		assertTrue( i1.isHeadNode() );
+		assertFalse(i1.isTailNode() );
+		assertFalse(i4.isHeadNode() );
+		assertFalse(i4.isTailNode() );
+		assertFalse(i5.isHeadNode() );
+		assertFalse(i5.isTailNode() );
+		assertFalse(i2.isHeadNode() );
+		assertFalse(i2.isTailNode() );
+		assertTrue( i3.isTailNode() );
+		assertFalse(i3.isHeadNode() );
+				
+	}
+	
+	public void testUnlink() {
+		
+		i1.linkAfter(i2.linkAfter(i3));
+		i3.linkAfter(i4);
+		i1.linkBefore(i5);
+		i1.linkAfter(i6);
+
+		// Order should be 5,1,6,2,3,4
+		i4.unlink();
+		i5.unlink();
+		i6.unlink();
+		
+		// Order should be 1,2,3
+		
+		assertTrue( i1.getNext() == i2 );
+		assertTrue( i1.getNext().getNext() == i3 );		
+		assertNull( i1.getNext().getNext().getNext() );
+
+		assertTrue( i3.getPrevious() == i2 );
+		assertTrue( i3.getPrevious().getPrevious() == i1 );		
+		assertNull( i3.getPrevious().getPrevious().getPrevious() );
+
+		assertTrue( i1.isHeadNode() );
+		assertFalse(i1.isTailNode() );
+		assertFalse(i2.isHeadNode() );
+		assertFalse(i2.isTailNode() );
+		assertTrue( i3.isTailNode() );
+		assertFalse(i3.isHeadNode() );						
+	}
+	
+}

Modified: incubator/activemq/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/pom.xml?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/pom.xml (original)
+++ incubator/activemq/trunk/pom.xml Fri Nov 24 22:00:56 2006
@@ -216,6 +216,12 @@
         <artifactId>activeio-core</artifactId>
         <version>3.1-incubator-SNAPSHOT</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.activemq</groupId>
+        <artifactId>activeio-core</artifactId>
+        <version>3.1-incubator-SNAPSHOT</version>
+        <type>test-jar</type>
+      </dependency>
 	  <dependency>
         <groupId>org.apache.activemq</groupId>
         <artifactId>activemq-openwire-generator</artifactId>