You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/07/02 17:28:43 UTC
svn commit: r1498978 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/broker/
activemq-broker/src/main/java/org/apache/activemq/broker/region/
activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/
Author: chirino
Date: Tue Jul 2 15:28:42 2013
New Revision: 1498978
URL: http://svn.apache.org/r1498978
Log:
When the leveldb replicated master was shutting down the client would get notified of a failure and it would not be hidden from the client app. We now suppress sending failure messages to clients when a broker is shutting down so that the client failover logic can kick in an reconnect the client to another server gracefully.
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1498978&r1=1498977&r2=1498978&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java Tue Jul 2 15:28:42 2013
@@ -172,6 +172,7 @@ public class TransportConnection impleme
this.taskRunnerFactory = taskRunnerFactory;
this.stopTaskRunnerFactory = stopTaskRunnerFactory;
this.transport = transport;
+ final BrokerService brokerService = this.broker.getBrokerService();
this.transport.setTransportListener(new DefaultTransportListener() {
@Override
public void onCommand(Object o) {
@@ -182,7 +183,7 @@ public class TransportConnection impleme
}
Command command = (Command) o;
Response response = service(command);
- if (response != null) {
+ if (response != null && !brokerService.isStopping() ) {
dispatchSync(response);
}
} finally {
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1498978&r1=1498977&r2=1498978&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Jul 2 15:28:42 2013
@@ -696,7 +696,7 @@ public class Queue extends BaseDestinati
}
} catch (Exception e) {
- if (!sendProducerAck && !context.isInRecoveryMode()) {
+ if (!sendProducerAck && !context.isInRecoveryMode() && !brokerService.isStopping()) {
ExceptionResponse response = new ExceptionResponse(e);
response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response);
Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1498978&r1=1498977&r2=1498978&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Tue Jul 2 15:28:42 2013
@@ -508,10 +508,22 @@ class LevelDBClient(store: LevelDBStore)
def might_fail[T](func : =>T):T = {
def handleFailure(e:IOException) = {
- store.stop()
if( store.broker_service !=null ) {
- store.broker_service.handleIOException(e);
+ // This should start stopping the broker but it might block,
+ // so do it on another thread...
+ new Thread("LevelDB IOException handler.") {
+ override def run() {
+ store.broker_service.handleIOException(e);
+ }
+ }.start()
+ // Lets wait until the broker service has started stopping. Once the
+ // stopping flag is raised, errors caused by stopping the store should
+ // not get propagated to the client.
+ while( !store.broker_service.isStopping ) {
+ Thread.sleep(100);
+ }
}
+ store.stop()
throw e;
}
try {