You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/06/20 16:14:34 UTC

svn commit: r1352120 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/jdbc/adapter/ test/java/org/apache/activemq/broker/ft/

Author: gtully
Date: Wed Jun 20 14:14:33 2012
New Revision: 1352120

URL: http://svn.apache.org/viewvc?rev=1352120&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-1885 - make tests more deterministic, ensure ioexception handler is used with lease locker to avoid contending masters, the resumption after an error is dependent on a keepAlive win, so the lease expiry tests w/o the io handler is invalid

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java   (contents, props changed)
      - copied, changed from r1350572, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java
Removed:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1352120&r1=1352119&r2=1352120&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Wed Jun 20 14:14:33 2012
@@ -1088,7 +1088,7 @@ public class DefaultJDBCAdapter implemen
         }
     }
 
-/*    public void dumpTables(Connection c, String destinationName, String clientId, String
+/*    public static void dumpTables(Connection c, String destinationName, String clientId, String
       subscriptionName) throws SQLException { 
         printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); 
         printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 
@@ -1100,23 +1100,23 @@ public class DefaultJDBCAdapter implemen
       s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
       printQuery(s,System.out); }
 
-    public void dumpTables(Connection c) throws SQLException {
-        printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
+    public static void dumpTables(java.sql.Connection c) throws SQLException {
+        printQuery(c, "Select * from ACTIVEMQ_MSGS ORDER BY ID", System.out);
         printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
     }
 
-    private void printQuery(Connection c, String query, PrintStream out)
+    private static void printQuery(java.sql.Connection c, String query, java.io.PrintStream out)
             throws SQLException {
         printQuery(c.prepareStatement(query), out);
     }
 
-    private void printQuery(PreparedStatement s, PrintStream out)
+    private static void printQuery(java.sql.PreparedStatement s, java.io.PrintStream out)
             throws SQLException {
 
         ResultSet set = null;
         try {
             set = s.executeQuery();
-            ResultSetMetaData metaData = set.getMetaData();
+            java.sql.ResultSetMetaData metaData = set.getMetaData();
             for (int i = 1; i <= metaData.getColumnCount(); i++) {
                 if (i == 1)
                     out.print("||");

Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java (from r1350572, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java&r1=1350572&r2=1352120&rev=1352120&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java Wed Jun 20 14:14:33 2012
@@ -20,12 +20,11 @@ import java.util.concurrent.TimeUnit;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.store.jdbc.JDBCIOExceptionHandler;
-import org.apache.activemq.util.DefaultIOExceptionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest extends DbRestartJDBCQueueMasterSlaveLeaseIntactTest {
-    private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.class);
+public class DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest extends DbRestartJDBCQueueMasterSlaveLeaseTest {
+    private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.class);
 
     private long restartDelay = 500;
 

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java?rev=1352120&r1=1352119&r2=1352120&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java Wed Jun 20 14:14:33 2012
@@ -17,10 +17,14 @@
 package org.apache.activemq.broker.ft;
 
 import java.io.IOException;
+import java.sql.Connection;
 import java.util.concurrent.TimeUnit;
+import junit.framework.Test;
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.store.jdbc.DataSourceSupport;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
+import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,9 +50,9 @@ public class DbRestartJDBCQueueMasterSla
     @Override
     protected void delayTillRestartRequired() {
 
-        LOG.info("restart db after lease has expired. While Db is offline, master should stay alive, them lease up for grabs");
+        LOG.info("delay for less than lease quantum. While Db is offline, master should stay alive");
         try {
-            TimeUnit.MILLISECONDS.sleep(3000);
+            TimeUnit.SECONDS.sleep(1);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
@@ -56,10 +60,8 @@ public class DbRestartJDBCQueueMasterSla
 
     @Override
     protected void verifyExpectedBroker(int inflightMessageCount) {
-        if (inflightMessageCount == 0) {
+        if (inflightMessageCount == 0 || inflightMessageCount == failureCount + 10) {
             assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
         }
-        // the lock is up for grabs after the expiry
     }
-
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java?rev=1352120&r1=1352119&r2=1352120&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java Wed Jun 20 14:14:33 2012
@@ -16,16 +16,17 @@
  */
 package org.apache.activemq.broker.ft;
 
-import java.sql.SQLException;
+import java.util.List;
+import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
-
+import javax.jms.Session;
 import org.apache.activemq.ActiveMQConnection;
+import org.apache.derby.jdbc.EmbeddedDataSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.derby.jdbc.EmbeddedDataSource;
 
 public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest {
     private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveTest.class);
