You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/09/05 02:02:57 UTC

svn commit: r692306 - in /activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal: DataFile.java DataFileAppender.java Journal.java NIODataFileAppender.java ReadOnlyJournal.java

Author: chirino
Date: Thu Sep  4 17:02:57 2008
New Revision: 692306

URL: http://svn.apache.org/viewvc?rev=692306&view=rev
Log:
Updated the journal to use the new LinkedNodeList interfaces.

Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java?rev=692306&r1=692305&r2=692306&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java Thu Sep  4 17:02:57 2008
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.io.RandomAccessFile;
 
+import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
 import org.apache.kahadb.util.IOHelper;
 import org.apache.kahadb.util.LinkedNode;
 
@@ -28,7 +29,7 @@
  * 
  * @version $Revision: 1.1.1.1 $
  */
-public class DataFile extends LinkedNode implements Comparable<DataFile> {
+public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFile> {
 
     protected final File file;
     protected final Integer dataFileId;

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=692306&r1=692305&r2=692306&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Thu Sep  4 17:02:57 2008
@@ -25,6 +25,7 @@
 import org.apache.kahadb.util.ByteSequence;
 import org.apache.kahadb.util.DataByteArrayOutputStream;
 import org.apache.kahadb.util.LinkedNode;
+import org.apache.kahadb.util.LinkedNodeList;
 
 /**
  * An optimized writer to do batch appends to a data file. This object is thread
@@ -40,7 +41,8 @@
 
     protected final Journal dataManager;
     protected final Map<WriteKey, WriteCommand> inflightWrites;
-    protected final Object enqueueMutex = new Object(){};
+    protected final Object enqueueMutex = new Object() {
+    };
     protected WriteBatch nextWriteBatch;
 
     protected boolean shutdown;
@@ -79,13 +81,14 @@
     public class WriteBatch {
 
         public final DataFile dataFile;
-        public final WriteCommand first;
+
+        public final LinkedNodeList<WriteCommand> writes = new LinkedNodeList<WriteCommand>();
         public final CountDownLatch latch = new CountDownLatch(1);
         public int size;
 
         public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException {
             this.dataFile = dataFile;
-            this.first = write;
+            this.writes.addLast(write);
             size += write.location.getSize();
         }
 
@@ -100,12 +103,12 @@
         }
 
         public void append(WriteCommand write) throws IOException {
-            this.first.getTailNode().linkAfter(write);
+            this.writes.addLast(write);
             size += write.location.getSize();
         }
     }
 
-    public static class WriteCommand extends LinkedNode {
+    public static class WriteCommand extends LinkedNode<WriteCommand> {
         public final Location location;
         public final ByteSequence data;
         final boolean sync;
@@ -115,18 +118,17 @@
             this.location = location;
             this.data = data;
             this.sync = sync;
-            this.onComplete=null;
+            this.onComplete = null;
         }
 
         public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
             this.location = location;
             this.data = data;
-			this.onComplete = onComplete;
+            this.onComplete = onComplete;
             this.sync = false;
-		}
+        }
     }
 
-
     /**
      * Construct a Store writer
      * 
@@ -161,13 +163,14 @@
         WriteCommand write = new WriteCommand(location, data, sync);
 
         // Locate datafile and enqueue into the executor in sychronized block so
-        // that writes get equeued onto the executor in order that they were assigned
+        // that writes get equeued onto the executor in order that they were
+        // assigned
         // by the data manager (which is basically just appending)
 
         synchronized (this) {
             // Find the position where this item will land at.
             DataFile dataFile = dataManager.allocateLocation(location);
-            if( !sync ) {
+            if (!sync) {
                 inflightWrites.put(new WriteKey(location), write);
             }
             batch = enqueue(dataFile, write);
@@ -183,8 +186,8 @@
 
         return location;
     }
-    
-	public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
+
+    public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
         // Write the packet our internal buffer.
         int size = data.getLength() + Journal.ITEM_HEAD_FOOT_SPACE;
 
@@ -196,7 +199,8 @@
         WriteCommand write = new WriteCommand(location, data, onComplete);
 
         // Locate datafile and enqueue into the executor in sychronized block so
-        // that writes get equeued onto the executor in order that they were assigned
+        // that writes get equeued onto the executor in order that they were
+        // assigned
         // by the data manager (which is basically just appending)
 
         synchronized (this) {
@@ -208,7 +212,7 @@
         location.setLatch(batch.latch);
 
         return location;
-	}
+    }
 
     private WriteBatch enqueue(DataFile dataFile, WriteCommand write) throws IOException {
         synchronized (enqueueMutex) {
@@ -287,13 +291,11 @@
 
     /**
      * The async processing loop that writes to the data files and does the
-     * force calls.
-     * 
-     * Since the file sync() call is the slowest of all the operations, this
-     * algorithm tries to 'batch' or group together several file sync() requests
-     * into a single file sync() call. The batching is accomplished attaching
-     * the same CountDownLatch instance to every force request in a group.
-     * 
+     * force calls. Since the file sync() call is the slowest of all the
+     * operations, this algorithm tries to 'batch' or group together several
+     * file sync() requests into a single file sync() call. The batching is
+     * accomplished attaching the same CountDownLatch instance to every force
+     * request in a group.
      */
     protected void processQueue() {
         DataFile dataFile = null;
@@ -330,21 +332,20 @@
                     file = dataFile.openRandomAccessFile(true);
                 }
 
-                WriteCommand write = wb.first;
+                WriteCommand write = wb.writes.getHead();
 
                 // Write all the data.
                 // Only need to seek to first location.. all others
                 // are in sequence.
                 file.seek(write.location.getOffset());
 
-                
-                boolean forceToDisk=false;
-                
+                boolean forceToDisk = false;
+
                 // 
                 // is it just 1 big write?
                 if (wb.size == write.location.getSize()) {
-                    forceToDisk = write.sync | write.onComplete!=null;
-                    
+                    forceToDisk = write.sync | write.onComplete != null;
+
                     // Just write it directly..
                     file.writeInt(write.location.getSize());
                     file.writeByte(write.location.getType());
@@ -357,7 +358,7 @@
 
                     // Combine the smaller writes into 1 big buffer
                     while (write != null) {
-                        forceToDisk |= write.sync | write.onComplete!=null;
+                        forceToDisk |= write.sync | write.onComplete != null;
 
                         buff.writeInt(write.location.getSize());
                         buff.writeByte(write.location.getType());
@@ -366,7 +367,7 @@
                         buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
                         buff.write(Journal.ITEM_HEAD_EOR);
 
-                        write = (WriteCommand)write.getNext();
+                        write = write.getNext();
                     }
 
                     // Now do the 1 big write.
@@ -375,31 +376,31 @@
                     buff.reset();
                 }
 
-                if( forceToDisk ) {
+                if (forceToDisk) {
                     file.getFD().sync();
                 }
-                
-                WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
+
+                WriteCommand lastWrite = wb.writes.getTail();
                 dataManager.setLastAppendLocation(lastWrite.location);
 
                 // Now that the data is on disk, remove the writes from the in
                 // flight
                 // cache.
-                write = wb.first;
+                write = wb.writes.getHead();
                 while (write != null) {
                     if (!write.sync) {
                         inflightWrites.remove(new WriteKey(write.location));
                     }
-                    if( write.onComplete !=null ) {
-                    	 try {
-							write.onComplete.run();
-						} catch (Throwable e) {
-							e.printStackTrace();
-						}
+                    if (write.onComplete != null) {
+                        try {
+                            write.onComplete.run();
+                        } catch (Throwable e) {
+                            e.printStackTrace();
+                        }
                     }
-                    write = (WriteCommand)write.getNext();
+                    write = write.getNext();
                 }
-                
+
                 // Signal any waiting threads that the write is on disk.
                 wb.latch.countDown();
             }
@@ -419,5 +420,4 @@
         }
     }
 
-
 }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=692306&r1=692305&r2=692306&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Thu Sep  4 17:02:57 2008
