You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/06/13 22:45:10 UTC
svn commit: r1350006 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/store/jdbc/
main/java/org/apache/activemq/util/ test/java/org/apache/activemq/broker/ft/
test/java/org/apache/activemq/store/jdbc/
Author: gtully
Date: Wed Jun 13 20:45:10 2012
New Revision: 1350006
URL: http://svn.apache.org/viewvc?rev=1350006&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3654 - JDBC Master/Slave : Slave cannot acquire lock when the master loose database connection. Adding an leasebasedlocker that can survive db disconnect, and jdbcioexceptionhandler that will pause/resume transports on db outage
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java (with props)
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java (with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java (with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactTest.java (with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java (with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1350006&r1=1350005&r2=1350006&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Wed Jun 13 20:45:10 2012
@@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.IllegalStateException;
@@ -142,7 +143,7 @@ public class ActiveMQMessageConsumer imp
private ExecutorService executorService;
private MessageTransformer transformer;
private boolean clearDispatchList;
- boolean inProgressClearRequiredFlag;
+ AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0);
private MessageAck pendingAck;
private long lastDeliveredSequenceId;
@@ -685,15 +686,15 @@ public class ActiveMQMessageConsumer imp
} }
void inProgressClearRequired() {
- inProgressClearRequiredFlag = true;
+ inProgressClearRequiredFlag.incrementAndGet();
// deal with delivered messages async to avoid lock contention with in progress acks
clearDispatchList = true;
}
void clearMessagesInProgress() {
- if (inProgressClearRequiredFlag) {
+ if (inProgressClearRequiredFlag.get() > 0) {
synchronized (unconsumedMessages.getMutex()) {
- if (inProgressClearRequiredFlag) {
+ if (inProgressClearRequiredFlag.get() > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " clearing unconsumed list (" + unconsumedMessages.size() + ") on transport interrupt");
}
@@ -706,7 +707,7 @@ public class ActiveMQMessageConsumer imp
}
// allow dispatch on this connection to resume
session.connection.transportInterruptionProcessingComplete();
- inProgressClearRequiredFlag = false;
+ inProgressClearRequiredFlag.decrementAndGet();
}
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java?rev=1350006&r1=1350005&r2=1350006&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DatabaseLocker.java Wed Jun 13 20:45:10 2012
@@ -37,8 +37,9 @@ public interface DatabaseLocker extends
/**
* Used by a timer to keep alive the lock.
* If the method returns false the broker should be terminated
+ * if an exception is thrown, the lock state cannot be determined
*/
- boolean keepAlive();
+ boolean keepAlive() throws IOException;
/**
* set the delay interval in milliseconds between lock acquire attempts
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java?rev=1350006&r1=1350005&r2=1350006&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java Wed Jun 13 20:45:10 2012
@@ -170,7 +170,7 @@ public class DefaultDatabaseLocker imple
}
}
- public boolean keepAlive() {
+ public boolean keepAlive() throws IOException {
boolean result = false;
try {
lockUpdateStatement = connection.prepareStatement(statements.getLockUpdateStatement());
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java?rev=1350006&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java Wed Jun 13 20:45:10 2012
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jdbc;
+
+import java.io.IOException;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+
+/**
+ * @org.apache.xbean.XBean
+ */
+public class JDBCIOExceptionHandler extends DefaultIOExceptionHandler {
+
+ public JDBCIOExceptionHandler() {
+ setIgnoreSQLExceptions(false);
+ setStopStartConnectors(true);
+ }
+
+ @Override
+ protected boolean hasLockOwnership() throws IOException {
+ boolean hasLock = true;
+ if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
+ JDBCPersistenceAdapter jdbcPersistenceAdapter = (JDBCPersistenceAdapter) broker.getPersistenceAdapter();
+ DatabaseLocker locker = jdbcPersistenceAdapter.getDatabaseLocker();
+ if (locker != null) {
+ try {
+ if (!locker.keepAlive()) {
+ hasLock = false;
+ }
+ } catch (IOException ignored) {
+ }
+
+ if (!hasLock) {
+ throw new IOException("PersistenceAdapter lock no longer valid using: " + locker);
+ }
+ }
+ }
+ return hasLock;
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1350006&r1=1350005&r2=1350006&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Wed Jun 13 20:45:10 2012
@@ -623,7 +623,7 @@ public class JDBCPersistenceAdapter exte
}
}
} catch (IOException e) {
- LOG.error("Failed to get database when trying keepalive: " + e, e);
+ LOG.warn("databaselocker keepalive resulted in: " + e, e);
}
if (stop) {
stopBroker();
@@ -632,7 +632,7 @@ public class JDBCPersistenceAdapter exte
protected void stopBroker() {
// we can no longer keep the lock so lets fail
- LOG.info("No longer able to keep the exclusive lock so giving up being a master");
+ LOG.info(brokerService.getBrokerName() + ", no longer able to keep the exclusive lock so giving up being a master");
try {
brokerService.stop();
} catch (Exception e) {
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java?rev=1350006&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java Wed Jun 13 20:45:10 2012
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents an exclusive lease on a database to avoid multiple brokers running
+ * against the same logical database.
+ *
+ * @org.apache.xbean.XBean element="lease-database-locker"
+ *
+ */
+public class LeaseDatabaseLocker implements DatabaseLocker {
+ private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLocker.class);
+ public static final long DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL = 5000;
+ protected DataSource dataSource;
+ protected Statements statements;
+ protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
+
+ protected boolean stopping;
+ protected int maxAllowableDiffFromDBTime = 2000;
+ protected long diffFromCurrentTime = Long.MAX_VALUE;
+ protected String leaseHolderId;
+ protected int queryTimeout = -1;
+ JDBCPersistenceAdapter persistenceAdapter;
+
+
+ public void setPersistenceAdapter(JDBCPersistenceAdapter adapter) throws IOException {
+ this.dataSource = adapter.getLockDataSource();
+ this.statements = adapter.getStatements();
+ this.persistenceAdapter = adapter;
+ }
+
+ public void start() throws Exception {
+ stopping = false;
+
+ LOG.info(getLeaseHolderId() + " attempting to acquire the exclusive lease to become the Master broker");
+ String sql = statements.getLeaseObtainStatement();
+ LOG.debug(getLeaseHolderId() + " locking Query is "+sql);
+
+ while (!stopping) {
+ Connection connection = null;
+ PreparedStatement statement = null;
+ try {
+ connection = getConnection();
+ initTimeDiff(connection);
+
+ statement = connection.prepareStatement(sql);
+ setQueryTimeout(statement);
+
+ final long now = System.currentTimeMillis() + diffFromCurrentTime;
+ statement.setString(1, getLeaseHolderId());
+ statement.setLong(2, now + lockAcquireSleepInterval);
+ statement.setLong(3, now);
+
+ int result = statement.executeUpdate();
+ if (result == 1) {
+ // we got the lease, verify we still have it
+ if (keepAlive()) {
+ break;
+ }
+ }
+
+ reportLeasOwnerShipAndDuration(connection);
+
+ } catch (Exception e) {
+ LOG.debug(getLeaseHolderId() + " lease aquire failure: "+ e, e);
+ } finally {
+ close(statement);
+ close(connection);
+ }
+
+ LOG.info(getLeaseHolderId() + " failed to acquire lease. Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again...");
+ TimeUnit.MILLISECONDS.sleep(lockAcquireSleepInterval);
+ }
+ if (stopping) {
+ throw new RuntimeException(getLeaseHolderId() + " failing lease aquire due to stop");
+ }
+
+ LOG.info(getLeaseHolderId() + ", becoming the master on dataSource: " + dataSource);
+ }
+
+ private void setQueryTimeout(PreparedStatement statement) throws SQLException {
+ if (queryTimeout > 0) {
+ statement.setQueryTimeout(queryTimeout);
+ }
+ }
+
+ private Connection getConnection() throws SQLException {
+ return dataSource.getConnection();
+ }
+
+ private void close(Connection connection) {
+ if (null != connection) {
+ try {
+ connection.close();
+ } catch (SQLException e1) {
+ LOG.debug(getLeaseHolderId() + " caught exception while closing connection: " + e1, e1);
+ }
+ }
+ }
+
+ private void close(PreparedStatement statement) {
+ if (null != statement) {
+ try {
+ statement.close();
+ } catch (SQLException e1) {
+ LOG.debug(getLeaseHolderId() + ", caught while closing statement: " + e1, e1);
+ }
+ }
+ }
+
+ private void reportLeasOwnerShipAndDuration(Connection connection) throws SQLException {
+ PreparedStatement statement = null;
+ try {
+ statement = connection.prepareStatement(statements.getLeaseOwnerStatement());
+ ResultSet resultSet = statement.executeQuery();
+ while (resultSet.next()) {
+ LOG.info(getLeaseHolderId() + " Lease held by " + resultSet.getString(1) + " till " + new Date(resultSet.getLong(2)));
+ }
+ } finally {
+ close(statement);
+ }
+ }
+
+ protected long initTimeDiff(Connection connection) throws SQLException {
+ if (maxAllowableDiffFromDBTime > 0 && Long.MAX_VALUE == diffFromCurrentTime) {
+ diffFromCurrentTime = determineTimeDifference(connection);
+ }
+ return diffFromCurrentTime;
+ }
+
+ private long determineTimeDifference(Connection connection) throws SQLException {
+ PreparedStatement statement = connection.prepareStatement(statements.getCurrentDateTime());
+ ResultSet resultSet = statement.executeQuery();
+ long result = 0l;
+ if (resultSet.next()) {
+ Timestamp timestamp = resultSet.getTimestamp(1);
+ long diff = System.currentTimeMillis() - timestamp.getTime();
+ LOG.info(getLeaseHolderId() + " diff from db: " + diff + ", db time: " + timestamp);
+ if (diff > maxAllowableDiffFromDBTime || diff < -maxAllowableDiffFromDBTime) {
+ // off by more than maxAllowableDiffFromDBTime so lets adjust
+ result = diff;
+ }
+ }
+ return result;
+ }
+
+ public void stop() throws Exception {
+ releaseLease();
+ stopping = true;
+ }
+
+ private void releaseLease() {
+ Connection connection = null;
+ PreparedStatement statement = null;
+ try {
+ connection = getConnection();
+ statement = connection.prepareStatement(statements.getLeaseUpdateStatement());
+ statement.setString(1, null);
+ statement.setLong(2, 0l);
+ statement.setString(3, getLeaseHolderId());
+ if (statement.executeUpdate() == 1) {
+ LOG.info(getLeaseHolderId() + ", released lease");
+ }
+ } catch (Exception e) {
+ LOG.error(getLeaseHolderId() + " failed to release lease: " + e, e);
+ } finally {
+ close(statement);
+ close(connection);
+ }
+ }
+
+ @Override
+ public boolean keepAlive() throws IOException {
+ boolean result = false;
+ final String sql = statements.getLeaseUpdateStatement();
+ LOG.debug(getLeaseHolderId() + ", lease keepAlive Query is " + sql);
+
+ Connection connection = null;
+ PreparedStatement statement = null;
+ try {
+ connection = getConnection();
+
+ initTimeDiff(connection);
+ statement = connection.prepareStatement(sql);
+ setQueryTimeout(statement);
+
+ final long now = System.currentTimeMillis() + diffFromCurrentTime;
+ statement.setString(1, getLeaseHolderId());
+ statement.setLong(2, now + lockAcquireSleepInterval);
+ statement.setString(3, getLeaseHolderId());
+
+ result = (statement.executeUpdate() == 1);
+ } catch (Exception e) {
+ LOG.warn(getLeaseHolderId() + ", failed to update lease: " + e, e);
+ IOException ioe = IOExceptionSupport.create(e);
+ persistenceAdapter.getBrokerService().handleIOException(ioe);
+ throw ioe;
+ } finally {
+ close(statement);
+ close(connection);
+ }
+ return result;
+ }
+
+ public long getLockAcquireSleepInterval() {
+ return lockAcquireSleepInterval;
+ }
+
+ public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
+ this.lockAcquireSleepInterval = lockAcquireSleepInterval;
+ }
+
+ public int getQueryTimeout() {
+ return queryTimeout;
+ }
+
+ public void setQueryTimeout(int queryTimeout) {
+ this.queryTimeout = queryTimeout;
+ }
+
+ public String getLeaseHolderId() {
+ if (leaseHolderId == null) {
+ if (persistenceAdapter.getBrokerService() != null) {
+ leaseHolderId = persistenceAdapter.getBrokerService().getBrokerName();
+ }
+ }
+ return leaseHolderId;
+ }
+
+ public void setLeaseHolderId(String leaseHolderId) {
+ this.leaseHolderId = leaseHolderId;
+ }
+
+ public int getMaxAllowableDiffFromDBTime() {
+ return maxAllowableDiffFromDBTime;
+ }
+
+ public void setMaxAllowableDiffFromDBTime(int maxAllowableDiffFromDBTime) {
+ this.maxAllowableDiffFromDBTime = maxAllowableDiffFromDBTime;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=1350006&r1=1350005&r2=1350006&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Wed Jun 13 20:45:10 2012
@@ -85,6 +85,10 @@ public class Statements {
private String updateDurableLastAckWithPriorityStatement;
private String updateDurableLastAckWithPriorityInTxStatement;
private String findXidByIdStatement;
+ private String leaseObtainStatement;
+ private String currentDateTimeStatement;
+ private String leaseUpdateStatement;
+ private String leaseOwnerStatement;
public String[] getCreateSchemaStatements() {
if (createSchemaStatements == null) {
@@ -103,9 +107,9 @@ public class Statements {
+ " NOT NULL" + ", SELECTOR " + stringIdDataType + ", LAST_ACKED_ID " + sequenceDataType
+ ", PRIMARY KEY ( CONTAINER, CLIENT_ID, SUB_NAME))",
"CREATE TABLE " + getFullLockTableName()
- + "( ID " + longDataType + " NOT NULL, TIME " + longDataType
+ + "( ID " + longDataType + " NOT NULL, TIME " + longDataType
+ ", BROKER_NAME " + stringIdDataType + ", PRIMARY KEY (ID) )",
- "INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)",
+ "INSERT INTO " + getFullLockTableName() + "(ID) VALUES (1)",
"ALTER TABLE " + getFullMessageTableName() + " ADD PRIORITY " + sequenceDataType,
"CREATE INDEX " + getFullMessageTableName() + "_PIDX ON " + getFullMessageTableName() + " (PRIORITY)",
"ALTER TABLE " + getFullMessageTableName() + " ADD XID " + binaryDataType,
@@ -421,6 +425,39 @@ public class Statements {
return lockCreateStatement;
}
+ public String getLeaseObtainStatement() {
+ if (leaseObtainStatement == null) {
+ leaseObtainStatement = "UPDATE " + getFullLockTableName()
+ + " SET BROKER_NAME=?, TIME=?"
+ + " WHERE (TIME IS NULL OR TIME < ?) AND ID = 1";
+ }
+ return leaseObtainStatement;
+ }
+
+ public String getCurrentDateTime() {
+ if (currentDateTimeStatement == null) {
+ currentDateTimeStatement = "SELECT CURRENT_TIMESTAMP FROM " + getFullLockTableName();
+ }
+ return currentDateTimeStatement;
+ }
+
+ public String getLeaseUpdateStatement() {
+ if (leaseUpdateStatement == null) {
+ leaseUpdateStatement = "UPDATE " + getFullLockTableName()
+ + " SET BROKER_NAME=?, TIME=?"
+ + " WHERE BROKER_NAME=? AND ID = 1";
+ }
+ return leaseUpdateStatement;
+ }
+
+ public String getLeaseOwnerStatement() {
+ if (leaseOwnerStatement == null) {
+ leaseOwnerStatement = "SELECT BROKER_NAME, TIME FROM " + getFullLockTableName()
+ + " WHERE ID = 1";
+ }
+ return leaseOwnerStatement;
+ }
+
public String getLockUpdateStatement() {
if (lockUpdateStatement == null) {
lockUpdateStatement = "UPDATE " + getFullLockTableName() + " SET TIME = ? WHERE ID = 1";
@@ -911,4 +948,20 @@ public class Statements {
public void setFindXidByIdStatement(String findXidByIdStatement) {
this.findXidByIdStatement = findXidByIdStatement;
}
+
+ public void setLeaseObtainStatement(String leaseObtainStatement) {
+ this.leaseObtainStatement = leaseObtainStatement;
+ }
+
+ public void setCurrentDateTimeStatement(String currentDateTimeStatement) {
+ this.currentDateTimeStatement = currentDateTimeStatement;
+ }
+
+ public void setLeaseUpdateStatement(String leaseUpdateStatement) {
+ this.leaseUpdateStatement = leaseUpdateStatement;
+ }
+
+ public void setLeaseOwnerStatement(String leaseOwnerStatement) {
+ this.leaseOwnerStatement = leaseOwnerStatement;
+ }
}
\ No newline at end of file
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java?rev=1350006&r1=1350005&r2=1350006&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java Wed Jun 13 20:45:10 2012
@@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory
.getLogger(DefaultIOExceptionHandler.class);
- private BrokerService broker;
+ protected BrokerService broker;
private boolean ignoreAllErrors = false;
private boolean ignoreNoSpaceErrors = true;
private boolean ignoreSQLExceptions = true;
@@ -94,13 +94,14 @@ import org.slf4j.LoggerFactory;
new Thread("restart transport connectors post IO exception") {
public void run() {
try {
- while (isPersistenceAdapterDown()) {
+ while (hasLockOwnership() && isPersistenceAdapterDown()) {
LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
}
broker.startAllConnectors();
} catch (Exception e) {
- LOG.warn("Failure occurred while restarting broker connectors", e);
+ LOG.warn("Stopping broker due to failure while restarting broker connectors", e);
+ stopBroker(e);
} finally {
stopStartInProgress.compareAndSet(true, false);
}
@@ -119,7 +120,11 @@ import org.slf4j.LoggerFactory;
return;
}
- LOG.info("Stopping the broker due to IO exception, " + exception, exception);
+ stopBroker(exception);
+ }
+
+ private void stopBroker(Exception exception) {
+ LOG.info("Stopping the broker due to exception, " + exception, exception);
new Thread("Stopping the broker due to IO exception") {
public void run() {
try {
@@ -131,6 +136,10 @@ import org.slf4j.LoggerFactory;
}.start();
}
+ protected boolean hasLockOwnership() throws IOException {
+ return true;
+ }
+
public void setBrokerService(BrokerService broker) {
this.broker = broker;
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java?rev=1350006&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java Wed Jun 13 20:45:10 2012
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.jdbc.JDBCIOExceptionHandler;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest extends DbRestartJDBCQueueMasterSlaveLeaseIntactTest {
+ private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.class);
+
+ private long restartDelay = 500;
+
+ @Override
+ protected void configureBroker(BrokerService brokerService) {
+ brokerService.setIoExceptionHandler(new JDBCIOExceptionHandler());
+ }
+
+ @Override
+ protected void delayTillRestartRequired() {
+ if (restartDelay > 500) {
+ LOG.info("delay for more than lease quantum. While Db is offline, master should stay alive but could loose lease");
+ } else {
+ LOG.info("delay for less than lease quantum. While Db is offline, master should stay alive");
+ }
+ try {
+ TimeUnit.MILLISECONDS.sleep(restartDelay);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ protected void verifyExpectedBroker(int inflightMessageCount) {
+ if (inflightMessageCount == 0 || (inflightMessageCount == failureCount + 10 && restartDelay <= 500)) {
+ assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
+ }
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ restartDelay = 500;
+ super.setUp();
+ }
+
+ public void testSendReceiveWithLeaseExpiry() throws Exception {
+ restartDelay = 3000;
+ testSendReceive();
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactTest.java?rev=1350006&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactTest.java Wed Jun 13 20:45:10 2012
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DbRestartJDBCQueueMasterSlaveLeaseIntactTest extends DbRestartJDBCQueueMasterSlaveLeaseTest {
+ private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveLeaseIntactTest.class);
+
+ @Override
+ protected void delayTillRestartRequired() {
+
+ LOG.info("delay for less than lease quantum. While Db is offline, master should stay alive");
+ try {
+ TimeUnit.MILLISECONDS.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ protected void verifyExpectedBroker(int inflightMessageCount) {
+ if (inflightMessageCount == 0 || inflightMessageCount == failureCount + 10) {
+ assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
+ }
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java?rev=1350006&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java Wed Jun 13 20:45:10 2012
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DbRestartJDBCQueueMasterSlaveLeaseTest extends DbRestartJDBCQueueMasterSlaveTest {
+ private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveLeaseTest.class);
+
+ @Override
+ protected void configureJdbcPersistenceAdapter(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
+ super.configureJdbcPersistenceAdapter(persistenceAdapter);
+ persistenceAdapter.setLockAcquireSleepInterval(getLockAcquireSleepInterval());
+ persistenceAdapter.setLockKeepAlivePeriod(getLockKeepAlivePeriod());
+ persistenceAdapter.setDatabaseLocker(new LeaseDatabaseLocker());
+ }
+
+ private long getLockKeepAlivePeriod() {
+ return 500;
+ }
+
+ private long getLockAcquireSleepInterval() {
+ return 2000;
+ }
+
+ @Override
+ protected void delayTillRestartRequired() {
+
+ LOG.info("restart db after lease has expired. While Db is offline, master should stay alive, them lease up for grabs");
+ try {
+ TimeUnit.MILLISECONDS.sleep(3000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ protected void verifyExpectedBroker(int inflightMessageCount) {
+ if (inflightMessageCount == 0) {
+ assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
+ }
+ // the lock is up for grabs after the expiry
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java?rev=1350006&r1=1350005&r2=1350006&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java Wed Jun 13 20:45:10 2012
@@ -16,11 +16,13 @@
*/
package org.apache.activemq.broker.ft;
+import java.sql.SQLException;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
+import org.apache.activemq.ActiveMQConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.derby.jdbc.EmbeddedDataSource;
@@ -28,7 +30,8 @@ import org.apache.derby.jdbc.EmbeddedDat
public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest {
private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveTest.class);
- protected void messageSent() throws Exception {
+ protected void messageSent() throws Exception {
+ verifyExpectedBroker(inflightMessageCount);
if (++inflightMessageCount == failureCount) {
LOG.info("STOPPING DB!@!!!!");
final EmbeddedDataSource ds = getExistingDataSource();
@@ -37,16 +40,32 @@ public class DbRestartJDBCQueueMasterSla
Thread dbRestartThread = new Thread("db-re-start-thread") {
public void run() {
- LOG.info("Waiting for master broker to Stop");
- master.waitUntilStopped();
+ delayTillRestartRequired();
ds.setShutdownDatabase("false");
+ try {
+ ds.getConnection().close();
+ } catch (SQLException ignored) {}
LOG.info("DB RESTARTED!@!!!!");
}
};
dbRestartThread.start();
}
+ verifyExpectedBroker(inflightMessageCount);
}
-
+
+ protected void verifyExpectedBroker(int inflightMessageCount) {
+ if (inflightMessageCount == 0) {
+ assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
+ } else if (inflightMessageCount == failureCount + 10) {
+ assertEquals("connected to slave", slave.get().getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
+ }
+ }
+
+ protected void delayTillRestartRequired() {
+ LOG.info("Waiting for master broker to Stop");
+ master.waitUntilStopped();
+ }
+
protected void sendToProducer(MessageProducer producer,
Destination producerDestination, Message message) throws JMSException {
{
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java?rev=1350006&r1=1350005&r2=1350006&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java Wed Jun 13 20:45:10 2012
@@ -16,9 +16,18 @@
*/
package org.apache.activemq.broker.ft;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.SQLException;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.store.jdbc.DataSourceSupport;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportServer;
import org.apache.derby.jdbc.EmbeddedDataSource;
public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest {
@@ -34,18 +43,22 @@ public class JDBCQueueMasterSlaveTest ex
protected void createMaster() throws Exception {
master = new BrokerService();
+ master.setBrokerName("master");
master.addConnector(MASTER_URL);
master.setUseJmx(false);
master.setPersistent(true);
master.setDeleteAllMessagesOnStartup(true);
JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
persistenceAdapter.setDataSource(getExistingDataSource());
- persistenceAdapter.setLockKeepAlivePeriod(500);
- persistenceAdapter.setLockAcquireSleepInterval(500);
+ configureJdbcPersistenceAdapter(persistenceAdapter);
master.setPersistenceAdapter(persistenceAdapter);
+ configureBroker(master);
master.start();
}
+ protected void configureBroker(BrokerService master) {
+ }
+
protected void createSlave() throws Exception {
// use a separate thread as the slave will block waiting for
// the exclusive db lock
@@ -53,7 +66,10 @@ public class JDBCQueueMasterSlaveTest ex
public void run() {
try {
BrokerService broker = new BrokerService();
- broker.addConnector(SLAVE_URL);
+ broker.setBrokerName("slave");
+ TransportConnector connector = new TransportConnector();
+ connector.setUri(new URI(SLAVE_URL));
+ broker.addConnector(connector);
// no need for broker.setMasterConnectorURI(masterConnectorURI)
// as the db lock provides the slave/master initialisation
broker.setUseJmx(false);
@@ -62,9 +78,12 @@ public class JDBCQueueMasterSlaveTest ex
persistenceAdapter.setDataSource(getExistingDataSource());
persistenceAdapter.setCreateTablesOnStartup(false);
broker.setPersistenceAdapter(persistenceAdapter);
+ configureJdbcPersistenceAdapter(persistenceAdapter);
+ configureBroker(broker);
broker.start();
slave.set(broker);
slaveStarted.countDown();
+ } catch (IllegalStateException expectedOnShutdown) {
} catch (Exception e) {
fail("failed to start slave broker, reason:" + e);
}
@@ -73,6 +92,11 @@ public class JDBCQueueMasterSlaveTest ex
t.start();
}
+ protected void configureJdbcPersistenceAdapter(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
+ persistenceAdapter.setLockKeepAlivePeriod(500);
+ persistenceAdapter.setLockAcquireSleepInterval(500);
+ }
+
protected EmbeddedDataSource getExistingDataSource() throws Exception {
return sharedDs;
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java?rev=1350006&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java Wed Jun 13 20:45:10 2012
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.junit.Before;
+import org.junit.Test;
+
+
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertTrue;
+
+
+public class LeaseDatabaseLockerTest {
+
+ JDBCPersistenceAdapter jdbc;
+ BrokerService brokerService;
+ EmbeddedDataSource dataSource;
+
+ @Before
+ public void setUpStore() throws Exception {
+ dataSource = new EmbeddedDataSource();
+ dataSource.setDatabaseName("derbyDb");
+ dataSource.setCreateDatabase("create");
+ jdbc = new JDBCPersistenceAdapter();
+ jdbc.setDataSource(dataSource);
+ brokerService = new BrokerService();
+ jdbc.setBrokerService(brokerService);
+ jdbc.getAdapter().doCreateTables(jdbc.getTransactionContext());
+ }
+
+ @Test
+ public void testLockInterleave() throws Exception {
+
+ LeaseDatabaseLocker lockerA = new LeaseDatabaseLocker();
+ brokerService.setBrokerName("First");
+ lockerA.setPersistenceAdapter(jdbc);
+
+ final LeaseDatabaseLocker lockerB = new LeaseDatabaseLocker();
+ brokerService.setBrokerName("Second");
+ lockerB.setPersistenceAdapter(jdbc);
+ final AtomicBoolean blocked = new AtomicBoolean(true);
+
+ final Connection connection = dataSource.getConnection();
+ printLockTable(connection);
+ lockerA.start();
+ printLockTable(connection);
+
+ ExecutorService executor = Executors.newCachedThreadPool();
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ lockerB.start();
+ blocked.set(false);
+ printLockTable(connection);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ assertTrue("B is blocked", blocked.get());
+
+ assertTrue("A is good", lockerA.keepAlive());
+ printLockTable(connection);
+
+ lockerA.stop();
+ printLockTable(connection);
+
+ TimeUnit.MILLISECONDS.sleep(2 * lockerB.getLockAcquireSleepInterval());
+ assertFalse("lockerB has the lock", blocked.get());
+ lockerB.stop();
+ printLockTable(connection);
+ }
+
+ private void printLockTable(Connection connection) throws IOException {
+ //((DefaultJDBCAdapter)jdbc.getAdapter()).printQuery(connection, "SELECT * from ACTIVEMQ_LOCK", System.err);
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date