You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/02/06 23:24:58 UTC

svn commit: r1241221 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/store/kahadb/ kahadb/src/main/java/org/apache/kahadb/journal/ kahadb/src/main/java/org/apache/kahadb/page/ kahadb/src/main/java/org/apache/kahadb/util/ kahadb/sr...

Author: tabish
Date: Mon Feb  6 22:24:58 2012
New Revision: 1241221

URL: http://svn.apache.org/viewvc?rev=1241221&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3702

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Page.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/IOHelper.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java
    activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java
    activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java
    activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Mon Feb  6 22:24:58 2012
@@ -55,7 +55,7 @@ public abstract class MessageDatabase ex
     protected BrokerService brokerService;
 
     public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
-    public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "0"));
+    public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
     public static final File DEFAULT_DIRECTORY = new File("KahaDB");
     protected static final Buffer UNMATCHED;
     static {

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java Mon Feb  6 22:24:58 2012
@@ -25,7 +25,7 @@ import java.util.Map;
 
 /**
  * Used to pool DataFileAccessors.
- * 
+ *
  * @author chirino
  */
 public class DataFileAccessorPool {
@@ -95,8 +95,7 @@ public class DataFileAccessorPool {
     }
 
     synchronized void clearUsedMark() {
-        for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
-            Pool pool = iter.next();
+        for (Pool pool : pools.values()) {
             pool.clearUsedMark();
         }
     }
@@ -153,8 +152,7 @@ public class DataFileAccessorPool {
             return;
         }
         closed = true;
-        for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
-            Pool pool = iter.next();
+        for (Pool pool : pools.values()) {
             pool.dispose();
         }
         pools.clear();

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Mon Feb  6 22:24:58 2012
@@ -28,20 +28,21 @@ import java.util.zip.Checksum;
 import org.apache.kahadb.util.ByteSequence;
 import org.apache.kahadb.util.DataByteArrayOutputStream;
 import org.apache.kahadb.util.LinkedNodeList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An optimized writer to do batch appends to a data file. This object is thread
  * safe and gains throughput as you increase the number of concurrent writes it
  * does.
- * 
- * 
  */
 class DataFileAppender implements FileAppender {
 
+    private static final Logger logger = LoggerFactory.getLogger(DataFileAppender.class);
+
     protected final Journal journal;
     protected final Map<Journal.WriteKey, Journal.WriteCommand> inflightWrites;
-    protected final Object enqueueMutex = new Object() {
-    };
+    protected final Object enqueueMutex = new Object();
     protected WriteBatch nextWriteBatch;
 
     protected boolean shutdown;
@@ -90,7 +91,7 @@ class DataFileAppender implements FileAp
 
         public WriteBatch(DataFile dataFile,int offset) {
             this.dataFile = dataFile;
-			this.offset = offset;
+            this.offset = offset;
             this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
             this.size=Journal.BATCH_CONTROL_RECORD_SIZE;
             journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE);
@@ -103,7 +104,7 @@ class DataFileAppender implements FileAp
 
         public boolean canAppend(Journal.WriteCommand write) {
             int newSize = size + write.location.getSize();
-			if (newSize >= maxWriteBatchSize || offset+newSize > journal.getMaxFileLength() ) {
+            if (newSize >= maxWriteBatchSize || offset+newSize > journal.getMaxFileLength() ) {
                 return false;
             }
             return true;
@@ -114,7 +115,7 @@ class DataFileAppender implements FileAp
             write.location.setDataFileId(dataFile.getDataFileId());
             write.location.setOffset(offset+size);
             int s = write.location.getSize();
-			size += s;
+            size += s;
             dataFile.incrementLength(s);
             journal.addToTotalLength(s);
         }
@@ -131,7 +132,7 @@ class DataFileAppender implements FileAp
     }
 
     public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
-    	
+
         // Write the packet our internal buffer.
         int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
 
@@ -149,11 +150,11 @@ class DataFileAppender implements FileAp
             } catch (InterruptedException e) {
                 throw new InterruptedIOException();
             }
-            IOException exception = batch.exception.get(); 
+            IOException exception = batch.exception.get();
             if (exception != null) {
-            	throw exception;
+                throw exception;
             }
-        }	
+        }
 
         return location;
     }
@@ -169,7 +170,7 @@ class DataFileAppender implements FileAp
         Journal.WriteCommand write = new Journal.WriteCommand(location, data, onComplete);
 
         WriteBatch batch = enqueue(write);
- 
+
         location.setLatch(batch.latch);
         return location;
     }
@@ -179,7 +180,7 @@ class DataFileAppender implements FileAp
             if (shutdown) {
                 throw new IOException("Async Writter Thread Shutdown");
             }
