You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/07/30 22:18:08 UTC

svn commit: r1508602 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/ activemq-broker/src/main/java/org/apache/activemq/transaction/ activemq-broker/src/main/java/org/apache/activemq/util/ activemq-client/src/main/java/or...

Author: gtully
Date: Tue Jul 30 20:18:07 2013
New Revision: 1508602

URL: http://svn.apache.org/r1508602
Log:
https://issues.apache.org/jira/browse/AMQ-4643 - ensure handled ioexceptions are not propagated back to client when broker or transports are shutdown. additional tests existing tests refactored to reflect new determinism. IOException handler now throws SuppressReplyException which is trapped by the transport connector. If store exception need to be processes by the client, for immediate response rather than failover reconnect, then the ioexception handler should be configured to ignoreAllErrors

Added:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java   (with props)
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java   (with props)
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/LocalTransaction.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/XATransaction.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
    activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java
    activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
    activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java?rev=1508602&r1=1508601&r2=1508602&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/LockableServiceSupport.java Tue Jul 30 20:18:07 2013
@@ -122,7 +122,7 @@ public abstract class LockableServiceSup
                 }
             }
         } catch (IOException e) {
-            LOG.warn("locker keepalive resulted in: " + e, e);
+            LOG.warn("locker keepAlive resulted in: " + e, e);
         }
         if (stop) {
             stopBroker();

Added: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java?rev=1508602&view=auto
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java (added)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java Tue Jul 30 20:18:07 2013
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.io.IOException;
+
+/**
+ * An exception thrown when the broker or transport will be shutdown in response
+ * to an error, eg. from IOExceptionHandler.
+ * The transport will die (socket.close()) so we don't want to propagate exceptions
+ * to the client; failover transport will retry the operation.
+ *
+ */
+public class SuppressReplyException extends RuntimeException {
+    public SuppressReplyException(String reason, IOException cause) {
+        super(reason, cause);
+    }
+}

Propchange: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/SuppressReplyException.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1508602&r1=1508601&r2=1508602&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java Tue Jul 30 20:18:07 2013
@@ -301,6 +301,11 @@ public class TransportConnection impleme
                         + " command: " + command + ", exception: " + e, e);
             }
 
