You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/11/17 10:53:49 UTC

svn commit: r881221 - /activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java

Author: dejanb
Date: Tue Nov 17 09:53:49 2009
New Revision: 881221

URL: http://svn.apache.org/viewvc?rev=881221&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2042 - kahadb cleaning up after io exception

Modified:
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=881221&r1=881220&r2=881221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Tue Nov 17 09:53:49 2009
@@ -177,6 +177,10 @@
             diskBound=null;
             return current == null;
         }
+        
+        boolean isDone() {
+            return diskBound == null && current == null;
+        }
 
     }
     
@@ -939,10 +943,7 @@
 
             batch = new ArrayList<PageWrite>(writes.size());
             // build a write batch from the current write cache.
-            Iterator<Long> it = writes.keySet().iterator();
-            while (it.hasNext()) {
-                Long key = it.next();
-                PageWrite write = writes.get(key);
+            for (PageWrite write : writes.values()) {
                 batch.add(write);
                 // Move the current write to the diskBound write, this lets folks update the 
                 // page again without blocking for this write.
@@ -958,75 +959,82 @@
             this.checkpointLatch=null;
         }
         
- 
-       if (enableRecoveryFile) {
-           
-           // Using Adler-32 instead of CRC-32 because it's much faster and it's 
-           // weakness for short messages with few hundred bytes is not a factor in this case since we know 
-           // our write batches are going to much larger.
-           Checksum checksum = new Adler32();
-           for (PageWrite w : batch) {
-               try {
-                   checksum.update(w.diskBound, 0, pageSize);
-               } catch (Throwable t) {
-                   throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
-               }
-           }
-           
-           // Can we shrink the recovery buffer??
-           if( recoveryPageCount > recoveryFileMaxPageCount ) {
-               int t = Math.max(recoveryFileMinPageCount, batch.size());
-               recoveryFile.setLength(recoveryFileSizeForPages(t));
-           }
-           
-            // Record the page writes in the recovery buffer.
-            recoveryFile.seek(0);
-            // Store the next tx id...
-            recoveryFile.writeLong(nextTxid.get());
-            // Store the checksum for thw write batch so that on recovery we know if we have a consistent 
-            // write batch on disk.
-            recoveryFile.writeLong(checksum.getValue());
-            // Write the # of pages that will follow
-            recoveryFile.writeInt(batch.size());
-            
-            
-            // Write the pages.
-            recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
+       try {
+            if (enableRecoveryFile) {
+
+                // Using Adler-32 instead of CRC-32 because it's much faster and
+                // it's
+                // weakness for short messages with few hundred bytes is not a
+                // factor in this case since we know
+                // our write batches are going to much larger.
+                Checksum checksum = new Adler32();
+                for (PageWrite w : batch) {
+                    try {
+                        checksum.update(w.diskBound, 0, pageSize);
+                    } catch (Throwable t) {
+                        throw IOExceptionSupport.create(
+                                "Cannot create recovery file. Reason: " + t, t);
+                    }
+                }
+
+                // Can we shrink the recovery buffer??
+                if (recoveryPageCount > recoveryFileMaxPageCount) {
+                    int t = Math.max(recoveryFileMinPageCount, batch.size());
+                    recoveryFile.setLength(recoveryFileSizeForPages(t));
+                }
+
+                // Record the page writes in the recovery buffer.
+                recoveryFile.seek(0);
+                // Store the next tx id...
+                recoveryFile.writeLong(nextTxid.get());
+                // Store the checksum for thw write batch so that on recovery we
+                // know if we have a consistent
+                // write batch on disk.
+                recoveryFile.writeLong(checksum.getValue());
+                // Write the # of pages that will follow
+                recoveryFile.writeInt(batch.size());
+
+                // Write the pages.
+                recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
+
+                for (PageWrite w : batch) {
+                    recoveryFile.writeLong(w.page.getPageId());
+                    recoveryFile.write(w.diskBound, 0, pageSize);
+                }
+
+                if (enableDiskSyncs) {
+                    // Sync to make sure recovery buffer writes land on disk..
+                    recoveryFile.getFD().sync();
+                }
+
+                recoveryPageCount = batch.size();
+            }
+
             for (PageWrite w : batch) {
-                recoveryFile.writeLong(w.page.getPageId());
-                recoveryFile.write(w.diskBound, 0, pageSize);
+                writeFile.seek(toOffset(w.page.getPageId()));
+                writeFile.write(w.diskBound, 0, pageSize);
+                w.done();
             }
-            
+
+            // Sync again
             if (enableDiskSyncs) {
-                // Sync to make sure recovery buffer writes land on disk..
-                recoveryFile.getFD().sync();
+                writeFile.getFD().sync();
             }
-            
-            recoveryPageCount = batch.size();
-        }
-       
-        
-        for (PageWrite w : batch) {
-            writeFile.seek(toOffset(w.page.getPageId()));
-            writeFile.write(w.diskBound, 0, pageSize);
-        }
-        
-        // Sync again
-        if( enableDiskSyncs ) {
-            writeFile.getFD().sync();
-        }
-        
-        synchronized( writes ) {
-            for (PageWrite w : batch) {
-                // If there are no more pending writes, then remove it from the write cache.
-                if( w.done() ) {
-                    writes.remove(w.page.getPageId());
+
+        } finally {
+            synchronized (writes) {
+                for (PageWrite w : batch) {
+                    // If there are no more pending writes, then remove it from
+                    // the write cache.
+                    if (w.isDone()) {
+                        writes.remove(w.page.getPageId());
+                    }
                 }
             }
-        }
-        
-        if( checkpointLatch!=null ) {
-            checkpointLatch.countDown();
+            
+            if( checkpointLatch!=null ) {
+                checkpointLatch.countDown();
+            }
         }
     }