You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/08/11 21:38:17 UTC

svn commit: r684888 - in /activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash: HashBin.java HashIndex.java HashPage.java HashPageInfo.java

Author: chirino
Date: Mon Aug 11 12:38:16 2008
New Revision: 684888

URL: http://svn.apache.org/viewvc?rev=684888&view=rev
Log:
Added a little better logging and fixed the HashTest

Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashBin.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPage.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPageInfo.java

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashBin.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashBin.java?rev=684888&r1=684887&r2=684888&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashBin.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashBin.java Mon Aug 11 12:38:16 2008
@@ -326,7 +326,7 @@
         while (page != null) {
             page.begin();
             str +=page.dump();
-            page.end();
+            page.end(id);
             page = (HashPageInfo) page.getNext();
         }
         return str;
@@ -334,7 +334,7 @@
     private void end() throws IOException {
         HashPageInfo page = root;
         while (page != null) {
-            page.end();
+            page.end(id);
             page = (HashPageInfo) page.getNext();
         }
     }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java?rev=684888&r1=684887&r2=684888&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java Mon Aug 11 12:38:16 2008
@@ -424,13 +424,14 @@
             return page;
         }
 
-        LOG.debug("Reading page: "+id);
         readIndexFile.seek(id);
         readIndexFile.readFully(readBuffer, 0, pageSize);
         dataIn.restart(readBuffer);
         HashPage page = new HashPage(this);
         page.setOffset(id);
         page.read(keyMarshaller, dataIn);
+        
+        LOG.debug("Read in page: "+id+", contains keys: "+page.getKeys());
         return page;
     }
 
@@ -516,6 +517,7 @@
     }
     
     private void doCompress() throws IOException {
+        LOG.debug("Compress to new index file: start");
         String backFileName = name + "-COMPRESS";
         HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
         backIndex.setKeyMarshaller(keyMarshaller);
@@ -538,13 +540,16 @@
             offset += pageSize;
         }
         backIndex.unload();
-      
+        LOG.debug("Compress to new index file: end");
+
+        LOG.debug("Compress loading new index file: start");
         unload();
         IOHelper.deleteFile(file);
         IOHelper.copyFile(backFile, file);
         IOHelper.deleteFile(backFile);
         openIndexFile();
         doLoad();
+        LOG.debug("Compress loading new index file: end");
     }
     
     private void resize(int newCapacity) throws IOException {
@@ -562,6 +567,7 @@
                 if (newCapacity != numberOfBins) {
                     LOG.info("Resize hash bins " + this.name + " from " + numberOfBins + " to " + newCapacity);
                     
+                    LOG.debug("Resize create new index file: start");
                     String backFileName = name + "-REISZE";
                     HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
                     backIndex.setKeyMarshaller(keyMarshaller);
@@ -584,7 +590,9 @@
                         offset += pageSize;
                     }
                     backIndex.unload();
+                    LOG.debug("Resize create new index file: end");
                   
+                    LOG.debug("Resize reloading new index file: start");
                     unload();
                     IOHelper.deleteFile(file);
                     IOHelper.copyFile(backFile, file);
@@ -594,6 +602,7 @@
                     threshold = calculateThreashold();
                     openIndexFile();
                     doLoad();
+                    LOG.debug("Resize reloading new index file: end");
                 }
             }
         }else {
@@ -684,11 +693,15 @@
             throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size());
         }
         
+        if( !page.isActive() && !page.getEntries().isEmpty() ) {
+            throw new IOException("Page is not active but is also not empty");
+        }
+        
 //        ByteSequence data = new ByteSequence(dataOut.getData(), 0, dataOut.size());
         ByteSequence data = new ByteSequence(dataOut.getData(), 0, pageSize);
         Long key = page.getOffset();
 
-        LOG.debug("Page write request at: "+page.getOffset()+", keys: ");
+        LOG.debug("Page write request for offset: "+page.getOffset()+", contains keys: "+page.getKeys());
         synchronized( writes ) {
             // If it's not in the write cache...
             PageWrite write = writes.get(key);
@@ -711,7 +724,7 @@
      * @throws InterruptedException 
      * @throws IOException 
      */
-    public boolean pollWrites(long timeout, TimeUnit unit) throws IOException {
+    public boolean doWrites(long timeout, TimeUnit unit) throws IOException {
         
         int batchLength=8+4; // Account for the:  lastTxid + recovery record counter
         ArrayList<PageWrite> batch = new ArrayList<PageWrite>(MAX_PAGES_IN_RECOVERY_BUFFER);
@@ -752,14 +765,13 @@
         }
        long txId = nextTxid.get();
         
-       LOG.debug("Starting write batch transaction id: "+txId);
+       StringBuilder pageOffsets = new StringBuilder();
        
         // Now the batch array has all the writes, write the batch to the recovery buffer.
         writeIndexFile.seek(0);
         writeIndexFile.writeLong(txId); // write txid of the batch
         writeIndexFile.writeInt(batch.size()); // write the recovery record counter.
         for (PageWrite w : batch) {
-            LOG.debug("Journaling write at offset: "+w.page.getOffset());
             writeIndexFile.writeLong(w.page.getOffset());
             writeIndexFile.writeInt(w.diskBound.length);
             writeIndexFile.write(w.diskBound.data, w.diskBound.offset, w.diskBound.length);
@@ -770,14 +782,18 @@
         
         // Now update the actual index locations
         for (PageWrite w : batch) {
-            LOG.debug("Doing write at offset: "+w.page.getOffset());
+            if( pageOffsets.length()!=0 ) {
+                pageOffsets.append(", ");
+            }
+            pageOffsets.append(w.page.getOffset());
             writeIndexFile.seek(w.page.getOffset());
             writeIndexFile.write(w.diskBound.data, w.diskBound.offset, w.diskBound.length);
         }
         
         // Sync again
         writeIndexFile.getFD().sync();
-        
+        LOG.debug("Index write complete tx: "+txId+", pages: "+pageOffsets);
+
         nextTxid.incrementAndGet();
         synchronized( writes ) {
             for (PageWrite w : batch) {
@@ -947,7 +963,7 @@
             public void run() {
                 try {
                     while( !stopWriter.get() ) {
-                        pollWrites(500, TimeUnit.MILLISECONDS);
+                        doWrites(500, TimeUnit.MILLISECONDS);
                     }
                 } catch (IOException e) {
                     e.printStackTrace();

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPage.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPage.java?rev=684888&r1=684887&r2=684888&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPage.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPage.java Mon Aug 11 12:38:16 2008
@@ -210,4 +210,15 @@
     public void setTxId(long txId) {
         this.txId = txId;
     }
+
+    public String getKeys() {
+        StringBuilder sb = new StringBuilder();
+        for (HashEntry e : hashIndexEntries) {
+            if( sb.length() != 0 ) {
+                sb.append(", ");
+            }
+            sb.append(e.getKey());
+        }
+        return sb.toString();
+    }
 }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPageInfo.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPageInfo.java?rev=684888&r1=684887&r2=684888&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPageInfo.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashPageInfo.java Mon Aug 11 12:38:16 2008
@@ -98,9 +98,10 @@
         }
     }
 
-    void end() throws IOException {
+    void end(int id) throws IOException {
         if (page != null) {
             if (dirty) {
+                page.setBinId(id);
                 hashIndex.writeFullPage(page);
             }
         }