+            if (e instanceof SuppressReplyException || (e.getCause() instanceof SuppressReplyException)) {
+                LOG.info("Suppressing reply to: " + command + " on: " + e + ", cause: " + e.getCause());
+                responseRequired = false;
+            }
+
             if (responseRequired) {
                 response = new ExceptionResponse(e);
             } else {

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/LocalTransaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/LocalTransaction.java?rev=1508602&r1=1508601&r2=1508602&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/LocalTransaction.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/LocalTransaction.java Tue Jul 30 20:18:07 2013
@@ -58,7 +58,7 @@ public class LocalTransaction extends Tr
             LOG.warn("COMMIT FAILED: ", e);
             rollback();
             // Let them know we rolled back.
-            XAException xae = new XAException("COMMIT FAILED: Transaction rolled back.");
+            XAException xae = new XAException("COMMIT FAILED: Transaction rolled back");
             xae.errorCode = XAException.XA_RBOTHER;
             xae.initCause(e);
             throw xae;
@@ -66,15 +66,13 @@ public class LocalTransaction extends Tr
 
         setState(Transaction.FINISHED_STATE);
         context.getTransactions().remove(xid);
-        // Sync on transaction store to avoid out of order messages in the cursor
-        // https://issues.apache.org/activemq/browse/AMQ-2594
         try {
-            transactionStore.commit(getTransactionId(), false,preCommitTask, postCommitTask);
+            transactionStore.commit(getTransactionId(), false, preCommitTask, postCommitTask);
             this.waitPostCommitDone(postCommitTask);
         } catch (Throwable t) {
             LOG.warn("Store COMMIT FAILED: ", t);
             rollback();
-            XAException xae = new XAException("STORE COMMIT FAILED: Transaction rolled back.");
+            XAException xae = new XAException("STORE COMMIT FAILED: Transaction rolled back");
             xae.errorCode = XAException.XA_RBOTHER;
             xae.initCause(t);
             throw xae;
@@ -109,7 +107,7 @@ public class LocalTransaction extends Tr
 
     @Override
     public int prepare() throws XAException {
-        XAException xae = new XAException("Prepare not implemented on Local Transactions.");
+        XAException xae = new XAException("Prepare not implemented on Local Transactions");
         xae.errorCode = XAException.XAER_RMERR;
         throw xae;
     }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/XATransaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/XATransaction.java?rev=1508602&r1=1508601&r2=1508602&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/XATransaction.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/XATransaction.java Tue Jul 30 20:18:07 2013
@@ -89,7 +89,7 @@ public class XATransaction extends Trans
         } catch (Throwable t) {
             LOG.warn("Store COMMIT FAILED: ", t);
             rollback();
-            XAException xae = new XAException("STORE COMMIT FAILED: Transaction rolled back.");
+            XAException xae = new XAException("STORE COMMIT FAILED: Transaction rolled back");
             xae.errorCode = XAException.XA_RBOTHER;
             xae.initCause(t);
             throw xae;
@@ -104,7 +104,7 @@ public class XATransaction extends Trans
 
     private void checkForPreparedState(boolean onePhase) throws XAException {
         if (!onePhase) {
-            XAException xae = new XAException("Cannot do 2 phase commit if the transaction has not been prepared.");
+            XAException xae = new XAException("Cannot do 2 phase commit if the transaction has not been prepared");
             xae.errorCode = XAException.XAER_PROTO;
             throw xae;
         }
@@ -118,7 +118,7 @@ public class XATransaction extends Trans
         } catch (Throwable e) {
             LOG.warn("PRE-PREPARE FAILED: ", e);
             rollback();
-            XAException xae = new XAException("PRE-PREPARE FAILED: Transaction rolled back.");
+            XAException xae = new XAException("PRE-PREPARE FAILED: Transaction rolled back");
             xae.errorCode = XAException.XA_RBOTHER;
             xae.initCause(e);
             throw xae;

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java?rev=1508602&r1=1508601&r2=1508602&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java Tue Jul 30 20:18:07 2013
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.SuppressReplyException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +41,7 @@ import org.slf4j.LoggerFactory;
     private String noSpaceMessage = "space";
     private String sqlExceptionMessage = ""; // match all
     private long resumeCheckSleepPeriod = 5*1000;
-    private AtomicBoolean stopStartInProgress = new AtomicBoolean(false);
+    private AtomicBoolean handlingException = new AtomicBoolean(false);
 
     public void handle(IOException exception) {
         if (ignoreAllErrors) {
@@ -73,59 +74,69 @@ import org.slf4j.LoggerFactory;
         }
 
         if (stopStartConnectors) {
-            if (!stopStartInProgress.compareAndSet(false, true)) {
-                // we are already working on it
-                return;
-            }
-            LOG.info("Initiating stop/restart of broker transport due to IO exception, " + exception, exception);
+            if (handlingException.compareAndSet(false, true)) {
+                LOG.info("Initiating stop/restart of transports on " + broker + " due to IO exception, " + exception, exception);
+
+                new Thread("IOExceptionHandler: stop transports") {
+                    public void run() {
+                        try {
+                            ServiceStopper stopper = new ServiceStopper();
+                            broker.stopAllConnectors(stopper);
+                            LOG.info("Successfully stopped transports on " + broker);
+                        } catch (Exception e) {
+                            LOG.warn("Failure occurred while stopping broker connectors", e);
+                        } finally {
+                            // resume again
+                            new Thread("IOExceptionHandler: restart transports") {
+                                public void run() {
+                                    try {
+                                        while (hasLockOwnership() && isPersistenceAdapterDown()) {
+                                            LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
+                                            TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
+                                        }
+                                        broker.startAllConnectors();
+                                        LOG.info("Successfully restarted transports on " + broker);
+                                    } catch (Exception e) {
+                                        LOG.warn("Stopping " + broker + " due to failure while restarting transports", e);
+                                        stopBroker(e);
+                                    } finally {
+                                        handlingException.compareAndSet(true, false);
+                                    }
+                                }
+
+                                private boolean isPersistenceAdapterDown() {
+                                    boolean checkpointSuccess = false;
+                                    try {
+                                        broker.getPersistenceAdapter().checkpoint(true);
+                                        checkpointSuccess = true;
+                                    } catch (Throwable ignored) {
+                                    }
+                                    return !checkpointSuccess;
+                                }
+                            }.start();
 
-            new Thread("stop transport connectors on IO exception") {
-                public void run() {
-                    try {
-                        ServiceStopper stopper = new ServiceStopper();
-                        broker.stopAllConnectors(stopper);
-                    } catch (Exception e) {
-                        LOG.warn("Failure occurred while stopping broker connectors", e);
-                    }
-                }
-            }.start();
 
-            // resume again
-            new Thread("restart transport connectors post IO exception") {
-                public void run() {
-                    try {
-                        while (hasLockOwnership() && isPersistenceAdapterDown()) {
-                            LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
-                            TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
                         }
-                        broker.startAllConnectors();
-                    } catch (Exception e) {
-                        LOG.warn("Stopping broker due to failure while restarting broker connectors", e);
-                        stopBroker(e);
-                    } finally {
-                        stopStartInProgress.compareAndSet(true, false);
                     }
-                }
+                }.start();
+            }
 
-                private boolean isPersistenceAdapterDown() {
-                    boolean checkpointSuccess = false;
-                    try {
-                        broker.getPersistenceAdapter().checkpoint(true);
-                        checkpointSuccess = true;
-                    } catch (Throwable ignored) {}
-                    return !checkpointSuccess;
-                }
-            }.start();
+            throw new SuppressReplyException("Stop/RestartTransportsInitiated", exception);
+        }
 
-            return;
+        if (handlingException.compareAndSet(false, true)) {
+            stopBroker(exception);
         }
 
-        stopBroker(exception);
+        // we don't want to propagate the exception back to the client
+        // They will see a delay till they see a disconnect via socket.close
+        // at which point failover: can kick in.
+        throw new SuppressReplyException("ShutdownBrokerInitiated", exception);
     }
 
     private void stopBroker(Exception exception) {
-        LOG.info("Stopping the broker due to exception, " + exception, exception);
-        new Thread("Stopping the broker due to IO exception") {
+        LOG.info("Stopping " + broker + " due to exception, " + exception, exception);
+        new Thread("IOExceptionHandler: stopping " + broker) {
             public void run() {
                 try {
                     if( broker.isRestartAllowed() ) {

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1508602&r1=1508601&r2=1508602&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Tue Jul 30 20:18:07 2013
@@ -1162,6 +1162,7 @@ public class ActiveMQMessageConsumer imp
     }
 
     public void rollback() throws JMSException {
+        clearDispatchList();
         synchronized (unconsumedMessages.getMutex()) {
             if (optimizeAcknowledge) {
                 // remove messages read but not acked at the broker yet through

Modified: activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java?rev=1508602&r1=1508601&r2=1508602&view=diff
==============================================================================
--- activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java (original)
+++ activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java Tue Jul 30 20:18:07 2013
@@ -102,12 +102,12 @@ public class DefaultDatabaseLocker exten
                         try {
                             connection.rollback();
                         } catch (SQLException e1) {
-                            LOG.error("Caught exception during rollback on connection: " + e1, e1);
+                            LOG.debug("Caught exception during rollback on connection: " + e1, e1);
                         }
                         try {
                             connection.close();
                         } catch (SQLException e1) {
-                            LOG.error("Caught exception while closing connection: " + e1, e1);
+                            LOG.debug("Caught exception while closing connection: " + e1, e1);
                         }
                         
                         connection = null;

Modified: activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java?rev=1508602&r1=1508601&r2=1508602&view=diff
==============================================================================
--- activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java (original)
+++ activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java Tue Jul 30 20:18:07 2013
@@ -19,6 +19,7 @@ package org.apache.activemq.store.jdbc;
 import java.io.IOException;
 
 import org.apache.activemq.broker.Locker;
+import org.apache.activemq.broker.SuppressReplyException;
 import org.apache.activemq.util.DefaultIOExceptionHandler;
 
 /**
@@ -31,6 +32,7 @@ public class JDBCIOExceptionHandler exte
         setStopStartConnectors(true);
     }
 
+    // fail only when we get an authoritative answer from the db w/o exceptions
     @Override
     protected boolean hasLockOwnership() throws IOException {
         boolean hasLock = true;
@@ -42,11 +44,12 @@ public class JDBCIOExceptionHandler exte
                     if (!locker.keepAlive()) {
                         hasLock = false;
                     }
+                } catch (SuppressReplyException ignoreWhileHandlingInProgress) {
                 } catch (IOException ignored) {
                 }
 
                 if (!hasLock) {
-                    throw new IOException("PersistenceAdapter lock no longer valid using: " + locker);
+                    throw new IOException("Lock keepAlive failed, no longer lock owner with: " + locker);
                 }
             }
         }

Modified: activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1508602&r1=1508601&r2=1508602&view=diff
==============================================================================
--- activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Tue Jul 30 20:18:07 2013
@@ -369,7 +369,7 @@ public class JDBCPersistenceAdapter exte
             clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
                 @Override
                 public Thread newThread(Runnable runnable) {
-                    Thread thread = new Thread(runnable, "ActiveMQ Cleanup Timer");
+                    Thread thread = new Thread(runnable, "ActiveMQ JDBC PA Scheduled Task");
                     thread.setDaemon(true);
                     return thread;
                 }

Modified: activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java?rev=1508602&r1=1508601&r2=1508602&view=diff
==============================================================================
--- activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java (original)
+++ activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java Tue Jul 30 20:18:07 2013
@@ -74,6 +74,7 @@ public class LeaseDatabaseLocker extends
         String sql = statements.getLeaseObtainStatement();
         LOG.debug(getLeaseHolderId() + " locking Query is "+sql);
 
+        long now = 0l;
         while (!stopping) {
             Connection connection = null;
             PreparedStatement statement = null;
@@ -84,7 +85,7 @@ public class LeaseDatabaseLocker extends
                 statement = connection.prepareStatement(sql);
                 setQueryTimeout(statement);
 
-                final long now = System.currentTimeMillis() + diffFromCurrentTime;
+                now = System.currentTimeMillis() + diffFromCurrentTime;
                 statement.setString(1, getLeaseHolderId());
                 statement.setLong(2, now + lockAcquireSleepInterval);
                 statement.setLong(3, now);
@@ -113,7 +114,7 @@ public class LeaseDatabaseLocker extends
             throw new RuntimeException(getLeaseHolderId() + " failing lease acquire due to stop");
         }
 
-        LOG.info(getLeaseHolderId() + ", becoming the master on dataSource: " + dataSource);
+        LOG.info(getLeaseHolderId() + ", becoming master with lease expiry " + new Date(now) + " on dataSource: " + dataSource);
     }
 
     private void setQueryTimeout(PreparedStatement statement) throws SQLException {
@@ -187,8 +188,12 @@ public class LeaseDatabaseLocker extends
     }
 
     public void doStop(ServiceStopper stopper) throws Exception {
-        releaseLease();
         stopping = true;
+        if (persistenceAdapter.getBrokerService() != null && persistenceAdapter.getBrokerService().isRestartRequested()) {
+            // keep our lease for restart
+            return;
+        }
+        releaseLease();
     }
 
     private void releaseLease() {
@@ -232,6 +237,10 @@ public class LeaseDatabaseLocker extends
             statement.setString(3, getLeaseHolderId());
 
             result = (statement.executeUpdate() == 1);
+
+            if (!result) {
+                reportLeasOwnerShipAndDuration(connection);
+            }
         } catch (Exception e) {
             LOG.warn(getLeaseHolderId() + ", failed to update lease: " + e, e);
             IOException ioe = IOExceptionSupport.create(e);
@@ -280,4 +289,9 @@ public class LeaseDatabaseLocker extends
     public void setMaxAllowableDiffFromDBTime(int maxAllowableDiffFromDBTime) {
         this.maxAllowableDiffFromDBTime = maxAllowableDiffFromDBTime;
     }
+
+    @Override
+    public String toString() {
+        return "LeaseDatabaseLocker owner:" + leaseHolderId + ",duration:" + lockAcquireSleepInterval + ",renew:" + lockAcquireSleepInterval;
+    }
 }

Modified: activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java?rev=1508602&r1=1508601&r2=1508602&view=diff
==============================================================================
--- activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java (original)
+++ activemq/trunk/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java Tue Jul 30 20:18:07 2013
@@ -138,7 +138,9 @@ public class TransactionContext {
 
             } catch (SQLException e) {
                 JDBCPersistenceAdapter.log("Error while closing connection: ", e);
-                throw IOExceptionSupport.create(e);
+                IOException ioe = IOExceptionSupport.create(e);
+                persistenceAdapter.getBrokerService().handleIOException(ioe);
+                throw ioe;
             } finally {
                 try {
                     if (connection != null) {

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java?rev=1508602&r1=1508601&r2=1508602&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java Tue Jul 30 20:18:07 2013
@@ -30,7 +30,9 @@ public class DbRestartJDBCQueueMasterSla
 
     @Override
     protected void configureBroker(BrokerService brokerService) {
-        brokerService.setIoExceptionHandler(new JDBCIOExceptionHandler());
+        // master and slave survive db restart and retain master/slave status
+        JDBCIOExceptionHandler stopConnectors = new JDBCIOExceptionHandler();
+        brokerService.setIoExceptionHandler(stopConnectors);
     }
 
     @Override
@@ -51,6 +53,9 @@ public class DbRestartJDBCQueueMasterSla
     protected void verifyExpectedBroker(int inflightMessageCount) {
         if (inflightMessageCount == 0  || (inflightMessageCount == failureCount + 10 && restartDelay <= 500)) {
             assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
+        } else {
+            // lease expired while DB was offline, either or master/slave can grab it so assert is not deterministic
+            // but we still need to validate sent == received
         }
     }
 

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java?rev=1508602&r1=1508601&r2=1508602&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java Tue Jul 30 20:18:07 2013
@@ -20,8 +20,11 @@ import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.jdbc.JDBCIOExceptionHandler;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,6 +39,17 @@ public class DbRestartJDBCQueueMasterSla
         persistenceAdapter.setLockKeepAlivePeriod(getLockKeepAlivePeriod());
     }
 
+    @Override
+    protected void configureBroker(BrokerService brokerService) {
+        //let the brokers die on exception and master should have lease on restart
+        // which will delay slave start till it expires
+        JDBCIOExceptionHandler trapSQLExceptions = new JDBCIOExceptionHandler();
+        trapSQLExceptions.setIgnoreSQLExceptions(false);
+        trapSQLExceptions.setStopStartConnectors(false);
+        trapSQLExceptions.setResumeCheckSleepPeriod(500l);
+        brokerService.setIoExceptionHandler(trapSQLExceptions);
+    }
+
     private long getLockKeepAlivePeriod() {
         return 500;
     }
@@ -43,22 +57,4 @@ public class DbRestartJDBCQueueMasterSla
     private long getLockAcquireSleepInterval() {
         return 2000;
     }
-
-    @Override
-    protected void delayTillRestartRequired() {
-
-        LOG.info("delay for less than lease quantum. While Db is offline, master should stay alive");
-        try {
-            TimeUnit.SECONDS.sleep(1);
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    protected void verifyExpectedBroker(int inflightMessageCount) {
-        if (inflightMessageCount == 0 || inflightMessageCount == failureCount + 10) {
-            assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
-        }
-    }
 }

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java?rev=1508602&r1=1508601&r2=1508602&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java Tue Jul 30 20:18:07 2013
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.broker.ft;
 
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.List;
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -25,6 +28,9 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TransactionRolledBackException;
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.derby.jdbc.EmbeddedDataSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,7 +62,7 @@ public class DbRestartJDBCQueueMasterSla
         if (inflightMessageCount == 0) {
             assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
         } else if (inflightMessageCount == failureCount + 10) {
-            assertEquals("connected to slave", slave.get().getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
+            assertEquals("connected to slave, count:" + inflightMessageCount, slave.get().getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
         }
     }
 
@@ -67,23 +73,7 @@ public class DbRestartJDBCQueueMasterSla
 
     protected void sendToProducer(MessageProducer producer,
             Destination producerDestination, Message message) throws JMSException {
-        {   
-            // do some retries as db failures filter back to the client until broker sees
-            // db lock failure and shuts down
-            boolean sent = false;
-            do {
-                try { 
-                    producer.send(producerDestination, message);
-                    sent = true;
-                } catch (JMSException e) {
-                    LOG.info("Exception on producer send for: " + message, e);
-                    try { 
-                        Thread.sleep(2000);
-                    } catch (InterruptedException ignored) {
-                    }
-                }
-            } while(!sent);
-        }
+        producer.send(producerDestination, message);
     }
 
     @Override
@@ -100,25 +90,58 @@ public class DbRestartJDBCQueueMasterSla
             LOG.info("Failed to commit message receipt: " + message, e);
             try {
                 receiveSession.rollback();
-            } catch (JMSException ignored) {}
+            } catch (JMSException ignored) {
+            }
 
-            if (e.getCause() instanceof TransactionRolledBackException) {
-                TransactionRolledBackException transactionRolledBackException = (TransactionRolledBackException)e.getCause();
+            if (e instanceof TransactionRolledBackException) {
+                TransactionRolledBackException transactionRolledBackException = (TransactionRolledBackException) e;
                 if (transactionRolledBackException.getMessage().indexOf("in doubt") != -1) {
-                    // failover chucked bc there is a missing reply to a commit. the ack may have got there and the reply
-                    // was lost or the ack may be lost.
-                    // so we may not get a resend.
+                    // failover chucked bc there is a missing reply to a commit.
+                    // failover is involved b/c the store exception is handled broker side and the client just
+                    // sees a disconnect (socket.close()).
+                    // If the client needs to be aware of the failure then it should not use IOExceptionHandler
+                    // so that the exception will propagate back
+
+                    // for this test case:
+                    // the commit may have got there and the reply is lost "or" the commit may be lost.
+                    // so we may or may not get a resend.
                     //
-                    // REVISIT: A JDBC store IO exception should not cause the connection to drop, so it needs to be wrapped
-                    // possibly by the IOExceptionHandler
-                    // The commit/close wrappers in jdbc TransactionContext need to delegate to the IOExceptionHandler
-
-                    // this would leave the application aware of the store failure, and possible aware of whether the commit
-                    // was a success, rather than going into failover-retries as it does now.
-
+                    // At the application level we need to determine if the message is there or not which is not trivial
+                    // for this test we assert received == sent
+                    // so we need to know whether the message will be replayed.
+                    // we can ask the store b/c we know it is jdbc - guess we could go through a destination
+                    // message store interface also or use jmx
+                    java.sql.Connection dbConnection = null;
+                    try {
+                        ActiveMQMessage mqMessage = (ActiveMQMessage) message;
+                        dbConnection = sharedDs.getConnection();
+                        PreparedStatement s = dbConnection.prepareStatement(((JDBCPersistenceAdapter) connectedToBroker().getPersistenceAdapter()).getStatements().getFindMessageStatement());
+                        s.setString(1, mqMessage.getMessageId().getProducerId().toString());
+                        s.setLong(2, mqMessage.getMessageId().getProducerSequenceId());
+                        ResultSet rs = s.executeQuery();
+
+                        if (!rs.next()) {
+                            // message is gone, so lets count it as consumed
+                            LOG.info("On TransactionRolledBackException we know that the ack/commit got there b/c message is gone so we  count it: " + mqMessage);
+                            super.consumeMessage(message, messageList);
+                        } else {
+                            LOG.info("On TransactionRolledBackException we know that the ack/commit was lost so we expect a replay of: " + mqMessage);
+                        }
+                    } catch (Exception dbe) {
+                        dbe.printStackTrace();
+                    } finally {
+                        try {
+                            dbConnection.close();
+                        } catch (SQLException e1) {
+                            e1.printStackTrace();
+                        }
+                    }
                 }
-
             }
         }
     }
+
+    private BrokerService connectedToBroker() {
+        return ((ActiveMQConnection)receiveConnection).getBrokerInfo().getBrokerName().equals("master") ? master : slave.get();
+    }
 }

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java?rev=1508602&r1=1508601&r2=1508602&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java Tue Jul 30 20:18:07 2013
@@ -76,7 +76,7 @@ public class DbRestartJDBCQueueTest exte
 
     protected void tearDown() throws  Exception {
        super.tearDown();
-        broker.stop();
+       broker.stop();
     }
 
 

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java?rev=1508602&r1=1508601&r2=1508602&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java Tue Jul 30 20:18:07 2013
@@ -30,6 +30,7 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
 import org.apache.activemq.util.IOHelper;
 import org.apache.derby.jdbc.EmbeddedDataSource;
 
@@ -59,7 +60,11 @@ public class JDBCQueueMasterSlaveTest ex
         master.start();
     }
 
-    protected void configureBroker(BrokerService master) {
+    protected void configureBroker(BrokerService brokerService) {
+        DefaultIOExceptionHandler stopBrokerOnStoreException = new DefaultIOExceptionHandler();
+        // we want any store io exception to stop the broker
+        stopBrokerOnStoreException.setIgnoreSQLExceptions(false);
+        brokerService.setIoExceptionHandler(stopBrokerOnStoreException);
     }
 
     protected void createSlave() throws Exception {

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java?rev=1508602&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java Tue Jul 30 20:18:07 2013
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.jdbc.DataSourceServiceSupport;
+import org.apache.activemq.store.jdbc.JDBCIOExceptionHandler;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
+import org.apache.activemq.store.jdbc.TransactionContext;
+import org.apache.activemq.util.IOHelper;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Testing how the broker reacts when a SQL Exception is thrown from
+ * org.apache.activemq.store.jdbc.TransactionContext.executeBatch().
+ * <p/>
+ * see https://issues.apache.org/jira/browse/AMQ-4636
+ */
+
+public class AMQ4636Test extends TestCase {
+
+    private static final String MY_TEST_TOPIC = "MY_TEST_TOPIC";
+    private static final Logger LOG = LoggerFactory
+            .getLogger(AMQ4636Test.class);
+    private String transportUrl = "tcp://0.0.0.0:0";
+    private BrokerService broker;
+    private TestTransactionContext testTransactionContext;
+
+    protected BrokerService createBroker(boolean withJMX) throws Exception {
+        BrokerService broker = new BrokerService();
+
+        broker.setUseJmx(withJMX);
+
+        EmbeddedDataSource embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory());
+        embeddedDataSource.setCreateDatabase("create");
+
+        //wire in a TestTransactionContext (wrapper to TransactionContext) that has an executeBatch()
+        // method that can be configured to throw a SQL exception on demand
+        JDBCPersistenceAdapter jdbc = new TestJDBCPersistenceAdapter();
+        jdbc.setDataSource(embeddedDataSource);
+        testTransactionContext = new TestTransactionContext(jdbc);
+
+        jdbc.setLockKeepAlivePeriod(1000l);
+        LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker();
+        leaseDatabaseLocker.setLockAcquireSleepInterval(2000l);
+        jdbc.setLocker(leaseDatabaseLocker);
+
+        broker.setPersistenceAdapter(jdbc);
+
+        broker.setIoExceptionHandler(new JDBCIOExceptionHandler());
+
+        transportUrl = broker.addConnector(transportUrl).getPublishableConnectString();
+        return broker;
+    }
+
+    /**
+     * adding a TestTransactionContext (wrapper to TransactionContext) so an SQLException is triggered
+     * during TransactionContext.executeBatch() when called in the broker.
+     * <p/>
+     * Expectation: SQLException triggers a connection shutdown and failover should kick and try to redeliver the
+     * message. SQLException should NOT be returned to client
+     */
+
+    public void testProducerWithDBShutdown() throws Exception {
+
+        broker = this.createBroker(false);
+        broker.deleteAllMessages();
+        broker.start();
+        broker.waitUntilStarted();
+
+        LOG.info("***Broker started...");
+
+        // failover but timeout in 1 seconds so the test does not hang
+        String failoverTransportURL = "failover:(" + transportUrl
+                + ")?timeout=1000";
+
+        this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL);
+
+        this.sendMessage(MY_TEST_TOPIC, failoverTransportURL);
+
+    }
+
+    public void createDurableConsumer(String topic,
+                                      String transportURL) throws JMSException {
+        Connection connection = null;
+        LOG.info("*** createDurableConsumer() called ...");
+
+        try {
+
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                    transportURL);
+
+            connection = factory.createConnection();
+            connection.setClientID("myconn1");
+            Session session = connection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            Destination destination = session.createTopic(topic);
+
+            TopicSubscriber topicSubscriber = session.createDurableSubscriber(
+                    (Topic) destination, "MySub1");
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+
+    public void sendMessage(String topic, String transportURL)
+            throws JMSException {
+        Connection connection = null;
+
+        try {
+
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                    transportURL);
+
+            connection = factory.createConnection();
+            Session session = connection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            Destination destination = session.createTopic(topic);
+            MessageProducer producer = session.createProducer(destination);
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+            Message m = session.createTextMessage("testMessage");
+            LOG.info("*** send message to broker...");
+
+            // trigger SQL exception in transactionContext
+            testTransactionContext.throwSQLException = true;
+            producer.send(m);
+
+            LOG.info("*** Finished send message to broker");
+
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+
+	/*
+     * Mock classes used for testing
+	 */
+
+    public class TestJDBCPersistenceAdapter extends JDBCPersistenceAdapter {
+
+        public TransactionContext getTransactionContext() throws IOException {
+            return testTransactionContext;
+        }
+    }
+
+    public class TestTransactionContext extends TransactionContext {
+
+        public boolean throwSQLException;
+
+        public TestTransactionContext(
+                JDBCPersistenceAdapter jdbcPersistenceAdapter)
+                throws IOException {
+            super(jdbcPersistenceAdapter);
+        }
+
+        public void executeBatch() throws SQLException {
+            if (throwSQLException) {
+                // only throw exception once
+                throwSQLException = false;
+                throw new SQLException("TEST SQL EXCEPTION");
+            }
+            super.executeBatch();
+        }
+
+    }
+
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java?rev=1508602&r1=1508601&r2=1508602&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java Tue Jul 30 20:18:07 2013
@@ -68,7 +68,9 @@ public class JDBCIOExceptionHandlerTest 
         jdbc.setLocker(leaseDatabaseLocker);
 
         broker.setPersistenceAdapter(jdbc);
-        broker.setIoExceptionHandler(new JDBCIOExceptionHandler());
+        JDBCIOExceptionHandler jdbcioExceptionHandler = new JDBCIOExceptionHandler();
+        jdbcioExceptionHandler.setResumeCheckSleepPeriod(1000l);
+        broker.setIoExceptionHandler(jdbcioExceptionHandler);
         String connectionUri = broker.addConnector(TRANSPORT_URL).getPublishableConnectString();
 
         factory = new ActiveMQConnectionFactory(connectionUri);