@@ -42,10 +42,9 @@
 import org.apache.kahadb.journal.DataFileAppender.WriteKey;
 import org.apache.kahadb.util.ByteSequence;
 import org.apache.kahadb.util.IOHelper;
+import org.apache.kahadb.util.LinkedNodeList;
 import org.apache.kahadb.util.Scheduler;
 
-
-
 /**
  * Manages DataFiles
  * 
@@ -62,8 +61,12 @@
 
     public static final int ITEM_HEAD_FOOT_SPACE = ITEM_HEAD_SPACE + ITEM_FOOT_SPACE;
 
-    public static final byte[] ITEM_HEAD_SOR = new byte[] {'S', 'O', 'R'}; // 
-    public static final byte[] ITEM_HEAD_EOR = new byte[] {'E', 'O', 'R'}; // 
+    public static final byte[] ITEM_HEAD_SOR = new byte[] {
+        'S', 'O', 'R'
+    }; // 
+    public static final byte[] ITEM_HEAD_EOR = new byte[] {
+        'E', 'O', 'R'
+    }; // 
 
     public static final byte DATA_ITEM_TYPE = 1;
     public static final byte REDO_ITEM_TYPE = 2;
@@ -79,7 +82,7 @@
     protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
 
     protected File directory = new File(DEFAULT_DIRECTORY);
-    protected File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY);
+    protected File directoryArchive = new File(DEFAULT_ARCHIVE_DIRECTORY);
     protected String filePrefix = DEFAULT_FILE_PREFIX;
     protected ControlFile controlFile;
     protected boolean started;
@@ -93,18 +96,18 @@
 
     protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
     protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
-    protected DataFile currentWriteFile;
+    protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>();
 
     protected Location mark;
     protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
     protected Runnable cleanupTask;
     protected final AtomicLong storeSize;
     protected boolean archiveDataLogs;
-    
+
     public Journal(AtomicLong storeSize) {
-        this.storeSize=storeSize;
+        this.storeSize = storeSize;
     }
-    
+
     public Journal() {
         this(new AtomicLong());
     }
@@ -116,7 +119,7 @@
         }
 
         started = true;
-        preferedFileLength=Math.max(PREFERED_DIFF, getMaxFileLength()-PREFERED_DIFF);
+        preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF);
         lock();
 
         ByteSequence sequence = controlFile.load();
@@ -134,7 +137,7 @@
                 return dir.equals(directory) && n.startsWith(filePrefix);
             }
         });
-       
+
         if (files != null) {
             for (int i = 0; i < files.length; i++) {
                 try {
@@ -154,33 +157,29 @@
             // right order.
             List<DataFile> l = new ArrayList<DataFile>(fileMap.values());
             Collections.sort(l);
-            currentWriteFile = null;
             for (DataFile df : l) {
-                if (currentWriteFile != null) {
-                    currentWriteFile.linkAfter(df);
-                }
-                currentWriteFile = df;
+                dataFiles.addLast(df);
                 fileByFileMap.put(df.getFile(), df);
             }
         }
 
         // Need to check the current Write File to see if there was a partial
         // write to it.
-        if (currentWriteFile != null) {
+        if (!dataFiles.isEmpty()) {
 
             // See if the lastSyncedLocation is valid..
             Location l = lastAppendLocation.get();
-            if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) {
+            if (l != null && l.getDataFileId() != dataFiles.getTail().getDataFileId()) {
                 l = null;
             }
 
             // If we know the last location that was ok.. then we can skip lots
             // of checking
-            try{
-            l = recoveryCheck(currentWriteFile, l);
-            lastAppendLocation.set(l);
-            }catch(IOException e){
-            	LOG.warn("recovery check failed", e);
+            try {
+                l = recoveryCheck(dataFiles.getTail(), l);
+                lastAppendLocation.set(l);
+            } catch (IOException e) {
+                LOG.warn("recovery check failed", e);
             }
         }
 
@@ -264,25 +263,24 @@
     }
 
     synchronized DataFile allocateLocation(Location location) throws IOException {
-        if (currentWriteFile == null || ((currentWriteFile.getLength() + location.getSize()) > maxFileLength)) {
-            int nextNum = currentWriteFile != null ? currentWriteFile.getDataFileId().intValue() + 1 : 1;
+        if (dataFiles.isEmpty()|| ((dataFiles.getTail().getLength() + location.getSize()) > maxFileLength)) {
+            int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
 
             String fileName = filePrefix + nextNum;
             File file = new File(directory, fileName);
             DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
-            //actually allocate the disk space
+            // actually allocate the disk space
             nextWriteFile.closeRandomAccessFile(nextWriteFile.openRandomAccessFile(true));
             fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
             fileByFileMap.put(file, nextWriteFile);
-            if (currentWriteFile != null) {
-                currentWriteFile.linkAfter(nextWriteFile);
-                if (currentWriteFile.isUnused()) {
-                    removeDataFile(currentWriteFile);
-                }
+            dataFiles.addLast(nextWriteFile);
+            
+            DataFile previous = dataFiles.getTail().getPrevious();
+            if (previous!=null && previous.isUnused()) {
+                removeDataFile(previous);
             }
-            currentWriteFile = nextWriteFile;
-
         }
+        DataFile currentWriteFile = dataFiles.getTail();
         location.setOffset(currentWriteFile.getLength());
         location.setDataFileId(currentWriteFile.getDataFileId().intValue());
         int size = location.getSize();
@@ -291,9 +289,9 @@
         storeSize.addAndGet(size);
         return currentWriteFile;
     }
-    
-    public synchronized void removeLocation(Location location) throws IOException{
-       
+
+    public synchronized void removeLocation(Location location) throws IOException {
+
         DataFile dataFile = getDataFile(location);
         dataFile.decrement();
     }
@@ -307,19 +305,19 @@
         }
         return dataFile;
     }
-    
+
     synchronized File getFile(Location item) throws IOException {
         Integer key = Integer.valueOf(item.getDataFileId());
         DataFile dataFile = fileMap.get(key);
         if (dataFile == null) {
             LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
-            throw new IOException("Could not locate data file " + filePrefix  + item.getDataFileId());
+            throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId());
         }
         return dataFile.getFile();
     }
 
     private DataFile getNextDataFile(DataFile dataFile) {
-        return (DataFile)dataFile.getNext();
+        return dataFile.getNext();
     }
 
     public synchronized void close() throws IOException {
@@ -350,8 +348,8 @@
         accessorPool.close();
 
         boolean result = true;
-        for (Iterator i = fileMap.values().iterator(); i.hasNext();) {
-            DataFile dataFile = (DataFile)i.next();
+        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
+            DataFile dataFile = i.next();
             storeSize.addAndGet(-dataFile.getLength());
             result &= dataFile.delete();
         }
@@ -359,7 +357,7 @@
         fileByFileMap.clear();
         lastAppendLocation.set(null);
         mark = null;
-        currentWriteFile = null;
+        dataFiles = new LinkedNodeList<DataFile>();
 
         // reopen open file handles...
         accessorPool = new DataFileAccessorPool(this);
@@ -374,7 +372,7 @@
     public synchronized void addInterestInFile(int file) throws IOException {
         if (file >= 0) {
             Integer key = Integer.valueOf(file);
-            DataFile dataFile = (DataFile)fileMap.get(key);
+            DataFile dataFile = fileMap.get(key);
             if (dataFile == null) {
                 throw new IOException("That data file does not exist");
             }
@@ -391,10 +389,10 @@
     public synchronized void removeInterestInFile(int file) throws IOException {
         if (file >= 0) {
             Integer key = Integer.valueOf(file);
-            DataFile dataFile = (DataFile)fileMap.get(key);
+            DataFile dataFile = fileMap.get(key);
             removeInterestInFile(dataFile);
         }
-       
+
     }
 
     synchronized void removeInterestInFile(DataFile dataFile) throws IOException {
@@ -405,18 +403,18 @@
         }
     }
 
-    public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Set<Integer>inProgress) throws IOException {
+    public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Set<Integer> inProgress) throws IOException {
         Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
         unUsed.removeAll(inUse);
         unUsed.removeAll(inProgress);
-                
+
         List<DataFile> purgeList = new ArrayList<DataFile>();
         for (Integer key : unUsed) {
-            DataFile dataFile = (DataFile)fileMap.get(key);
+            DataFile dataFile = fileMap.get(key);
             purgeList.add(dataFile);
         }
         for (DataFile dataFile : purgeList) {
-            if (dataFile.getDataFileId() != currentWriteFile.getDataFileId()) {
+            if (dataFile != dataFiles.getTail()) {
                 forceRemoveDataFile(dataFile);
             }
         }
@@ -425,19 +423,19 @@
     public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Integer lastFile) throws IOException {
         Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
         unUsed.removeAll(inUse);
-                
+
         List<DataFile> purgeList = new ArrayList<DataFile>();
         for (Integer key : unUsed) {
-        	// Only add files less than the lastFile..
-        	if( key.intValue() < lastFile.intValue() ) {
-                DataFile dataFile = (DataFile)fileMap.get(key);
+            // Only add files less than the lastFile..
+            if (key.intValue() < lastFile.intValue()) {
+                DataFile dataFile = fileMap.get(key);
                 purgeList.add(dataFile);
-        	}
+            }
         }
         for (DataFile dataFile : purgeList) {
             forceRemoveDataFile(dataFile);
         }
-	}
+    }
 
     public synchronized void consolidateDataFiles() throws IOException {
         List<DataFile> purgeList = new ArrayList<DataFile>();
@@ -454,28 +452,25 @@
     private synchronized void removeDataFile(DataFile dataFile) throws IOException {
 
         // Make sure we don't delete too much data.
-        if (dataFile == currentWriteFile || mark == null || dataFile.getDataFileId() >= mark.getDataFileId()) {
+        if (dataFile == dataFiles.getTail() || mark == null || dataFile.getDataFileId() >= mark.getDataFileId()) {
             LOG.debug("Won't remove DataFile" + dataFile);
-        	return;
+            return;
         }
         forceRemoveDataFile(dataFile);
     }
-    
-    private synchronized void forceRemoveDataFile(DataFile dataFile)
-            throws IOException {
+
+    private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException {
         accessorPool.disposeDataFileAccessors(dataFile);
         fileByFileMap.remove(dataFile.getFile());
-        DataFile removed = fileMap.remove(dataFile.getDataFileId());
+        fileMap.remove(dataFile.getDataFileId());
         storeSize.addAndGet(-dataFile.getLength());
         dataFile.unlink();
         if (archiveDataLogs) {
             dataFile.move(getDirectoryArchive());
-            LOG.info("moved data file " + dataFile + " to "
-                    + getDirectoryArchive());
+            LOG.info("moved data file " + dataFile + " to " + getDirectoryArchive());
         } else {
             boolean result = dataFile.delete();
-            LOG.info("discarding data file " + dataFile
-                    + (result ? "successful " : "failed"));
+            LOG.info("discarding data file " + dataFile + (result ? "successful " : "failed"));
         }
     }
 
@@ -507,18 +502,18 @@
         while (true) {
             if (cur == null) {
                 if (location == null) {
-                    DataFile head = (DataFile)currentWriteFile.getHeadNode();
+                    DataFile head = dataFiles.getHead();
                     cur = new Location();
                     cur.setDataFileId(head.getDataFileId());
                     cur.setOffset(0);
                 } else {
                     // Set to the next offset..
-                	if( location.getSize() == -1 ) {
-                		cur = new Location(location);
-                	}  else {
-	            		cur = new Location(location);
-	            		cur.setOffset(location.getOffset()+location.getSize());
-                	}
+                    if (location.getSize() == -1) {
+                        cur = new Location(location);
+                    } else {
+                        cur = new Location(location);
+                        cur.setOffset(location.getOffset() + location.getSize());
+                    }
                 }
             } else {
                 cur.setOffset(cur.getOffset() + cur.getSize());
@@ -553,20 +548,19 @@
             }
         }
     }
-    
-    public synchronized Location getNextLocation(File file, Location lastLocation,boolean thisFileOnly) throws IllegalStateException, IOException{
+
+    public synchronized Location getNextLocation(File file, Location lastLocation, boolean thisFileOnly) throws IllegalStateException, IOException {
         DataFile df = fileByFileMap.get(file);
-        return getNextLocation(df, lastLocation,thisFileOnly);
+        return getNextLocation(df, lastLocation, thisFileOnly);
     }
-    
-    public synchronized Location getNextLocation(DataFile dataFile,
-            Location lastLocation,boolean thisFileOnly) throws IOException, IllegalStateException {
+
+    public synchronized Location getNextLocation(DataFile dataFile, Location lastLocation, boolean thisFileOnly) throws IOException, IllegalStateException {
 
         Location cur = null;
         while (true) {
             if (cur == null) {
                 if (lastLocation == null) {
-                    DataFile head = (DataFile)dataFile.getHeadNode();
+                    DataFile head = dataFile.getHeadNode();
                     cur = new Location();
                     cur.setDataFileId(head.getDataFileId());
                     cur.setOffset(0);
@@ -579,19 +573,18 @@
                 cur.setOffset(cur.getOffset() + cur.getSize());
             }
 
-            
             // Did it go into the next file??
             if (dataFile.getLength() <= cur.getOffset()) {
                 if (thisFileOnly) {
                     return null;
-                }else {
-                dataFile = getNextDataFile(dataFile);
-                if (dataFile == null) {
-                    return null;
                 } else {
-                    cur.setDataFileId(dataFile.getDataFileId().intValue());
-                    cur.setOffset(0);
-                }
+                    dataFile = getNextDataFile(dataFile);
+                    if (dataFile == null) {
+                        return null;
+                    } else {
+                        cur.setDataFileId(dataFile.getDataFileId().intValue());
+                        cur.setOffset(0);
+                    }
                 }
             }
 
@@ -641,7 +634,7 @@
         Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
         return loc;
     }
-    
+
     public synchronized Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
         Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
         return loc;
@@ -689,22 +682,22 @@
         this.lastAppendLocation.set(lastSyncedLocation);
     }
 
-	public boolean isUseNio() {
-		return useNio;
-	}
-
-	public void setUseNio(boolean useNio) {
-		this.useNio = useNio;
-	}
-	
-	public File getDirectoryArchive() {
+    public boolean isUseNio() {
+        return useNio;
+    }
+
+    public void setUseNio(boolean useNio) {
+        this.useNio = useNio;
+    }
+
+    public File getDirectoryArchive() {
         return directoryArchive;
     }
 
     public void setDirectoryArchive(File directoryArchive) {
         this.directoryArchive = directoryArchive;
     }
-    
+
     public boolean isArchiveDataLogs() {
         return archiveDataLogs;
     }
@@ -714,40 +707,41 @@
     }
 
     synchronized public Integer getCurrentDataFileId() {
-        if( currentWriteFile==null )
+        if (dataFiles.isEmpty())
             return null;
-        return currentWriteFile.getDataFileId();
+        return dataFiles.getTail().getDataFileId();
     }
-    
+
     /**
      * Get a set of files - only valid after start()
+     * 
      * @return files currently being used
      */