@@ -34,7 +35,7 @@ public class DbRestartJDBCQueueMasterSla
         verifyExpectedBroker(inflightMessageCount);
         if (++inflightMessageCount == failureCount) {
             LOG.info("STOPPING DB!@!!!!");
-            final EmbeddedDataSource ds = getExistingDataSource();
+            final EmbeddedDataSource ds = ((SyncDataSource)getExistingDataSource()).getDelegate();
             ds.setShutdownDatabase("shutdown");
             LOG.info("DB STOPPED!@!!!!");
             
@@ -42,9 +43,6 @@ public class DbRestartJDBCQueueMasterSla
                 public void run() {
                     delayTillRestartRequired();
                     ds.setShutdownDatabase("false");
-                    try {
-                        ds.getConnection().close();
-                    } catch (SQLException ignored) {}
                     LOG.info("DB RESTARTED!@!!!!");
                 }
             };
@@ -77,7 +75,7 @@ public class DbRestartJDBCQueueMasterSla
                     producer.send(producerDestination, message);
                     sent = true;
                 } catch (JMSException e) {
-                    LOG.info("Exception on producer send:", e);
+                    LOG.info("Exception on producer send for: " + message, e);
                     try { 
                         Thread.sleep(2000);
                     } catch (InterruptedException ignored) {
@@ -86,4 +84,22 @@ public class DbRestartJDBCQueueMasterSla
             } while(!sent);
         }
     }
+
+    @Override
+    protected Session createReceiveSession(Connection receiveConnection) throws Exception {
+        return receiveConnection.createSession(true, Session.SESSION_TRANSACTED);
+    }
+
+    @Override
+    protected void consumeMessage(Message message, List<Message> messageList) {
+        try {
+            receiveSession.commit();
+            super.consumeMessage(message, messageList);
+        } catch (JMSException e) {
+            LOG.info("Faild to commit message receipt: " + message, e);
+            try {
+                receiveSession.rollback();
+            } catch (JMSException ignored) {}
+        }
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java?rev=1352120&r1=1352119&r2=1352120&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java Wed Jun 20 14:14:33 2012
@@ -17,10 +17,12 @@
 package org.apache.activemq.broker.ft;
 
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.sql.Connection;
 import java.sql.SQLException;
+import javax.sql.DataSource;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.command.BrokerInfo;
@@ -31,13 +33,13 @@ import org.apache.activemq.transport.Tra
 import org.apache.derby.jdbc.EmbeddedDataSource;
 
 public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest {
-    protected EmbeddedDataSource sharedDs;
+    protected DataSource sharedDs;
     protected String MASTER_URL = "tcp://localhost:62001";
     protected String SLAVE_URL  = "tcp://localhost:62002";
 
     protected void setUp() throws Exception {
         // startup db
-        sharedDs = (EmbeddedDataSource) new DataSourceSupport().getDataSource();
+        sharedDs = new SyncDataSource((EmbeddedDataSource)new DataSourceSupport().getDataSource());
         super.setUp();
     }
     
@@ -97,7 +99,61 @@ public class JDBCQueueMasterSlaveTest ex
         persistenceAdapter.setLockAcquireSleepInterval(500);
     }
 
-    protected EmbeddedDataSource getExistingDataSource() throws Exception {
+    protected DataSource getExistingDataSource() throws Exception {
         return sharedDs;
     }
+
+    // prevent concurrent calls from attempting to create the db at the same time
+    // can result in "already exists in this jvm" errors
+    class SyncDataSource implements DataSource {
+        final EmbeddedDataSource delegate;
+        SyncDataSource(EmbeddedDataSource dataSource) {
+            this.delegate = dataSource;
+        }
+            @Override
+            public Connection getConnection() throws SQLException {
+                synchronized (this) {
+                    return delegate.getConnection();
+                }
+            }
+
+            @Override
+            public Connection getConnection(String username, String password) throws SQLException {
+                synchronized (this) {
+                    return delegate.getConnection();
+                }
+            }
+
+            @Override
+            public PrintWriter getLogWriter() throws SQLException {
+                return null;
+            }
+
+            @Override
+            public void setLogWriter(PrintWriter out) throws SQLException {
+            }
+
+            @Override
+            public void setLoginTimeout(int seconds) throws SQLException {
+            }
+
+            @Override
+            public int getLoginTimeout() throws SQLException {
+                return 0;
+            }
+
+            @Override
+            public <T> T unwrap(Class<T> iface) throws SQLException {
+                return null;
+            }
+
+            @Override
+            public boolean isWrapperFor(Class<?> iface) throws SQLException {
+                return false;
+            }
+
+            EmbeddedDataSource getDelegate() {
+                return delegate;
+            }
+        };
 }