You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/11/20 12:36:47 UTC

svn commit: r882511 - in /activemq/branches/activemq-5.3: activemq-core/src/main/java/org/apache/activemq/broker/ activemq-core/src/main/java/org/apache/activemq/store/amq/ activemq-core/src/main/java/org/apache/activemq/store/journal/ activemq-core/sr...

Author: dejanb
Date: Fri Nov 20 11:36:45 2009
New Revision: 882511

URL: http://svn.apache.org/viewvc?rev=882511&view=rev
Log:
merging https://issues.apache.org/activemq/browse/AMQ-2042 - 834922,835373,835412,835833,835888,880792,881221,882144

Added:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
      - copied, changed from r835888, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/IOExceptionHandler.java
      - copied unchanged from r835888, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOExceptionHandler.java
Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
    activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Fri Nov 20 11:36:45 2009
@@ -84,6 +84,8 @@
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.vm.VMTransportFactory;
 import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.apache.activemq.util.IOExceptionHandler;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.JMXSupport;
@@ -178,7 +180,9 @@
     private int systemExitOnShutdownExitCode;
     private SslContext sslContext;
     private boolean forceStart = false;
-    static {
+    private IOExceptionHandler ioExceptionHandler;
+
+	static {
         String localHostName = "localhost";
         try {
             localHostName = java.net.InetAddress.getLocalHost().getHostName();
@@ -481,6 +485,9 @@
                 }
             }
             brokerId = broker.getBrokerId();
+            if (ioExceptionHandler == null) {
+            	setIoExceptionHandler(new DefaultIOExceptionHandler());
+            }
             LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
             getBroker().brokerServiceStarted();
             startedLatch.countDown();
@@ -2008,6 +2015,14 @@
             }
         }
     }
+    
+    public void handleIOException(IOException exception) {
+        if (ioExceptionHandler != null) {
+            ioExceptionHandler.handle(exception);
+         } else {
+            LOG.info("Ignoring IO exception, " + exception, exception);
+         }
+    }
 
     /**
      * Starts all destiantions in persistence store. This includes all inactive
@@ -2111,5 +2126,10 @@
         this.passiveSlave = passiveSlave;
     }
     
+    public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
+        ioExceptionHandler.setBrokerService(this);
+        this.ioExceptionHandler = ioExceptionHandler;
+    }
+    
    
 }
\ No newline at end of file

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Fri Nov 20 11:36:45 2009
@@ -695,7 +695,13 @@
     }
     
     public Location writeCommand(DataStructure command, boolean syncHint,boolean forceSync) throws IOException {
-        return asyncDataManager.write(wireFormat.marshal(command), (forceSync||(syncHint && syncOnWrite)));
+    	try {
+    		return asyncDataManager.write(wireFormat.marshal(command), (forceSync||(syncHint && syncOnWrite)));
+    	} catch (IOException ioe) {
+    		LOG.error("Failed to write command: " + command + ". Reason: " + ioe, ioe);
+        	brokerService.handleIOException(ioe);
+        	throw ioe;
+        }
     }
 
     private Location writeTraceMessage(String message, boolean sync) throws IOException {

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Fri Nov 20 11:36:45 2009
@@ -623,7 +623,7 @@
         	    return journal.write(toPacket(wireFormat.marshal(command)), sync);
             } catch (IOException ioe) {
         	    LOG.error("Cannot write to the journal", ioe);
-        	    stopBroker();
+        	    brokerService.handleIOException(ioe);
         	    throw ioe;
             }
         }
@@ -725,17 +725,5 @@
             ((BrokerServiceAware)pa).setBrokerService(brokerService);
         }
     }
-    
-    protected void stopBroker() {
-        new Thread() {
-           public void run() {
-        	   try {
-    	            brokerService.stop();
-    	        } catch (Exception e) {
-    	            LOG.warn("Failure occured while stopping broker");
-    	        }    			
-    		}
-    	}.start();
-    }
 
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Fri Nov 20 11:36:45 2009
@@ -24,6 +24,8 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -54,7 +56,7 @@
  * @org.apache.xbean.XBean
  * @version $Revision: 1.4 $
  */