-    public Set<File> getFiles(){
+    public Set<File> getFiles() {
         return fileByFileMap.keySet();
     }
 
-	synchronized public long getDiskSize() {
-		long rc=0;
-        DataFile cur = (DataFile)currentWriteFile.getHeadNode();
-        while( cur !=null ) {
-        	rc += cur.getLength();
-        	cur = (DataFile) cur.getNext();
-        }
-		return rc;
-	}
-
-	synchronized public long getDiskSizeUntil(Location startPosition) {
-		long rc=0;
-        DataFile cur = (DataFile)currentWriteFile.getHeadNode();
-        while( cur !=null ) {
-        	if( cur.getDataFileId().intValue() >= startPosition.getDataFileId() ) {
-        		return rc + startPosition.getOffset();
-        	}
-        	rc += cur.getLength();
-        	cur = (DataFile) cur.getNext();
+    synchronized public long getDiskSize() {
+        long rc = 0;
+        DataFile cur = dataFiles.getHead();
+        while (cur != null) {
+            rc += cur.getLength();
+            cur = cur.getNext();
         }
-		return rc;
-	}
+        return rc;
+    }
+
+    synchronized public long getDiskSizeUntil(Location startPosition) {
+        long rc = 0;
+        DataFile cur = dataFiles.getHead();
+        while (cur != null) {
+            if (cur.getDataFileId().intValue() >= startPosition.getDataFileId()) {
+                return rc + startPosition.getOffset();
+            }
+            rc += cur.getLength();
+            cur = cur.getNext();
+        }
+        return rc;
+    }
 
 }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java?rev=692306&r1=692305&r2=692306&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java Thu Sep  4 17:02:57 2008
@@ -91,7 +91,7 @@
                     channel = file.getChannel();
                 }
 
