You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/10/24 21:59:44 UTC
svn commit: r1401849 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/store/jdbc/
main/java/org/apache/activemq/store/jdbc/adapter/
test/java/org/apache/activemq/store/jdbc/
Author: gtully
Date: Wed Oct 24 19:59:43 2012
New Revision: 1401849
URL: http://svn.apache.org/viewvc?rev=1401849&view=rev
Log:
test case for https://issues.apache.org/jira/browse/AMQ-4122 - that does not show a problem atm
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java?rev=1401849&r1=1401848&r2=1401849&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java Wed Oct 24 19:59:43 2012
@@ -94,7 +94,7 @@ public class LeaseDatabaseLocker extends
reportLeasOwnerShipAndDuration(connection);
} catch (Exception e) {
- LOG.debug(getLeaseHolderId() + " lease aquire failure: "+ e, e);
+ LOG.debug(getLeaseHolderId() + " lease acquire failure: "+ e, e);
} finally {
close(statement);
close(connection);
@@ -104,7 +104,7 @@ public class LeaseDatabaseLocker extends
TimeUnit.MILLISECONDS.sleep(lockAcquireSleepInterval);
}
if (stopping) {
- throw new RuntimeException(getLeaseHolderId() + " failing lease aquire due to stop");
+ throw new RuntimeException(getLeaseHolderId() + " failing lease acquire due to stop");
}
LOG.info(getLeaseHolderId() + ", becoming the master on dataSource: " + dataSource);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1401849&r1=1401848&r2=1401849&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Wed Oct 24 19:59:43 2012
@@ -17,6 +17,7 @@
package org.apache.activemq.store.jdbc.adapter;
import java.io.IOException;
+import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -1090,7 +1091,7 @@ public class DefaultJDBCAdapter implemen
}
}
-/* public static void dumpTables(Connection c, String destinationName, String clientId, String
+ public static void dumpTables(Connection c, String destinationName, String clientId, String
subscriptionName) throws SQLException {
printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
@@ -1107,12 +1108,12 @@ public class DefaultJDBCAdapter implemen
printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
}
- private static void printQuery(java.sql.Connection c, String query, java.io.PrintStream out)
+ public static void printQuery(java.sql.Connection c, String query, java.io.PrintStream out)
throws SQLException {
printQuery(c.prepareStatement(query), out);
}
- private static void printQuery(java.sql.PreparedStatement s, java.io.PrintStream out)
+ public static void printQuery(java.sql.PreparedStatement s, java.io.PrintStream out)
throws SQLException {
ResultSet set = null;
@@ -1143,6 +1144,6 @@ public class DefaultJDBCAdapter implemen
} catch (Throwable ignore) {
}
}
- } */
+ }
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java?rev=1401849&r1=1401848&r2=1401849&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java Wed Oct 24 19:59:43 2012
@@ -16,24 +16,31 @@
*/
package org.apache.activemq.store.jdbc;
-import java.io.IOException;
import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activemq.broker.AbstractLocker;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import static junit.framework.Assert.assertFalse;
-import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.*;
public class LeaseDatabaseLockerTest {
+ private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLockerTest.class);
+
JDBCPersistenceAdapter jdbc;
BrokerService brokerService;
EmbeddedDataSource dataSource;
@@ -96,7 +103,93 @@ public class LeaseDatabaseLockerTest {
printLockTable(connection);
}
- private void printLockTable(Connection connection) throws IOException {
- //((DefaultJDBCAdapter)jdbc.getAdapter()).printQuery(connection, "SELECT * from ACTIVEMQ_LOCK", System.err);
+ @Test
+ public void testLockAcquireRace() throws Exception {
+
+ // build a fake lock
+ final String fakeId = "Anon";
+ final Connection connection = dataSource.getConnection();
+ PreparedStatement statement = connection.prepareStatement(jdbc.getStatements().getLeaseObtainStatement());
+
+ final long now = System.currentTimeMillis();
+ statement.setString(1,fakeId);
+ statement.setLong(2, now + 30000);
+ statement.setLong(3, now);
+
+ assertEquals("we got the lease", 1, statement.executeUpdate());
+ printLockTable(connection);
+
+ final LeaseDatabaseLocker lockerA = new LeaseDatabaseLocker();
+ lockerA.setLeaseHolderId("A");
+ lockerA.configure(jdbc);
+
+ final LeaseDatabaseLocker lockerB = new LeaseDatabaseLocker();
+ lockerB.setLeaseHolderId("B");
+ lockerB.configure(jdbc);
+
+ final Set<LeaseDatabaseLocker> lockedSet = new HashSet<LeaseDatabaseLocker>();
+ ExecutorService executor = Executors.newCachedThreadPool();
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ lockerA.start();
+ lockedSet.add(lockerA);
+ printLockTable(connection);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ lockerB.start();
+ lockedSet.add(lockerB);
+ printLockTable(connection);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ // sleep for a bit till both are alive
+ TimeUnit.SECONDS.sleep(2);
+ assertTrue("no start", lockedSet.isEmpty());
+ assertFalse("A is blocked", lockerA.keepAlive());
+ assertFalse("B is blocked", lockerB.keepAlive());
+
+ LOG.info("releasing phony lock " + fakeId);
+
+ statement = connection.prepareStatement(jdbc.getStatements().getLeaseUpdateStatement());
+ statement.setString(1, null);
+ statement.setLong(2, 0l);
+ statement.setString(3, fakeId);
+ assertEquals("we released " + fakeId, 1, statement.executeUpdate());
+ LOG.info("released " + fakeId);
+ printLockTable(connection);
+
+ TimeUnit.MILLISECONDS.sleep(AbstractLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
+ assertEquals("one locker started", 1, lockedSet.size());
+
+ assertTrue("one isAlive", lockerA.keepAlive() || lockerB.keepAlive());
+
+ LeaseDatabaseLocker winner = lockedSet.iterator().next();
+ winner.stop();
+ lockedSet.remove(winner);
+
+ TimeUnit.MILLISECONDS.sleep(AbstractLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
+ assertEquals("one locker started", 1, lockedSet.size());
+
+ lockedSet.iterator().next().stop();
+ printLockTable(connection);
+ }
+
+ private void printLockTable(Connection connection) throws Exception {
+ ((DefaultJDBCAdapter)jdbc.getAdapter()).printQuery(connection, "SELECT * from ACTIVEMQ_LOCK", System.err);
}
}