You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/11/16 16:22:19 UTC
svn commit: r880792 - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
Author: dejanb
Date: Mon Nov 16 15:22:19 2009
New Revision: 880792
URL: http://svn.apache.org/viewvc?rev=880792&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2042 - enable kahadb to recover from 'no space available'
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=880792&r1=880791&r2=880792&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Mon Nov 16 15:22:19 2009
@@ -232,6 +232,41 @@
}
}
+ private void startCheckpoint() {
+ checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
+ public void run() {
+ try {
+ long lastCleanup = System.currentTimeMillis();
+ long lastCheckpoint = System.currentTimeMillis();
+ // Sleep for a short time so we can periodically check
+ // to see if we need to exit this thread.
+ long sleepTime = Math.min(checkpointInterval, 500);
+ while (opened.get()) {
+
+ Thread.sleep(sleepTime);
+ long now = System.currentTimeMillis();
+ if( now - lastCleanup >= cleanupInterval ) {
+ checkpointCleanup(true);
+ lastCleanup = now;
+ lastCheckpoint = now;
+ } else if( now - lastCheckpoint >= checkpointInterval ) {
+ checkpointCleanup(false);
+ lastCheckpoint = now;
+ }
+ }
+ } catch (InterruptedException e) {
+ // Looks like someone really wants us to exit this thread...
+ } catch (IOException ioe) {
+ LOG.error("Checkpoint failed", ioe);
+ brokerService.handleIOException(ioe);
+ }
+ }
+
+ };
+ checkpointThread.setDaemon(true);
+ checkpointThread.start();
+ }
+
/**
* @throws IOException
*/
@@ -241,37 +276,7 @@
loadPageFile();
- checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
- public void run() {
- try {
- long lastCleanup = System.currentTimeMillis();
- long lastCheckpoint = System.currentTimeMillis();
-
- // Sleep for a short time so we can periodically check
- // to see if we need to exit this thread.
- long sleepTime = Math.min(checkpointInterval, 500);
- while (opened.get()) {
- Thread.sleep(sleepTime);
- long now = System.currentTimeMillis();
- if( now - lastCleanup >= cleanupInterval ) {
- checkpointCleanup(true);
- lastCleanup = now;
- lastCheckpoint = now;
- } else if( now - lastCheckpoint >= checkpointInterval ) {
- checkpointCleanup(false);
- lastCheckpoint = now;
- }
- }
- } catch (InterruptedException e) {
- // Looks like someone really wants us to exit this thread...
- } catch (IOException ioe) {
- LOG.error("Checkpoint failed", ioe);
- brokerService.handleIOException(ioe);
- }
- }
- };
- checkpointThread.setDaemon(true);
- checkpointThread.start();
+ startCheckpoint();
recover();
}
}
@@ -621,31 +626,40 @@
}
/**
- * All updated are are funneled through this method. The updates a converted
+ * All updated are are funneled through this method. The updates are converted
* to a JournalMessage which is logged to the journal and then the data from
* the JournalMessage is used to update the index just like it would be done
- * durring a recovery process.
+ * during a recovery process.
*/
public Location store(JournalCommand data, boolean sync) throws IOException {
-
- int size = data.serializedSizeFramed();
- DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
- os.writeByte(data.type().getNumber());
- data.writeFramed(os);
-
- long start = System.currentTimeMillis();
- Location location = journal.write(os.toByteSequence(), sync);
- long start2 = System.currentTimeMillis();
- process(data, location);
- long end = System.currentTimeMillis();
- if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
- LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
+ try {
+ int size = data.serializedSizeFramed();
+ DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
+ os.writeByte(data.type().getNumber());
+ data.writeFramed(os);
+
+ long start = System.currentTimeMillis();
+ Location location = journal.write(os.toByteSequence(), sync);
+ long start2 = System.currentTimeMillis();
+ process(data, location);
+ long end = System.currentTimeMillis();
+ if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+ LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
+ }
+
+ synchronized (indexMutex) {
+ metadata.lastUpdate = location;
+ }
+ if (!checkpointThread.isAlive()) {
+ LOG.info("KahaDB: Recovering checkpoint thread after exception");
+ startCheckpoint();
+ }
+ return location;
+ } catch (IOException ioe) {
+ LOG.error("KahaDB failed to store to Journal", ioe);
+ brokerService.handleIOException(ioe);
+ throw ioe;
}
-
- synchronized (indexMutex) {
- metadata.lastUpdate = location;
- }
- return location;
}
/**
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=880792&r1=880791&r2=880792&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Mon Nov 16 15:22:19 2009
@@ -219,10 +219,7 @@
if (shutdown) {
throw new IOException("Async Writter Thread Shutdown");
}
- if (firstAsyncException != null) {
- throw firstAsyncException;
- }
-
+
if (!running) {
running = true;
thread = new Thread() {
@@ -234,6 +231,11 @@
thread.setDaemon(true);
thread.setName("ActiveMQ Data File Writer");
thread.start();
+ firstAsyncException = null;
+ }
+
+ if (firstAsyncException != null) {
+ throw firstAsyncException;
}
while ( true ) {
@@ -430,6 +432,7 @@
} catch (Throwable ignore) {
}
shutdownDone.countDown();
+ running = false;
}
}
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=880792&r1=880791&r2=880792&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Mon Nov 16 15:22:19 2009
@@ -44,6 +44,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.IOExceptionSupport;
import org.apache.kahadb.util.IOHelper;
import org.apache.kahadb.util.IntrospectionSupport;
import org.apache.kahadb.util.LRUCache;
@@ -165,8 +166,8 @@
}
void begin() {
- diskBound = current;
- current = null;
+ diskBound = current;
+ current = null;
}
/**
@@ -937,12 +938,18 @@
// If there is not enough to write, wait for a notification...
batch = new ArrayList<PageWrite>(writes.size());
- // build a write batch from the current write cache.
- for (PageWrite write : writes.values()) {
+ // build a write batch from the current write cache.
+ Iterator<Long> it = writes.keySet().iterator();
+ while (it.hasNext()) {
+ Long key = it.next();
+ PageWrite write = writes.get(key);
batch.add(write);
// Move the current write to the diskBound write, this lets folks update the
// page again without blocking for this write.
write.begin();
+ if (write.diskBound == null) {
+ batch.remove(write);
+ }
}
// Grab on to the existing checkpoint latch cause once we do this write we can
@@ -959,7 +966,11 @@
// our write batches are going to much larger.
Checksum checksum = new Adler32();
for (PageWrite w : batch) {
- checksum.update(w.diskBound, 0, pageSize);
+ try {
+ checksum.update(w.diskBound, 0, pageSize);
+ } catch (Throwable t) {
+ throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
+ }
}
// Can we shrink the recovery buffer??