You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/11/13 16:50:32 UTC

svn commit: r835888 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/ store/amq/ store/journal/ store/kahadaptor/ store/kahadb/ util/

Author: dejanb
Date: Fri Nov 13 15:50:32 2009
New Revision: 835888

URL: http://svn.apache.org/viewvc?rev=835888&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQ-2042 - adding configurable io exception handling

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOExceptionHandler.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=835888&r1=835887&r2=835888&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Fri Nov 13 15:50:32 2009
@@ -84,6 +84,8 @@
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.vm.VMTransportFactory;
 import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.apache.activemq.util.IOExceptionHandler;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.JMXSupport;
@@ -178,7 +180,9 @@
     private int systemExitOnShutdownExitCode;
     private SslContext sslContext;
     private boolean forceStart = false;
-    static {
+    private IOExceptionHandler ioExceptionHandler;
+
+	static {
         String localHostName = "localhost";
         try {
             localHostName = java.net.InetAddress.getLocalHost().getHostName();
@@ -481,6 +485,9 @@
                 }
             }
             brokerId = broker.getBrokerId();
+            if (ioExceptionHandler == null) {
+            	setIoExceptionHandler(new DefaultIOExceptionHandler());
+            }
             LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
             getBroker().brokerServiceStarted();
             startedLatch.countDown();
@@ -2008,6 +2015,14 @@
             }
         }
     }
+    
+    public void handleIOException(IOException exception) {
+        if (ioExceptionHandler != null) {
+            ioExceptionHandler.handle(exception);
+         } else {
+            LOG.info("Ignoring IO exception, " + exception, exception);
+         }
+    }
 
     /**
      * Starts all destiantions in persistence store. This includes all inactive
@@ -2111,5 +2126,10 @@
         this.passiveSlave = passiveSlave;
     }
     
+    public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
+        ioExceptionHandler.setBrokerService(this);
+        this.ioExceptionHandler = ioExceptionHandler;
+    }
+    
    
 }
\ No newline at end of file

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=835888&r1=835887&r2=835888&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Fri Nov 13 15:50:32 2009
@@ -699,7 +699,7 @@
     		return asyncDataManager.write(wireFormat.marshal(command), (forceSync||(syncHint && syncOnWrite)));
     	} catch (IOException ioe) {
     		LOG.error("Failed to write command: " + command + ". Reason: " + ioe, ioe);
-        	stopBroker();
+        	brokerService.handleIOException(ioe);
         	throw ioe;
         }
     }
@@ -1091,16 +1091,4 @@
 	           + ".DisableLocking",
 	           "false"));
 	}
-	
-    protected void stopBroker() {
-        new Thread() {
-           public void run() {
-        	   try {
-    	            brokerService.stop();
-    	        } catch (Exception e) {
-    	            LOG.warn("Failure occured while stopping broker", e);
-    	        }    			
-    		}
-    	}.start();
-    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=835888&r1=835887&r2=835888&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Fri Nov 13 15:50:32 2009
@@ -623,7 +623,7 @@
         	    return journal.write(toPacket(wireFormat.marshal(command)), sync);
             } catch (IOException ioe) {
         	    LOG.error("Cannot write to the journal", ioe);
-        	    stopBroker();
+        	    brokerService.handleIOException(ioe);
         	    throw ioe;
             }
         }
@@ -725,17 +725,5 @@
             ((BrokerServiceAware)pa).setBrokerService(brokerService);
         }
     }
-    
-    protected void stopBroker() {
-        new Thread() {
-           public void run() {
-        	   try {
-    	            brokerService.stop();
-    	        } catch (Exception e) {
-    	            LOG.warn("Failure occured while stopping broker", e);
-    	        }    			
-    		}
-    	}.start();
-    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java?rev=835888&r1=835887&r2=835888&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java Fri Nov 13 15:50:32 2009
@@ -148,8 +148,10 @@
     			destination.addMessage(null, message);
     		}
     	} catch (RuntimeStoreException rse) {
-    	    stopBroker();
-    	    throw rse;
+            if (rse.getCause() instanceof IOException) {
+                brokerService.handleIOException((IOException)rse.getCause());
+            }
+            throw rse;
     	}
     }
 
@@ -166,8 +168,10 @@
     			destination.removeMessage(null, ack);
     		}
     	} catch (RuntimeStoreException rse) {
-    	    stopBroker();
-    	    throw rse;
+            if (rse.getCause() instanceof IOException) {
+                brokerService.handleIOException((IOException)rse.getCause());
+            }
+            throw rse;
     	}
     }
 
@@ -205,16 +209,4 @@
 	public void setBrokerService(BrokerService brokerService) {
 		this.brokerService = brokerService;
 	}
-	
-    protected void stopBroker() {
-        new Thread() {
-           public void run() {
-        	   try {
-    	            brokerService.stop();
-    	        } catch (Exception e) {
-    	            LOG.warn("Failure occured while stopping broker", e);
-    	        }    			
-    		}
-    	}.start();
-    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=835888&r1=835887&r2=835888&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri Nov 13 15:50:32 2009
@@ -265,8 +265,8 @@
 	                } catch (InterruptedException e) {
 	                    // Looks like someone really wants us to exit this thread...
 	                } catch (IOException ioe) {
-	                	LOG.error("Checkpoint failed", ioe);
-	                	stopBroker();
+	                    LOG.error("Checkpoint failed", ioe);
+	                    brokerService.handleIOException(ioe);
 	                }
 	            }
 	        };
@@ -1537,16 +1537,4 @@
 	public void setBrokerService(BrokerService brokerService) {
 		this.brokerService = brokerService;
 	}
-	
-    protected void stopBroker() {
-        new Thread() {
-           public void run() {
-        	   try {
-    	            brokerService.stop();
-    	        } catch (Exception e) {
-    	            LOG.warn("Failure occured while stopping broker", e);
-    	        }    			
-    		}
-    	}.start();
-    }
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java?rev=835888&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java Fri Nov 13 15:50:32 2009
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.IOException;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class DefaultIOExceptionHandler implements IOExceptionHandler {
+
+    private static final Log LOG = LogFactory
+            .getLog(DefaultIOExceptionHandler.class);
+    private BrokerService broker;
+    private boolean ignoreAllErrors = false;
+
+    public void handle(IOException exception) {
+        if (ignoreAllErrors) {
+            LOG.info("Ignoring IO exception, " + exception, exception);
+            return;
+        }
+
+        LOG.info("Stopping the broker due to IO exception, " + exception, exception);
+        new Thread() {
+            public void run() {
+                try {
+                    broker.stop();
+                } catch (Exception e) {
+                    LOG.warn("Failure occured while stopping broker", e);
+                }
+            }
+        }.start();
+    }
+
+    public void setBrokerService(BrokerService broker) {
+        this.broker = broker;
+    }
+
+    public boolean isIgnoreAllErrors() {
+        return ignoreAllErrors;
+    }
+
+    public void setIgnoreAllErrors(boolean ignoreAllErrors) {
+        this.ignoreAllErrors = ignoreAllErrors;
+    }
+
+}

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOExceptionHandler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOExceptionHandler.java?rev=835888&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOExceptionHandler.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOExceptionHandler.java Fri Nov 13 15:50:32 2009
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.io.IOException;
+
+import org.apache.activemq.broker.BrokerServiceAware;
+
+public interface IOExceptionHandler extends BrokerServiceAware {
+
+    public void handle(IOException exception);
+
+}