You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/01/09 23:22:06 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-4947
Updated Branches:
refs/heads/trunk efb988655 -> ef619b6a9
https://issues.apache.org/jira/browse/AMQ-4947
Updated fix for this issue allows for enabling non-forced metadata
updates to the file channel via FileChannel#force(false)
enable this by defining
"org.apache.activemq.kahaDB.files.skipMetadataUpdate=true"
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ef619b6a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ef619b6a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ef619b6a
Branch: refs/heads/trunk
Commit: ef619b6a9bb9fd215a3996c8dc49f21e04380547
Parents: efb9886
Author: Timothy Bish <ta...@gmai.com>
Authored: Thu Jan 9 17:21:56 2014 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Thu Jan 9 17:21:56 2014 -0500
----------------------------------------------------------------------
.../CallerBufferingDataFileAppender.java | 23 ++--
.../kahadb/disk/journal/DataFileAccessor.java | 17 +--
.../kahadb/disk/journal/DataFileAppender.java | 11 +-
.../store/kahadb/disk/page/PageFile.java | 34 +++--
.../store/kahadb/disk/util/DiskBenchmark.java | 133 ++++++++-----------
.../util/RecoverableRandomAccessFile.java | 28 +++-
6 files changed, 134 insertions(+), 112 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/ef619b6a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
index ff11848..d7c4a28 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
@@ -17,11 +17,11 @@
package org.apache.activemq.store.kahadb.disk.journal;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
-import org.apache.activemq.util.ByteSequence;
+
import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.RecoverableRandomAccessFile;
/**
@@ -30,7 +30,7 @@ import org.apache.activemq.util.RecoverableRandomAccessFile;
* does.
* The thread calling enqueue does the file open and buffering of the data, which
* reduces the round trip of the write thread.
- *
+ *
*/
class CallerBufferingDataFileAppender extends DataFileAppender {
@@ -49,6 +49,7 @@ class CallerBufferingDataFileAppender extends DataFileAppender {
append(write);
}
+ @Override
public void append(Journal.WriteCommand write) throws IOException {
super.append(write);
forceToDisk |= appendToBuffer(write, buff);
@@ -124,15 +125,15 @@ class CallerBufferingDataFileAppender extends DataFileAppender {
final boolean forceToDisk = wb.forceToDisk;
ByteSequence sequence = buff.toByteSequence();
-
- // Now we can fill in the batch control record properly.
+
+ // Now we can fill in the batch control record properly.
buff.reset();
buff.skip(5+Journal.BATCH_CONTROL_RECORD_MAGIC.length);
buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
if( journal.isChecksum() ) {
- Checksum checksum = new Adler32();
- checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
- buff.writeLong(checksum.getValue());
+ Checksum checksum = new Adler32();
+ checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
+ buff.writeLong(checksum.getValue());
}
// Now do the 1 big write.
@@ -151,11 +152,11 @@ class CallerBufferingDataFileAppender extends DataFileAppender {
file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
ReplicationTarget replicationTarget = journal.getReplicationTarget();
if( replicationTarget!=null ) {
- replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk);
+ replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk);
}
-
+
if (forceToDisk) {
- file.getFD().sync();
+ file.sync();
}
Journal.WriteCommand lastWrite = wb.writes.getTail();
http://git-wip-us.apache.org/repos/asf/activemq/blob/ef619b6a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
index 2896196..2046031 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
@@ -17,7 +17,6 @@
package org.apache.activemq.store.kahadb.disk.journal;
import java.io.IOException;
-import java.io.RandomAccessFile;
import java.util.Map;
import org.apache.activemq.util.ByteSequence;
@@ -26,8 +25,8 @@ import org.apache.activemq.util.RecoverableRandomAccessFile;
/**
* Optimized Store reader and updater. Single threaded and synchronous. Use in
* conjunction with the DataFileAccessorPool of concurrent use.
- *
- *
+ *
+ *
*/
final class DataFileAccessor {
@@ -38,7 +37,7 @@ final class DataFileAccessor {
/**
* Construct a Store reader
- *
+ *
* @param fileId
* @throws IOException
*/
@@ -70,7 +69,7 @@ final class DataFileAccessor {
throw new IOException("Invalid location: " + location);
}
- Journal.WriteCommand asyncWrite = (Journal.WriteCommand)inflightWrites.get(new Journal.WriteKey(location));
+ Journal.WriteCommand asyncWrite = inflightWrites.get(new Journal.WriteKey(location));
if (asyncWrite != null) {
return asyncWrite.data;
}
@@ -93,7 +92,7 @@ final class DataFileAccessor {
throw new IOException("Invalid location: " + location + ", : " + e, e);
}
}
-
+
public void readFully(long offset, byte data[]) throws IOException {
file.seek(offset);
file.readFully(data);
@@ -105,7 +104,7 @@ final class DataFileAccessor {
}
public void readLocationDetails(Location location) throws IOException {
- Journal.WriteCommand asyncWrite = (Journal.WriteCommand)inflightWrites.get(new Journal.WriteKey(location));
+ Journal.WriteCommand asyncWrite = inflightWrites.get(new Journal.WriteKey(location));
if (asyncWrite != null) {
location.setSize(asyncWrite.location.getSize());
location.setType(asyncWrite.location.getType());
@@ -155,9 +154,7 @@ final class DataFileAccessor {
int size = Math.min(data.getLength(), location.getSize());
file.write(data.getData(), data.getOffset(), size);
if (sync) {
- file.getFD().sync();
+ file.sync();
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ef619b6a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
index 095db52..969584e 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
@@ -18,16 +18,15 @@ package org.apache.activemq.store.kahadb.disk.journal;
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.io.RandomAccessFile;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
-import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
+import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.RecoverableRandomAccessFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,10 +66,12 @@ class DataFileAppender implements FileAppender {
hash = (int)(file ^ offset);
}
+ @Override
public int hashCode() {
return hash;
}
+ @Override
public boolean equals(Object obj) {
if (obj instanceof WriteKey) {
WriteKey di = (WriteKey)obj;
@@ -132,6 +133,7 @@ class DataFileAppender implements FileAppender {
this.syncOnComplete = this.journal.isEnableAsyncDiskSync();
}
+ @Override
public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
// Write the packet our internal buffer.
@@ -160,6 +162,7 @@ class DataFileAppender implements FileAppender {
return location;
}
+ @Override
public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
// Write the packet our internal buffer.
int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
@@ -185,6 +188,7 @@ class DataFileAppender implements FileAppender {
if (!running) {
running = true;
thread = new Thread() {
+ @Override
public void run() {
processQueue();
}
@@ -246,6 +250,7 @@ class DataFileAppender implements FileAppender {
return new WriteBatch(file, file.getLength(), write);
}
+ @Override
public void close() throws IOException {
synchronized (enqueueMutex) {
if (!shutdown) {
@@ -365,7 +370,7 @@ class DataFileAppender implements FileAppender {
}
if (forceToDisk) {
- file.getFD().sync();
+ file.sync();
}
Journal.WriteCommand lastWrite = wb.writes.getTail();
http://git-wip-us.apache.org/repos/asf/activemq/blob/ef619b6a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
index 3f107a6..17d6a54 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
@@ -42,9 +42,15 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
-import org.apache.activemq.util.*;
import org.apache.activemq.store.kahadb.disk.util.Sequence;
import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.LFUCache;
+import org.apache.activemq.util.LRUCache;
+import org.apache.activemq.util.RecoverableRandomAccessFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,7 +81,7 @@ public class PageFile {
private static final Logger LOG = LoggerFactory.getLogger(PageFile.class);
// A PageFile will use a couple of files in this directory
- private File directory;
+ private final File directory;
// And the file names in that directory will be based on this name.
private final String name;
@@ -97,7 +103,7 @@ public class PageFile {
// The number of pages in the current recovery buffer
private int recoveryPageCount;
- private AtomicBoolean loaded = new AtomicBoolean();
+ private final AtomicBoolean loaded = new AtomicBoolean();
// The number of pages we are aiming to write every time we
// write to disk.
int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
@@ -118,23 +124,23 @@ public class PageFile {
private boolean enabledWriteThread = false;
// These are used if enableAsyncWrites==true
- private AtomicBoolean stopWriter = new AtomicBoolean();
+ private final AtomicBoolean stopWriter = new AtomicBoolean();
private Thread writerThread;
private CountDownLatch checkpointLatch;
// Keeps track of writes that are being written to disk.
- private TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>();
+ private final TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>();
// Keeps track of free pages.
private final AtomicLong nextFreePageId = new AtomicLong();
private SequenceSet freeList = new SequenceSet();
- private AtomicLong nextTxid = new AtomicLong();
+ private final AtomicLong nextTxid = new AtomicLong();
// Persistent settings stored in the page file.
private MetaData metaData;
- private ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
+ private final ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
private boolean useLFRUEviction = false;
private float LFUEvictionFactor = 0.2f;
@@ -521,6 +527,7 @@ public class PageFile {
}
+ @Override
public String toString() {
return "Page File: " + getMainPageFile();
}
@@ -610,10 +617,10 @@ public class PageFile {
// So we don't loose it.. write it 2 times...
writeFile.seek(0);
writeFile.write(d);
- writeFile.getFD().sync();
+ writeFile.sync();
writeFile.seek(PAGE_FILE_HEADER_SIZE / 2);
writeFile.write(d);
- writeFile.getFD().sync();
+ writeFile.sync();
}
private void storeFreeList() throws IOException {
@@ -880,14 +887,17 @@ public class PageFile {
private <T> void write(Page<T> page, byte[] data) throws IOException {
final PageWrite write = new PageWrite(page, data);
Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>() {
+ @Override
public Long getKey() {
return write.getPage().getPageId();
}
+ @Override
public PageWrite getValue() {
return write;
}
+ @Override
public PageWrite setValue(PageWrite value) {
return null;
}
@@ -1081,9 +1091,9 @@ public class PageFile {
if (enableDiskSyncs) {
// Sync to make sure recovery buffer writes land on disk..
if (enableRecoveryFile) {
- recoveryFile.getFD().sync();
+ writeFile.sync();
}
- writeFile.getFD().sync();
+ writeFile.sync();
}
} finally {
synchronized (writes) {
@@ -1185,7 +1195,7 @@ public class PageFile {
}
// And sync it to disk
- writeFile.getFD().sync();
+ writeFile.sync();
return nextTxId;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ef619b6a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DiskBenchmark.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DiskBenchmark.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DiskBenchmark.java
index 2805f5f..641fe79 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DiskBenchmark.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/util/DiskBenchmark.java
@@ -27,13 +27,16 @@ import java.util.Arrays;
*/
public class DiskBenchmark {
+ private static final boolean SKIP_METADATA_UPDATE =
+ Boolean.getBoolean("org.apache.activemq.file.skipMetadataUpdate");
+
boolean verbose;
// reads and writes work with 4k of data at a time.
- int bs=1024*4;
+ int bs = 1024 * 4;
// Work with 100 meg file.
- long size=1024*1024*500;
- long sampleInterval = 10*1000;
-
+ long size = 1024 * 1024 * 500;
+ long sampleInterval = 10 * 1000;
+
public static void main(String[] args) {
DiskBenchmark benchmark = new DiskBenchmark();
@@ -67,79 +70,69 @@ public class DiskBenchmark {
}
}
-
+
public static class Report {
public int size;
-
+
public int writes;
public long writeDuration;
-
+
public int syncWrites;
public long syncWriteDuration;
-
+
public int reads;
public long readDuration;
@Override
public String toString() {
- return
- "Writes: \n" +
- " "+writes+" writes of size "+size+" written in "+(writeDuration/1000.0)+" seconds.\n"+
- " "+getWriteRate()+" writes/second.\n"+
- " "+getWriteSizeRate()+" megs/second.\n"+
- "\n"+
- "Sync Writes: \n" +
- " "+syncWrites+" writes of size "+size+" written in "+(syncWriteDuration/1000.0)+" seconds.\n"+
- " "+getSyncWriteRate()+" writes/second.\n"+
- " "+getSyncWriteSizeRate()+" megs/second.\n"+
- "\n"+
- "Reads: \n" +
- " "+reads+" reads of size "+size+" read in "+(readDuration/1000.0)+" seconds.\n"+
- " "+getReadRate()+" writes/second.\n"+
- " "+getReadSizeRate()+" megs/second.\n"+
- "\n"+
- "";
+ return "Writes: \n" + " " + writes + " writes of size " + size + " written in " + (writeDuration / 1000.0) + " seconds.\n" + " " + getWriteRate()
+ + " writes/second.\n" + " " + getWriteSizeRate() + " megs/second.\n" + "\n" + "Sync Writes: \n" + " " + syncWrites + " writes of size "
+ + size + " written in " + (syncWriteDuration / 1000.0) + " seconds.\n" + " " + getSyncWriteRate() + " writes/second.\n" + " "
+ + getSyncWriteSizeRate() + " megs/second.\n" + "\n" + "Reads: \n" + " " + reads + " reads of size " + size + " read in "
+ + (readDuration / 1000.0) + " seconds.\n" + " " + getReadRate() + " writes/second.\n" + " " + getReadSizeRate() + " megs/second.\n" + "\n"
+ + "";
}
private float getWriteSizeRate() {
float rc = writes;
rc *= size;
- rc /= (1024*1024); // put it in megs
- rc /= (writeDuration/1000.0); // get rate.
+ rc /= (1024 * 1024); // put it in megs
+ rc /= (writeDuration / 1000.0); // get rate.
return rc;
}
private float getWriteRate() {
float rc = writes;
- rc /= (writeDuration/1000.0); // get rate.
+ rc /= (writeDuration / 1000.0); // get rate.
return rc;
}
-
+
private float getSyncWriteSizeRate() {
float rc = syncWrites;
rc *= size;
- rc /= (1024*1024); // put it in megs
- rc /= (syncWriteDuration/1000.0); // get rate.
+ rc /= (1024 * 1024); // put it in megs
+ rc /= (syncWriteDuration / 1000.0); // get rate.
return rc;
}
private float getSyncWriteRate() {
float rc = syncWrites;
- rc /= (syncWriteDuration/1000.0); // get rate.
+ rc /= (syncWriteDuration / 1000.0); // get rate.
return rc;
}
+
private float getReadSizeRate() {
float rc = reads;
rc *= size;
- rc /= (1024*1024); // put it in megs
- rc /= (readDuration/1000.0); // get rate.
+ rc /= (1024 * 1024); // put it in megs
+ rc /= (readDuration / 1000.0); // get rate.
return rc;
}
private float getReadRate() {
float rc = reads;
- rc /= (readDuration/1000.0); // get rate.
+ rc /= (readDuration / 1000.0); // get rate.
return rc;
}
@@ -200,64 +193,63 @@ public class DiskBenchmark {
}
}
-
public Report benchmark(File file) throws IOException {
Report rc = new Report();
-
+
// Initialize the block we will be writing to disk.
- byte []data = new byte[bs];
+ byte[] data = new byte[bs];
for (int i = 0; i < data.length; i++) {
- data[i] = (byte)('a'+(i%26));
+ data[i] = (byte) ('a' + (i % 26));
}
-
+
rc.size = data.length;
RandomAccessFile raf = new RandomAccessFile(file, "rw");
raf.setLength(size);
-
+
// Figure out how many writes we can do in the sample interval.
long start = System.currentTimeMillis();
long now = System.currentTimeMillis();
- int ioCount=0;
- while( true ) {
- if( (now-start)>sampleInterval ) {
+ int ioCount = 0;
+ while (true) {
+ if ((now - start) > sampleInterval) {
break;
}
raf.seek(0);
- for( long i=0; i+data.length < size; i+=data.length) {
+ for (long i = 0; i + data.length < size; i += data.length) {
raf.write(data);
ioCount++;
now = System.currentTimeMillis();
- if( (now-start)>sampleInterval ) {
+ if ((now - start) > sampleInterval) {
break;
}
}
- // Sync to disk so that the we actually write the data to disk.. otherwise
- // OS buffering might not really do the write.
- raf.getFD().sync();
+ // Sync to disk so that the we actually write the data to disk..
+ // otherwise OS buffering might not really do the write.
+ raf.getChannel().force(!SKIP_METADATA_UPDATE);
}
- raf.getFD().sync();
+ raf.getChannel().force(!SKIP_METADATA_UPDATE);
raf.close();
now = System.currentTimeMillis();
-
+
rc.size = data.length;
rc.writes = ioCount;
- rc.writeDuration = (now-start);
+ rc.writeDuration = (now - start);
raf = new RandomAccessFile(file, "rw");
start = System.currentTimeMillis();
now = System.currentTimeMillis();
- ioCount=0;
- while( true ) {
- if( (now-start)>sampleInterval ) {
+ ioCount = 0;
+ while (true) {
+ if ((now - start) > sampleInterval) {
break;
}
- for( long i=0; i+data.length < size; i+=data.length) {
+ for (long i = 0; i + data.length < size; i += data.length) {
raf.seek(i);
raf.write(data);
- raf.getFD().sync();
+ raf.getChannel().force(false);
ioCount++;
now = System.currentTimeMillis();
- if( (now-start)>sampleInterval ) {
+ if ((now - start) > sampleInterval) {
break;
}
}
@@ -265,72 +257,63 @@ public class DiskBenchmark {
raf.close();
now = System.currentTimeMillis();
rc.syncWrites = ioCount;
- rc.syncWriteDuration = (now-start);
+ rc.syncWriteDuration = (now - start);
raf = new RandomAccessFile(file, "rw");
start = System.currentTimeMillis();
now = System.currentTimeMillis();
- ioCount=0;
- while( true ) {
- if( (now-start)>sampleInterval ) {
+ ioCount = 0;
+ while (true) {
+ if ((now - start) > sampleInterval) {
break;
}
raf.seek(0);
- for( long i=0; i+data.length < size; i+=data.length) {
+ for (long i = 0; i + data.length < size; i += data.length) {
raf.seek(i);
raf.readFully(data);
ioCount++;
now = System.currentTimeMillis();
- if( (now-start)>sampleInterval ) {
+ if ((now - start) > sampleInterval) {
break;
}
}
}
raf.close();
-
+
rc.reads = ioCount;
- rc.readDuration = (now-start);
+ rc.readDuration = (now - start);
return rc;
}
-
public boolean isVerbose() {
return verbose;
}
-
public void setVerbose(boolean verbose) {
this.verbose = verbose;
}
-
public int getBs() {
return bs;
}
-
public void setBs(int bs) {
this.bs = bs;
}
-
public long getSize() {
return size;
}
-
public void setSize(long size) {
this.size = size;
}
-
public long getSampleInterval() {
return sampleInterval;
}
-
public void setSampleInterval(long sampleInterval) {
this.sampleInterval = sampleInterval;
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ef619b6a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
index 35c1586..1b0cb4c 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
@@ -16,10 +16,18 @@
*/
package org.apache.activemq.util;
-import java.io.*;
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
public class RecoverableRandomAccessFile implements java.io.DataOutput, java.io.DataInput, java.io.Closeable {
+ private static final boolean SKIP_METADATA_UPDATE =
+ Boolean.getBoolean("org.apache.activemq.kahaDB.files.skipMetadataUpdate");
+
RandomAccessFile raf;
File file;
String mode;
@@ -389,6 +397,24 @@ public class RecoverableRandomAccessFile implements java.io.DataOutput, java.io.
}
}
+ public void sync() throws IOException {
+ try {
+ getRaf().getChannel().force(!SKIP_METADATA_UPDATE);;
+ } catch (IOException ioe) {
+ handleException();
+ throw ioe;
+ }
+ }
+
+ public FileChannel getChannel() throws IOException {
+ try {
+ return getRaf().getChannel();
+ } catch (IOException ioe) {
+ handleException();
+ throw ioe;
+ }
+ }
+
public int read(byte[] b, int off, int len) throws IOException {
try {
return getRaf().read(b, off, len);