You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/11/17 10:53:49 UTC
svn commit: r881221 -
/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
Author: dejanb
Date: Tue Nov 17 09:53:49 2009
New Revision: 881221
URL: http://svn.apache.org/viewvc?rev=881221&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2042 - kahadb cleaning up after io exception
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=881221&r1=881220&r2=881221&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Tue Nov 17 09:53:49 2009
@@ -177,6 +177,10 @@
diskBound=null;
return current == null;
}
+
+ boolean isDone() {
+ return diskBound == null && current == null;
+ }
}
@@ -939,10 +943,7 @@
batch = new ArrayList<PageWrite>(writes.size());
// build a write batch from the current write cache.
- Iterator<Long> it = writes.keySet().iterator();
- while (it.hasNext()) {
- Long key = it.next();
- PageWrite write = writes.get(key);
+ for (PageWrite write : writes.values()) {
batch.add(write);
// Move the current write to the diskBound write, this lets folks update the
// page again without blocking for this write.
@@ -958,75 +959,82 @@
this.checkpointLatch=null;
}
-
- if (enableRecoveryFile) {
-
- // Using Adler-32 instead of CRC-32 because it's much faster and it's
- // weakness for short messages with few hundred bytes is not a factor in this case since we know
- // our write batches are going to much larger.
- Checksum checksum = new Adler32();
- for (PageWrite w : batch) {
- try {
- checksum.update(w.diskBound, 0, pageSize);
- } catch (Throwable t) {
- throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
- }
- }
-
- // Can we shrink the recovery buffer??
- if( recoveryPageCount > recoveryFileMaxPageCount ) {
- int t = Math.max(recoveryFileMinPageCount, batch.size());
- recoveryFile.setLength(recoveryFileSizeForPages(t));
- }
-
- // Record the page writes in the recovery buffer.
- recoveryFile.seek(0);
- // Store the next tx id...
- recoveryFile.writeLong(nextTxid.get());
- // Store the checksum for thw write batch so that on recovery we know if we have a consistent
- // write batch on disk.
- recoveryFile.writeLong(checksum.getValue());
- // Write the # of pages that will follow
- recoveryFile.writeInt(batch.size());
-
-
- // Write the pages.
- recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
+ try {
+ if (enableRecoveryFile) {
+
+ // Using Adler-32 instead of CRC-32 because it's much faster and
+ // it's
+ // weakness for short messages with few hundred bytes is not a
+ // factor in this case since we know
+ // our write batches are going to much larger.
+ Checksum checksum = new Adler32();
+ for (PageWrite w : batch) {
+ try {
+ checksum.update(w.diskBound, 0, pageSize);
+ } catch (Throwable t) {
+ throw IOExceptionSupport.create(
+ "Cannot create recovery file. Reason: " + t, t);
+ }
+ }
+
+ // Can we shrink the recovery buffer??
+ if (recoveryPageCount > recoveryFileMaxPageCount) {
+ int t = Math.max(recoveryFileMinPageCount, batch.size());
+ recoveryFile.setLength(recoveryFileSizeForPages(t));
+ }
+
+ // Record the page writes in the recovery buffer.
+ recoveryFile.seek(0);
+ // Store the next tx id...
+ recoveryFile.writeLong(nextTxid.get());
+ // Store the checksum for thw write batch so that on recovery we
+ // know if we have a consistent
+ // write batch on disk.
+ recoveryFile.writeLong(checksum.getValue());
+ // Write the # of pages that will follow
+ recoveryFile.writeInt(batch.size());
+
+ // Write the pages.
+ recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
+
+ for (PageWrite w : batch) {
+ recoveryFile.writeLong(w.page.getPageId());
+ recoveryFile.write(w.diskBound, 0, pageSize);
+ }
+
+ if (enableDiskSyncs) {
+ // Sync to make sure recovery buffer writes land on disk..
+ recoveryFile.getFD().sync();
+ }
+
+ recoveryPageCount = batch.size();
+ }
+
for (PageWrite w : batch) {
- recoveryFile.writeLong(w.page.getPageId());
- recoveryFile.write(w.diskBound, 0, pageSize);
+ writeFile.seek(toOffset(w.page.getPageId()));
+ writeFile.write(w.diskBound, 0, pageSize);
+ w.done();
}
-
+
+ // Sync again
if (enableDiskSyncs) {
- // Sync to make sure recovery buffer writes land on disk..
- recoveryFile.getFD().sync();
+ writeFile.getFD().sync();
}
-
- recoveryPageCount = batch.size();
- }
-
-
- for (PageWrite w : batch) {
- writeFile.seek(toOffset(w.page.getPageId()));
- writeFile.write(w.diskBound, 0, pageSize);
- }
-
- // Sync again
- if( enableDiskSyncs ) {
- writeFile.getFD().sync();
- }
-
- synchronized( writes ) {
- for (PageWrite w : batch) {
- // If there are no more pending writes, then remove it from the write cache.
- if( w.done() ) {
- writes.remove(w.page.getPageId());
+
+ } finally {
+ synchronized (writes) {
+ for (PageWrite w : batch) {
+ // If there are no more pending writes, then remove it from
+ // the write cache.
+ if (w.isDone()) {
+ writes.remove(w.page.getPageId());
+ }
}
}
- }
-
- if( checkpointLatch!=null ) {
- checkpointLatch.countDown();
+
+ if( checkpointLatch!=null ) {
+ checkpointLatch.countDown();
+ }
}
}