You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/11/16 16:22:19 UTC

svn commit: r880792 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java kahadb/src/main/java/org/apache/kahadb/page/PageFile.java

Author: dejanb
Date: Mon Nov 16 15:22:19 2009
New Revision: 880792

URL: http://svn.apache.org/viewvc?rev=880792&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2042 - enable kahadb to recover from 'no space available'

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=880792&r1=880791&r2=880792&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Mon Nov 16 15:22:19 2009
@@ -232,6 +232,41 @@
         }
 	}
 	
+	private void startCheckpoint() {
+        checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
+            public void run() {
+                try {
+                    long lastCleanup = System.currentTimeMillis();
+                    long lastCheckpoint = System.currentTimeMillis();
+                    // Sleep for a short time so we can periodically check 
+                    // to see if we need to exit this thread.
+                    long sleepTime = Math.min(checkpointInterval, 500);
+                    while (opened.get()) {
+                        
+                        Thread.sleep(sleepTime);
+                        long now = System.currentTimeMillis();
+                        if( now - lastCleanup >= cleanupInterval ) {
+                            checkpointCleanup(true);
+                            lastCleanup = now;
+                            lastCheckpoint = now;
+                        } else if( now - lastCheckpoint >= checkpointInterval ) {
+                            checkpointCleanup(false);
+                            lastCheckpoint = now;
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    // Looks like someone really wants us to exit this thread...
+                } catch (IOException ioe) {
+                    LOG.error("Checkpoint failed", ioe);
+                    brokerService.handleIOException(ioe);
+                }
+            }
+                    
+        };
+        checkpointThread.setDaemon(true);
+        checkpointThread.start();
+	}
+	
 	/**
 	 * @throws IOException
 	 */
@@ -241,37 +276,7 @@
             
 	        loadPageFile();
 	        
-	        checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
-	            public void run() {
-	                try {
-	                    long lastCleanup = System.currentTimeMillis();
-	                    long lastCheckpoint = System.currentTimeMillis();
-	                    
-	                    // Sleep for a short time so we can periodically check 
-	                    // to see if we need to exit this thread.
-	                    long sleepTime = Math.min(checkpointInterval, 500);
-	                    while (opened.get()) {
-	                        Thread.sleep(sleepTime);
-	                        long now = System.currentTimeMillis();
-	                        if( now - lastCleanup >= cleanupInterval ) {
-	                            checkpointCleanup(true);
-	                            lastCleanup = now;
-	                            lastCheckpoint = now;
-	                        } else if( now - lastCheckpoint >= checkpointInterval ) {
-	                            checkpointCleanup(false);
-	                            lastCheckpoint = now;
-	                        }
-	                    }
-	                } catch (InterruptedException e) {
-	                    // Looks like someone really wants us to exit this thread...
-	                } catch (IOException ioe) {
-	                    LOG.error("Checkpoint failed", ioe);
-	                    brokerService.handleIOException(ioe);
-	                }
-	            }
-	        };
-	        checkpointThread.setDaemon(true);
-	        checkpointThread.start();
+	        startCheckpoint();
             recover();
 		}
 	}
@@ -621,31 +626,40 @@
     }
 
     /**
-     * All updated are are funneled through this method. The updates a converted
+     * All updated are are funneled through this method. The updates are converted
      * to a JournalMessage which is logged to the journal and then the data from
      * the JournalMessage is used to update the index just like it would be done
-     * durring a recovery process.
+     * during a recovery process.
      */
     public Location store(JournalCommand data, boolean sync) throws IOException {
-    	
-        int size = data.serializedSizeFramed();
-        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
-        os.writeByte(data.type().getNumber());
-        data.writeFramed(os);
-
-        long start = System.currentTimeMillis();
-        Location location = journal.write(os.toByteSequence(), sync);
-        long start2 = System.currentTimeMillis();
-        process(data, location);
-    	long end = System.currentTimeMillis();
-    	if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
-    		LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
+    	try {
+            int size = data.serializedSizeFramed();
+            DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
+            os.writeByte(data.type().getNumber());
+            data.writeFramed(os);
+    
+            long start = System.currentTimeMillis();
+            Location location = journal.write(os.toByteSequence(), sync);
+            long start2 = System.currentTimeMillis();
+            process(data, location);
+        	long end = System.currentTimeMillis();
+        	if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+        		LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
+        	}
+    
+            synchronized (indexMutex) {
+            	metadata.lastUpdate = location;
+            }
+            if (!checkpointThread.isAlive()) {
+                LOG.info("KahaDB: Recovering checkpoint thread after exception");
+                startCheckpoint();
+            }
+            return location;
+    	} catch (IOException ioe) {
+            LOG.error("KahaDB failed to store to Journal", ioe);
+            brokerService.handleIOException(ioe);
+    	    throw ioe;
     	}
-
-        synchronized (indexMutex) {
-        	metadata.lastUpdate = location;
-        }
-        return location;
     }
 
     /**

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=880792&r1=880791&r2=880792&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Mon Nov 16 15:22:19 2009
@@ -219,10 +219,7 @@
             if (shutdown) {
                 throw new IOException("Async Writter Thread Shutdown");
             }
-            if (firstAsyncException != null) {
-                throw firstAsyncException;
-            }
-
+            
             if (!running) {
                 running = true;
                 thread = new Thread() {
@@ -234,6 +231,11 @@
                 thread.setDaemon(true);
                 thread.setName("ActiveMQ Data File Writer");
                 thread.start();
+                firstAsyncException = null;
+            }
+            
+            if (firstAsyncException != null) {
+                throw firstAsyncException;
             }
 
             while ( true ) {
@@ -430,6 +432,7 @@
             } catch (Throwable ignore) {
             }
             shutdownDone.countDown();
+            running = false;
         }
     }
 

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=880792&r1=880791&r2=880792&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Mon Nov 16 15:22:19 2009
@@ -44,6 +44,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.IOExceptionSupport;
 import org.apache.kahadb.util.IOHelper;
 import org.apache.kahadb.util.IntrospectionSupport;
 import org.apache.kahadb.util.LRUCache;
@@ -165,8 +166,8 @@
         }
         
         void begin() {
-            diskBound = current;
-            current = null;
+           diskBound = current;
+           current = null;
         }
         
         /**
@@ -937,12 +938,18 @@
             // If there is not enough to write, wait for a notification...
 
             batch = new ArrayList<PageWrite>(writes.size());
-            // build a write batch from the current write cache. 
-            for (PageWrite write : writes.values()) {
+            // build a write batch from the current write cache.
+            Iterator<Long> it = writes.keySet().iterator();
+            while (it.hasNext()) {
+                Long key = it.next();
+                PageWrite write = writes.get(key);
                 batch.add(write);
                 // Move the current write to the diskBound write, this lets folks update the 
                 // page again without blocking for this write.
                 write.begin();
+                if (write.diskBound == null) {
+                    batch.remove(write);
+                }
             }
 
             // Grab on to the existing checkpoint latch cause once we do this write we can 
@@ -959,7 +966,11 @@
            // our write batches are going to much larger.
            Checksum checksum = new Adler32();
            for (PageWrite w : batch) {
-               checksum.update(w.diskBound, 0, pageSize);
+               try {
+                   checksum.update(w.diskBound, 0, pageSize);
+               } catch (Throwable t) {
+                   throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
+               }
            }
            
            // Can we shrink the recovery buffer??