You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/01/09 23:22:06 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-4947

Updated Branches:
  refs/heads/trunk efb988655 -> ef619b6a9


https://issues.apache.org/jira/browse/AMQ-4947

Updated fix for this issue allows for enabling non-forced metadata
updates to the file channel via FileChannel#force(false)

enable this by defining
"org.apache.activemq.kahaDB.files.skipMetadataUpdate=true"

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ef619b6a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ef619b6a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ef619b6a

Branch: refs/heads/trunk
Commit: ef619b6a9bb9fd215a3996c8dc49f21e04380547
Parents: efb9886
Author: Timothy Bish <ta...@gmai.com>
Authored: Thu Jan 9 17:21:56 2014 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Thu Jan 9 17:21:56 2014 -0500

----------------------------------------------------------------------
 .../CallerBufferingDataFileAppender.java        |  23 ++--
 .../kahadb/disk/journal/DataFileAccessor.java   |  17 +--
 .../kahadb/disk/journal/DataFileAppender.java   |  11 +-
 .../store/kahadb/disk/page/PageFile.java        |  34 +++--
 .../store/kahadb/disk/util/DiskBenchmark.java   | 133 ++++++++-----------
 .../util/RecoverableRandomAccessFile.java       |  28 +++-
 6 files changed, 134 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ef619b6a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