-public class KahaPersistenceAdapter implements PersistenceAdapter {
+public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
 
     private static final int STORE_LOCKED_WAIT_DELAY = 10 * 1000;
     private static final Log LOG = LogFactory.getLog(KahaPersistenceAdapter.class);
@@ -73,6 +75,7 @@
     private boolean initialized;
     private final AtomicLong storeSize;
     private boolean persistentIndex = true;
+    private BrokerService brokerService;
 
     
     public KahaPersistenceAdapter(AtomicLong size) {
@@ -175,6 +178,7 @@
                     container.setValueMarshaller(new TransactionMarshaller(wireFormat));
                     container.load();
                     transactionStore = new KahaTransactionStore(this, container);
+                    transactionStore.setBrokerService(brokerService);
                     break;
                 } catch (StoreLockedExcpetion e) {
                     LOG.info("Store is locked... waiting " + (STORE_LOCKED_WAIT_DELAY / 1000)
@@ -361,6 +365,10 @@
             wireFormat.setTightEncodingEnabled(true);
         }
     }
+
+	public void setBrokerService(BrokerService brokerService) {
+		this.brokerService = brokerService;
+	}
   
 
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java Fri Nov 20 11:36:45 2009
@@ -24,17 +24,23 @@
 
 import javax.transaction.xa.XAException;
 
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.kaha.RuntimeStoreException;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.ProxyMessageStore;
 import org.apache.activemq.store.ProxyTopicMessageStore;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionRecoveryListener;
 import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.journal.JournalPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Provides a TransactionStore implementation that can create transaction aware
@@ -42,10 +48,14 @@
  * 
  * @version $Revision: 1.4 $
  */
-public class KahaTransactionStore implements TransactionStore {
+public class KahaTransactionStore implements TransactionStore, BrokerServiceAware {	
+    private static final Log LOG = LogFactory.getLog(KahaTransactionStore.class);
+	
     private Map transactions = new ConcurrentHashMap();
     private Map prepared;
     private KahaPersistenceAdapter adaptor;
+    
+    private BrokerService brokerService;
 
     KahaTransactionStore(KahaPersistenceAdapter adaptor, Map preparedMap) {
         this.adaptor = adaptor;
@@ -130,12 +140,19 @@
      * @throws IOException
      */
     void addMessage(final MessageStore destination, final Message message) throws IOException {
-        if (message.isInTransaction()) {
-            KahaTransaction tx = getOrCreateTx(message.getTransactionId());
-            tx.add((KahaMessageStore)destination, message);
-        } else {
-            destination.addMessage(null, message);
-        }
+    	try {
+    		if (message.isInTransaction()) {
+    			KahaTransaction tx = getOrCreateTx(message.getTransactionId());
+    			tx.add((KahaMessageStore)destination, message);
+    		} else {
+    			destination.addMessage(null, message);
+    		}
+    	} catch (RuntimeStoreException rse) {
+            if (rse.getCause() instanceof IOException) {
+                brokerService.handleIOException((IOException)rse.getCause());
+            }
+            throw rse;
+    	}
     }
 
     /**
@@ -143,12 +160,19 @@
      * @throws IOException
      */
     final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
-        if (ack.isInTransaction()) {
-            KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
-            tx.add((KahaMessageStore)destination, ack);
-        } else {
-            destination.removeMessage(null, ack);
-        }
+    	try {
+    		if (ack.isInTransaction()) {
+    			KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
+    			tx.add((KahaMessageStore)destination, ack);
+    		} else {
+    			destination.removeMessage(null, ack);
+    		}
+    	} catch (RuntimeStoreException rse) {
+            if (rse.getCause() instanceof IOException) {
+                brokerService.handleIOException((IOException)rse.getCause());
+            }
+            throw rse;
+    	}
     }
 
     protected synchronized KahaTransaction getTx(TransactionId key) {
@@ -181,4 +205,8 @@
     protected MessageStore getStoreById(Object id) {
         return adaptor.retrieveMessageStore(id);
     }
+
+	public void setBrokerService(BrokerService brokerService) {
+		this.brokerService = brokerService;
+	}
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Fri Nov 20 11:36:45 2009
@@ -17,6 +17,8 @@
 package org.apache.activemq.store.kahadb;
 
 import org.apache.activeio.journal.Journal;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -37,7 +39,7 @@
  * @org.apache.xbean.XBean element="kahaDB"
  * @version $Revision: 1.17 $
  */
-public class KahaDBPersistenceAdapter implements PersistenceAdapter {
+public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
     private KahaDBStore letter = new KahaDBStore();
     
 
@@ -364,4 +366,8 @@
     public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
         letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
     }
+
+	public void setBrokerService(BrokerService brokerService) {
+		letter.setBrokerService(brokerService);
+	}
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri Nov 20 11:36:45 2009
@@ -36,6 +36,8 @@
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.SubscriptionInfo;
@@ -75,8 +77,11 @@
 import org.apache.kahadb.util.SequenceSet;
 import org.apache.kahadb.util.StringMarshaller;
 import org.apache.kahadb.util.VariableMarshaller;
+import org.springframework.core.enums.LetterCodedLabeledEnum;
 
-public class MessageDatabase {
+public class MessageDatabase implements BrokerServiceAware {
+	
+	private BrokerService brokerService;
 
     public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
     public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "500"));
@@ -227,6 +232,41 @@
         }
 	}
 	
+	private void startCheckpoint() {
+        checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
+            public void run() {
+                try {
+                    long lastCleanup = System.currentTimeMillis();
+                    long lastCheckpoint = System.currentTimeMillis();
+                    // Sleep for a short time so we can periodically check 
+                    // to see if we need to exit this thread.
+                    long sleepTime = Math.min(checkpointInterval, 500);
+                    while (opened.get()) {
+                        
+                        Thread.sleep(sleepTime);
+                        long now = System.currentTimeMillis();
+                        if( now - lastCleanup >= cleanupInterval ) {
+                            checkpointCleanup(true);
+                            lastCleanup = now;
+                            lastCheckpoint = now;
+                        } else if( now - lastCheckpoint >= checkpointInterval ) {
+                            checkpointCleanup(false);
+                            lastCheckpoint = now;
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    // Looks like someone really wants us to exit this thread...
+                } catch (IOException ioe) {
+                    LOG.error("Checkpoint failed", ioe);
+                    brokerService.handleIOException(ioe);
+                }
+            }
+                    
+        };
+        checkpointThread.setDaemon(true);
+        checkpointThread.start();
+	}
+	
 	/**
 	 * @throws IOException
 	 */
@@ -236,34 +276,7 @@
             
 	        loadPageFile();
 	        
-	        checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
-	            public void run() {
-	                try {
-	                    long lastCleanup = System.currentTimeMillis();
-	                    long lastCheckpoint = System.currentTimeMillis();
-	                    
-	                    // Sleep for a short time so we can periodically check 
-	                    // to see if we need to exit this thread.
-	                    long sleepTime = Math.min(checkpointInterval, 500);
-	                    while (opened.get()) {
-	                        Thread.sleep(sleepTime);
-	                        long now = System.currentTimeMillis();
-	                        if( now - lastCleanup >= cleanupInterval ) {
-	                            checkpointCleanup(true);
-	                            lastCleanup = now;
-	                            lastCheckpoint = now;
-	                        } else if( now - lastCheckpoint >= checkpointInterval ) {
-	                            checkpointCleanup(false);
-	                            lastCheckpoint = now;
-	                        }
-	                    }
-	                } catch (InterruptedException e) {
-	                    // Looks like someone really wants us to exit this thread...
-	                }
-	            }
-	        };
-	        checkpointThread.setDaemon(true);
-	        checkpointThread.start();
+	        startCheckpoint();
             recover();
 		}
 	}
