You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/07/02 17:28:43 UTC

svn commit: r1498978 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/ activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/

Author: chirino
Date: Tue Jul  2 15:28:42 2013
New Revision: 1498978

URL: http://svn.apache.org/r1498978
Log:
When the leveldb replicated master was shutting down the client would get notified of a failure and it would not be hidden from the client app.  We now suppress sending failure messages to clients when a broker is shutting down so that the client failover logic can kick in an reconnect the client to another server gracefully.

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1498978&r1=1498977&r2=1498978&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java Tue Jul  2 15:28:42 2013
@@ -172,6 +172,7 @@ public class TransportConnection impleme
         this.taskRunnerFactory = taskRunnerFactory;
         this.stopTaskRunnerFactory = stopTaskRunnerFactory;
         this.transport = transport;
+        final BrokerService brokerService = this.broker.getBrokerService();
         this.transport.setTransportListener(new DefaultTransportListener() {
             @Override
             public void onCommand(Object o) {
@@ -182,7 +183,7 @@ public class TransportConnection impleme
                     }
                     Command command = (Command) o;
                     Response response = service(command);
-                    if (response != null) {
+                    if (response != null && !brokerService.isStopping() ) {
                         dispatchSync(response);
                     }
                 } finally {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1498978&r1=1498977&r2=1498978&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Jul  2 15:28:42 2013
@@ -696,7 +696,7 @@ public class Queue extends BaseDestinati
                                     }
 
                                 } catch (Exception e) {
-                                    if (!sendProducerAck && !context.isInRecoveryMode()) {
+                                    if (!sendProducerAck && !context.isInRecoveryMode() && !brokerService.isStopping()) {
                                         ExceptionResponse response = new ExceptionResponse(e);
                                         response.setCorrelationId(message.getCommandId());
                                         context.getConnection().dispatchAsync(response);

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1498978&r1=1498977&r2=1498978&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala Tue Jul  2 15:28:42 2013
@@ -508,10 +508,22 @@ class LevelDBClient(store: LevelDBStore)
 
   def might_fail[T](func : =>T):T = {
     def handleFailure(e:IOException) = {
-      store.stop()
       if( store.broker_service !=null ) {
-        store.broker_service.handleIOException(e);
+        // This should start stopping the broker but it might block,
+        // so do it on another thread...
+        new Thread("LevelDB IOException handler.") {
+          override def run() {
+            store.broker_service.handleIOException(e);
+          }
+        }.start()
+        // Lets wait until the broker service has started stopping.  Once the
+        // stopping flag is raised, errors caused by stopping the store should
+        // not get propagated to the client.
+        while( !store.broker_service.isStopping ) {
+          Thread.sleep(100);
+        }
       }
+      store.stop()
       throw e;
     }
     try {