You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/08/11 22:24:16 UTC

svn commit: r684905 - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java

Author: chirino
Date: Mon Aug 11 13:24:15 2008
New Revision: 684905

URL: http://svn.apache.org/viewvc?rev=684905&view=rev
Log:
Delaying the write thread a bit so that it does work in larger batches.  

Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java?rev=684905&r1=684904&r2=684905&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java Mon Aug 11 13:24:15 2008
@@ -58,7 +58,7 @@
     public static final int MAXIMUM_CAPACITY;
     public static final int DEFAULT_LOAD_FACTOR;
     private static final int LOW_WATER_MARK=1024*16;
-    private static final int MAX_PAGES_IN_RECOVERY_BUFFER=100;
+    private static final int MAX_PAGES_IN_RECOVERY_BUFFER=1000;
     // Recovery header is (long offset) + (int data_size) 
     private static final int RECOVERY_HEADER_SIZE=12;  
     
@@ -83,6 +83,8 @@
     private LinkedList<HashPage> freeList = new LinkedList<HashPage>();
     private AtomicBoolean loaded = new AtomicBoolean();
     private LRUCache<Long, HashPage> pageCache;
+    private boolean enableRecoveryBuffer=true;
+    private boolean enableSyncedWrites=true;
     
     private boolean enablePageCaching=false;//this is off by default - see AMQ-1667
     private int pageCacheSize = 10;
@@ -711,10 +713,24 @@
             } else {
                 write.setCurrent(data);
             }
-            writes.notify();
+            
+            // Once we start approaching capacity, notify the writer to start writing
+            if( canStartWriteBatch() ) {
+                writes.notify();
+            }
         }
     }
 
+    private boolean canStartWriteBatch() {
+        int capacityUsed = ((writes.size() * 100)/MAXIMUM_CAPACITY);
+        
+        // The constant 10 here controls how soon write batches start going to disk..
+        // would be nice to figure out how to auto tune that value.  Make to small and
+        // we reduce through put because we are locking the write mutex too offen doing writes
+    
+        return capacityUsed >= 10 || checkpointLatch!=null;
+    }
+
     
     /**
      * 
@@ -725,13 +741,13 @@
      * @throws IOException 
      */
     public boolean doWrites(long timeout, TimeUnit unit) throws IOException {
-        
+                
         int batchLength=8+4; // Account for the:  lastTxid + recovery record counter
         ArrayList<PageWrite> batch = new ArrayList<PageWrite>(MAX_PAGES_IN_RECOVERY_BUFFER);
         
         synchronized( writes ) {            
-            // If there is nothing to write, wait for a notification...
-            if( writes.isEmpty() ) {
+            // If there is not enough to write, wait for a notification...
+            if( !canStartWriteBatch() ) {
                 releaseCheckpointWaiter();
                 try {
                     writes.wait(unit.toMillis(timeout));
@@ -744,7 +760,6 @@
                 return false;
             }
             
-              
             // build a write batch from the current write cache. 
             for (PageWrite write : writes.values()) {
                 
@@ -765,22 +780,28 @@
         }
        long txId = nextTxid.get();
         
-       StringBuilder pageOffsets = new StringBuilder();
-       
-        // Now the batch array has all the writes, write the batch to the recovery buffer.
-        writeIndexFile.seek(0);
-        writeIndexFile.writeLong(txId); // write txid of the batch
-        writeIndexFile.writeInt(batch.size()); // write the recovery record counter.
-        for (PageWrite w : batch) {
-            writeIndexFile.writeLong(w.page.getOffset());
-            writeIndexFile.writeInt(w.diskBound.length);
-            writeIndexFile.write(w.diskBound.data, w.diskBound.offset, w.diskBound.length);
+ 
+       if (enableRecoveryBuffer) {
+            // Now the batch array has all the writes, write the batch to the
+            // recovery buffer.
+            writeIndexFile.seek(0);
+            writeIndexFile.writeLong(txId); // write txid of the batch
+            writeIndexFile.writeInt(batch.size()); // write the recovery record
+                                                    // counter.
+            for (PageWrite w : batch) {
+                writeIndexFile.writeLong(w.page.getOffset());
+                writeIndexFile.writeInt(w.diskBound.length);
+                writeIndexFile.write(w.diskBound.data, w.diskBound.offset, w.diskBound.length);
+            }
+            if( enableSyncedWrites ) {
+                // Sync to make sure recovery buffer writes land on disk..
+                writeIndexFile.getFD().sync();
+            }
         }
-        
-        // Sync to make sure recovery buffer writes land on disk..
-        writeIndexFile.getFD().sync(); 
+       
         
         // Now update the actual index locations
+        StringBuilder pageOffsets = new StringBuilder();
         for (PageWrite w : batch) {
             if( pageOffsets.length()!=0 ) {
                 pageOffsets.append(", ");
@@ -791,10 +812,13 @@
         }
         
         // Sync again
-        writeIndexFile.getFD().sync();
-        LOG.debug("Index write complete tx: "+txId+", pages: "+pageOffsets);
-
+        if( enableSyncedWrites ) {
+            writeIndexFile.getFD().sync();
+            LOG.debug("Index write complete tx: "+txId+", pages: "+pageOffsets);
+        }
+        
         nextTxid.incrementAndGet();
+
         synchronized( writes ) {
             for (PageWrite w : batch) {
                 // If there are no more pending writes, then remove it from the write cache.
@@ -806,6 +830,7 @@
                 releaseCheckpointWaiter();
             }
         }
+        
         return true;
     }
 
@@ -839,6 +864,7 @@
                 this.checkpointLatch = new CountDownLatch(1);
             }
             checkpointLatch = this.checkpointLatch;
+            writes.notify();
         }        
         try {
             checkpointLatch.await();        
@@ -963,7 +989,7 @@
             public void run() {
                 try {
                     while( !stopWriter.get() ) {
-                        doWrites(500, TimeUnit.MILLISECONDS);
+                        doWrites(1000, TimeUnit.MILLISECONDS);
                     }
                 } catch (IOException e) {
                     e.printStackTrace();
@@ -980,4 +1006,20 @@
         writerThread.join();
     }
 
+    public boolean isEnableRecoveryBuffer() {
+        return enableRecoveryBuffer;
+    }
+
+    public void setEnableRecoveryBuffer(boolean doubleBuffer) {
+        this.enableRecoveryBuffer = doubleBuffer;
+    }
+
+    public boolean isEnableSyncedWrites() {
+        return enableSyncedWrites;
+    }
+
+    public void setEnableSyncedWrites(boolean syncWrites) {
+        this.enableSyncedWrites = syncWrites;
+    }
+
 }

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java?rev=684905&r1=684904&r2=684905&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java Mon Aug 11 13:24:15 2008
@@ -28,9 +28,9 @@
     @Override
     protected Index createIndex(File root, String name) throws Exception {
         HashIndex index = new HashIndex(root, name, indexManager);
-        //index.setNumberOfBins(12);
-        //index.setPageSize(32 * 1024);
         index.setKeyMarshaller(Store.STRING_MARSHALLER);
+//        index.setEnableRecoveryBuffer(false);
+//        index.setEnableSyncedWrites(false);
         return index;
     }