@@ -575,26 +588,22 @@
         return journal.getNextLocation(null);
 	}
 
-    protected void checkpointCleanup(final boolean cleanup) {
-        try {
-        	long start = System.currentTimeMillis();
-            synchronized (indexMutex) {
-            	if( !opened.get() ) {
-            		return;
-            	}
-                pageFile.tx().execute(new Transaction.Closure<IOException>() {
-                    public void execute(Transaction tx) throws IOException {
-                        checkpointUpdate(tx, cleanup);
-                    }
-                });
-            }
-        	long end = System.currentTimeMillis();
-        	if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
-        		LOG.info("Slow KahaDB access: cleanup took "+(end-start));
+    protected void checkpointCleanup(final boolean cleanup) throws IOException {
+    	long start = System.currentTimeMillis();
+        synchronized (indexMutex) {
+        	if( !opened.get() ) {
+        		return;
         	}
-        } catch (IOException e) {
-        	e.printStackTrace();
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    checkpointUpdate(tx, cleanup);
+                }
+            });
         }
+    	long end = System.currentTimeMillis();
+    	if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+    		LOG.info("Slow KahaDB access: cleanup took "+(end-start));
+    	}
     }
 
     
@@ -617,32 +626,40 @@
     }
 
     /**
-     * All updated are are funneled through this method. The updates a converted
+     * All updated are are funneled through this method. The updates are converted
      * to a JournalMessage which is logged to the journal and then the data from
      * the JournalMessage is used to update the index just like it would be done
-     * durring a recovery process.
+     * during a recovery process.
      */
     public Location store(JournalCommand data, boolean sync) throws IOException {
-
-    	
-        int size = data.serializedSizeFramed();
-        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
-        os.writeByte(data.type().getNumber());
-        data.writeFramed(os);
-
-        long start = System.currentTimeMillis();
-        Location location = journal.write(os.toByteSequence(), sync);
-        long start2 = System.currentTimeMillis();
-        process(data, location);
-    	long end = System.currentTimeMillis();
-    	if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
-    		LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
+    	try {
+            int size = data.serializedSizeFramed();
+            DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
+            os.writeByte(data.type().getNumber());
+            data.writeFramed(os);
+    
+            long start = System.currentTimeMillis();
+            Location location = journal.write(os.toByteSequence(), sync);
+            long start2 = System.currentTimeMillis();
+            process(data, location);
+        	long end = System.currentTimeMillis();
+        	if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+        		LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
+        	}
+    
+            synchronized (indexMutex) {
+            	metadata.lastUpdate = location;
+            }
+            if (!checkpointThread.isAlive()) {
+                LOG.info("KahaDB: Recovering checkpoint thread after exception");
+                startCheckpoint();
+            }
+            return location;
+    	} catch (IOException ioe) {
+            LOG.error("KahaDB failed to store to Journal", ioe);
+            brokerService.handleIOException(ioe);
+    	    throw ioe;
     	}
