You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/11/20 12:36:47 UTC
svn commit: r882511 - in /activemq/branches/activemq-5.3:
activemq-core/src/main/java/org/apache/activemq/broker/
activemq-core/src/main/java/org/apache/activemq/store/amq/
activemq-core/src/main/java/org/apache/activemq/store/journal/
activemq-core/sr...
Author: dejanb
Date: Fri Nov 20 11:36:45 2009
New Revision: 882511
URL: http://svn.apache.org/viewvc?rev=882511&view=rev
Log:
merging https://issues.apache.org/activemq/browse/AMQ-2042 - 834922,835373,835412,835833,835888,880792,881221,882144
Added:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
- copied, changed from r835888, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/IOExceptionHandler.java
- copied unchanged from r835888, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/IOExceptionHandler.java
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Fri Nov 20 11:36:45 2009
@@ -84,6 +84,8 @@
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.apache.activemq.util.IOExceptionHandler;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.JMXSupport;
@@ -178,7 +180,9 @@
private int systemExitOnShutdownExitCode;
private SslContext sslContext;
private boolean forceStart = false;
- static {
+ private IOExceptionHandler ioExceptionHandler;
+
+ static {
String localHostName = "localhost";
try {
localHostName = java.net.InetAddress.getLocalHost().getHostName();
@@ -481,6 +485,9 @@
}
}
brokerId = broker.getBrokerId();
+ if (ioExceptionHandler == null) {
+ setIoExceptionHandler(new DefaultIOExceptionHandler());
+ }
LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " + brokerId + ") started");
getBroker().brokerServiceStarted();
startedLatch.countDown();
@@ -2008,6 +2015,14 @@
}
}
}
+
+ public void handleIOException(IOException exception) {
+ if (ioExceptionHandler != null) {
+ ioExceptionHandler.handle(exception);
+ } else {
+ LOG.info("Ignoring IO exception, " + exception, exception);
+ }
+ }
/**
* Starts all destiantions in persistence store. This includes all inactive
@@ -2111,5 +2126,10 @@
this.passiveSlave = passiveSlave;
}
+ public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
+ ioExceptionHandler.setBrokerService(this);
+ this.ioExceptionHandler = ioExceptionHandler;
+ }
+
}
\ No newline at end of file
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Fri Nov 20 11:36:45 2009
@@ -695,7 +695,13 @@
}
public Location writeCommand(DataStructure command, boolean syncHint,boolean forceSync) throws IOException {
- return asyncDataManager.write(wireFormat.marshal(command), (forceSync||(syncHint && syncOnWrite)));
+ try {
+ return asyncDataManager.write(wireFormat.marshal(command), (forceSync||(syncHint && syncOnWrite)));
+ } catch (IOException ioe) {
+ LOG.error("Failed to write command: " + command + ". Reason: " + ioe, ioe);
+ brokerService.handleIOException(ioe);
+ throw ioe;
+ }
}
private Location writeTraceMessage(String message, boolean sync) throws IOException {
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java Fri Nov 20 11:36:45 2009
@@ -623,7 +623,7 @@
return journal.write(toPacket(wireFormat.marshal(command)), sync);
} catch (IOException ioe) {
LOG.error("Cannot write to the journal", ioe);
- stopBroker();
+ brokerService.handleIOException(ioe);
throw ioe;
}
}
@@ -725,17 +725,5 @@
((BrokerServiceAware)pa).setBrokerService(brokerService);
}
}
-
- protected void stopBroker() {
- new Thread() {
- public void run() {
- try {
- brokerService.stop();
- } catch (Exception e) {
- LOG.warn("Failure occured while stopping broker");
- }
- }
- }.start();
- }
}
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java Fri Nov 20 11:36:45 2009
@@ -24,6 +24,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@@ -54,7 +56,7 @@
* @org.apache.xbean.XBean
* @version $Revision: 1.4 $
*/
-public class KahaPersistenceAdapter implements PersistenceAdapter {
+public class KahaPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
private static final int STORE_LOCKED_WAIT_DELAY = 10 * 1000;
private static final Log LOG = LogFactory.getLog(KahaPersistenceAdapter.class);
@@ -73,6 +75,7 @@
private boolean initialized;
private final AtomicLong storeSize;
private boolean persistentIndex = true;
+ private BrokerService brokerService;
public KahaPersistenceAdapter(AtomicLong size) {
@@ -175,6 +178,7 @@
container.setValueMarshaller(new TransactionMarshaller(wireFormat));
container.load();
transactionStore = new KahaTransactionStore(this, container);
+ transactionStore.setBrokerService(brokerService);
break;
} catch (StoreLockedExcpetion e) {
LOG.info("Store is locked... waiting " + (STORE_LOCKED_WAIT_DELAY / 1000)
@@ -361,6 +365,10 @@
wireFormat.setTightEncodingEnabled(true);
}
}
+
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerService = brokerService;
+ }
}
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java Fri Nov 20 11:36:45 2009
@@ -24,17 +24,23 @@
import javax.transaction.xa.XAException;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ProxyMessageStore;
import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.journal.JournalPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* Provides a TransactionStore implementation that can create transaction aware
@@ -42,10 +48,14 @@
*
* @version $Revision: 1.4 $
*/
-public class KahaTransactionStore implements TransactionStore {
+public class KahaTransactionStore implements TransactionStore, BrokerServiceAware {
+ private static final Log LOG = LogFactory.getLog(KahaTransactionStore.class);
+
private Map transactions = new ConcurrentHashMap();
private Map prepared;
private KahaPersistenceAdapter adaptor;
+
+ private BrokerService brokerService;
KahaTransactionStore(KahaPersistenceAdapter adaptor, Map preparedMap) {
this.adaptor = adaptor;
@@ -130,12 +140,19 @@
* @throws IOException
*/
void addMessage(final MessageStore destination, final Message message) throws IOException {
- if (message.isInTransaction()) {
- KahaTransaction tx = getOrCreateTx(message.getTransactionId());
- tx.add((KahaMessageStore)destination, message);
- } else {
- destination.addMessage(null, message);
- }
+ try {
+ if (message.isInTransaction()) {
+ KahaTransaction tx = getOrCreateTx(message.getTransactionId());
+ tx.add((KahaMessageStore)destination, message);
+ } else {
+ destination.addMessage(null, message);
+ }
+ } catch (RuntimeStoreException rse) {
+ if (rse.getCause() instanceof IOException) {
+ brokerService.handleIOException((IOException)rse.getCause());
+ }
+ throw rse;
+ }
}
/**
@@ -143,12 +160,19 @@
* @throws IOException
*/
final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException {
- if (ack.isInTransaction()) {
- KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
- tx.add((KahaMessageStore)destination, ack);
- } else {
- destination.removeMessage(null, ack);
- }
+ try {
+ if (ack.isInTransaction()) {
+ KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
+ tx.add((KahaMessageStore)destination, ack);
+ } else {
+ destination.removeMessage(null, ack);
+ }
+ } catch (RuntimeStoreException rse) {
+ if (rse.getCause() instanceof IOException) {
+ brokerService.handleIOException((IOException)rse.getCause());
+ }
+ throw rse;
+ }
}
protected synchronized KahaTransaction getTx(TransactionId key) {
@@ -181,4 +205,8 @@
protected MessageStore getStoreById(Object id) {
return adaptor.retrieveMessageStore(id);
}
+
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerService = brokerService;
+ }
}
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Fri Nov 20 11:36:45 2009
@@ -17,6 +17,8 @@
package org.apache.activemq.store.kahadb;
import org.apache.activeio.journal.Journal;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@@ -37,7 +39,7 @@
* @org.apache.xbean.XBean element="kahaDB"
* @version $Revision: 1.17 $
*/
-public class KahaDBPersistenceAdapter implements PersistenceAdapter {
+public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
private KahaDBStore letter = new KahaDBStore();
@@ -364,4 +366,8 @@
public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
}
+
+ public void setBrokerService(BrokerService brokerService) {
+ letter.setBrokerService(brokerService);
+ }
}
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri Nov 20 11:36:45 2009
@@ -36,6 +36,8 @@
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.SubscriptionInfo;
@@ -75,8 +77,11 @@
import org.apache.kahadb.util.SequenceSet;
import org.apache.kahadb.util.StringMarshaller;
import org.apache.kahadb.util.VariableMarshaller;
+import org.springframework.core.enums.LetterCodedLabeledEnum;
-public class MessageDatabase {
+public class MessageDatabase implements BrokerServiceAware {
+
+ private BrokerService brokerService;
public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
public static final int LOG_SLOW_ACCESS_TIME = Integer.parseInt(System.getProperty(PROPERTY_LOG_SLOW_ACCESS_TIME, "500"));
@@ -227,6 +232,41 @@
}
}
+ private void startCheckpoint() {
+ checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
+ public void run() {
+ try {
+ long lastCleanup = System.currentTimeMillis();
+ long lastCheckpoint = System.currentTimeMillis();
+ // Sleep for a short time so we can periodically check
+ // to see if we need to exit this thread.
+ long sleepTime = Math.min(checkpointInterval, 500);
+ while (opened.get()) {
+
+ Thread.sleep(sleepTime);
+ long now = System.currentTimeMillis();
+ if( now - lastCleanup >= cleanupInterval ) {
+ checkpointCleanup(true);
+ lastCleanup = now;
+ lastCheckpoint = now;
+ } else if( now - lastCheckpoint >= checkpointInterval ) {
+ checkpointCleanup(false);
+ lastCheckpoint = now;
+ }
+ }
+ } catch (InterruptedException e) {
+ // Looks like someone really wants us to exit this thread...
+ } catch (IOException ioe) {
+ LOG.error("Checkpoint failed", ioe);
+ brokerService.handleIOException(ioe);
+ }
+ }
+
+ };
+ checkpointThread.setDaemon(true);
+ checkpointThread.start();
+ }
+
/**
* @throws IOException
*/
@@ -236,34 +276,7 @@
loadPageFile();
- checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
- public void run() {
- try {
- long lastCleanup = System.currentTimeMillis();
- long lastCheckpoint = System.currentTimeMillis();
-
- // Sleep for a short time so we can periodically check
- // to see if we need to exit this thread.
- long sleepTime = Math.min(checkpointInterval, 500);
- while (opened.get()) {
- Thread.sleep(sleepTime);
- long now = System.currentTimeMillis();
- if( now - lastCleanup >= cleanupInterval ) {
- checkpointCleanup(true);
- lastCleanup = now;
- lastCheckpoint = now;
- } else if( now - lastCheckpoint >= checkpointInterval ) {
- checkpointCleanup(false);
- lastCheckpoint = now;
- }
- }
- } catch (InterruptedException e) {
- // Looks like someone really wants us to exit this thread...
- }
- }
- };
- checkpointThread.setDaemon(true);
- checkpointThread.start();
+ startCheckpoint();
recover();
}
}
@@ -575,26 +588,22 @@
return journal.getNextLocation(null);
}
- protected void checkpointCleanup(final boolean cleanup) {
- try {
- long start = System.currentTimeMillis();
- synchronized (indexMutex) {
- if( !opened.get() ) {
- return;
- }
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- checkpointUpdate(tx, cleanup);
- }
- });
- }
- long end = System.currentTimeMillis();
- if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
- LOG.info("Slow KahaDB access: cleanup took "+(end-start));
+ protected void checkpointCleanup(final boolean cleanup) throws IOException {
+ long start = System.currentTimeMillis();
+ synchronized (indexMutex) {
+ if( !opened.get() ) {
+ return;
}
- } catch (IOException e) {
- e.printStackTrace();
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ checkpointUpdate(tx, cleanup);
+ }
+ });
}
+ long end = System.currentTimeMillis();
+ if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+ LOG.info("Slow KahaDB access: cleanup took "+(end-start));
+ }
}
@@ -617,32 +626,40 @@
}
/**
- * All updated are are funneled through this method. The updates a converted
+ * All updated are are funneled through this method. The updates are converted
* to a JournalMessage which is logged to the journal and then the data from
* the JournalMessage is used to update the index just like it would be done
- * durring a recovery process.
+ * during a recovery process.
*/
public Location store(JournalCommand data, boolean sync) throws IOException {
-
-
- int size = data.serializedSizeFramed();
- DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
- os.writeByte(data.type().getNumber());
- data.writeFramed(os);
-
- long start = System.currentTimeMillis();
- Location location = journal.write(os.toByteSequence(), sync);
- long start2 = System.currentTimeMillis();
- process(data, location);
- long end = System.currentTimeMillis();
- if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
- LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
+ try {
+ int size = data.serializedSizeFramed();
+ DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
+ os.writeByte(data.type().getNumber());
+ data.writeFramed(os);
+
+ long start = System.currentTimeMillis();
+ Location location = journal.write(os.toByteSequence(), sync);
+ long start2 = System.currentTimeMillis();
+ process(data, location);
+ long end = System.currentTimeMillis();
+ if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+ LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
+ }
+
+ synchronized (indexMutex) {
+ metadata.lastUpdate = location;
+ }
+ if (!checkpointThread.isAlive()) {
+ LOG.info("KahaDB: Recovering checkpoint thread after exception");
+ startCheckpoint();
+ }
+ return location;
+ } catch (IOException ioe) {
+ LOG.error("KahaDB failed to store to Journal", ioe);
+ brokerService.handleIOException(ioe);
+ throw ioe;
}
-
- synchronized (indexMutex) {
- metadata.lastUpdate = location;
- }
- return location;
}
/**
@@ -1530,4 +1547,8 @@
public void setChecksumJournalFiles(boolean checksumJournalFiles) {
this.checksumJournalFiles = checksumJournalFiles;
}
+
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerService = brokerService;
+ }
}
Copied: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java (from r835888, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java)
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java?p2=activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java&r1=835888&r2=882511&rev=882511&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java Fri Nov 20 11:36:45 2009
@@ -28,12 +28,25 @@
.getLog(DefaultIOExceptionHandler.class);
private BrokerService broker;
private boolean ignoreAllErrors = false;
+ private boolean ignoreNoSpaceErrors = true;
+ private String noSpaceMessage = "space";
public void handle(IOException exception) {
if (ignoreAllErrors) {
LOG.info("Ignoring IO exception, " + exception, exception);
return;
}
+
+ if (ignoreNoSpaceErrors) {
+ Throwable cause = exception;
+ while (cause != null && cause instanceof IOException) {
+ if (cause.getMessage().contains(noSpaceMessage)) {
+ LOG.info("Ignoring no space left exception, " + exception, exception);
+ return;
+ }
+ cause = cause.getCause();
+ }
+ }
LOG.info("Stopping the broker due to IO exception, " + exception, exception);
new Thread() {
@@ -59,4 +72,20 @@
this.ignoreAllErrors = ignoreAllErrors;
}
+ public boolean isIgnoreNoSpaceErrors() {
+ return ignoreNoSpaceErrors;
+ }
+
+ public void setIgnoreNoSpaceErrors(boolean ignoreNoSpaceErrors) {
+ this.ignoreNoSpaceErrors = ignoreNoSpaceErrors;
+ }
+
+ public String getNoSpaceMessage() {
+ return noSpaceMessage;
+ }
+
+ public void setNoSpaceMessage(String noSpaceMessage) {
+ this.noSpaceMessage = noSpaceMessage;
+ }
+
}
Modified: activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (original)
+++ activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Fri Nov 20 11:36:45 2009
@@ -21,6 +21,7 @@
import java.io.RandomAccessFile;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
@@ -85,6 +86,7 @@
public final CountDownLatch latch = new CountDownLatch(1);
private final int offset;
public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
+ public AtomicReference<IOException> exception = new AtomicReference<IOException>();
public WriteBatch(DataFile dataFile, int offset, WriteCommand write) throws IOException {
this.dataFile = dataFile;
@@ -158,7 +160,7 @@
* @throws
*/
public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
-
+
// Write the packet our internal buffer.
int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
@@ -184,6 +186,10 @@
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
+ IOException exception = batch.exception.get();
+ if (exception != null) {
+ throw exception;
+ }
}
return location;
@@ -213,10 +219,7 @@
if (shutdown) {
throw new IOException("Async Writter Thread Shutdown");
}
- if (firstAsyncException != null) {
- throw firstAsyncException;
- }
-
+
if (!running) {
running = true;
thread = new Thread() {
@@ -228,6 +231,11 @@
thread.setDaemon(true);
thread.setName("ActiveMQ Data File Writer");
thread.start();
+ firstAsyncException = null;
+ }
+
+ if (firstAsyncException != null) {
+ throw firstAsyncException;
}
while ( true ) {
@@ -298,6 +306,7 @@
protected void processQueue() {
DataFile dataFile = null;
RandomAccessFile file = null;
+ WriteBatch wb = null;
try {
DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
@@ -321,7 +330,7 @@
enqueueMutex.notify();
}
- WriteBatch wb = (WriteBatch)o;
+ wb = (WriteBatch)o;
if (dataFile != wb.dataFile) {
if (file != null) {
file.setLength(dataFile.getLength());
@@ -405,6 +414,14 @@
} catch (IOException e) {
synchronized (enqueueMutex) {
firstAsyncException = e;
+ if (wb != null) {
+ wb.latch.countDown();
+ wb.exception.set(e);
+ }
+ if (nextWriteBatch != null) {
+ nextWriteBatch.latch.countDown();
+ nextWriteBatch.exception.set(e);
+ }
}
} catch (InterruptedException e) {
} finally {
@@ -415,6 +432,7 @@
} catch (Throwable ignore) {
}
shutdownDone.countDown();
+ running = false;
}
}
Modified: activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=882511&r1=882510&r2=882511&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/branches/activemq-5.3/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Fri Nov 20 11:36:45 2009
@@ -44,6 +44,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.IOExceptionSupport;
import org.apache.kahadb.util.IOHelper;
import org.apache.kahadb.util.IntrospectionSupport;
import org.apache.kahadb.util.LRUCache;
@@ -165,8 +166,8 @@
}
void begin() {
- diskBound = current;
- current = null;
+ diskBound = current;
+ current = null;
}
/**
@@ -176,6 +177,10 @@
diskBound=null;
return current == null;
}
+
+ boolean isDone() {
+ return diskBound == null && current == null;
+ }
}
@@ -937,12 +942,15 @@
// If there is not enough to write, wait for a notification...
batch = new ArrayList<PageWrite>(writes.size());
- // build a write batch from the current write cache.
+ // build a write batch from the current write cache.
for (PageWrite write : writes.values()) {
batch.add(write);
// Move the current write to the diskBound write, this lets folks update the
// page again without blocking for this write.
write.begin();
+ if (write.diskBound == null) {
+ batch.remove(write);
+ }
}
// Grab on to the existing checkpoint latch cause once we do this write we can
@@ -951,71 +959,82 @@
this.checkpointLatch=null;
}
-
- if (enableRecoveryFile) {
-
- // Using Adler-32 instead of CRC-32 because it's much faster and it's
- // weakness for short messages with few hundred bytes is not a factor in this case since we know
- // our write batches are going to much larger.
- Checksum checksum = new Adler32();
- for (PageWrite w : batch) {
- checksum.update(w.diskBound, 0, pageSize);
- }
-
- // Can we shrink the recovery buffer??
- if( recoveryPageCount > recoveryFileMaxPageCount ) {
- int t = Math.max(recoveryFileMinPageCount, batch.size());
- recoveryFile.setLength(recoveryFileSizeForPages(t));
- }
-
- // Record the page writes in the recovery buffer.
- recoveryFile.seek(0);
- // Store the next tx id...
- recoveryFile.writeLong(nextTxid.get());
- // Store the checksum for thw write batch so that on recovery we know if we have a consistent
- // write batch on disk.
- recoveryFile.writeLong(checksum.getValue());
- // Write the # of pages that will follow
- recoveryFile.writeInt(batch.size());
-
-
- // Write the pages.
- recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
+ try {
+ if (enableRecoveryFile) {
+
+ // Using Adler-32 instead of CRC-32 because it's much faster and
+ // it's
+ // weakness for short messages with few hundred bytes is not a
+ // factor in this case since we know
+ // our write batches are going to much larger.
+ Checksum checksum = new Adler32();
+ for (PageWrite w : batch) {
+ try {
+ checksum.update(w.diskBound, 0, pageSize);
+ } catch (Throwable t) {
+ throw IOExceptionSupport.create(
+ "Cannot create recovery file. Reason: " + t, t);
+ }
+ }
+
+ // Can we shrink the recovery buffer??
+ if (recoveryPageCount > recoveryFileMaxPageCount) {
+ int t = Math.max(recoveryFileMinPageCount, batch.size());
+ recoveryFile.setLength(recoveryFileSizeForPages(t));
+ }
+
+ // Record the page writes in the recovery buffer.
+ recoveryFile.seek(0);
+ // Store the next tx id...
+ recoveryFile.writeLong(nextTxid.get());
+ // Store the checksum for thw write batch so that on recovery we
+ // know if we have a consistent
+ // write batch on disk.
+ recoveryFile.writeLong(checksum.getValue());
+ // Write the # of pages that will follow
+ recoveryFile.writeInt(batch.size());
+
+ // Write the pages.
+ recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
+
+ for (PageWrite w : batch) {
+ recoveryFile.writeLong(w.page.getPageId());
+ recoveryFile.write(w.diskBound, 0, pageSize);
+ }
+
+ if (enableDiskSyncs) {
+ // Sync to make sure recovery buffer writes land on disk..
+ recoveryFile.getFD().sync();
+ }
+
+ recoveryPageCount = batch.size();
+ }
+
for (PageWrite w : batch) {
- recoveryFile.writeLong(w.page.getPageId());
- recoveryFile.write(w.diskBound, 0, pageSize);
+ writeFile.seek(toOffset(w.page.getPageId()));
+ writeFile.write(w.diskBound, 0, pageSize);
+ w.done();
}
-
+
+ // Sync again
if (enableDiskSyncs) {
- // Sync to make sure recovery buffer writes land on disk..
- recoveryFile.getFD().sync();
+ writeFile.getFD().sync();
}
-
- recoveryPageCount = batch.size();
- }
-
-
- for (PageWrite w : batch) {
- writeFile.seek(toOffset(w.page.getPageId()));
- writeFile.write(w.diskBound, 0, pageSize);
- }
-
- // Sync again
- if( enableDiskSyncs ) {
- writeFile.getFD().sync();
- }
-
- synchronized( writes ) {
- for (PageWrite w : batch) {
- // If there are no more pending writes, then remove it from the write cache.
- if( w.done() ) {
- writes.remove(w.page.getPageId());
+
+ } finally {
+ synchronized (writes) {
+ for (PageWrite w : batch) {
+ // If there are no more pending writes, then remove it from
+ // the write cache.
+ if (w.isDone()) {
+ writes.remove(w.page.getPageId());
+ }
}
}
- }
-
- if( checkpointLatch!=null ) {
- checkpointLatch.countDown();
+
+ if( checkpointLatch!=null ) {
+ checkpointLatch.countDown();
+ }
}
}