index ff11848..d7c4a28 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
@@ -17,11 +17,11 @@
 package org.apache.activemq.store.kahadb.disk.journal;
 
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
-import org.apache.activemq.util.ByteSequence;
+
 import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.RecoverableRandomAccessFile;
 
 /**
@@ -30,7 +30,7 @@ import org.apache.activemq.util.RecoverableRandomAccessFile;
  * does.
  * The thread calling enqueue does the file open and buffering of the data, which
  * reduces the round trip of the write thread.
- * 
+ *
  */
 class CallerBufferingDataFileAppender extends DataFileAppender {
 
@@ -49,6 +49,7 @@ class CallerBufferingDataFileAppender extends DataFileAppender {
             append(write);
         }
 
+        @Override
         public void append(Journal.WriteCommand write) throws IOException {
             super.append(write);
             forceToDisk |= appendToBuffer(write, buff);
@@ -124,15 +125,15 @@ class CallerBufferingDataFileAppender extends DataFileAppender {
                 final boolean forceToDisk = wb.forceToDisk;
 
                 ByteSequence sequence = buff.toByteSequence();
-                
-                // Now we can fill in the batch control record properly. 
+
+                // Now we can fill in the batch control record properly.
                 buff.reset();
                 buff.skip(5+Journal.BATCH_CONTROL_RECORD_MAGIC.length);
                 buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
                 if( journal.isChecksum() ) {
-	                Checksum checksum = new Adler32();
-	                checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
-	                buff.writeLong(checksum.getValue());
+                    Checksum checksum = new Adler32();
+                    checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
+                    buff.writeLong(checksum.getValue());
                 }
 
                 // Now do the 1 big write.
@@ -151,11 +152,11 @@ class CallerBufferingDataFileAppender extends DataFileAppender {
                 file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
                 ReplicationTarget replicationTarget = journal.getReplicationTarget();
                 if( replicationTarget!=null ) {
-                	replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk);
+                    replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk);
                 }
-                
+
                 if (forceToDisk) {
-                    file.getFD().sync();
+                    file.sync();
                 }
 
                 Journal.WriteCommand lastWrite = wb.writes.getTail();

http://git-wip-us.apache.org/repos/asf/activemq/blob/ef619b6a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
index 2896196..2046031 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
@@ -17,7 +17,6 @@
 package org.apache.activemq.store.kahadb.disk.journal;
 
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.util.Map;
 
 import org.apache.activemq.util.ByteSequence;
@@ -26,8 +25,8 @@ import org.apache.activemq.util.RecoverableRandomAccessFile;
 /**
  * Optimized Store reader and updater. Single threaded and synchronous. Use in
  * conjunction with the DataFileAccessorPool of concurrent use.
- * 
- * 
+ *
+ *
  */
 final class DataFileAccessor {
 
@@ -38,7 +37,7 @@ final class DataFileAccessor {
 
     /**
      * Construct a Store reader
-     * 
+     *
      * @param fileId
      * @throws IOException
      */
@@ -70,7 +69,7 @@ final class DataFileAccessor {
             throw new IOException("Invalid location: " + location);
         }
 
-        Journal.WriteCommand asyncWrite = (Journal.WriteCommand)inflightWrites.get(new Journal.WriteKey(location));
+        Journal.WriteCommand asyncWrite = inflightWrites.get(new Journal.WriteKey(location));
         if (asyncWrite != null) {
             return asyncWrite.data;
         }
@@ -93,7 +92,7 @@ final class DataFileAccessor {
             throw new IOException("Invalid location: " + location + ", : " + e, e);
         }
     }
-    
+
     public void readFully(long offset, byte data[]) throws IOException {
        file.seek(offset);
        file.readFully(data);
@@ -105,7 +104,7 @@ final class DataFileAccessor {
     }
 
     public void readLocationDetails(Location location) throws IOException {
-        Journal.WriteCommand asyncWrite = (Journal.WriteCommand)inflightWrites.get(new Journal.WriteKey(location));
+        Journal.WriteCommand asyncWrite = inflightWrites.get(new Journal.WriteKey(location));
         if (asyncWrite != null) {
             location.setSize(asyncWrite.location.getSize());
             location.setType(asyncWrite.location.getType());
@@ -155,9 +154,7 @@ final class DataFileAccessor {
         int size = Math.min(data.getLength(), location.getSize());
         file.write(data.getData(), data.getOffset(), size);
         if (sync) {
-            file.getFD().sync();
+            file.sync();
         }
-
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ef619b6a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
index 095db52..969584e 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
@@ -18,16 +18,15 @@ package org.apache.activemq.store.kahadb.disk.journal;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.io.RandomAccessFile;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
 
-import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
 import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
+import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.RecoverableRandomAccessFile;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,10 +66,12 @@ class DataFileAppender implements FileAppender {
             hash = (int)(file ^ offset);
         }
 
+        @Override
         public int hashCode() {
             return hash;
         }
 
+        @Override
         public boolean equals(Object obj) {
             if (obj instanceof WriteKey) {
                 WriteKey di = (WriteKey)obj;
@@ -132,6 +133,7 @@ class DataFileAppender implements FileAppender {
         this.syncOnComplete = this.journal.isEnableAsyncDiskSync();
     }
 
+    @Override
     public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
 
         // Write the packet our internal buffer.
@@ -160,6 +162,7 @@ class DataFileAppender implements FileAppender {
         return location;
     }
 
+    @Override
     public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
         // Write the packet our internal buffer.
         int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
@@ -185,6 +188,7 @@ class DataFileAppender implements FileAppender {
             if (!running) {
                 running = true;
                 thread = new Thread() {
+                    @Override
                     public void run() {
                         processQueue();
                     }
@@ -246,6 +250,7 @@ class DataFileAppender implements FileAppender {
         return new WriteBatch(file, file.getLength(), write);
     }
 
+    @Override
     public void close() throws IOException {
         synchronized (enqueueMutex) {
             if (!shutdown) {
@@ -365,7 +370,7 @@ class DataFileAppender implements FileAppender {
                 }
 
                 if (forceToDisk) {
-                    file.getFD().sync();
+                    file.sync();
                 }
 
                 Journal.WriteCommand lastWrite = wb.writes.getTail();

http://git-wip-us.apache.org/repos/asf/activemq/blob/ef619b6a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
index 3f107a6..17d6a54 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
@@ -42,9 +42,15 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
 
-import org.apache.activemq.util.*;
 import org.apache.activemq.store.kahadb.disk.util.Sequence;
 import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.LFUCache;
+import org.apache.activemq.util.LRUCache;
+import org.apache.activemq.util.RecoverableRandomAccessFile;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,7 +81,7 @@ public class PageFile {
     private static final Logger LOG = LoggerFactory.getLogger(PageFile.class);
 
     // A PageFile will use a couple of files in this directory
-    private File directory;
+    private final File directory;
     // And the file names in that directory will be based on this name.
     private final String name;
 
@@ -97,7 +103,7 @@ public class PageFile {
     // The number of pages in the current recovery buffer
     private int recoveryPageCount;
 
-    private AtomicBoolean loaded = new AtomicBoolean();
+    private final AtomicBoolean loaded = new AtomicBoolean();
     // The number of pages we are aiming to write every time we
     // write to disk.
     int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
@@ -118,23 +124,23 @@ public class PageFile {
     private boolean enabledWriteThread = false;
 
     // These are used if enableAsyncWrites==true
-    private AtomicBoolean stopWriter = new AtomicBoolean();
+    private final AtomicBoolean stopWriter = new AtomicBoolean();
     private Thread writerThread;
     private CountDownLatch checkpointLatch;
 
     // Keeps track of writes that are being written to disk.
-    private TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>();
+    private final TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>();
 
     // Keeps track of free pages.
     private final AtomicLong nextFreePageId = new AtomicLong();
     private SequenceSet freeList = new SequenceSet();
 
-    private AtomicLong nextTxid = new AtomicLong();
+    private final AtomicLong nextTxid = new AtomicLong();
 
     // Persistent settings stored in the page file.
     private MetaData metaData;
 
-    private ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
+    private final ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
 
     private boolean useLFRUEviction = false;
     private float LFUEvictionFactor = 0.2f;
@@ -521,6 +527,7 @@ public class PageFile {
     }
 
 
+    @Override
     public String toString() {
         return "Page File: " + getMainPageFile();
     }
@@ -610,10 +617,10 @@ public class PageFile {
         // So we don't loose it.. write it 2 times...
         writeFile.seek(0);
         writeFile.write(d);
-        writeFile.getFD().sync();
+        writeFile.sync();
         writeFile.seek(PAGE_FILE_HEADER_SIZE / 2);
         writeFile.write(d);
-        writeFile.getFD().sync();
+        writeFile.sync();
     }
 
     private void storeFreeList() throws IOException {
@@ -880,14 +887,17 @@ public class PageFile {
     private <T> void write(Page<T> page, byte[] data) throws IOException {
         final PageWrite write = new PageWrite(page, data);
         Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>() {
+            @Override
             public Long getKey() {
                 return write.getPage().getPageId();
             }
 
+            @Override
             public PageWrite getValue() {
                 return write;
             }
 
+            @Override
             public PageWrite setValue(PageWrite value) {
                 return null;
             }
@@ -1081,9 +1091,9 @@ public class PageFile {
             if (enableDiskSyncs) {
                 // Sync to make sure recovery buffer writes land on disk..
                 if (enableRecoveryFile) {
-                    recoveryFile.getFD().sync();
+                    writeFile.sync();
                 }
-                writeFile.getFD().sync();
+                writeFile.sync();
             }
         } finally {
             synchronized (writes) {
@@ -1185,7 +1195,7 @@ public class PageFile {
         }
 
         // And sync it to disk
-        writeFile.getFD().sync();
+        writeFile.sync();
         return nextTxId;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/ef619b6a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DiskBenchmark.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DiskBenchmark.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DiskBenchmark.java
index 2805f5f..641fe79 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DiskBenchmark.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DiskBenchmark.java
@@ -27,13 +27,16 @@ import java.util.Arrays;
  */
 public class DiskBenchmark {
 
+    private static final boolean SKIP_METADATA_UPDATE =
+        Boolean.getBoolean("org.apache.activemq.file.skipMetadataUpdate");
+
     boolean verbose;
     // reads and writes work with 4k of data at a time.
-    int bs=1024*4; 
+    int bs = 1024 * 4;
     // Work with 100 meg file.
-    long size=1024*1024*500; 
-    long sampleInterval = 10*1000; 
-    
+    long size = 1024 * 1024 * 500;
+    long sampleInterval = 10 * 1000;
+
     public static void main(String[] args) {
 
         DiskBenchmark benchmark = new DiskBenchmark();
@@ -67,79 +70,69 @@ public class DiskBenchmark {
         }
 
     }
-    
+
     public static class Report {
 
         public int size;
-        
+
         public int writes;
         public long writeDuration;
-        
+
         public int syncWrites;
         public long syncWriteDuration;
-        
+
         public int reads;
         public long readDuration;
 
         @Override
         public String toString() {
-            return 
-            "Writes: \n" +
-            "  "+writes+" writes of size "+size+" written in "+(writeDuration/1000.0)+" seconds.\n"+
-            "  "+getWriteRate()+" writes/second.\n"+
-            "  "+getWriteSizeRate()+" megs/second.\n"+
-            "\n"+
-            "Sync Writes: \n" +
-            "  "+syncWrites+" writes of size "+size+" written in "+(syncWriteDuration/1000.0)+" seconds.\n"+
-            "  "+getSyncWriteRate()+" writes/second.\n"+
-            "  "+getSyncWriteSizeRate()+" megs/second.\n"+
-            "\n"+
-            "Reads: \n" +
-            "  "+reads+" reads of size "+size+" read in "+(readDuration/1000.0)+" seconds.\n"+
-            "  "+getReadRate()+" writes/second.\n"+
-            "  "+getReadSizeRate()+" megs/second.\n"+
-            "\n"+
-            "";
+            return "Writes: \n" + "  " + writes + " writes of size " + size + " written in " + (writeDuration / 1000.0) + " seconds.\n" + "  " + getWriteRate()
+                + " writes/second.\n" + "  " + getWriteSizeRate() + " megs/second.\n" + "\n" + "Sync Writes: \n" + "  " + syncWrites + " writes of size "
+                + size + " written in " + (syncWriteDuration / 1000.0) + " seconds.\n" + "  " + getSyncWriteRate() + " writes/second.\n" + "  "
+                + getSyncWriteSizeRate() + " megs/second.\n" + "\n" + "Reads: \n" + "  " + reads + " reads of size " + size + " read in "
+                + (readDuration / 1000.0) + " seconds.\n" + "  " + getReadRate() + " writes/second.\n" + "  " + getReadSizeRate() + " megs/second.\n" + "\n"
+                + "";
         }
 
         private float getWriteSizeRate() {
             float rc = writes;
             rc *= size;
-            rc /= (1024*1024); // put it in megs
-            rc /= (writeDuration/1000.0); // get rate. 
+            rc /= (1024 * 1024); // put it in megs
+            rc /= (writeDuration / 1000.0); // get rate.
             return rc;
         }
 
         private float getWriteRate() {
             float rc = writes;
-            rc /= (writeDuration/1000.0); // get rate. 
+            rc /= (writeDuration / 1000.0); // get rate.
             return rc;
         }
-        
+
         private float getSyncWriteSizeRate() {
             float rc = syncWrites;
             rc *= size;
-            rc /= (1024*1024); // put it in megs
-            rc /= (syncWriteDuration/1000.0); // get rate. 
+            rc /= (1024 * 1024); // put it in megs
+            rc /= (syncWriteDuration / 1000.0); // get rate.
             return rc;
         }
 
         private float getSyncWriteRate() {
             float rc = syncWrites;
-            rc /= (syncWriteDuration/1000.0); // get rate. 
+            rc /= (syncWriteDuration / 1000.0); // get rate.
             return rc;
         }
+
         private float getReadSizeRate() {
             float rc = reads;
             rc *= size;
-            rc /= (1024*1024); // put it in megs
-            rc /= (readDuration/1000.0); // get rate. 
+            rc /= (1024 * 1024); // put it in megs
+            rc /= (readDuration / 1000.0); // get rate.
             return rc;
         }
 
         private float getReadRate() {
             float rc = reads;
-            rc /= (readDuration/1000.0); // get rate. 
+            rc /= (readDuration / 1000.0); // get rate.
             return rc;
         }
 
@@ -200,64 +193,63 @@ public class DiskBenchmark {
         }
     }
 
-
     public Report benchmark(File file) throws IOException {
         Report rc = new Report();
-        
+
         // Initialize the block we will be writing to disk.
-        byte []data = new byte[bs];
+        byte[] data = new byte[bs];
         for (int i = 0; i < data.length; i++) {
-            data[i] = (byte)('a'+(i%26));
+            data[i] = (byte) ('a' + (i % 26));
         }
-        
+
         rc.size = data.length;
         RandomAccessFile raf = new RandomAccessFile(file, "rw");
         raf.setLength(size);
-        
+
         // Figure out how many writes we can do in the sample interval.
         long start = System.currentTimeMillis();
         long now = System.currentTimeMillis();
-        int ioCount=0;
-        while( true ) {
-            if( (now-start)>sampleInterval ) {
+        int ioCount = 0;
+        while (true) {
+            if ((now - start) > sampleInterval) {
                 break;
             }
             raf.seek(0);
-            for( long i=0; i+data.length < size; i+=data.length) {
+            for (long i = 0; i + data.length < size; i += data.length) {
                 raf.write(data);
                 ioCount++;
                 now = System.currentTimeMillis();
-                if( (now-start)>sampleInterval ) {
+                if ((now - start) > sampleInterval) {
                     break;
                 }
             }
-            // Sync to disk so that the we actually write the data to disk.. otherwise 
-            // OS buffering might not really do the write.
-            raf.getFD().sync();
+            // Sync to disk so that the we actually write the data to disk..
+            // otherwise OS buffering might not really do the write.
+            raf.getChannel().force(!SKIP_METADATA_UPDATE);
         }
-        raf.getFD().sync();
+        raf.getChannel().force(!SKIP_METADATA_UPDATE);
         raf.close();
         now = System.currentTimeMillis();
-        
+
         rc.size = data.length;
         rc.writes = ioCount;
-        rc.writeDuration = (now-start);
+        rc.writeDuration = (now - start);
 
         raf = new RandomAccessFile(file, "rw");
         start = System.currentTimeMillis();
         now = System.currentTimeMillis();
-        ioCount=0;
-        while( true ) {
-            if( (now-start)>sampleInterval ) {
+        ioCount = 0;
+        while (true) {
+            if ((now - start) > sampleInterval) {
                 break;
             }
-            for( long i=0; i+data.length < size; i+=data.length) {
+            for (long i = 0; i + data.length < size; i += data.length) {
                 raf.seek(i);
                 raf.write(data);
-                raf.getFD().sync();
+                raf.getChannel().force(false);
                 ioCount++;
                 now = System.currentTimeMillis();
-                if( (now-start)>sampleInterval ) {
+                if ((now - start) > sampleInterval) {
                     break;
                 }
             }
@@ -265,72 +257,63 @@ public class DiskBenchmark {
         raf.close();
         now = System.currentTimeMillis();
         rc.syncWrites = ioCount;
-        rc.syncWriteDuration = (now-start);
+        rc.syncWriteDuration = (now - start);
 
         raf = new RandomAccessFile(file, "rw");
         start = System.currentTimeMillis();
         now = System.currentTimeMillis();
-        ioCount=0;
-        while( true ) {
-            if( (now-start)>sampleInterval ) {
+        ioCount = 0;
+        while (true) {
+            if ((now - start) > sampleInterval) {
                 break;
             }
             raf.seek(0);
-            for( long i=0; i+data.length < size; i+=data.length) {
+            for (long i = 0; i + data.length < size; i += data.length) {
                 raf.seek(i);
                 raf.readFully(data);
                 ioCount++;
                 now = System.currentTimeMillis();
-                if( (now-start)>sampleInterval ) {
+                if ((now - start) > sampleInterval) {
                     break;
                 }
             }
         }
         raf.close();
-        
+
         rc.reads = ioCount;
-        rc.readDuration = (now-start);
+        rc.readDuration = (now - start);
         return rc;
     }
 
-
     public boolean isVerbose() {
         return verbose;
     }
 
-
     public void setVerbose(boolean verbose) {
         this.verbose = verbose;
     }
 
-
     public int getBs() {
         return bs;
     }
 
-
     public void setBs(int bs) {
         this.bs = bs;
     }
 
-
     public long getSize() {
         return size;
     }
 
-
     public void setSize(long size) {
         this.size = size;
     }
 
-
     public long getSampleInterval() {
         return sampleInterval;
     }
 
-
     public void setSampleInterval(long sampleInterval) {
         this.sampleInterval = sampleInterval;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ef619b6a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
index 35c1586..1b0cb4c 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
@@ -16,10 +16,18 @@
  */
 package org.apache.activemq.util;
 
-import java.io.*;
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
 
 public class RecoverableRandomAccessFile implements java.io.DataOutput, java.io.DataInput, java.io.Closeable {
 
+    private static final boolean SKIP_METADATA_UPDATE =
+        Boolean.getBoolean("org.apache.activemq.kahaDB.files.skipMetadataUpdate");
+
     RandomAccessFile raf;
     File file;
     String mode;
@@ -389,6 +397,24 @@ public class RecoverableRandomAccessFile implements java.io.DataOutput, java.io.
         }
     }
 
+    public void sync() throws IOException {
+        try {
+            getRaf().getChannel().force(!SKIP_METADATA_UPDATE);;
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
+    public FileChannel getChannel() throws IOException {
+        try {
+            return getRaf().getChannel();
+        } catch (IOException ioe) {
+            handleException();
+            throw ioe;
+        }
+    }
+
     public int read(byte[] b, int off, int len) throws IOException {
         try {
             return getRaf().read(b, off, len);