-
-        synchronized (indexMutex) {
-        	metadata.lastUpdate = location;
-        }
-        return location;
     }
 
     /**
@@ -1530,4 +1547,8 @@
     public void setChecksumJournalFiles(boolean checksumJournalFiles) {
         this.checksumJournalFiles = checksumJournalFiles;
     }
+
+	public void setBrokerService(BrokerService brokerService) {
+		this.brokerService = brokerService;
+	}
 }

Copied: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java (from r835888, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java)
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java?p2=activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java&r1=835888&r2=882511&rev=882511&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java Fri Nov 20 11:36:45 2009
@@ -28,12 +28,25 @@
             .getLog(DefaultIOExceptionHandler.class);
     private BrokerService broker;
     private boolean ignoreAllErrors = false;
+    private boolean ignoreNoSpaceErrors = true;
+    private String noSpaceMessage = "space";
 
     public void handle(IOException exception) {
         if (ignoreAllErrors) {
             LOG.info("Ignoring IO exception, " + exception, exception);
             return;
         }
+        
+        if (ignoreNoSpaceErrors) {
+            Throwable cause = exception;
+            while (cause != null && cause instanceof IOException) {
+                if (cause.getMessage().contains(noSpaceMessage)) {
+                    LOG.info("Ignoring no space left exception, " + exception, exception);
+                    return;
+                }
+                cause = cause.getCause();
+            }
+        }
 
         LOG.info("Stopping the broker due to IO exception, " + exception, exception);
         new Thread() {
@@ -59,4 +72,20 @@
         this.ignoreAllErrors = ignoreAllErrors;
     }
 
+    public boolean isIgnoreNoSpaceErrors() {
+        return ignoreNoSpaceErrors;
+    }
+
+    public void setIgnoreNoSpaceErrors(boolean ignoreNoSpaceErrors) {
+        this.ignoreNoSpaceErrors = ignoreNoSpaceErrors;
+    }
+
+    public String getNoSpaceMessage() {
+        return noSpaceMessage;
+    }
+
+    public void setNoSpaceMessage(String noSpaceMessage) {
+        this.noSpaceMessage = noSpaceMessage;
+    }
+
 }

Modified: activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (original)
+++ activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Fri Nov 20 11:36:45 2009
@@ -21,6 +21,7 @@
 import java.io.RandomAccessFile;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
 
@@ -85,6 +86,7 @@
         public final CountDownLatch latch = new CountDownLatch(1);
 		private final int offset;
         public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
+        public AtomicReference<IOException> exception = new AtomicReference<IOException>();
 
         public WriteBatch(DataFile dataFile, int offset, WriteCommand write) throws IOException {
             this.dataFile = dataFile;
@@ -158,7 +160,7 @@
      * @throws
      */
     public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
