You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/08/11 22:24:16 UTC
svn commit: r684905 - in /activemq/sandbox/kahadb/src:
main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java
test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java
Author: chirino
Date: Mon Aug 11 13:24:15 2008
New Revision: 684905
URL: http://svn.apache.org/viewvc?rev=684905&view=rev
Log:
Delaying the write thread a bit so that it does work in larger batches.
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java?rev=684905&r1=684904&r2=684905&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java Mon Aug 11 13:24:15 2008
@@ -58,7 +58,7 @@
public static final int MAXIMUM_CAPACITY;
public static final int DEFAULT_LOAD_FACTOR;
private static final int LOW_WATER_MARK=1024*16;
- private static final int MAX_PAGES_IN_RECOVERY_BUFFER=100;
+ private static final int MAX_PAGES_IN_RECOVERY_BUFFER=1000;
// Recovery header is (long offset) + (int data_size)
private static final int RECOVERY_HEADER_SIZE=12;
@@ -83,6 +83,8 @@
private LinkedList<HashPage> freeList = new LinkedList<HashPage>();
private AtomicBoolean loaded = new AtomicBoolean();
private LRUCache<Long, HashPage> pageCache;
+ private boolean enableRecoveryBuffer=true;
+ private boolean enableSyncedWrites=true;
private boolean enablePageCaching=false;//this is off by default - see AMQ-1667
private int pageCacheSize = 10;
@@ -711,10 +713,24 @@
} else {
write.setCurrent(data);
}
- writes.notify();
+
+ // Once we start approaching capacity, notify the writer to start writing
+ if( canStartWriteBatch() ) {
+ writes.notify();
+ }
}
}
+ private boolean canStartWriteBatch() {
+ int capacityUsed = ((writes.size() * 100)/MAXIMUM_CAPACITY);
+
+ // The constant 10 here controls how soon write batches start going to disk..
+ // would be nice to figure out how to auto tune that value. Make to small and
+ // we reduce through put because we are locking the write mutex too offen doing writes
+
+ return capacityUsed >= 10 || checkpointLatch!=null;
+ }
+
/**
*
@@ -725,13 +741,13 @@
* @throws IOException
*/
public boolean doWrites(long timeout, TimeUnit unit) throws IOException {
-
+
int batchLength=8+4; // Account for the: lastTxid + recovery record counter
ArrayList<PageWrite> batch = new ArrayList<PageWrite>(MAX_PAGES_IN_RECOVERY_BUFFER);
synchronized( writes ) {
- // If there is nothing to write, wait for a notification...
- if( writes.isEmpty() ) {
+ // If there is not enough to write, wait for a notification...
+ if( !canStartWriteBatch() ) {
releaseCheckpointWaiter();
try {
writes.wait(unit.toMillis(timeout));
@@ -744,7 +760,6 @@
return false;
}
-
// build a write batch from the current write cache.
for (PageWrite write : writes.values()) {
@@ -765,22 +780,28 @@
}
long txId = nextTxid.get();
- StringBuilder pageOffsets = new StringBuilder();
-
- // Now the batch array has all the writes, write the batch to the recovery buffer.
- writeIndexFile.seek(0);
- writeIndexFile.writeLong(txId); // write txid of the batch
- writeIndexFile.writeInt(batch.size()); // write the recovery record counter.
- for (PageWrite w : batch) {
- writeIndexFile.writeLong(w.page.getOffset());
- writeIndexFile.writeInt(w.diskBound.length);
- writeIndexFile.write(w.diskBound.data, w.diskBound.offset, w.diskBound.length);
+
+ if (enableRecoveryBuffer) {
+ // Now the batch array has all the writes, write the batch to the
+ // recovery buffer.
+ writeIndexFile.seek(0);
+ writeIndexFile.writeLong(txId); // write txid of the batch
+ writeIndexFile.writeInt(batch.size()); // write the recovery record
+ // counter.
+ for (PageWrite w : batch) {
+ writeIndexFile.writeLong(w.page.getOffset());
+ writeIndexFile.writeInt(w.diskBound.length);
+ writeIndexFile.write(w.diskBound.data, w.diskBound.offset, w.diskBound.length);
+ }
+ if( enableSyncedWrites ) {
+ // Sync to make sure recovery buffer writes land on disk..
+ writeIndexFile.getFD().sync();
+ }
}
-
- // Sync to make sure recovery buffer writes land on disk..
- writeIndexFile.getFD().sync();
+
// Now update the actual index locations
+ StringBuilder pageOffsets = new StringBuilder();
for (PageWrite w : batch) {
if( pageOffsets.length()!=0 ) {
pageOffsets.append(", ");
@@ -791,10 +812,13 @@
}
// Sync again
- writeIndexFile.getFD().sync();
- LOG.debug("Index write complete tx: "+txId+", pages: "+pageOffsets);
-
+ if( enableSyncedWrites ) {
+ writeIndexFile.getFD().sync();
+ LOG.debug("Index write complete tx: "+txId+", pages: "+pageOffsets);
+ }
+
nextTxid.incrementAndGet();
+
synchronized( writes ) {
for (PageWrite w : batch) {
// If there are no more pending writes, then remove it from the write cache.
@@ -806,6 +830,7 @@
releaseCheckpointWaiter();
}
}
+
return true;
}
@@ -839,6 +864,7 @@
this.checkpointLatch = new CountDownLatch(1);
}
checkpointLatch = this.checkpointLatch;
+ writes.notify();
}
try {
checkpointLatch.await();
@@ -963,7 +989,7 @@
public void run() {
try {
while( !stopWriter.get() ) {
- doWrites(500, TimeUnit.MILLISECONDS);
+ doWrites(1000, TimeUnit.MILLISECONDS);
}
} catch (IOException e) {
e.printStackTrace();
@@ -980,4 +1006,20 @@
writerThread.join();
}
+ public boolean isEnableRecoveryBuffer() {
+ return enableRecoveryBuffer;
+ }
+
+ public void setEnableRecoveryBuffer(boolean doubleBuffer) {
+ this.enableRecoveryBuffer = doubleBuffer;
+ }
+
+ public boolean isEnableSyncedWrites() {
+ return enableSyncedWrites;
+ }
+
+ public void setEnableSyncedWrites(boolean syncWrites) {
+ this.enableSyncedWrites = syncWrites;
+ }
+
}
Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java?rev=684905&r1=684904&r2=684905&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java Mon Aug 11 13:24:15 2008
@@ -28,9 +28,9 @@
@Override
protected Index createIndex(File root, String name) throws Exception {
HashIndex index = new HashIndex(root, name, indexManager);
- //index.setNumberOfBins(12);
- //index.setPageSize(32 * 1024);
index.setKeyMarshaller(Store.STRING_MARSHALLER);
+// index.setEnableRecoveryBuffer(false);
+// index.setEnableSyncedWrites(false);
return index;
}