-                WriteCommand write = wb.first;
+                WriteCommand write = wb.writes.getHead();
 
                 // Write all the data.
                 // Only need to seek to first location.. all others
@@ -139,7 +139,7 @@
                         copy(footer, buffer);
                         assert !footer.hasRemaining();
 
-                        write = (WriteCommand)write.getNext();
+                        write = write.getNext();
                     }
 
                     // Fully write out the buffer..
@@ -152,13 +152,13 @@
                     file.getChannel().force(false);
                 }
 
-                WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
+                WriteCommand lastWrite = wb.writes.getTail();
                 dataManager.setLastAppendLocation(lastWrite.location);
 
                 // Now that the data is on disk, remove the writes from the in
                 // flight
                 // cache.
-                write = wb.first;
+                write = wb.writes.getHead();
                 while (write != null) {
                     if (!write.sync) {
                         inflightWrites.remove(new WriteKey(write.location));
@@ -170,7 +170,7 @@
 							e.printStackTrace();
 						}
 					}
-                    write = (WriteCommand)write.getNext();
+                    write = write.getNext();
                 }
                 
                 // Signal any waiting threads that the write is on disk.

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java?rev=692306&r1=692305&r2=692306&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java Thu Sep  4 17:02:57 2008
@@ -25,8 +25,6 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.kahadb.util.ByteSequence;
-import org.apache.kahadb.util.Scheduler;
 
 /**
  * An AsyncDataManager that works in read only mode against multiple data directories.
@@ -77,27 +75,32 @@
 
         // Sort the list so that we can link the DataFiles together in the
         // right order.
-        List<DataFile> dataFiles = new ArrayList<DataFile>(fileMap.values());
-        Collections.sort(dataFiles);
-        currentWriteFile = null;
-        for (DataFile df : dataFiles) {
-            if (currentWriteFile != null) {
-                currentWriteFile.linkAfter(df);
-            }
-            currentWriteFile = df;
+        List<DataFile> list = new ArrayList<DataFile>(fileMap.values());
+        Collections.sort(list);
+        for (DataFile df : list) {
+            dataFiles.addLast(df);
             fileByFileMap.put(df.getFile(), df);
         }
         
-        // Need to check the current Write File to see if there was a partial
-        // write to it.
-        if (currentWriteFile != null) {
-
-            // See if the lastSyncedLocation is valid..
-            Location l = lastAppendLocation.get();
-            if (l != null && l.getDataFileId() != currentWriteFile.getDataFileId().intValue()) {
-                l = null;
-            }
-        }
+//        // Need to check the current Write File to see if there was a partial
+//        // write to it.
+//        if (!dataFiles.isEmpty()) {
+//
+//            // See if the lastSyncedLocation is valid..
+//            Location l = lastAppendLocation.get();
+//            if (l != null && l.getDataFileId() != dataFiles.getTail().getDataFileId().intValue()) {
+//                l = null;
+//            }
+//            
+//            // If we know the last location that was ok.. then we can skip lots
+//            // of checking
+//            try {
+//                l = recoveryCheck(dataFiles.getTail(), l);
+//                lastAppendLocation.set(l);
+//            } catch (IOException e) {
+//                LOG.warn("recovery check failed", e);
+//            }
+//        }
     }
     
     public synchronized void close() throws IOException {
@@ -112,11 +115,11 @@
 
     
     public Location getFirstLocation() throws IllegalStateException, IOException {
-        if( currentWriteFile == null ) {
+        if( dataFiles.isEmpty() ) {
             return null;
         }
         
-        DataFile first = (DataFile)currentWriteFile.getHeadNode();
+        DataFile first = dataFiles.getHead();
         Location cur = new Location();
         cur.setDataFileId(first.getDataFileId());
         cur.setOffset(0);