You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/06/13 22:45:10 UTC

svn commit: r1350006 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/activemq/util/ test/java/org/apache/activemq/broker/ft/ test/java/org/apache/activemq/store/jdbc/

Author: gtully
Date: Wed Jun 13 20:45:10 2012
New Revision: 1350006

URL: http://svn.apache.org/viewvc?rev=1350006&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3654 - JDBC Master/Slave : Slave cannot acquire lock when the master loose database connection. Adding an leasebasedlocker that can survive db disconnect, and jdbcioexceptionhandler that will pause/resume transports on db outage

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java   (with props)
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1350006&r1=1350005&r2=1350006&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Wed Jun 13 20:45:10 2012
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.IllegalStateException;
@@ -142,7 +143,7 @@ public class ActiveMQMessageConsumer imp
     private ExecutorService executorService;
     private MessageTransformer transformer;
     private boolean clearDispatchList;
-    boolean inProgressClearRequiredFlag;
+    AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0);
 
     private MessageAck pendingAck;
     private long lastDeliveredSequenceId;
@@ -685,15 +686,15 @@ public class ActiveMQMessageConsumer imp
         }    }
 
     void inProgressClearRequired() {
-        inProgressClearRequiredFlag = true;
+        inProgressClearRequiredFlag.incrementAndGet();
         // deal with delivered messages async to avoid lock contention with in progress acks
         clearDispatchList = true;
     }
 
     void clearMessagesInProgress() {
-        if (inProgressClearRequiredFlag) {
+        if (inProgressClearRequiredFlag.get() > 0) {
             synchronized (unconsumedMessages.getMutex()) {
-                if (inProgressClearRequiredFlag) {
+                if (inProgressClearRequiredFlag.get() > 0) {
                     if (LOG.isDebugEnabled()) {
                         LOG.debug(getConsumerId() + " clearing unconsumed list (" + unconsumedMessages.size() + ") on transport interrupt");
                     }
@@ -706,7 +707,7 @@ public class ActiveMQMessageConsumer imp
                     }
                     // allow dispatch on this connection to resume
                     session.connection.transportInterruptionProcessingComplete();
-                    inProgressClearRequiredFlag = false;
+                    inProgressClearRequiredFlag.decrementAndGet();
                 }
             }
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java?rev=1350006&r1=1350005&r2=1350006&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java Wed Jun 13 20:45:10 2012
@@ -37,8 +37,9 @@ public interface DatabaseLocker extends 
     /**
      * Used by a timer to keep alive the lock.
      * If the method returns false the broker should be terminated
+     * if an exception is thrown, the lock state cannot be determined
      */
-    boolean keepAlive();
+    boolean keepAlive() throws IOException;
 
     /**
      * set the delay interval in milliseconds between lock acquire attempts

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java?rev=1350006&r1=1350005&r2=1350006&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java Wed Jun 13 20:45:10 2012
@@ -170,7 +170,7 @@ public class DefaultDatabaseLocker imple
         }
     }
 
-    public boolean keepAlive() {
+    public boolean keepAlive() throws IOException {
         boolean result = false;
         try {
             lockUpdateStatement = connection.prepareStatement(statements.getLockUpdateStatement());

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java?rev=1350006&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java Wed Jun 13 20:45:10 2012
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jdbc;
+
+import java.io.IOException;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+
+/**
+ * @org.apache.xbean.XBean
+ */
+public class JDBCIOExceptionHandler extends DefaultIOExceptionHandler {
+
+    public JDBCIOExceptionHandler() {
+        setIgnoreSQLExceptions(false);
+        setStopStartConnectors(true);
+    }
+
+    @Override
+    protected boolean hasLockOwnership() throws IOException {
+        boolean hasLock = true;
+        if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
+            JDBCPersistenceAdapter jdbcPersistenceAdapter = (JDBCPersistenceAdapter) broker.getPersistenceAdapter();
+            DatabaseLocker locker = jdbcPersistenceAdapter.getDatabaseLocker();
+            if (locker != null) {
+                try {
+                    if (!locker.keepAlive()) {
+                        hasLock = false;
+                    }
+                } catch (IOException ignored) {
+                }
+
+                if (!hasLock) {
+                    throw new IOException("PersistenceAdapter lock no longer valid using: " + locker);
+                }
+            }
+        }
+        return hasLock;
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1350006&r1=1350005&r2=1350006&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Wed Jun 13 20:45:10 2012
@@ -623,7 +623,7 @@ public class JDBCPersistenceAdapter exte
                 }
             }
         } catch (IOException e) {
-            LOG.error("Failed to get database when trying keepalive: " + e, e);
+            LOG.warn("databaselocker keepalive resulted in: " + e, e);
         }
         if (stop) {
             stopBroker();
@@ -632,7 +632,7 @@ public class JDBCPersistenceAdapter exte
 
     protected void stopBroker() {
         // we can no longer keep the lock so lets fail
-        LOG.info("No longer able to keep the exclusive lock so giving up being a master");
+        LOG.info(brokerService.getBrokerName() + ", no longer able to keep the exclusive lock so giving up being a master");
         try {
             brokerService.stop();
         } catch (Exception e) {

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java?rev=1350006&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java Wed Jun 13 20:45:10 2012
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents an exclusive lease on a database to avoid multiple brokers running
+ * against the same logical database.
+ * 
+ * @org.apache.xbean.XBean element="lease-database-locker"
+ * 
+ */
+public class LeaseDatabaseLocker implements DatabaseLocker {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLocker.class);
+    public static final long DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL = 5000;
+    protected DataSource dataSource;
+    protected Statements statements;
+    protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
+
+    protected boolean stopping;
+    protected int maxAllowableDiffFromDBTime = 2000;
+    protected long diffFromCurrentTime = Long.MAX_VALUE;
+    protected String leaseHolderId;
+    protected int queryTimeout = -1;
+    JDBCPersistenceAdapter persistenceAdapter;
+
+
+    public void setPersistenceAdapter(JDBCPersistenceAdapter adapter) throws IOException {
+        this.dataSource = adapter.getLockDataSource();
+        this.statements = adapter.getStatements();
+        this.persistenceAdapter = adapter;
+    }
+    
+    public void start() throws Exception {
+        stopping = false;
+
+        LOG.info(getLeaseHolderId() + " attempting to acquire the exclusive lease to become the Master broker");
+        String sql = statements.getLeaseObtainStatement();
+        LOG.debug(getLeaseHolderId() + " locking Query is "+sql);
+
+        while (!stopping) {
+            Connection connection = null;
+            PreparedStatement statement = null;
+            try {
+                connection = getConnection();
+                initTimeDiff(connection);
+
+                statement = connection.prepareStatement(sql);
+                setQueryTimeout(statement);
+
+                final long now = System.currentTimeMillis() + diffFromCurrentTime;
+                statement.setString(1, getLeaseHolderId());
+                statement.setLong(2, now + lockAcquireSleepInterval);
+                statement.setLong(3, now);
+
+                int result = statement.executeUpdate();
+                if (result == 1) {
+                    // we got the lease, verify we still have it
+                    if (keepAlive()) {
+                        break;
+                    }
+                }
+
+                reportLeasOwnerShipAndDuration(connection);
+
+            } catch (Exception e) {
+                LOG.debug(getLeaseHolderId() + " lease aquire failure: "+ e, e);
+            } finally {
+                close(statement);
+                close(connection);
+            }
+
+            LOG.info(getLeaseHolderId() + " failed to acquire lease.  Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again...");
+            TimeUnit.MILLISECONDS.sleep(lockAcquireSleepInterval);
+        }
+        if (stopping) {
+            throw new RuntimeException(getLeaseHolderId() + " failing lease aquire due to stop");
+        }
+
+        LOG.info(getLeaseHolderId() + ", becoming the master on dataSource: " + dataSource);
+    }
+
+    private void setQueryTimeout(PreparedStatement statement) throws SQLException {
+        if (queryTimeout > 0) {
+            statement.setQueryTimeout(queryTimeout);
+        }
+    }
+
+    private Connection getConnection() throws SQLException {
+        return dataSource.getConnection();
+    }
+
+    private void close(Connection connection) {
+        if (null != connection) {
+            try {
+                connection.close();
+            } catch (SQLException e1) {
+                LOG.debug(getLeaseHolderId() + " caught exception while closing connection: " + e1, e1);
+            }
+        }
+    }
+
+    private void close(PreparedStatement statement) {
+        if (null != statement) {
+            try {
+                statement.close();
+            } catch (SQLException e1) {
+                LOG.debug(getLeaseHolderId() + ", caught while closing statement: " + e1, e1);
+            }
+        }
+    }
+
+    private void reportLeasOwnerShipAndDuration(Connection connection) throws SQLException {
+        PreparedStatement statement = null;
+        try {
+            statement = connection.prepareStatement(statements.getLeaseOwnerStatement());
+            ResultSet resultSet = statement.executeQuery();
+            while (resultSet.next()) {
+                LOG.info(getLeaseHolderId() + " Lease held by " + resultSet.getString(1) + " till " + new Date(resultSet.getLong(2)));
+            }
+        } finally {
+            close(statement);
+        }
+    }
+
+    protected long initTimeDiff(Connection connection) throws SQLException {
+        if (maxAllowableDiffFromDBTime > 0 && Long.MAX_VALUE == diffFromCurrentTime) {
+            diffFromCurrentTime = determineTimeDifference(connection);
+        }
+        return diffFromCurrentTime;
+    }
+
+    private long determineTimeDifference(Connection connection) throws SQLException {
+        PreparedStatement statement = connection.prepareStatement(statements.getCurrentDateTime());
+        ResultSet resultSet = statement.executeQuery();
+        long result = 0l;
+        if (resultSet.next()) {
+            Timestamp timestamp = resultSet.getTimestamp(1);
+            long diff = System.currentTimeMillis() - timestamp.getTime();
+            LOG.info(getLeaseHolderId() + " diff from db: " + diff + ", db time: " + timestamp);
+            if (diff > maxAllowableDiffFromDBTime || diff < -maxAllowableDiffFromDBTime) {
+                // off by more than maxAllowableDiffFromDBTime so lets adjust
+                result = diff;
+            }
+        }
+        return result;
+    }
+
+    public void stop() throws Exception {
+        releaseLease();
+        stopping = true;
+    }
+
+    private void releaseLease() {
+        Connection connection = null;
+        PreparedStatement statement = null;
+        try {
+            connection = getConnection();
+            statement = connection.prepareStatement(statements.getLeaseUpdateStatement());
+            statement.setString(1, null);
+            statement.setLong(2, 0l);
+            statement.setString(3, getLeaseHolderId());
+            if (statement.executeUpdate() == 1) {
+                LOG.info(getLeaseHolderId() + ", released lease");
+            }
+        } catch (Exception e) {
+            LOG.error(getLeaseHolderId() + " failed to release lease: " + e, e);
+        } finally {
+            close(statement);
+            close(connection);
+        }
+    }
+
+    @Override
+    public boolean keepAlive() throws IOException {
+        boolean result = false;
+        final String sql = statements.getLeaseUpdateStatement();
+        LOG.debug(getLeaseHolderId() + ", lease keepAlive Query is " + sql);
+
+        Connection connection = null;
+        PreparedStatement statement = null;
+        try {
+            connection = getConnection();
+
+            initTimeDiff(connection);
+            statement = connection.prepareStatement(sql);
+            setQueryTimeout(statement);
+
+            final long now = System.currentTimeMillis() + diffFromCurrentTime;
+            statement.setString(1, getLeaseHolderId());
+            statement.setLong(2, now + lockAcquireSleepInterval);
+            statement.setString(3, getLeaseHolderId());
+
+            result = (statement.executeUpdate() == 1);
+        } catch (Exception e) {
+            LOG.warn(getLeaseHolderId() + ", failed to update lease: " + e, e);
+            IOException ioe = IOExceptionSupport.create(e);
+            persistenceAdapter.getBrokerService().handleIOException(ioe);
+            throw ioe;
+        } finally {
+            close(statement);
+            close(connection);
+        }
+        return result;
+    }
+
+    public long getLockAcquireSleepInterval() {
+        return lockAcquireSleepInterval;
+    }
+
+    public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
+        this.lockAcquireSleepInterval = lockAcquireSleepInterval;
+    }
+    
+    public int getQueryTimeout() {
+        return queryTimeout;
+    }
+
+    public void setQueryTimeout(int queryTimeout) {
+        this.queryTimeout = queryTimeout;
+    }
+
+    public String getLeaseHolderId() {
+        if (leaseHolderId == null) {
+            if (persistenceAdapter.getBrokerService() != null) {
+                leaseHolderId = persistenceAdapter.getBrokerService().getBrokerName();
+            }
+        }
+        return leaseHolderId;
+    }
+
+    public void setLeaseHolderId(String leaseHolderId) {
+        this.leaseHolderId = leaseHolderId;
+    }
+
+    public int getMaxAllowableDiffFromDBTime() {
+        return maxAllowableDiffFromDBTime;
+    }
+
+    public void setMaxAllowableDiffFromDBTime(int maxAllowableDiffFromDBTime) {
+        this.maxAllowableDiffFromDBTime = maxAllowableDiffFromDBTime;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=1350006&r1=1350005&r2=1350006&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Wed Jun 13 20:45:10 2012
@@ -85,6 +85,10 @@ public class Statements {
     private String updateDurableLastAckWithPriorityStatement;
     private String updateDurableLastAckWithPriorityInTxStatement;
     private String findXidByIdStatement;
+    private String leaseObtainStatement;
+    private String currentDateTimeStatement;
+    private String leaseUpdateStatement;
+    private String leaseOwnerStatement;
 
     public String[] getCreateSchemaStatements() {
         if (createSchemaStatements == null) {
@@ -103,9 +107,9 @@ public class Statements {
                     + " NOT NULL" + ", SELECTOR " + stringIdDataType + ", LAST_ACKED_ID " + sequenceDataType
                     + ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))", 
                 "CREATE TABLE " + getFullLockTableName() 
-                    + "( ID " + longDataType + " NOT NULL, TIME " + longDataType 
+                    + "( ID " + longDataType + " NOT NULL, TIME " + longDataType
                     + ", BROKER_NAME " + stringIdDataType + ", PRIMARY KEY (ID) )",
-                "INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)", 
+                "INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)",
                 "ALTER TABLE " + getFullMessageTableName() + " ADD PRIORITY " + sequenceDataType,
                 "CREATE INDEX " + getFullMessageTableName() + "_PIDX ON " + getFullMessageTableName() + " (PRIORITY)",
                 "ALTER TABLE " + getFullMessageTableName() + " ADD XID " + binaryDataType,
@@ -421,6 +425,39 @@ public class Statements {
         return lockCreateStatement;
     }
 
+    public String getLeaseObtainStatement() {
+        if (leaseObtainStatement == null) {
+            leaseObtainStatement = "UPDATE " + getFullLockTableName()
+                    + " SET BROKER_NAME=?, TIME=?"
+                    + " WHERE (TIME IS NULL OR TIME < ?) AND ID = 1";
+        }
+        return leaseObtainStatement;
+    }
+
+    public String getCurrentDateTime() {
+        if (currentDateTimeStatement == null) {
+            currentDateTimeStatement = "SELECT CURRENT_TIMESTAMP FROM " + getFullLockTableName();
+        }
+        return currentDateTimeStatement;
+    }
+
+    public String getLeaseUpdateStatement() {
+        if (leaseUpdateStatement == null) {
+            leaseUpdateStatement = "UPDATE " + getFullLockTableName()
+                    + " SET BROKER_NAME=?, TIME=?"
+                    + " WHERE BROKER_NAME=? AND ID = 1";
+        }
+        return leaseUpdateStatement;
+    }
+
+    public String getLeaseOwnerStatement() {
+        if (leaseOwnerStatement == null) {
+            leaseOwnerStatement = "SELECT BROKER_NAME, TIME FROM " + getFullLockTableName()
+                    + " WHERE ID = 1";
+        }
+        return leaseOwnerStatement;
+    }
+
     public String getLockUpdateStatement() {
         if (lockUpdateStatement == null) {
             lockUpdateStatement = "UPDATE " + getFullLockTableName() + " SET TIME = ? WHERE ID = 1";
@@ -911,4 +948,20 @@ public class Statements {
     public void setFindXidByIdStatement(String findXidByIdStatement) {
         this.findXidByIdStatement = findXidByIdStatement;
     }
+
+    public void setLeaseObtainStatement(String leaseObtainStatement) {
+        this.leaseObtainStatement = leaseObtainStatement;
+    }
+
+    public void setCurrentDateTimeStatement(String currentDateTimeStatement) {
+        this.currentDateTimeStatement = currentDateTimeStatement;
+    }
+
+    public void setLeaseUpdateStatement(String leaseUpdateStatement) {
+        this.leaseUpdateStatement = leaseUpdateStatement;
+    }
+
+    public void setLeaseOwnerStatement(String leaseOwnerStatement) {
+        this.leaseOwnerStatement = leaseOwnerStatement;
+    }
 }
\ No newline at end of file

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java?rev=1350006&r1=1350005&r2=1350006&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java Wed Jun 13 20:45:10 2012
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
 
     private static final Logger LOG = LoggerFactory
             .getLogger(DefaultIOExceptionHandler.class);
-    private BrokerService broker;
+    protected BrokerService broker;
     private boolean ignoreAllErrors = false;
     private boolean ignoreNoSpaceErrors = true;
     private boolean ignoreSQLExceptions = true;
@@ -94,13 +94,14 @@ import org.slf4j.LoggerFactory;
             new Thread("restart transport connectors post IO exception") {
                 public void run() {
                     try {
-                        while (isPersistenceAdapterDown()) {
+                        while (hasLockOwnership() && isPersistenceAdapterDown()) {
                             LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
                             TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
                         }
                         broker.startAllConnectors();
                     } catch (Exception e) {
-                        LOG.warn("Failure occurred while restarting broker connectors", e);
+                        LOG.warn("Stopping broker due to failure while restarting broker connectors", e);
+                        stopBroker(e);
                     } finally {
                         stopStartInProgress.compareAndSet(true, false);
                     }
@@ -119,7 +120,11 @@ import org.slf4j.LoggerFactory;
             return;
         }
 
-        LOG.info("Stopping the broker due to IO exception, " + exception, exception);
+        stopBroker(exception);
+    }
+
+    private void stopBroker(Exception exception) {
+        LOG.info("Stopping the broker due to exception, " + exception, exception);
         new Thread("Stopping the broker due to IO exception") {
             public void run() {
                 try {
@@ -131,6 +136,10 @@ import org.slf4j.LoggerFactory;
         }.start();
     }
 
+    protected boolean hasLockOwnership() throws IOException {
+        return true;
+    }
+
     public void setBrokerService(BrokerService broker) {
         this.broker = broker;
     }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java?rev=1350006&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java Wed Jun 13 20:45:10 2012
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.jdbc.JDBCIOExceptionHandler;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest extends DbRestartJDBCQueueMasterSlaveLeaseIntactTest {
+    private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.class);
+
+    private long restartDelay = 500;
+
+    @Override
+    protected void configureBroker(BrokerService brokerService) {
+        brokerService.setIoExceptionHandler(new JDBCIOExceptionHandler());
+    }
+
+    @Override
+    protected void delayTillRestartRequired() {
+        if (restartDelay > 500) {
+            LOG.info("delay for more than lease quantum. While Db is offline, master should stay alive but could loose lease");
+        } else {
+            LOG.info("delay for less than lease quantum. While Db is offline, master should stay alive");
+        }
+        try {
+            TimeUnit.MILLISECONDS.sleep(restartDelay);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    protected void verifyExpectedBroker(int inflightMessageCount) {
+        if (inflightMessageCount == 0  || (inflightMessageCount == failureCount + 10 && restartDelay <= 500)) {
+            assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
+        }
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        restartDelay = 500;
+        super.setUp();
+    }
+
+    public void testSendReceiveWithLeaseExpiry() throws Exception {
+        restartDelay = 3000;
+        testSendReceive();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactTest.java?rev=1350006&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactTest.java Wed Jun 13 20:45:10 2012
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DbRestartJDBCQueueMasterSlaveLeaseIntactTest extends DbRestartJDBCQueueMasterSlaveLeaseTest {
+    private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveLeaseIntactTest.class);
+
+    @Override
+    protected void delayTillRestartRequired() {
+
+        LOG.info("delay for less than lease quantum. While Db is offline, master should stay alive");
+        try {
+            TimeUnit.MILLISECONDS.sleep(1000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    protected void verifyExpectedBroker(int inflightMessageCount) {
+        if (inflightMessageCount == 0 || inflightMessageCount == failureCount + 10) {
+            assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java?rev=1350006&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java Wed Jun 13 20:45:10 2012
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DbRestartJDBCQueueMasterSlaveLeaseTest extends DbRestartJDBCQueueMasterSlaveTest {
+    private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveLeaseTest.class);
+
+    @Override
+    protected void configureJdbcPersistenceAdapter(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
+        super.configureJdbcPersistenceAdapter(persistenceAdapter);
+        persistenceAdapter.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
+        persistenceAdapter.setLockKeepAlivePeriod(getLockKeepAlivePeriod());
+        persistenceAdapter.setDatabaseLocker(new LeaseDatabaseLocker());
+    }
+
+    private long getLockKeepAlivePeriod() {
+        return 500;
+    }
+
+    private long getLockAcquireSleepInterval() {
+        return 2000;
+    }
+
+    @Override
+    protected void delayTillRestartRequired() {
+
+        LOG.info("restart db after lease has expired. While Db is offline, master should stay alive, them lease up for grabs");
+        try {
+            TimeUnit.MILLISECONDS.sleep(3000);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    protected void verifyExpectedBroker(int inflightMessageCount) {
+        if (inflightMessageCount == 0) {
+            assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
+        }
+        // the lock is up for grabs after the expiry
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java?rev=1350006&r1=1350005&r2=1350006&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java Wed Jun 13 20:45:10 2012
@@ -16,11 +16,13 @@
  */
 package org.apache.activemq.broker.ft;
 
+import java.sql.SQLException;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
 
+import org.apache.activemq.ActiveMQConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.derby.jdbc.EmbeddedDataSource;
@@ -28,7 +30,8 @@ import org.apache.derby.jdbc.EmbeddedDat
 public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest {
     private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveTest.class);
     
-    protected void messageSent() throws Exception {    
+    protected void messageSent() throws Exception {
+        verifyExpectedBroker(inflightMessageCount);
         if (++inflightMessageCount == failureCount) {
             LOG.info("STOPPING DB!@!!!!");
             final EmbeddedDataSource ds = getExistingDataSource();
@@ -37,16 +40,32 @@ public class DbRestartJDBCQueueMasterSla
             
             Thread dbRestartThread = new Thread("db-re-start-thread") {
                 public void run() {
-                    LOG.info("Waiting for master broker to Stop");
-                    master.waitUntilStopped();
+                    delayTillRestartRequired();
                     ds.setShutdownDatabase("false");
+                    try {
+                        ds.getConnection().close();
+                    } catch (SQLException ignored) {}
                     LOG.info("DB RESTARTED!@!!!!");
                 }
             };
             dbRestartThread.start();
         }
+        verifyExpectedBroker(inflightMessageCount);
     }
-     
+
+    protected void verifyExpectedBroker(int inflightMessageCount) {
+        if (inflightMessageCount == 0) {
+            assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
+        } else if (inflightMessageCount == failureCount + 10) {
+            assertEquals("connected to slave", slave.get().getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
+        }
+    }
+
+    protected void delayTillRestartRequired() {
+        LOG.info("Waiting for master broker to Stop");
+        master.waitUntilStopped();
+    }
+
     protected void sendToProducer(MessageProducer producer,
             Destination producerDestination, Message message) throws JMSException {
         {   

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java?rev=1350006&r1=1350005&r2=1350006&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java Wed Jun 13 20:45:10 2012
@@ -16,9 +16,18 @@
  */
 package org.apache.activemq.broker.ft;
 
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.SQLException;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.store.jdbc.DataSourceSupport;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportServer;
 import org.apache.derby.jdbc.EmbeddedDataSource;
 
 public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest {
@@ -34,18 +43,22 @@ public class JDBCQueueMasterSlaveTest ex
     
     protected void createMaster() throws Exception {
         master = new BrokerService();
+        master.setBrokerName("master");
         master.addConnector(MASTER_URL);
         master.setUseJmx(false);
         master.setPersistent(true);
         master.setDeleteAllMessagesOnStartup(true);
         JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
         persistenceAdapter.setDataSource(getExistingDataSource());
-        persistenceAdapter.setLockKeepAlivePeriod(500);
-        persistenceAdapter.setLockAcquireSleepInterval(500);
+        configureJdbcPersistenceAdapter(persistenceAdapter);
         master.setPersistenceAdapter(persistenceAdapter);
+        configureBroker(master);
         master.start();
     }
 
+    protected void configureBroker(BrokerService master) {
+    }
+
     protected void createSlave() throws Exception {
         // use a separate thread as the slave will block waiting for
         // the exclusive db lock
@@ -53,7 +66,10 @@ public class JDBCQueueMasterSlaveTest ex
             public void run() {
                 try {
                     BrokerService broker = new BrokerService();
-                    broker.addConnector(SLAVE_URL);
+                    broker.setBrokerName("slave");
+                    TransportConnector connector = new TransportConnector();
+                    connector.setUri(new URI(SLAVE_URL));
+                    broker.addConnector(connector);
                     // no need for broker.setMasterConnectorURI(masterConnectorURI)
                     // as the db lock provides the slave/master initialisation
                     broker.setUseJmx(false);
@@ -62,9 +78,12 @@ public class JDBCQueueMasterSlaveTest ex
                     persistenceAdapter.setDataSource(getExistingDataSource());
                     persistenceAdapter.setCreateTablesOnStartup(false);
                     broker.setPersistenceAdapter(persistenceAdapter);
+                    configureJdbcPersistenceAdapter(persistenceAdapter);
+                    configureBroker(broker);
                     broker.start();
                     slave.set(broker);
                     slaveStarted.countDown();
+                } catch (IllegalStateException expectedOnShutdown) {
                 } catch (Exception e) {
                     fail("failed to start slave broker, reason:" + e);
                 }
@@ -73,6 +92,11 @@ public class JDBCQueueMasterSlaveTest ex
         t.start();
     }
 
+    protected void configureJdbcPersistenceAdapter(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
+        persistenceAdapter.setLockKeepAlivePeriod(500);
+        persistenceAdapter.setLockAcquireSleepInterval(500);
+    }
+
     protected EmbeddedDataSource getExistingDataSource() throws Exception {
         return sharedDs;
     }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java?rev=1350006&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java Wed Jun 13 20:45:10 2012
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.junit.Before;
+import org.junit.Test;
+
+
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertTrue;
+
+
+public class LeaseDatabaseLockerTest {
+
+    JDBCPersistenceAdapter jdbc;
+    BrokerService brokerService;
+    EmbeddedDataSource dataSource;
+
+    @Before
+    public void setUpStore() throws Exception {
+        dataSource = new EmbeddedDataSource();
+        dataSource.setDatabaseName("derbyDb");
+        dataSource.setCreateDatabase("create");
+        jdbc = new JDBCPersistenceAdapter();
+        jdbc.setDataSource(dataSource);
+        brokerService = new BrokerService();
+        jdbc.setBrokerService(brokerService);
+        jdbc.getAdapter().doCreateTables(jdbc.getTransactionContext());
+    }
+
+    @Test
+    public void testLockInterleave() throws Exception {
+
+        LeaseDatabaseLocker lockerA = new LeaseDatabaseLocker();
+        brokerService.setBrokerName("First");
+        lockerA.setPersistenceAdapter(jdbc);
+
+        final LeaseDatabaseLocker lockerB = new LeaseDatabaseLocker();
+        brokerService.setBrokerName("Second");
+        lockerB.setPersistenceAdapter(jdbc);
+        final AtomicBoolean blocked = new AtomicBoolean(true);
+
+        final Connection connection = dataSource.getConnection();
+        printLockTable(connection);
+        lockerA.start();
+        printLockTable(connection);
+
+        ExecutorService executor = Executors.newCachedThreadPool();
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    lockerB.start();
+                    blocked.set(false);
+                    printLockTable(connection);
+
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        assertTrue("B is blocked", blocked.get());
+
+        assertTrue("A is good", lockerA.keepAlive());
+        printLockTable(connection);
+
+        lockerA.stop();
+        printLockTable(connection);
+
+        TimeUnit.MILLISECONDS.sleep(2 * lockerB.getLockAcquireSleepInterval());
+        assertFalse("lockerB has the lock", blocked.get());
+        lockerB.stop();
+        printLockTable(connection);
+    }
+
+    private void printLockTable(Connection connection) throws IOException {
+        //((DefaultJDBCAdapter)jdbc.getAdapter()).printQuery(connection, "SELECT * from ACTIVEMQ_LOCK", System.err);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date