You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/11/25 07:00:57 UTC
svn commit: r479089 [2/2] - in /incubator/activemq/trunk: ./ activemq-core/
activemq-core/src/main/java/org/apache/activemq/kaha/impl/
activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/
activemq-core/src/main/java/org/apache/activemq/kaha...
Copied: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java (from r477680, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java)
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java?view=diff&rev=479089&p1=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java&r1=477680&p2=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManager.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java Fri Nov 24 22:00:56 2006
@@ -28,6 +28,7 @@
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreLocation;
+import org.apache.activemq.kaha.impl.DataManager;
import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.commons.logging.Log;
@@ -37,21 +38,19 @@
*
* @version $Revision: 1.1.1.1 $
*/
-public final class DataManager{
+public final class DataManagerImpl implements DataManager {
- private static final Log log=LogFactory.getLog(DataManager.class);
+ private static final Log log=LogFactory.getLog(DataManagerImpl.class);
public static long MAX_FILE_LENGTH=1024*1024*32;
private static final String NAME_PREFIX="data-";
private final File dir;
private final String name;
- private DataFileReader reader;
- private DataFileWriter writer;
+ private SyncDataFileReader reader;
+ private SyncDataFileWriter writer;
private DataFile currentWriteFile;
private long maxFileLength = MAX_FILE_LENGTH;
Map fileMap=new HashMap();
- private boolean useAsyncWriter=false;
-
public static final int ITEM_HEAD_SIZE=5; // type + length
public static final byte DATA_ITEM_TYPE=1;
public static final byte REDO_ITEM_TYPE=2;
@@ -59,7 +58,7 @@
Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER;
private String dataFilePrefix;
- public DataManager(File dir, final String name){
+ public DataManagerImpl(File dir, final String name){
this.dir=dir;
this.name=name;
@@ -93,6 +92,9 @@
return result;
}
+ /* (non-Javadoc)
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#getName()
+ */
public String getName(){
return name;
}
@@ -121,22 +123,37 @@
return dataFile;
}
+ /* (non-Javadoc)
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#readItem(org.apache.activemq.kaha.Marshaller, org.apache.activemq.kaha.StoreLocation)
+ */
public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException{
return getReader().readItem(marshaller,item);
}
+ /* (non-Javadoc)
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#storeDataItem(org.apache.activemq.kaha.Marshaller, java.lang.Object)
+ */
public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException{
return getWriter().storeItem(marshaller,payload, DATA_ITEM_TYPE);
}
+ /* (non-Javadoc)
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#storeRedoItem(java.lang.Object)
+ */
public synchronized StoreLocation storeRedoItem(Object payload) throws IOException{
return getWriter().storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
}
+ /* (non-Javadoc)
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#updateItem(org.apache.activemq.kaha.StoreLocation, org.apache.activemq.kaha.Marshaller, java.lang.Object)
+ */
public synchronized void updateItem(StoreLocation location,Marshaller marshaller, Object payload) throws IOException {
getWriter().updateItem((DataItem)location,marshaller,payload,DATA_ITEM_TYPE);
}
+ /* (non-Javadoc)
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#recoverRedoItems(org.apache.activemq.kaha.impl.data.RedoListener)
+ */
public synchronized void recoverRedoItems(RedoListener listener) throws IOException{
// Nothing to recover if there is no current file.
@@ -179,6 +196,9 @@
}
}
+ /* (non-Javadoc)
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#close()
+ */
public synchronized void close() throws IOException{
getWriter().close();
for(Iterator i=fileMap.values().iterator();i.hasNext();){
@@ -189,6 +209,9 @@
fileMap.clear();
}
+ /* (non-Javadoc)
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#force()
+ */
public synchronized void force() throws IOException{
for(Iterator i=fileMap.values().iterator();i.hasNext();){
DataFile dataFile=(DataFile) i.next();
@@ -197,6 +220,9 @@
}
+ /* (non-Javadoc)
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#delete()
+ */
public synchronized boolean delete() throws IOException{
boolean result=true;
for(Iterator i=fileMap.values().iterator();i.hasNext();){
@@ -208,6 +234,9 @@
}
+ /* (non-Javadoc)
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#addInterestInFile(int)
+ */
public synchronized void addInterestInFile(int file) throws IOException{
if(file>=0){
Integer key=new Integer(file);
@@ -225,6 +254,9 @@
}
}
+ /* (non-Javadoc)
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#removeInterestInFile(int)
+ */
public synchronized void removeInterestInFile(int file) throws IOException{
if(file>=0){
Integer key=new Integer(file);
@@ -243,6 +275,9 @@
}
}
+ /* (non-Javadoc)
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#consolidateDataFiles()
+ */
public synchronized void consolidateDataFiles() throws IOException{
List purgeList=new ArrayList();
for(Iterator i=fileMap.values().iterator();i.hasNext();){
@@ -264,10 +299,16 @@
log.debug("discarding data file "+dataFile+(result?"successful ":"failed"));
}
+ /* (non-Javadoc)
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#getRedoMarshaller()
+ */
public Marshaller getRedoMarshaller() {
return redoMarshaller;
}
+ /* (non-Javadoc)
+ * @see org.apache.activemq.kaha.impl.data.IDataManager#setRedoMarshaller(org.apache.activemq.kaha.Marshaller)
+ */
public void setRedoMarshaller(Marshaller redoMarshaller) {
this.redoMarshaller = redoMarshaller;
}
@@ -290,45 +331,30 @@
return "DataManager:("+NAME_PREFIX+name+")";
}
- public synchronized DataFileReader getReader() {
+ public synchronized SyncDataFileReader getReader() {
if( reader == null ) {
reader = createReader();
}
return reader;
}
- protected synchronized DataFileReader createReader() {
- if( useAsyncWriter ) {
- return new AsyncDataFileReader(this, (AsyncDataFileWriter) getWriter());
- } else {
- return new SyncDataFileReader(this);
- }
+ protected synchronized SyncDataFileReader createReader() {
+ return new SyncDataFileReader(this);
}
- public synchronized void setReader(DataFileReader reader) {
+ public synchronized void setReader(SyncDataFileReader reader) {
this.reader = reader;
}
- public synchronized DataFileWriter getWriter() {
+ public synchronized SyncDataFileWriter getWriter() {
if( writer==null ) {
writer = createWriter();
}
return writer;
}
- private DataFileWriter createWriter() {
- if( useAsyncWriter ) {
- return new AsyncDataFileWriter(this);
- } else {
- return new SyncDataFileWriter(this);
- }
+ private SyncDataFileWriter createWriter() {
+ return new SyncDataFileWriter(this);
}
- public synchronized void setWriter(DataFileWriter writer) {
+ public synchronized void setWriter(SyncDataFileWriter writer) {
this.writer = writer;
}
- public synchronized boolean isUseAsyncWriter() {
- return useAsyncWriter;
- }
-
- public synchronized void setUseAsyncWriter(boolean useAsyncWriter) {
- this.useAsyncWriter = useAsyncWriter;
- }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileReader.java Fri Nov 24 22:00:56 2006
@@ -27,9 +27,9 @@
*
* @version $Revision: 1.1.1.1 $
*/
-final class SyncDataFileReader implements DataFileReader {
+public final class SyncDataFileReader {
- private DataManager dataManager;
+ private DataManagerImpl dataManager;
private DataByteArrayInputStream dataIn;
/**
@@ -37,7 +37,7 @@
*
* @param file
*/
- SyncDataFileReader(DataManager fileManager){
+ SyncDataFileReader(DataManagerImpl fileManager){
this.dataManager=fileManager;
this.dataIn=new DataByteArrayInputStream();
}
@@ -62,7 +62,7 @@
// TODO: we could reuse the buffer in dataIn if it's big enough to avoid
// allocating byte[] arrays on every readItem.
byte[] data=new byte[item.getSize()];
- file.seek(item.getOffset()+DataManager.ITEM_HEAD_SIZE);
+ file.seek(item.getOffset()+DataManagerImpl.ITEM_HEAD_SIZE);
file.readFully(data);
dataIn.restart(data);
return marshaller.readPayload(dataIn);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java Fri Nov 24 22:00:56 2006
@@ -21,7 +21,6 @@
import java.io.RandomAccessFile;
import org.apache.activemq.kaha.Marshaller;
-import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.util.DataByteArrayOutputStream;
/**
* Optimized Store writer. Synchronously marshalls and writes to the data file. Simple but
@@ -29,10 +28,10 @@
*
* @version $Revision: 1.1.1.1 $
*/
-final class SyncDataFileWriter implements DataFileWriter{
+final public class SyncDataFileWriter {
private DataByteArrayOutputStream buffer;
- private DataManager dataManager;
+ private DataManagerImpl dataManager;
/**
@@ -40,7 +39,7 @@
*
* @param file
*/
- SyncDataFileWriter(DataManager fileManager){
+ SyncDataFileWriter(DataManagerImpl fileManager){
this.dataManager=fileManager;
this.buffer=new DataByteArrayOutputStream();
}
@@ -52,10 +51,10 @@
// Write the packet our internal buffer.
buffer.reset();
- buffer.position(DataManager.ITEM_HEAD_SIZE);
+ buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
marshaller.writePayload(payload,buffer);
int size=buffer.size();
- int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
+ int payloadSize=size-DataManagerImpl.ITEM_HEAD_SIZE;
buffer.reset();
buffer.writeByte(type);
buffer.writeInt(payloadSize);
@@ -80,10 +79,10 @@
public synchronized void updateItem(DataItem item,Marshaller marshaller, Object payload, byte type) throws IOException {
//Write the packet our internal buffer.
buffer.reset();
- buffer.position(DataManager.ITEM_HEAD_SIZE);
+ buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
marshaller.writePayload(payload,buffer);
int size=buffer.size();
- int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
+ int payloadSize=size-DataManagerImpl.ITEM_HEAD_SIZE;
buffer.reset();
buffer.writeByte(type);
buffer.writeInt(payloadSize);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java Fri Nov 24 22:00:56 2006
@@ -26,7 +26,7 @@
import org.apache.activemq.kaha.StoreEntry;
-import org.apache.activemq.kaha.impl.data.DataManager;
+import org.apache.activemq.kaha.impl.DataManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/StoreIndexWriter.java Fri Nov 24 22:00:56 2006
@@ -20,7 +20,7 @@
import java.io.IOException;
import java.io.RandomAccessFile;
-import org.apache.activemq.kaha.impl.data.DataManager;
+import org.apache.activemq.kaha.impl.DataManager;
import org.apache.activemq.util.DataByteArrayOutputStream;
/**
* Optimized Store writer
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/LinkedNode.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,157 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+/**
+ * Provides a base class for you to extend when you want object to maintain
+ * a doubly linked list to other objects without using a collection class.
+ *
+ * @author chirino
+ */
+public class LinkedNode {
+
+ protected LinkedNode next=this;
+ protected LinkedNode prev=this;
+ protected boolean tail=true;
+
+
+ public LinkedNode getHeadNode() {
+ if( isHeadNode() ) {
+ return this;
+ }
+ if( isTailNode() ) {
+ return next;
+ }
+ LinkedNode rc = prev;
+ while(!rc.isHeadNode()) {
+ rc = rc.prev;
+ }
+ return rc;
+ }
+
+ public LinkedNode getTailNode() {
+ if( isTailNode() ) {
+ return this;
+ }
+ if( isHeadNode() ) {
+ return prev;
+ }
+ LinkedNode rc = next;
+ while(!rc.isTailNode()) {
+ rc = rc.next;
+ }
+ return rc;
+ }
+
+ public LinkedNode getNext() {
+ return tail ? null : next;
+ }
+
+ public LinkedNode getPrevious() {
+ return prev.tail ? null : prev;
+ }
+
+ public boolean isHeadNode() {
+ return prev.isTailNode();
+ }
+
+ public boolean isTailNode() {
+ return tail;
+ }
+
+ /**
+ * @param rightHead the node to link after this node.
+ * @return this
+ */
+ public LinkedNode linkAfter(LinkedNode rightHead) {
+
+ if( rightHead == this ) {
+ throw new IllegalArgumentException("You cannot link to yourself");
+ }
+ if( !rightHead.isHeadNode() ) {
+ throw new IllegalArgumentException("You only insert nodes that are the first in a list");
+ }
+
+ LinkedNode rightTail = rightHead.prev;
+
+ if( tail ) {
+ tail = false;
+ } else {
+ rightTail.tail=false;
+ }
+
+ rightHead.prev = this; // link the head of the right side.
+ rightTail.next = next; // link the tail of the right side
+ next.prev = rightTail; // link the head of the left side
+ next = rightHead; // link the tail of the left side.
+
+ return this;
+ }
+
+
+ /**
+ * @param leftHead the node to link after this node.
+ * @return
+ * @return this
+ */
+ public LinkedNode linkBefore(LinkedNode leftHead) {
+
+
+ if( leftHead == this ) {
+ throw new IllegalArgumentException("You cannot link to yourself");
+ }
+ if( !leftHead.isHeadNode() ) {
+ throw new IllegalArgumentException("You only insert nodes that are the first in a list");
+ }
+
+ // The left side is no longer going to be a tail..
+ LinkedNode leftTail = leftHead.prev;
+ leftTail.tail = false;
+
+ leftTail.next = this; // link the tail of the left side.
+ leftHead.prev = prev; // link the head of the left side.
+ prev.next = leftHead; // link the tail of the right side.
+ prev = leftTail; // link the head of the right side.
+
+ return leftHead;
+ }
+
+ /**
+ * Removes this node out of the linked list it is chained in.
+ */
+ public void unlink() {
+ // If we are allready unlinked...
+ if( prev==this ) {
+ return;
+ }
+
+ if( tail ) {
+ prev.tail = true;
+ }
+
+ // Update the peers links..
+ next.prev = prev;
+ prev.next = next;
+
+ // Update our links..
+ next = this;
+ prev = this;
+ tail=true;
+ }
+
+}
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java Fri Nov 24 22:00:56 2006
@@ -18,13 +18,13 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+
import junit.framework.TestCase;
+
import org.apache.activemq.kaha.IndexTypes;
import org.apache.activemq.kaha.StoreFactory;
-import org.apache.activemq.kaha.impl.KahaStore;
import org.apache.activemq.kaha.impl.container.ContainerId;
import org.apache.activemq.kaha.impl.container.ListContainerImpl;
-import org.apache.activemq.kaha.impl.data.DataManager;
import org.apache.activemq.kaha.impl.index.IndexItem;
import org.apache.activemq.kaha.impl.index.IndexManager;
/**
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalImplTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalImplTest.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalImplTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalImplTest.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,180 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.activeio.journal.InvalidRecordLocationException;
+import org.apache.activeio.journal.RecordLocation;
+import org.apache.activeio.packet.ByteArrayPacket;
+import org.apache.activeio.packet.Packet;
+import org.apache.activemq.kaha.impl.async.JournalFacade.RecordLocationFacade;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Tests the AsyncDataManager based Journal
+ *
+ * @version $Revision: 1.1 $
+ */
+public class JournalImplTest extends TestCase {
+
+ Log log = LogFactory.getLog(JournalImplTest.class);
+
+ int size = 1024*10;
+ int logFileCount=2;
+ File logDirectory = new File("target/dm-data2");
+ private JournalFacade journal;
+
+ /**
+ * @see junit.framework.TestCase#setUp()
+ */
+ protected void setUp() throws Exception {
+ if( logDirectory.exists() ) {
+ deleteDir(logDirectory);
+ }
+ assertTrue("Could not delete directory: "+logDirectory.getCanonicalPath(), !logDirectory.exists() );
+ AsyncDataManager dm = new AsyncDataManager();
+ dm.setDirectory(logDirectory);
+ dm.setMaxFileLength(1024*64);
+ dm.start();
+ journal = new JournalFacade(dm);
+ }
+
+ /**
+ */
+ private void deleteDir(File f) {
+ File[] files = f.listFiles();
+ for (int i = 0; i < files.length; i++) {
+ File file = files[i];
+ file.delete();
+ }
+ f.delete();
+ }
+
+ protected void tearDown() throws Exception {
+ journal.close();
+ if( logDirectory.exists() )
+ deleteDir(logDirectory);
+ //assertTrue( !logDirectory.exists() );
+ }
+
+ public void testLogFileCreation() throws IOException {
+ RecordLocation mark = journal.getMark();
+ assertNull(mark);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testAppendAndRead() throws InvalidRecordLocationException, InterruptedException, IOException {
+
+ Packet data1 = createPacket("Hello World 1");
+ RecordLocation location1 = journal.write( data1, false);
+ Packet data2 = createPacket("Hello World 2");
+ RecordLocation location2 = journal.write( data2, false);
+ Packet data3 = createPacket("Hello World 3");
+ RecordLocation location3 = journal.write( data3, false);
+
+ //Thread.sleep(1000);
+
+ // Now see if we can read that data.
+ Packet data;
+ data = journal.read(location2);
+ assertEquals( data2, data);
+ data = journal.read(location1);
+ assertEquals( data1, data);
+ data = journal.read(location3);
+ assertEquals( data3, data);
+
+ // Can we cursor the data?
+ RecordLocation l=journal.getNextRecordLocation(null);
+ int t = l.compareTo(location1);
+ assertEquals(0, t);
+ data = journal.read(l);
+ assertEquals( data1, data);
+
+ l=journal.getNextRecordLocation(l);
+ assertEquals(0, l.compareTo(location2));
+ data = journal.read(l);
+ assertEquals( data2, data);
+
+ l=journal.getNextRecordLocation(l);
+ assertEquals(0, l.compareTo(location3));
+ data = journal.read(l);
+ assertEquals( data3, data);
+
+ l=journal.getNextRecordLocation(l);
+ assertNull(l);
+
+ log.info(journal);
+ }
+
+ public void testCanReadFromArchivedLogFile() throws InvalidRecordLocationException, InterruptedException, IOException {
+
+ Packet data1 = createPacket("Hello World 1");
+ RecordLocationFacade location1 = (RecordLocationFacade) journal.write( data1, false);
+
+ RecordLocationFacade pos;
+ int counter = 0;
+ do {
+
+ Packet p = createPacket("<<<data>>>");
+ pos = (RecordLocationFacade) journal.write( p, false);
+ if( counter++ % 1000 == 0 ) {
+ journal.setMark(pos, false);
+ }
+
+ } while( pos.getLocation().getDataFileId() < 5 );
+
+ // Now see if we can read that first packet.
+ Packet data;
+ data = journal.read(location1);
+ assertEquals( data1, data);
+
+ }
+
+ /**
+ * @param string
+ * @return
+ */
+ private Packet createPacket(String string) {
+ return new ByteArrayPacket(string.getBytes());
+ }
+
+ public static void assertEquals(Packet arg0, Packet arg1) {
+ assertEquals(arg0.sliceAsBytes(), arg1.sliceAsBytes());
+ }
+
+ public static void assertEquals(byte[] arg0, byte[] arg1) {
+
+// System.out.println("Comparing: "+new String(arg0)+" and "+new String(arg1));
+ if( arg0==null ^ arg1==null )
+ fail("Not equal: "+arg0+" != "+arg1);
+ if( arg0==null )
+ return;
+ if( arg0.length!=arg1.length)
+ fail("Array lenght not equal: "+arg0.length+" != "+arg1.length);
+ for( int i=0; i<arg0.length;i++) {
+ if( arg0[i]!= arg1[i]) {
+ fail("Array item not equal at index "+i+": "+arg0[i]+" != "+arg1[i]);
+ }
+ }
+ }
+}
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalPerfTool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalPerfTool.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalPerfTool.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalPerfTool.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,82 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.activeio.journal.Journal;
+import org.apache.activeio.journal.JournalPerfToolSupport;
+import org.apache.activemq.kaha.impl.async.AsyncDataManager;
+
+/**
+ * A Performance statistics gathering tool for the AsyncDataManager based Journal.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class JournalPerfTool extends JournalPerfToolSupport {
+
+ private int logFileSize = 1024*1024*50;
+
+ public static void main(String[] args) throws Exception {
+ JournalPerfTool tool = new JournalPerfTool();
+
+ tool.initialWorkers=10;
+ tool.syncFrequency=15;
+ tool.workerIncrement=0;
+ tool.workerThinkTime=0;
+ tool.verbose=false;
+ tool.incrementDelay=5*1000;
+
+ if( args.length > 0 ) {
+ tool.journalDirectory = new File(args[0]);
+ }
+ if( args.length > 1 ) {
+ tool.workerIncrement = Integer.parseInt(args[1]);
+ }
+ if( args.length > 2 ) {
+ tool.incrementDelay = Long.parseLong(args[2]);
+ }
+ if( args.length > 3 ) {
+ tool.verbose = Boolean.getBoolean(args[3]);
+ }
+ if( args.length > 4 ) {
+ tool.recordSize = Integer.parseInt(args[4]);
+ }
+ if( args.length > 5 ) {
+ tool.syncFrequency = Integer.parseInt(args[5]);
+ }
+ if( args.length > 6 ) {
+ tool.workerThinkTime = Integer.parseInt(args[6]);
+ }
+ tool.exec();
+ }
+
+ /**
+ * @throws IOException
+ * @see org.apache.activeio.journal.JournalPerfToolSupport#createJournal()
+ */
+ public Journal createJournal() throws IOException {
+ AsyncDataManager dm = new AsyncDataManager();
+ dm.setMaxFileLength(logFileSize);
+ dm.setDirectory(this.journalDirectory);
+ dm.start();
+ return new JournalFacade(dm);
+ }
+
+}
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalRWPerfTool.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalRWPerfTool.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalRWPerfTool.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/JournalRWPerfTool.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,83 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.File;
+
+import java.io.IOException;
+
+import org.apache.activeio.journal.Journal;
+import org.apache.activeio.journal.JournalRWPerfToolSupport;
+import org.apache.activemq.kaha.impl.async.AsyncDataManager;
+
+/**
+ * A Performance statistics gathering tool for the AsyncDataManager based Journal.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class JournalRWPerfTool extends JournalRWPerfToolSupport {
+
+ private int logFileSize = 1024*1024*50;
+
+ public static void main(String[] args) throws Exception {
+ JournalRWPerfTool tool = new JournalRWPerfTool();
+
+ tool.initialWriteWorkers=10;
+ tool.syncFrequency=15;
+ tool.writeWorkerIncrement=0;
+ tool.writeWorkerThinkTime=0;
+ tool.verbose=false;
+ tool.incrementDelay=5*1000;
+
+ if( args.length > 0 ) {
+ tool.journalDirectory = new File(args[0]);
+ }
+ if( args.length > 1 ) {
+ tool.writeWorkerIncrement = Integer.parseInt(args[1]);
+ }
+ if( args.length > 2 ) {
+ tool.incrementDelay = Long.parseLong(args[2]);
+ }
+ if( args.length > 3 ) {
+ tool.verbose = Boolean.getBoolean(args[3]);
+ }
+ if( args.length > 4 ) {
+ tool.recordSize = Integer.parseInt(args[4]);
+ }
+ if( args.length > 5 ) {
+ tool.syncFrequency = Integer.parseInt(args[5]);
+ }
+ if( args.length > 6 ) {
+ tool.writeWorkerThinkTime = Integer.parseInt(args[6]);
+ }
+ tool.exec();
+ }
+
+ /**
+ * @throws IOException
+ * @see org.apache.activeio.journal.JournalPerfToolSupport#createJournal()
+ */
+ public Journal createJournal() throws IOException {
+ AsyncDataManager dm = new AsyncDataManager();
+ dm.setMaxFileLength(logFileSize);
+ dm.setDirectory(this.journalDirectory);
+ dm.start();
+ return new JournalFacade(dm);
+ }
+
+}
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/LocationTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/LocationTest.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/LocationTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/LocationTest.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,62 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.async;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import org.apache.activemq.kaha.impl.async.Location;
+import org.apache.activemq.kaha.impl.async.JournalFacade.RecordLocationFacade;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests the Location Class
+ *
+ * @version $Revision: 1.1 $
+ */
+public class LocationTest extends TestCase {
+
+ @SuppressWarnings("unchecked")
+ synchronized public void testRecordLocationImplComparison() throws IOException {
+ Location l1 = new Location();
+ l1.setDataFileId(0);
+ l1.setOffset(5);
+ Location l2 = new Location(l1);
+ l2.setOffset(10);
+ Location l3 = new Location(l2);
+ l3.setDataFileId(2);
+ l3.setOffset(0);
+
+ assertTrue( l1.compareTo(l2)<0 );
+
+ // Sort them using a list. Put them in the wrong order.
+ ArrayList<RecordLocationFacade> l = new ArrayList<RecordLocationFacade>();
+ l.add(new RecordLocationFacade(l2));
+ l.add(new RecordLocationFacade(l3));
+ l.add(new RecordLocationFacade(l1));
+ Collections.sort(l);
+
+ // Did they get sorted to the correct order?
+ System.out.println(l.get(0));
+ assertSame( l.get(0).getLocation(), l1 );
+ assertSame( l.get(1).getLocation(), l2 );
+ assertSame( l.get(2).getLocation(), l3 );
+ }
+}
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/LinkedNodeTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/LinkedNodeTest.java?view=auto&rev=479089
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/LinkedNodeTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/LinkedNodeTest.java Fri Nov 24 22:00:56 2006
@@ -0,0 +1,176 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import junit.framework.TestCase;
+
+/**
+ *
+ * @author chirino
+ */
+public class LinkedNodeTest extends TestCase {
+
+
+ static class IntLinkedNode extends LinkedNode {
+ public final int v;
+
+ public IntLinkedNode(int v){
+ this.v = v;
+ };
+
+ @Override
+ public String toString() {
+ return ""+v;
+ }
+ }
+
+ IntLinkedNode i1 = new IntLinkedNode(1);
+ IntLinkedNode i2 = new IntLinkedNode(2);
+ IntLinkedNode i3 = new IntLinkedNode(3);
+ IntLinkedNode i4 = new IntLinkedNode(4);
+ IntLinkedNode i5 = new IntLinkedNode(5);
+ IntLinkedNode i6 = new IntLinkedNode(6);
+
+ public void testLinkAfter() {
+
+ i1.linkAfter(i2.linkAfter(i3));
+ // Order should be 1,2,3
+
+ assertTrue( i1.getNext() == i2 );
+ assertTrue( i1.getNext().getNext() == i3 );
+ assertNull( i1.getNext().getNext().getNext() );
+
+ assertTrue( i3.getPrevious() == i2 );
+ assertTrue( i3.getPrevious().getPrevious() == i1 );
+ assertNull( i3.getPrevious().getPrevious().getPrevious() );
+
+ assertTrue( i1.isHeadNode() );
+ assertFalse(i1.isTailNode() );
+ assertFalse(i2.isHeadNode() );
+ assertFalse(i2.isTailNode() );
+ assertTrue( i3.isTailNode() );
+ assertFalse(i3.isHeadNode() );
+
+ i1.linkAfter(i4.linkAfter(i5));
+
+ // Order should be 1,4,5,2,3
+
+ assertTrue( i1.getNext() == i4 );
+ assertTrue( i1.getNext().getNext() == i5 );
+ assertTrue( i1.getNext().getNext().getNext() == i2 );
+ assertTrue( i1.getNext().getNext().getNext().getNext() == i3 );
+ assertNull( i1.getNext().getNext().getNext().getNext().getNext() );
+
+ assertTrue( i3.getPrevious() == i2 );
+ assertTrue( i3.getPrevious().getPrevious() == i5 );
+ assertTrue( i3.getPrevious().getPrevious().getPrevious() == i4 );
+ assertTrue( i3.getPrevious().getPrevious().getPrevious().getPrevious() == i1 );
+ assertNull( i3.getPrevious().getPrevious().getPrevious().getPrevious().getPrevious() );
+
+ assertTrue( i1.isHeadNode() );
+ assertFalse(i1.isTailNode() );
+ assertFalse(i4.isHeadNode() );
+ assertFalse(i4.isTailNode() );
+ assertFalse(i5.isHeadNode() );
+ assertFalse(i5.isTailNode() );
+ assertFalse(i2.isHeadNode() );
+ assertFalse(i2.isTailNode() );
+ assertTrue( i3.isTailNode() );
+ assertFalse(i3.isHeadNode() );
+
+ }
+
+ public void testLinkBefore() {
+
+ i3.linkBefore(i2.linkBefore(i1));
+
+ assertTrue( i1.getNext() == i2 );
+ assertTrue( i1.getNext().getNext() == i3 );
+ assertNull( i1.getNext().getNext().getNext() );
+
+ assertTrue( i3.getPrevious() == i2 );
+ assertTrue( i3.getPrevious().getPrevious() == i1 );
+ assertNull( i3.getPrevious().getPrevious().getPrevious() );
+
+ assertTrue( i1.isHeadNode() );
+ assertFalse(i1.isTailNode() );
+ assertFalse(i2.isHeadNode() );
+ assertFalse(i2.isTailNode() );
+ assertTrue( i3.isTailNode() );
+ assertFalse(i3.isHeadNode() );
+
+ i2.linkBefore(i5.linkBefore(i4));
+
+ // Order should be 1,4,5,2,3
+
+ assertTrue( i1.getNext() == i4 );
+ assertTrue( i1.getNext().getNext() == i5 );
+ assertTrue( i1.getNext().getNext().getNext() == i2 );
+ assertTrue( i1.getNext().getNext().getNext().getNext() == i3 );
+ assertNull( i1.getNext().getNext().getNext().getNext().getNext() );
+
+ assertTrue( i3.getPrevious() == i2 );
+ assertTrue( i3.getPrevious().getPrevious() == i5 );
+ assertTrue( i3.getPrevious().getPrevious().getPrevious() == i4 );
+ assertTrue( i3.getPrevious().getPrevious().getPrevious().getPrevious() == i1 );
+ assertNull( i3.getPrevious().getPrevious().getPrevious().getPrevious().getPrevious() );
+
+ assertTrue( i1.isHeadNode() );
+ assertFalse(i1.isTailNode() );
+ assertFalse(i4.isHeadNode() );
+ assertFalse(i4.isTailNode() );
+ assertFalse(i5.isHeadNode() );
+ assertFalse(i5.isTailNode() );
+ assertFalse(i2.isHeadNode() );
+ assertFalse(i2.isTailNode() );
+ assertTrue( i3.isTailNode() );
+ assertFalse(i3.isHeadNode() );
+
+ }
+
+ public void testUnlink() {
+
+ i1.linkAfter(i2.linkAfter(i3));
+ i3.linkAfter(i4);
+ i1.linkBefore(i5);
+ i1.linkAfter(i6);
+
+ // Order should be 5,1,6,2,3,4
+ i4.unlink();
+ i5.unlink();
+ i6.unlink();
+
+ // Order should be 1,2,3
+
+ assertTrue( i1.getNext() == i2 );
+ assertTrue( i1.getNext().getNext() == i3 );
+ assertNull( i1.getNext().getNext().getNext() );
+
+ assertTrue( i3.getPrevious() == i2 );
+ assertTrue( i3.getPrevious().getPrevious() == i1 );
+ assertNull( i3.getPrevious().getPrevious().getPrevious() );
+
+ assertTrue( i1.isHeadNode() );
+ assertFalse(i1.isTailNode() );
+ assertFalse(i2.isHeadNode() );
+ assertFalse(i2.isTailNode() );
+ assertTrue( i3.isTailNode() );
+ assertFalse(i3.isHeadNode() );
+ }
+
+}
Modified: incubator/activemq/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/pom.xml?view=diff&rev=479089&r1=479088&r2=479089
==============================================================================
--- incubator/activemq/trunk/pom.xml (original)
+++ incubator/activemq/trunk/pom.xml Fri Nov 24 22:00:56 2006
@@ -216,6 +216,12 @@
<artifactId>activeio-core</artifactId>
<version>3.1-incubator-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activeio-core</artifactId>
+ <version>3.1-incubator-SNAPSHOT</version>
+ <type>test-jar</type>
+ </dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-openwire-generator</artifactId>