-            
+
             if (!running) {
                 running = true;
                 thread = new Thread() {
@@ -193,44 +194,45 @@ class DataFileAppender implements FileAp
                 thread.start();
                 firstAsyncException = null;
             }
-            
+
             if (firstAsyncException != null) {
                 throw firstAsyncException;
             }
 
             while ( true ) {
-	            if (nextWriteBatch == null) {
-	            	DataFile file = journal.getCurrentWriteFile();
-	            	if( file.getLength() > journal.getMaxFileLength() ) {
-	            		file = journal.rotateWriteFile();
-	            	}
-
-	                nextWriteBatch = newWriteBatch(write, file);
-	                enqueueMutex.notifyAll();
-	                break;
-	            } else {
-	                // Append to current batch if possible..
-	                if (nextWriteBatch.canAppend(write)) {
-	                    nextWriteBatch.append(write);
-	                    break;
-	                } else {
-	                    // Otherwise wait for the queuedCommand to be null
-	                    try {
-	                        while (nextWriteBatch != null) {
-	                            final long start = System.currentTimeMillis();
-	                            enqueueMutex.wait();
-	                            if (maxStat > 0) { 
-	                                System.err.println("Watiting for write to finish with full batch... millis: " + (System.currentTimeMillis() - start));
-	                            }
-	                        }
-	                    } catch (InterruptedException e) {
-	                        throw new InterruptedIOException();
-	                    }
-	                    if (shutdown) {
-	                        throw new IOException("Async Writter Thread Shutdown");
-	                    }
-	                }
-	            }
+                if (nextWriteBatch == null) {
+                    DataFile file = journal.getCurrentWriteFile();
+                    if( file.getLength() > journal.getMaxFileLength() ) {
+                        file = journal.rotateWriteFile();
+                    }
+
+                    nextWriteBatch = newWriteBatch(write, file);
+                    enqueueMutex.notifyAll();
+                    break;
+                } else {
+                    // Append to current batch if possible..
+                    if (nextWriteBatch.canAppend(write)) {
+                        nextWriteBatch.append(write);
+                        break;
+                    } else {
+                        // Otherwise wait for the queuedCommand to be null
+                        try {
+                            while (nextWriteBatch != null) {
+                                final long start = System.currentTimeMillis();
+                                enqueueMutex.wait();
+                                if (maxStat > 0) {
+                                    logger.info("Watiting for write to finish with full batch... millis: " +
+                                                (System.currentTimeMillis() - start));
+                               }
+                            }
+                        } catch (InterruptedException e) {
+                            throw new InterruptedIOException();
+                        }
+                        if (shutdown) {
+                            throw new IOException("Async Writter Thread Shutdown");
+                        }
+                    }
+                }
             }
             if (!write.sync) {
                 inflightWrites.put(new Journal.WriteKey(write.location), write);
@@ -282,13 +284,11 @@ class DataFileAppender implements FileAp
             DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
             while (true) {
 
-                Object o = null;
-
                 // Block till we get a command.
                 synchronized (enqueueMutex) {
                     while (true) {
                         if (nextWriteBatch != null) {
-                            o = nextWriteBatch;
+                            wb = nextWriteBatch;
                             nextWriteBatch = null;
                             break;
                         }
@@ -300,7 +300,6 @@ class DataFileAppender implements FileAp
                     enqueueMutex.notifyAll();
                 }
 
-                wb = (WriteBatch)o;
                 if (dataFile != wb.dataFile) {
                     if (file != null) {
                         file.setLength(dataFile.getLength());
@@ -333,15 +332,15 @@ class DataFileAppender implements FileAp
                 }
 
                 ByteSequence sequence = buff.toByteSequence();
-                
-                // Now we can fill in the batch control record properly. 
+
+                // Now we can fill in the batch control record properly.
                 buff.reset();
                 buff.skip(5+Journal.BATCH_CONTROL_RECORD_MAGIC.length);
                 buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
                 if( journal.isChecksum() ) {
-	                Checksum checksum = new Adler32();
-	                checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
-	                buff.writeLong(checksum.getValue());
+                    Checksum checksum = new Adler32();
+                    checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
+                    buff.writeLong(checksum.getValue());
                 }
 
                 // Now do the 1 big write.
@@ -354,16 +353,16 @@ class DataFileAppender implements FileAp
                         for (;statIdx > 0;) {
                             all+= stats[--statIdx];
                         }
-                        System.err.println("Ave writeSize: " + all/maxStat);
+                        logger.info("Ave writeSize: " + all/maxStat);
                     }
                 }
                 file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
-                
+
                 ReplicationTarget replicationTarget = journal.getReplicationTarget();
                 if( replicationTarget!=null ) {
-                	replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk);
+                    replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk);
                 }
-                
+
                 if (forceToDisk) {
                     file.getFD().sync();
                 }
@@ -411,7 +410,7 @@ class DataFileAppender implements FileAp
                 try {
                     write.onComplete.run();
                 } catch (Throwable e) {
-                    e.printStackTrace();
+                    logger.info("Add exception was raised while executing the run command for onComplete", e);
                 }
             }
             write = write.getNext();

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Mon Feb  6 22:24:58 2012
@@ -48,8 +48,8 @@ import org.apache.kahadb.util.Sequence;
 
 /**
  * Manages DataFiles
- * 
- * 
+ *
+ *
  */
 public class Journal {
     public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER";
@@ -57,12 +57,12 @@ public class Journal {
 
     private static final int MAX_BATCH_SIZE = 32*1024*1024;
 
-	// ITEM_HEAD_SPACE = length + type+ reserved space + SOR
+    // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
     public static final int RECORD_HEAD_SPACE = 4 + 1;
-    
+
     public static final byte USER_RECORD_TYPE = 1;
     public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
-    // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch. 
+    // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.
     public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
     public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
     public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
@@ -77,7 +77,7 @@ public class Journal {
             sequence.compact();
             return sequence.getData();
         } catch (IOException e) {
-            throw new RuntimeException("Could not create batch control record header.");
+            throw new RuntimeException("Could not create batch control record header.", e);
         }
     }
 
@@ -89,7 +89,7 @@ public class Journal {
     public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
     public static final int PREFERED_DIFF = 1024 * 512;
     public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
-    
+
     private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
 
     protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
@@ -99,11 +99,11 @@ public class Journal {
     protected String filePrefix = DEFAULT_FILE_PREFIX;
     protected String fileSuffix = DEFAULT_FILE_SUFFIX;
     protected boolean started;
-    
+
     protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
     protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
     protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
-    
+
     protected FileAppender appender;
     protected DataFileAccessorPool accessorPool;
 
@@ -115,18 +115,17 @@ public class Journal {
     protected Runnable cleanupTask;
     protected AtomicLong totalLength = new AtomicLong();
     protected boolean archiveDataLogs;
-	private ReplicationTarget replicationTarget;
+    private ReplicationTarget replicationTarget;
     protected boolean checksum;
     protected boolean checkForCorruptionOnStartup;
     protected boolean enableAsyncDiskSync = true;
     private Timer timer;
-   
 
     public synchronized void start() throws IOException {
         if (started) {
             return;
         }
-        
+
         long start = System.currentTimeMillis();
         accessorPool = new DataFileAccessorPool(this);
         started = true;
@@ -141,9 +140,8 @@ public class Journal {
         });
 
         if (files != null) {
-            for (int i = 0; i < files.length; i++) {
+            for (File file : files) {
                 try {
-                    File file = files[i];
                     String n = file.getName();
                     String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
                     int num = Integer.parseInt(numStr);
@@ -174,7 +172,7 @@ public class Journal {
             }
         }
 
-    	getCurrentWriteFile();
+        getCurrentWriteFile();
 
         if( lastAppendLocation.get()==null ) {
             DataFile df = dataFiles.getTail();
@@ -194,19 +192,19 @@ public class Journal {
     }
 
     private static byte[] bytes(String string) {
-    	try {
-			return string.getBytes("UTF-8");
-		} catch (UnsupportedEncodingException e) {
-			throw new RuntimeException(e);
-		}
-	}
+        try {
+            return string.getBytes("UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+    }
 
-	protected Location recoveryCheck(DataFile dataFile) throws IOException {
+    protected Location recoveryCheck(DataFile dataFile) throws IOException {
         Location location = new Location();
         location.setDataFileId(dataFile.getDataFileId());
         location.setOffset(0);
 
-    	DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
         try {
             while( true ) {
                 int size = checkBatchRecord(reader, location.getOffset());
@@ -227,9 +225,9 @@ public class Journal {
                     }
                 }
             }
-            
+
         } catch (IOException e) {
-		} finally {
+        } finally {
             accessorPool.closeDataFileAccessor(reader);
         }
 
@@ -315,14 +313,14 @@ public class Journal {
     }
 
 
-	void addToTotalLength(int size) {
-		totalLength.addAndGet(size);
-	}
+    void addToTotalLength(int size) {
+        totalLength.addAndGet(size);
+    }
 
     public long length() {
         return totalLength.get();
     }
-    
+
     synchronized DataFile getCurrentWriteFile() throws IOException {
         if (dataFiles.isEmpty()) {
             rotateWriteFile();
@@ -331,21 +329,21 @@ public class Journal {
     }
 
     synchronized DataFile rotateWriteFile() {
-		int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
-		File file = getFile(nextNum);
-		DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
-		// actually allocate the disk space
-		fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
-		fileByFileMap.put(file, nextWriteFile);
-		dataFiles.addLast(nextWriteFile);
-		return nextWriteFile;
-	}
-
-	public File getFile(int nextNum) {
-		String fileName = filePrefix + nextNum + fileSuffix;
-		File file = new File(directory, fileName);
-		return file;
-	}
+        int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
+        File file = getFile(nextNum);
+        DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
+        // actually allocate the disk space
+        fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
+        fileByFileMap.put(file, nextWriteFile);
+        dataFiles.addLast(nextWriteFile);
+        return nextWriteFile;
+    }
+
+    public File getFile(int nextNum) {
+        String fileName = filePrefix + nextNum + fileSuffix;
+        File file = new File(directory, fileName);
+        return file;
+    }
 
     synchronized DataFile getDataFile(Location item) throws IOException {
         Integer key = Integer.valueOf(item.getDataFileId());
@@ -419,12 +417,12 @@ public class Journal {
     public synchronized void removeDataFiles(Set<Integer> files) throws IOException {
         for (Integer key : files) {
             // Can't remove the data file (or subsequent files) that is currently being written to.
-        	if( key >= lastAppendLocation.get().getDataFileId() ) {
-        		continue;
-        	}
+            if( key >= lastAppendLocation.get().getDataFileId() ) {
+                continue;
+            }
             DataFile dataFile = fileMap.get(key);
             if( dataFile!=null ) {
-            	forceRemoveDataFile(dataFile);
+                forceRemoveDataFile(dataFile);
             }
         }
     }
@@ -440,9 +438,9 @@ public class Journal {
             LOG.debug("moved data file " + dataFile + " to " + getDirectoryArchive());
         } else {
             if ( dataFile.delete() ) {
-            	LOG.debug("Discarded data file " + dataFile);
+                LOG.debug("Discarded data file " + dataFile);
             } else {
-            	LOG.warn("Failed to discard data file " + dataFile.getFile());
+                LOG.warn("Failed to discard data file " + dataFile.getFile());
             }
         }
     }
@@ -466,14 +464,14 @@ public class Journal {
         return directory.toString();
     }
 
-	public synchronized void appendedExternally(Location loc, int length) throws IOException {
-		DataFile dataFile = null;
-		if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) {
-			// It's an update to the current log file..
-			dataFile = dataFiles.getTail();
-			dataFile.incrementLength(length);
-		} else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) {
-			// It's an update to the next log file.
+    public synchronized void appendedExternally(Location loc, int length) throws IOException {
+        DataFile dataFile = null;
+        if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) {
+            // It's an update to the current log file..
+            dataFile = dataFiles.getTail();
+            dataFile.incrementLength(length);
+        } else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) {
+            // It's an update to the next log file.
             int nextNum = loc.getDataFileId();
             File file = getFile(nextNum);
             dataFile = new DataFile(file, nextNum, preferedFileLength);
@@ -481,10 +479,10 @@ public class Journal {
             fileMap.put(dataFile.getDataFileId(), dataFile);
             fileByFileMap.put(file, dataFile);
             dataFiles.addLast(dataFile);
-		} else {
-			throw new IOException("Invalid external append.");
-		}
-	}
+        } else {
+            throw new IOException("Invalid external append.");
+        }
+    }
 
     public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
 
@@ -494,7 +492,7 @@ public class Journal {
                 if (location == null) {
                     DataFile head = dataFiles.getHead();
                     if( head == null ) {
-                    	return null;
+                        return null;
                     }
                     cur = new Location();
                     cur.setDataFileId(head.getDataFileId());
@@ -528,7 +526,7 @@ public class Journal {
             // Load in location size and type.
             DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
             try {
-				reader.readLocationDetails(cur);
+                reader.readLocationDetails(cur);
             } finally {
                 accessorPool.closeDataFileAccessor(reader);
             }
@@ -682,7 +680,7 @@ public class Journal {
 
     /**
      * Get a set of files - only valid after start()
-     * 
+     *
      * @return files currently being used
      */
     public Set<File> getFiles() {
@@ -692,7 +690,7 @@ public class Journal {
     public synchronized Map<Integer, DataFile> getFileMap() {
         return new TreeMap<Integer, DataFile>(fileMap);
     }
-    
+
     public long getDiskSize() {
         long tailLength=0;
         synchronized( this ) {
@@ -700,9 +698,9 @@ public class Journal {
                 tailLength = dataFiles.getTail().getLength();
             }
         }
-        
+
         long rc = totalLength.get();
-        
+
         // The last file is actually at a minimum preferedFileLength big.
         if( tailLength < preferedFileLength ) {
             rc -= tailLength;
@@ -711,12 +709,12 @@ public class Journal {
         return rc;
     }
 
-	public void setReplicationTarget(ReplicationTarget replicationTarget) {
-		this.replicationTarget = replicationTarget;
-	}
-	public ReplicationTarget getReplicationTarget() {
-		return replicationTarget;
-	}
+    public void setReplicationTarget(ReplicationTarget replicationTarget) {
+        this.replicationTarget = replicationTarget;
+    }
+    public ReplicationTarget getReplicationTarget() {
+        return replicationTarget;
+    }
 
     public String getFileSuffix() {
         return fileSuffix;
@@ -726,13 +724,13 @@ public class Journal {
         this.fileSuffix = fileSuffix;
     }
 
-	public boolean isChecksum() {
-		return checksum;
-	}
-
-	public void setChecksum(boolean checksumWrites) {
-		this.checksum = checksumWrites;
-	}
+    public boolean isChecksum() {
+        return checksum;
+    }
+
+    public void setChecksum(boolean checksumWrites) {
+        this.checksum = checksumWrites;
+    }
 
     public boolean isCheckForCorruptionOnStartup() {
         return checkForCorruptionOnStartup;
@@ -745,7 +743,7 @@ public class Journal {
     public void setWriteBatchSize(int writeBatchSize) {
         this.writeBatchSize = writeBatchSize;
     }
-    
+
     public int getWriteBatchSize() {
         return writeBatchSize;
     }

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Page.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Page.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Page.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Page.java Mon Feb  6 22:24:58 2012
@@ -19,19 +19,9 @@ package org.apache.kahadb.page;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.kahadb.util.ByteSequence;
-import org.apache.kahadb.util.DataByteArrayInputStream;
-import org.apache.kahadb.util.DataByteArrayOutputStream;
-import org.apache.kahadb.util.Marshaller;
 
 /**
  * A Page within a file.
- * 
- * 
  */
 public class Page<T> {
 
@@ -48,7 +38,7 @@ public class Page<T> {
     long txId;
     // A field reserved to hold checksums..  Not in use (yet)
     int checksum;
-    
+
     // Points to the next page in the chunk stream
     long next;
     T data;
@@ -60,18 +50,17 @@ public class Page<T> {
         this.pageId=pageId;
     }
 
-    public void copy(Page<T> other) {
+    public Page<T> copy(Page<T> other) {
         this.pageId = other.pageId;
         this.txId = other.txId;
         this.type = other.type;
         this.next = other.next;
         this.data = other.data;
+        return this;
     }
 
     Page<T> copy() {
-        Page<T> rc = new Page<T>();
-        rc.copy(this);
-        return rc;
+        return new Page<T>().copy(this);
     }
 
     void makeFree(long txId) {
@@ -80,13 +69,13 @@ public class Page<T> {
         this.data = null;
         this.next = 0;
     }
-    
+
     public void makePagePart(long next, long txId) {
         this.type = Page.PAGE_PART_TYPE;
         this.next = next;
         this.txId = txId;
     }
-    
+
     public void makePageEnd(long size, long txId) {
         this.type = Page.PAGE_END_TYPE;
         this.next = size;
@@ -142,6 +131,4 @@ public class Page<T> {
     public void setChecksum(int checksum) {
         this.checksum = checksum;
     }
-
-
 }

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Mon Feb  6 22:24:58 2012
@@ -60,10 +60,10 @@ public class PageFile {
     private static final String FREE_FILE_SUFFIX = ".free";
 
     // 4k Default page size.
-    public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "" + 1024 * 4));
-    public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.parseInt(System.getProperty("defaultWriteBatchSize", "" + 1000));
-    public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.parseInt(System.getProperty("defaultPageCacheSize", "" + 100));
-    ;
+    public static final int DEFAULT_PAGE_SIZE = Integer.getInteger("defaultPageSize", 1024*4);
+    public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.getInteger("defaultWriteBatchSize", 1000);
+    public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.getInteger("defaultPageCacheSize", 100);;
+
     private static final int RECOVERY_FILE_HEADER_SIZE = 1024 * 4;
     private static final int PAGE_FILE_HEADER_SIZE = 1024 * 4;
 
@@ -87,14 +87,14 @@ public class PageFile {
 
     // The minimum number of space allocated to the recovery file in number of pages.
     private int recoveryFileMinPageCount = 1000;
-    // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize 
+    // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize
     // to this max size as soon as  possible.
     private int recoveryFileMaxPageCount = 10000;
     // The number of pages in the current recovery buffer
     private int recoveryPageCount;
 
     private AtomicBoolean loaded = new AtomicBoolean();
-    // The number of pages we are aiming to write every time we 
+    // The number of pages we are aiming to write every time we
     // write to disk.
     int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
 
@@ -113,7 +113,7 @@ public class PageFile {
     // Will writes be done in an async thread?
     private boolean enabledWriteThread = false;
 
-    // These are used if enableAsyncWrites==true 
+    // These are used if enableAsyncWrites==true
     private AtomicBoolean stopWriter = new AtomicBoolean();
     private Thread writerThread;
     private CountDownLatch checkpointLatch;
@@ -127,7 +127,7 @@ public class PageFile {
 
     private AtomicLong nextTxid = new AtomicLong();
 
-    // Persistent settings stored in the page file. 
+    // Persistent settings stored in the page file.
     private MetaData metaData;
 
     private ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
@@ -198,13 +198,11 @@ public class PageFile {
         void begin() {
             if (currentLocation != -1) {
                 diskBoundLocation = currentLocation;
-                currentLocation = -1;
-                current = null;
             } else {
                 diskBound = current;
-                current = null;
-                currentLocation = -1;
             }
+            current = null;
+            currentLocation = -1;
         }
 
         /**
@@ -219,7 +217,6 @@ public class PageFile {
         boolean isDone() {
             return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1;
         }
-
     }
 
     /**
@@ -336,10 +333,8 @@ public class PageFile {
      * @throws IOException
      */
     private void delete(File file) throws IOException {
-        if (file.exists()) {
-            if (!file.delete()) {
-                throw new IOException("Could not delete: " + file.getPath());
-            }
+        if (file.exists() && !file.delete()) {
+            throw new IOException("Could not delete: " + file.getPath());
         }
     }
 
@@ -407,13 +402,12 @@ public class PageFile {
 
                 // Scan all to find the free pages.
                 freeList = new SequenceSet();
-                for (Iterator i = tx().iterator(true); i.hasNext(); ) {
-                    Page page = (Page) i.next();
+                for (Iterator<Page> i = tx().iterator(true); i.hasNext(); ) {
+                    Page page = i.next();
                     if (page.getType() == Page.PAGE_FREE_TYPE) {
                         freeList.add(page.getPageId());
                     }
                 }
-
             }
 
             metaData.setCleanShutdown(false);
@@ -427,7 +421,7 @@ public class PageFile {
             startWriter();
 
         } else {
-            throw new IllegalStateException("Cannot load the page file when it is allready loaded.");
+            throw new IllegalStateException("Cannot load the page file when it is already loaded.");
         }
     }
 
@@ -516,7 +510,9 @@ public class PageFile {
         try {
             checkpointLatch.await();
         } catch (InterruptedException e) {
-            throw new InterruptedIOException();
+            InterruptedIOException ioe = new InterruptedIOException();
+            ioe.initCause(e);
+            throw ioe;
         }
     }
 
@@ -597,7 +593,7 @@ public class PageFile {
         ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE);
         p.store(os, "");
         if (os.size() > PAGE_FILE_HEADER_SIZE / 2) {
-            throw new IOException("Configuation is to larger than: " + PAGE_FILE_HEADER_SIZE / 2);
+            throw new IOException("Configuation is larger than: " + PAGE_FILE_HEADER_SIZE / 2);
         }
         // Fill the rest with space...
         byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE / 2) - os.size()];
@@ -632,7 +628,7 @@ public class PageFile {
     }
 
     ///////////////////////////////////////////////////////////////////
-    // Property Accessors 
+    // Property Accessors
     ///////////////////////////////////////////////////////////////////
 
     /**
@@ -773,7 +769,6 @@ public class PageFile {
     }
 
     public void setWriteBatchSize(int writeBatchSize) {
-        assertNotLoaded();
         this.writeBatchSize = writeBatchSize;
     }
 
@@ -833,9 +828,14 @@ public class PageFile {
 
             Page<T> first = null;
             int c = count;
-            while (c > 0) {
-                Page<T> page = new Page<T>(nextFreePageId.getAndIncrement());
-                page.makeFree(getNextWriteTransactionId());
+
+            // Perform the id's only once....
+            long pageId = nextFreePageId.getAndAdd(count);
+            long writeTxnId = nextTxid.getAndAdd(count);
+
+            while (c-- > 0) {
+                Page<T> page = new Page<T>(pageId++);
+                page.makeFree(writeTxnId++);
 
                 if (first == null) {
                     first = page;
@@ -847,7 +847,6 @@ public class PageFile {
                 write(page, out.getData());
 
                 // LOG.debug("allocate writing: "+page.getPageId());
-                c--;
             }
 
             return first;
@@ -985,9 +984,6 @@ public class PageFile {
     // Internal Double write implementation follows...
     ///////////////////////////////////////////////////////////////////
 
-    /**
-     *
-     */
     private void pollWrites() {
         try {
             while (!stopWriter.get()) {
@@ -1007,7 +1003,7 @@ public class PageFile {
                 writeBatch();
             }
         } catch (Throwable e) {
-            e.printStackTrace();
+            LOG.info("An exception was raised while performing poll writes", e);
         } finally {
             releaseCheckpointWaiter();
         }
@@ -1165,7 +1161,7 @@ public class PageFile {
                 batch.put(offset, data);
             }
         } catch (Exception e) {
-            // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it. 
+            // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it.
             // as the pages should still be consistent.
             LOG.debug("Redo buffer was not fully intact: ", e);
             return nextTxId;

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java Mon Feb  6 22:24:58 2012
@@ -39,6 +39,8 @@ public class Transaction implements Iter
      * and it's data is larger than what would fit into a single page.
      */
     public class PageOverflowIOException extends IOException {
+        private static final long serialVersionUID = 1L;
+
         public PageOverflowIOException(String message) {
             super(message);
         }
@@ -49,6 +51,8 @@ public class Transaction implements Iter
      * with an invalid page id.
      */
     public class InvalidPageIOException extends IOException {
+        private static final long serialVersionUID = 1L;
+
         private final long page;
 
         public InvalidPageIOException(String message, long page) {
@@ -92,7 +96,7 @@ public class Transaction implements Iter
     // List of pages freed in this transaction
     private final SequenceSet freeList = new SequenceSet();
 
-    private long maxTransactionSize = Long.parseLong(System.getProperty("maxKahaDBTxSize", "" + 10485760));
+    private long maxTransactionSize = Long.getLong("maxKahaDBTxSize", 10485760L);
 
     private long size = 0;
 
@@ -178,12 +182,14 @@ public class Transaction implements Iter
     public <T> void free(Page<T> page, int count) throws IOException {
         pageFile.assertLoaded();
         long offsetPage = page.getPageId();
-        for (int i = 0; i < count; i++) {
+        while (count-- > 0) {
             if (page == null) {
-                page = load(offsetPage + i, null);
+                page = load(offsetPage, null);
             }
             free(page);
             page = null;
+            // Increment the offsetPage value since using it depends on the current count.
+            offsetPage++;
         }
     }
 
@@ -318,7 +324,6 @@ public class Transaction implements Iter
 
             }
 
-            @SuppressWarnings("unchecked")
             @Override
             public void close() throws IOException {
                 super.close();
@@ -551,7 +556,6 @@ public class Transaction implements Iter
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
-    @SuppressWarnings("unchecked")
     public Iterator<Page> iterator() {
         return (Iterator<Page>)iterator(false);
     }
@@ -569,6 +573,7 @@ public class Transaction implements Iter
         pageFile.assertLoaded();
 
         return new Iterator<Page>() {
+
             long nextId;
             Page nextPage;
             Page lastPage;
@@ -699,7 +704,6 @@ public class Transaction implements Iter
     /**
      * Queues up a page write that should get done when commit() gets called.
      */
-    @SuppressWarnings("unchecked")
     private void write(final Page page, byte[] data) throws IOException {
         Long key = page.getPageId();
 
@@ -707,7 +711,7 @@ public class Transaction implements Iter
         size = writes.size() * pageFile.getPageSize();
 
         PageWrite write;
-        
+
         if (size > maxTransactionSize) {
             if (tmpFile == null) {
                 tmpFile = new RandomAccessFile(getTempFile(), "rw");
@@ -796,5 +800,4 @@ public class Transaction implements Iter
             }
         }
     }
-
 }

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/IOHelper.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/IOHelper.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/IOHelper.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/IOHelper.java Mon Feb  6 22:24:58 2012
@@ -18,19 +18,18 @@ package org.apache.kahadb.util;
 
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Stack;
 
-/**
- * 
- */
 public final class IOHelper {
+
     protected static final int MAX_DIR_NAME_LENGTH;
     protected static final int MAX_FILE_NAME_LENGTH;
     private static final int DEFAULT_BUFFER_SIZE = 4096;
+
     private IOHelper() {
     }
 
@@ -65,18 +64,18 @@ public final class IOHelper {
     public static String toFileSystemDirectorySafeName(String name) {
         return toFileSystemSafeName(name, true, MAX_DIR_NAME_LENGTH);
     }
-    
+
     public static String toFileSystemSafeName(String name) {
         return toFileSystemSafeName(name, false, MAX_FILE_NAME_LENGTH);
     }
-    
+
     /**
      * Converts any string into a string that is safe to use as a file name.
      * The result will only include ascii characters and numbers, and the "-","_", and "." characters.
      *
      * @param name
-     * @param dirSeparators 
-     * @param maxFileLength 
+     * @param dirSeparators
+     * @param maxFileLength
      * @return
      */
     public static String toFileSystemSafeName(String name,boolean dirSeparators,int maxFileLength) {
@@ -104,8 +103,45 @@ public final class IOHelper {
         }
         return result;
     }
-    
-    public static boolean deleteFile(File fileToDelete) {
+
+    public static boolean delete(File top) {
+        boolean result = true;
+        Stack<File> files = new Stack<File>();
+        // Add file to the stack to be processed...
+        files.push(top);
+        // Process all files until none remain...
+        while (!files.isEmpty()) {
+            File file = files.pop();
+            if (file.isDirectory()) {
+                File list[] = file.listFiles();
+                if (list == null || list.length == 0) {
+                    // The current directory contains no entries...
+                    // delete directory and continue...
+                    result &= file.delete();
+                } else {
+                    // Add back the directory since it is not empty....
+                    // and when we process it again it will be empty and can be
+                    // deleted safely...
+                    files.push(file);
+                    for (File dirFile : list) {
+                        if (dirFile.isDirectory()) {
+                            // Place the directory on the stack...
+                            files.push(dirFile);
+                        } else {
+                            // This is a simple file, delete it...
+                            result &= dirFile.delete();
+                        }
+                    }
+                }
+            } else {
+                // This is a simple file, delete it...
+                result &= file.delete();
+            }
+        }
+        return result;
+    }
+
+    private static boolean deleteFile(File fileToDelete) {
         if (fileToDelete == null || !fileToDelete.exists()) {
             return true;
         }
@@ -113,8 +149,8 @@ public final class IOHelper {
         result &= fileToDelete.delete();
         return result;
     }
-    
-    public static boolean deleteChildren(File parent) {
+
+    private static boolean deleteChildren(File parent) {
         if (parent == null || !parent.exists()) {
             return false;
         }
@@ -138,23 +174,22 @@ public final class IOHelper {
                 }
             }
         }
-       
+
         return result;
     }
-    
-    
+
     public static void moveFile(File src, File targetDirectory) throws IOException {
         if (!src.renameTo(new File(targetDirectory, src.getName()))) {
             throw new IOException("Failed to move " + src + " to " + targetDirectory);
         }
     }
-    
+
     public static void copyFile(File src, File dest) throws IOException {
         FileInputStream fileSrc = new FileInputStream(src);
         FileOutputStream fileDest = new FileOutputStream(dest);
         copyInputStream(fileSrc, fileDest);
     }
-    
+
     public static void copyInputStream(InputStream in, OutputStream out) throws IOException {
         byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
         int len = in.read(buffer);
@@ -165,19 +200,18 @@ public final class IOHelper {
         in.close();
         out.close();
     }
-    
+
     static {
-        MAX_DIR_NAME_LENGTH = Integer.valueOf(System.getProperty("MaximumDirNameLength","200")).intValue();  
-        MAX_FILE_NAME_LENGTH = Integer.valueOf(System.getProperty("MaximumFileNameLength","64")).intValue();             
+        MAX_DIR_NAME_LENGTH = Integer.getInteger("MaximumDirNameLength",200);
+        MAX_FILE_NAME_LENGTH = Integer.getInteger("MaximumFileNameLength",64);
     }
 
-    
     public static void mkdirs(File dir) throws IOException {
         if (dir.exists()) {
             if (!dir.isDirectory()) {
                 throw new IOException("Failed to create directory '" + dir +"', regular file already existed with that name");
             }
-            
+
         } else {
             if (!dir.mkdirs()) {
                 throw new IOException("Failed to create directory '" + dir+"'");

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java Mon Feb  6 22:24:58 2012
@@ -25,19 +25,19 @@ import java.util.Date;
 
 /**
  * Used to lock a File.
- * 
+ *
  * @author chirino
  */
 public class LockFile {
-    
-    private static final boolean DISABLE_FILE_LOCK = "true".equals(System.getProperty("java.nio.channels.FileLock.broken", "false"));
+
+    private static final boolean DISABLE_FILE_LOCK = Boolean.getBoolean("java.nio.channels.FileLock.broken");
     final private File file;
-    
+
     private FileLock lock;
     private RandomAccessFile readFile;
     private int lockCounter;
     private final boolean deleteOnUnlock;
-    
+
     public LockFile(File file, boolean deleteOnUnlock) {
         this.file = file;
         this.deleteOnUnlock = deleteOnUnlock;
@@ -54,7 +54,7 @@ public class LockFile {
         if( lockCounter>0 ) {
             return;
         }
-        
+
         IOHelper.mkdirs(file.getParentFile());
         if (System.getProperty(getVmLockKey()) != null) {
             throw new IOException("File '" + file + "' could not be locked as lock is already held for this jvm.");
@@ -80,7 +80,7 @@ public class LockFile {
                 }
                 throw new IOException("File '" + file + "' could not be locked.");
             }
-              
+
         }
     }
 
@@ -90,12 +90,12 @@ public class LockFile {
         if (DISABLE_FILE_LOCK) {
             return;
         }
-        
+
         lockCounter--;
         if( lockCounter!=0 ) {
             return;
         }
-        
+
         // release the lock..
         if (lock != null) {
             try {
@@ -106,7 +106,7 @@ public class LockFile {
             lock = null;
         }
         closeReadFile();
-        
+
         if( deleteOnUnlock ) {
             file.delete();
         }
@@ -125,7 +125,7 @@ public class LockFile {
             }
             readFile = null;
         }
-        
+
     }
 
 }

Modified: activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java (original)
+++ activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java Mon Feb  6 22:24:58 2012
@@ -49,16 +49,15 @@ public abstract class IndexBenchmark ext
 
     public void setUp() throws Exception {
         ROOT_DIR = new File(IOHelper.getDefaultDataDirectory());
-        IOHelper.mkdirs(ROOT_DIR);
-        IOHelper.deleteChildren(ROOT_DIR);
-        
+        IOHelper.delete(ROOT_DIR);
+
         pf = new PageFile(ROOT_DIR, getClass().getName());
         pf.load();
     }
 
     protected void tearDown() throws Exception {
         Transaction tx = pf.tx();
-        for (Index i : indexes.values()) {
+        for (Index<?, ?> i : indexes.values()) {
             try {
                 i.unload(tx);
             } catch (Throwable ignore) {
@@ -99,7 +98,7 @@ public abstract class IndexBenchmark ext
             try {
 
                 Transaction tx = pf.tx();
-                
+
                 Index<String,Long> index = openIndex(name);
                 long counter = 0;
                 while (!shutdown.get()) {
@@ -109,7 +108,7 @@ public abstract class IndexBenchmark ext
                     index.put(tx, key, c);
                     tx.commit();
                     Thread.yield(); // This avoids consumer starvation..
-                    
+
                     onProduced(counter++);
                 }
 
@@ -121,7 +120,7 @@ public abstract class IndexBenchmark ext
         public void onProduced(long counter) {
         }
     }
-    
+
     protected String key(long c) {
         return "a-long-message-id-like-key-" + c;
     }
@@ -150,7 +149,7 @@ public abstract class IndexBenchmark ext
                 while (!shutdown.get()) {
                     long c = counter;
                     String key = key(c);
-                    
+
                     Long record = index.get(tx, key);
                     if (record != null) {
                         if( index.remove(tx, key) == null ) {

Modified: activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java (original)
+++ activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java Mon Feb  6 22:24:58 2012
@@ -43,9 +43,7 @@ public abstract class IndexTestSupport e
     protected void setUp() throws Exception {
         super.setUp();
         directory = new File(IOHelper.getDefaultDataDirectory());
-        IOHelper.mkdirs(directory);
-        IOHelper.deleteChildren(directory);
-        
+        IOHelper.delete(directory);
     }
 
     protected void tearDown() throws Exception {
@@ -54,7 +52,7 @@ public abstract class IndexTestSupport e
             pf.delete();
         }
     }
-    
+
     protected void createPageFileAndIndex(int pageSize) throws Exception {
         pf = new PageFile(directory, getClass().getName());
         pf.setPageSize(pageSize);

Modified: activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java?rev=1241221&r1=1241220&r2=1241221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java (original)
+++ activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java Mon Feb  6 22:24:58 2012
@@ -23,13 +23,14 @@ import java.util.concurrent.TimeUnit;
 import junit.framework.TestCase;
 import org.apache.kahadb.journal.Journal;
 import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.IOHelper;
 
 public class JournalTest extends TestCase {
     protected static final int DEFAULT_MAX_BATCH_SIZE = 1024 * 1024 * 4;
 
     Journal dataManager;
     File dir;
-    
+
     @Override
     public void setUp() throws Exception {
         dir = new File("target/tests/DataFileAppenderTest");
@@ -39,28 +40,16 @@ public class JournalTest extends TestCas
         configure(dataManager);
         dataManager.start();
     }
-    
+
     protected void configure(Journal dataManager) {
     }
 
     @Override
     public void tearDown() throws Exception {
         dataManager.close();
-        deleteFilesInDirectory(dir);
-        dir.delete();
+        IOHelper.delete(dir);
     }
 
-    private void deleteFilesInDirectory(File directory) {
-        File[] files = directory.listFiles();
-        for (int i=0; i<files.length; i++) {
-            File f = files[i];
-            if (f.isDirectory()) {
-                deleteFilesInDirectory(f);
-            }   
-            f.delete();
-        }  
-    }  
-
     public void testBatchWriteCallbackCompleteAfterTimeout() throws Exception {
         final int iterations = 10;
         final CountDownLatch latch = new CountDownLatch(iterations);
@@ -68,7 +57,7 @@ public class JournalTest extends TestCas
         for (int i=0; i < iterations; i++) {
             dataManager.write(data, new Runnable() {
                 public void run() {
-                    latch.countDown();                 
+                    latch.countDown();
                 }
             });
         }
@@ -84,7 +73,7 @@ public class JournalTest extends TestCas
         for (int i=0; i<iterations; i++) {
             dataManager.write(data, new Runnable() {
                 public void run() {
-                    latch.countDown();                 
+                    latch.countDown();
                 }
             });
         }
@@ -92,7 +81,7 @@ public class JournalTest extends TestCas
         assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty());
         assertEquals("none written", 0, latch.getCount());
     }
-    
+
     public void testBatchWriteCompleteAfterClose() throws Exception {
         ByteSequence data = new ByteSequence("DATA".getBytes());
         final int iterations = 10;
@@ -102,27 +91,27 @@ public class JournalTest extends TestCas
         dataManager.close();
         assertTrue("queued data is written:" + dataManager.getInflightWrites().size(), dataManager.getInflightWrites().isEmpty());
     }
-    
+
     public void testBatchWriteToMaxMessageSize() throws Exception {
         final int iterations = 4;
         final CountDownLatch latch = new CountDownLatch(iterations);
         Runnable done = new Runnable() {
             public void run() {
-                latch.countDown();                 
+                latch.countDown();
             }
         };
         int messageSize = DEFAULT_MAX_BATCH_SIZE / iterations;
         byte[] message = new byte[messageSize];
         ByteSequence data = new ByteSequence(message);
-        
+
         for (int i=0; i< iterations; i++) {
             dataManager.write(data, done);
         }
-        
+
         // write may take some time
         assertTrue("all callbacks complete", latch.await(10, TimeUnit.SECONDS));
     }
-    
+
     public void testNoBatchWriteWithSync() throws Exception {
         ByteSequence data = new ByteSequence("DATA".getBytes());
         final int iterations = 10;