You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/10/24 21:59:44 UTC

svn commit: r1401849 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/activemq/store/jdbc/adapter/ test/java/org/apache/activemq/store/jdbc/

Author: gtully
Date: Wed Oct 24 19:59:43 2012
New Revision: 1401849

URL: http://svn.apache.org/viewvc?rev=1401849&view=rev
Log:
test case for https://issues.apache.org/jira/browse/AMQ-4122 - that does not show a problem atm

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java?rev=1401849&r1=1401848&r2=1401849&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java Wed Oct 24 19:59:43 2012
@@ -94,7 +94,7 @@ public class LeaseDatabaseLocker extends
                 reportLeasOwnerShipAndDuration(connection);
 
             } catch (Exception e) {
-                LOG.debug(getLeaseHolderId() + " lease aquire failure: "+ e, e);
+                LOG.debug(getLeaseHolderId() + " lease acquire failure: "+ e, e);
             } finally {
                 close(statement);
                 close(connection);
@@ -104,7 +104,7 @@ public class LeaseDatabaseLocker extends
             TimeUnit.MILLISECONDS.sleep(lockAcquireSleepInterval);
         }
         if (stopping) {
-            throw new RuntimeException(getLeaseHolderId() + " failing lease aquire due to stop");
+            throw new RuntimeException(getLeaseHolderId() + " failing lease acquire due to stop");
         }
 
         LOG.info(getLeaseHolderId() + ", becoming the master on dataSource: " + dataSource);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1401849&r1=1401848&r2=1401849&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Wed Oct 24 19:59:43 2012
@@ -17,6 +17,7 @@
 package org.apache.activemq.store.jdbc.adapter;
 
 import java.io.IOException;
+import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -1090,7 +1091,7 @@ public class DefaultJDBCAdapter implemen
         }
     }
 
-/*    public static void dumpTables(Connection c, String destinationName, String clientId, String
+    public static void dumpTables(Connection c, String destinationName, String clientId, String
       subscriptionName) throws SQLException { 
         printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); 
         printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 
@@ -1107,12 +1108,12 @@ public class DefaultJDBCAdapter implemen
         printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
     }
 
-    private static void printQuery(java.sql.Connection c, String query, java.io.PrintStream out)
+    public static void printQuery(java.sql.Connection c, String query, java.io.PrintStream out)
             throws SQLException {
         printQuery(c.prepareStatement(query), out);
     }
 
-    private static void printQuery(java.sql.PreparedStatement s, java.io.PrintStream out)
+    public static void printQuery(java.sql.PreparedStatement s, java.io.PrintStream out)
             throws SQLException {
 
         ResultSet set = null;
@@ -1143,6 +1144,6 @@ public class DefaultJDBCAdapter implemen
             } catch (Throwable ignore) {
             }
         }
-    }  */
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java?rev=1401849&r1=1401848&r2=1401849&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/LeaseDatabaseLockerTest.java Wed Oct 24 19:59:43 2012
@@ -16,24 +16,31 @@
  */
 package org.apache.activemq.store.jdbc;
 
-import java.io.IOException;
 import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.activemq.broker.AbstractLocker;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
 import org.apache.derby.jdbc.EmbeddedDataSource;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
-import static junit.framework.Assert.assertFalse;
-import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.*;
 
 
 public class LeaseDatabaseLockerTest {
 
+    private static final Logger LOG = LoggerFactory.getLogger(LeaseDatabaseLockerTest.class);
+
     JDBCPersistenceAdapter jdbc;
     BrokerService brokerService;
     EmbeddedDataSource dataSource;
@@ -96,7 +103,93 @@ public class LeaseDatabaseLockerTest {
         printLockTable(connection);
     }
 
-    private void printLockTable(Connection connection) throws IOException {
-        //((DefaultJDBCAdapter)jdbc.getAdapter()).printQuery(connection, "SELECT * from ACTIVEMQ_LOCK", System.err);
+    @Test
+    public void testLockAcquireRace() throws Exception {
+
+        // build a fake lock
+        final String fakeId = "Anon";
+        final Connection connection = dataSource.getConnection();
+        PreparedStatement statement = connection.prepareStatement(jdbc.getStatements().getLeaseObtainStatement());
+
+        final long now = System.currentTimeMillis();
+        statement.setString(1,fakeId);
+        statement.setLong(2, now + 30000);
+        statement.setLong(3, now);
+
+        assertEquals("we got the lease", 1, statement.executeUpdate());
+        printLockTable(connection);
+
+        final LeaseDatabaseLocker lockerA = new LeaseDatabaseLocker();
+        lockerA.setLeaseHolderId("A");
+        lockerA.configure(jdbc);
+
+        final LeaseDatabaseLocker lockerB = new LeaseDatabaseLocker();
+        lockerB.setLeaseHolderId("B");
+        lockerB.configure(jdbc);
+
+        final Set<LeaseDatabaseLocker> lockedSet = new HashSet<LeaseDatabaseLocker>();
+        ExecutorService executor = Executors.newCachedThreadPool();
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    lockerA.start();
+                    lockedSet.add(lockerA);
+                    printLockTable(connection);
+
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    lockerB.start();
+                    lockedSet.add(lockerB);
+                    printLockTable(connection);
+
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        // sleep for a bit till both are alive
+        TimeUnit.SECONDS.sleep(2);
+        assertTrue("no start", lockedSet.isEmpty());
+        assertFalse("A is blocked", lockerA.keepAlive());
+        assertFalse("B is blocked", lockerB.keepAlive());
+
+        LOG.info("releasing phony lock " + fakeId);
+
+        statement = connection.prepareStatement(jdbc.getStatements().getLeaseUpdateStatement());
+        statement.setString(1, null);
+        statement.setLong(2, 0l);
+        statement.setString(3, fakeId);
+        assertEquals("we released " + fakeId, 1, statement.executeUpdate());
+        LOG.info("released " + fakeId);
+        printLockTable(connection);
+
+        TimeUnit.MILLISECONDS.sleep(AbstractLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
+        assertEquals("one locker started", 1, lockedSet.size());
+
+        assertTrue("one isAlive", lockerA.keepAlive() || lockerB.keepAlive());
+
+        LeaseDatabaseLocker winner = lockedSet.iterator().next();
+        winner.stop();
+        lockedSet.remove(winner);
+
+        TimeUnit.MILLISECONDS.sleep(AbstractLocker.DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL);
+        assertEquals("one locker started", 1, lockedSet.size());
+
+        lockedSet.iterator().next().stop();
+        printLockTable(connection);
+    }
+
+    private void printLockTable(Connection connection) throws Exception {
+        ((DefaultJDBCAdapter)jdbc.getAdapter()).printQuery(connection, "SELECT * from ACTIVEMQ_LOCK", System.err);
     }
 }