-
+    	
         // Write the packet our internal buffer.
         int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
 
@@ -184,6 +186,10 @@
             } catch (InterruptedException e) {
                 throw new InterruptedIOException();
             }
+            IOException exception = batch.exception.get(); 
+            if (exception != null) {
+            	throw exception;
+            }
         }	
 
         return location;
@@ -213,10 +219,7 @@
             if (shutdown) {
                 throw new IOException("Async Writter Thread Shutdown");
             }
-            if (firstAsyncException != null) {
-                throw firstAsyncException;
-            }
-
+            
             if (!running) {
                 running = true;
                 thread = new Thread() {
@@ -228,6 +231,11 @@
                 thread.setDaemon(true);
                 thread.setName("ActiveMQ Data File Writer");
                 thread.start();
+                firstAsyncException = null;
+            }
+            
+            if (firstAsyncException != null) {
+                throw firstAsyncException;
             }
 
             while ( true ) {
@@ -298,6 +306,7 @@
     protected void processQueue() {
         DataFile dataFile = null;
         RandomAccessFile file = null;
+        WriteBatch wb = null;
         try {
 
             DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
@@ -321,7 +330,7 @@
                     enqueueMutex.notify();
                 }
 
-                WriteBatch wb = (WriteBatch)o;
+                wb = (WriteBatch)o;
                 if (dataFile != wb.dataFile) {
                     if (file != null) {
                         file.setLength(dataFile.getLength());
@@ -405,6 +414,14 @@
         } catch (IOException e) {
             synchronized (enqueueMutex) {
                 firstAsyncException = e;
+                if (wb != null) {
+                    wb.latch.countDown();
+                    wb.exception.set(e);
+                }
+                if (nextWriteBatch != null) {
+            	    nextWriteBatch.latch.countDown();
+            	    nextWriteBatch.exception.set(e);
+                }
             }
         } catch (InterruptedException e) {
         } finally {
@@ -415,6 +432,7 @@
             } catch (Throwable ignore) {
             }
             shutdownDone.countDown();
+            running = false;
         }
     }
 

Modified: activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Fri Nov 20 11:36:45 2009
@@ -44,6 +44,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.IOExceptionSupport;
 import org.apache.kahadb.util.IOHelper;
 import org.apache.kahadb.util.IntrospectionSupport;
 import org.apache.kahadb.util.LRUCache;
@@ -165,8 +166,8 @@
         }
         
         void begin() {
-            diskBound = current;
-            current = null;
+           diskBound = current;
+           current = null;
         }
         
         /**
@@ -176,6 +177,10 @@
             diskBound=null;
             return current == null;
         }
+        
+        boolean isDone() {
+            return diskBound == null && current == null;
+        }
 
     }
     
@@ -937,12 +942,15 @@
             // If there is not enough to write, wait for a notification...
 
             batch = new ArrayList<PageWrite>(writes.size());
-            // build a write batch from the current write cache. 
+            // build a write batch from the current write cache.
             for (PageWrite write : writes.values()) {
                 batch.add(write);
                 // Move the current write to the diskBound write, this lets folks update the 
                 // page again without blocking for this write.
                 write.begin();
+                if (write.diskBound == null) {
+                    batch.remove(write);
+                }
             }
 
             // Grab on to the existing checkpoint latch cause once we do this write we can 
@@ -951,71 +959,82 @@
             this.checkpointLatch=null;
         }
         
- 
-       if (enableRecoveryFile) {
-           
-           // Using Adler-32 instead of CRC-32 because it's much faster and it's 
-           // weakness for short messages with few hundred bytes is not a factor in this case since we know 
-           // our write batches are going to much larger.
-           Checksum checksum = new Adler32();
-           for (PageWrite w : batch) {
-               checksum.update(w.diskBound, 0, pageSize);
-           }
-           
-           // Can we shrink the recovery buffer??
-           if( recoveryPageCount > recoveryFileMaxPageCount ) {
-               int t = Math.max(recoveryFileMinPageCount, batch.size());
-               recoveryFile.setLength(recoveryFileSizeForPages(t));
-           }
-           
-            // Record the page writes in the recovery buffer.
-            recoveryFile.seek(0);
-            // Store the next tx id...
-            recoveryFile.writeLong(nextTxid.get());
-            // Store the checksum for thw write batch so that on recovery we know if we have a consistent 
-            // write batch on disk.
-            recoveryFile.writeLong(checksum.getValue());
-            // Write the # of pages that will follow
-            recoveryFile.writeInt(batch.size());
-            
-            
-            // Write the pages.
-            recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
+       try {
+            if (enableRecoveryFile) {
+
+                // Using Adler-32 instead of CRC-32 because it's much faster and
+                // it's
+                // weakness for short messages with few hundred bytes is not a
+                // factor in this case since we know
+                // our write batches are going to much larger.
+                Checksum checksum = new Adler32();
+                for (PageWrite w : batch) {
+                    try {
+                        checksum.update(w.diskBound, 0, pageSize);
+                    } catch (Throwable t) {
+                        throw IOExceptionSupport.create(
+                                "Cannot create recovery file. Reason: " + t, t);
+                    }
+                }
+
+                // Can we shrink the recovery buffer??
+                if (recoveryPageCount > recoveryFileMaxPageCount) {
+                    int t = Math.max(recoveryFileMinPageCount, batch.size());
+                    recoveryFile.setLength(recoveryFileSizeForPages(t));
+                }
+
+                // Record the page writes in the recovery buffer.
+                recoveryFile.seek(0);
+                // Store the next tx id...
+                recoveryFile.writeLong(nextTxid.get());
+                // Store the checksum for thw write batch so that on recovery we
+                // know if we have a consistent
+                // write batch on disk.
+                recoveryFile.writeLong(checksum.getValue());
+                // Write the # of pages that will follow
+                recoveryFile.writeInt(batch.size());
+
+                // Write the pages.
+                recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
+
+                for (PageWrite w : batch) {
+                    recoveryFile.writeLong(w.page.getPageId());
+                    recoveryFile.write(w.diskBound, 0, pageSize);
+                }
+
+                if (enableDiskSyncs) {
+                    // Sync to make sure recovery buffer writes land on disk..
+                    recoveryFile.getFD().sync();
+                }
+
+                recoveryPageCount = batch.size();
+            }
+
             for (PageWrite w : batch) {
-                recoveryFile.writeLong(w.page.getPageId());
-                recoveryFile.write(w.diskBound, 0, pageSize);
+                writeFile.seek(toOffset(w.page.getPageId()));
+                writeFile.write(w.diskBound, 0, pageSize);
+                w.done();
             }
-            
+
+            // Sync again
             if (enableDiskSyncs) {
-                // Sync to make sure recovery buffer writes land on disk..
-                recoveryFile.getFD().sync();
+                writeFile.getFD().sync();
             }
-            
-            recoveryPageCount = batch.size();
-        }
-       
-        
-        for (PageWrite w : batch) {
-            writeFile.seek(toOffset(w.page.getPageId()));
-            writeFile.write(w.diskBound, 0, pageSize);
-        }
-        
-        // Sync again
-        if( enableDiskSyncs ) {
-            writeFile.getFD().sync();
-        }
-        
-        synchronized( writes ) {
-            for (PageWrite w : batch) {
-                // If there are no more pending writes, then remove it from the write cache.
-                if( w.done() ) {
-                    writes.remove(w.page.getPageId());
+
+        } finally {
+            synchronized (writes) {
+                for (PageWrite w : batch) {
+                    // If there are no more pending writes, then remove it from
+                    // the write cache.
+                    if (w.isDone()) {
+                        writes.remove(w.page.getPageId());
+                    }
                 }
             }
-        }
-        
-        if( checkpointLatch!=null ) {
-            checkpointLatch.countDown();
+            
+            if( checkpointLatch!=null ) {
+                checkpointLatch.countDown();
+            }
         }
     }