You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/06/20 16:14:34 UTC
svn commit: r1352120 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/store/jdbc/adapter/
test/java/org/apache/activemq/broker/ft/
Author: gtully
Date: Wed Jun 20 14:14:33 2012
New Revision: 1352120
URL: http://svn.apache.org/viewvc?rev=1352120&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-1885 - make tests more deterministic, ensure ioexception handler is used with lease locker to avoid contending masters, the resumption after an error is dependent on a keepAlive win, so the lease expiry tests w/o the io handler is invalid
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java (contents, props changed)
- copied, changed from r1350572, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java
Removed:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1352120&r1=1352119&r2=1352120&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Wed Jun 20 14:14:33 2012
@@ -1088,7 +1088,7 @@ public class DefaultJDBCAdapter implemen
}
}
-/* public void dumpTables(Connection c, String destinationName, String clientId, String
+/* public static void dumpTables(Connection c, String destinationName, String clientId, String
subscriptionName) throws SQLException {
printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
@@ -1100,23 +1100,23 @@ public class DefaultJDBCAdapter implemen
s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName);
printQuery(s,System.out); }
- public void dumpTables(Connection c) throws SQLException {
- printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
+ public static void dumpTables(java.sql.Connection c) throws SQLException {
+ printQuery(c, "Select * from ACTIVEMQ_MSGS ORDER BY ID", System.out);
printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
}
- private void printQuery(Connection c, String query, PrintStream out)
+ private static void printQuery(java.sql.Connection c, String query, java.io.PrintStream out)
throws SQLException {
printQuery(c.prepareStatement(query), out);
}
- private void printQuery(PreparedStatement s, PrintStream out)
+ private static void printQuery(java.sql.PreparedStatement s, java.io.PrintStream out)
throws SQLException {
ResultSet set = null;
try {
set = s.executeQuery();
- ResultSetMetaData metaData = set.getMetaData();
+ java.sql.ResultSetMetaData metaData = set.getMetaData();
for (int i = 1; i <= metaData.getColumnCount(); i++) {
if (i == 1)
out.print("||");
Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java (from r1350572, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java&r1=1350572&r2=1352120&rev=1352120&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java Wed Jun 20 14:14:33 2012
@@ -20,12 +20,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.jdbc.JDBCIOExceptionHandler;
-import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest extends DbRestartJDBCQueueMasterSlaveLeaseIntactTest {
- private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveLeaseIntactQuiesceTest.class);
+public class DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest extends DbRestartJDBCQueueMasterSlaveLeaseTest {
+ private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.class);
private long restartDelay = 500;
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseQuiesceTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java?rev=1352120&r1=1352119&r2=1352120&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveLeaseTest.java Wed Jun 20 14:14:33 2012
@@ -17,10 +17,14 @@
package org.apache.activemq.broker.ft;
import java.io.IOException;
+import java.sql.Connection;
import java.util.concurrent.TimeUnit;
+import junit.framework.Test;
import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.store.jdbc.DataSourceSupport;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.LeaseDatabaseLocker;
+import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,9 +50,9 @@ public class DbRestartJDBCQueueMasterSla
@Override
protected void delayTillRestartRequired() {
- LOG.info("restart db after lease has expired. While Db is offline, master should stay alive, them lease up for grabs");
+ LOG.info("delay for less than lease quantum. While Db is offline, master should stay alive");
try {
- TimeUnit.MILLISECONDS.sleep(3000);
+ TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
@@ -56,10 +60,8 @@ public class DbRestartJDBCQueueMasterSla
@Override
protected void verifyExpectedBroker(int inflightMessageCount) {
- if (inflightMessageCount == 0) {
+ if (inflightMessageCount == 0 || inflightMessageCount == failureCount + 10) {
assertEquals("connected to master", master.getBrokerName(), ((ActiveMQConnection)sendConnection).getBrokerName());
}
- // the lock is up for grabs after the expiry
}
-
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java?rev=1352120&r1=1352119&r2=1352120&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java Wed Jun 20 14:14:33 2012
@@ -16,16 +16,17 @@
*/
package org.apache.activemq.broker.ft;
-import java.sql.SQLException;
+import java.util.List;
+import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
-
+import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
+import org.apache.derby.jdbc.EmbeddedDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.derby.jdbc.EmbeddedDataSource;
public class DbRestartJDBCQueueMasterSlaveTest extends JDBCQueueMasterSlaveTest {
private static final transient Logger LOG = LoggerFactory.getLogger(DbRestartJDBCQueueMasterSlaveTest.class);
@@ -34,7 +35,7 @@ public class DbRestartJDBCQueueMasterSla
verifyExpectedBroker(inflightMessageCount);
if (++inflightMessageCount == failureCount) {
LOG.info("STOPPING DB!@!!!!");
- final EmbeddedDataSource ds = getExistingDataSource();
+ final EmbeddedDataSource ds = ((SyncDataSource)getExistingDataSource()).getDelegate();
ds.setShutdownDatabase("shutdown");
LOG.info("DB STOPPED!@!!!!");
@@ -42,9 +43,6 @@ public class DbRestartJDBCQueueMasterSla
public void run() {
delayTillRestartRequired();
ds.setShutdownDatabase("false");
- try {
- ds.getConnection().close();
- } catch (SQLException ignored) {}
LOG.info("DB RESTARTED!@!!!!");
}
};
@@ -77,7 +75,7 @@ public class DbRestartJDBCQueueMasterSla
producer.send(producerDestination, message);
sent = true;
} catch (JMSException e) {
- LOG.info("Exception on producer send:", e);
+ LOG.info("Exception on producer send for: " + message, e);
try {
Thread.sleep(2000);
} catch (InterruptedException ignored) {
@@ -86,4 +84,22 @@ public class DbRestartJDBCQueueMasterSla
} while(!sent);
}
}
+
+ @Override
+ protected Session createReceiveSession(Connection receiveConnection) throws Exception {
+ return receiveConnection.createSession(true, Session.SESSION_TRANSACTED);
+ }
+
+ @Override
+ protected void consumeMessage(Message message, List<Message> messageList) {
+ try {
+ receiveSession.commit();
+ super.consumeMessage(message, messageList);
+ } catch (JMSException e) {
+ LOG.info("Faild to commit message receipt: " + message, e);
+ try {
+ receiveSession.rollback();
+ } catch (JMSException ignored) {}
+ }
+ }
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java?rev=1352120&r1=1352119&r2=1352120&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java Wed Jun 20 14:14:33 2012
@@ -17,10 +17,12 @@
package org.apache.activemq.broker.ft;
import java.io.IOException;
+import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.URI;
import java.sql.Connection;
import java.sql.SQLException;
+import javax.sql.DataSource;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.BrokerInfo;
@@ -31,13 +33,13 @@ import org.apache.activemq.transport.Tra
import org.apache.derby.jdbc.EmbeddedDataSource;
public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest {
- protected EmbeddedDataSource sharedDs;
+ protected DataSource sharedDs;
protected String MASTER_URL = "tcp://localhost:62001";
protected String SLAVE_URL = "tcp://localhost:62002";
protected void setUp() throws Exception {
// startup db
- sharedDs = (EmbeddedDataSource) new DataSourceSupport().getDataSource();
+ sharedDs = new SyncDataSource((EmbeddedDataSource)new DataSourceSupport().getDataSource());
super.setUp();
}
@@ -97,7 +99,61 @@ public class JDBCQueueMasterSlaveTest ex
persistenceAdapter.setLockAcquireSleepInterval(500);
}
- protected EmbeddedDataSource getExistingDataSource() throws Exception {
+ protected DataSource getExistingDataSource() throws Exception {
return sharedDs;
}
+
+ // prevent concurrent calls from attempting to create the db at the same time
+ // can result in "already exists in this jvm" errors
+ class SyncDataSource implements DataSource {
+ final EmbeddedDataSource delegate;
+ SyncDataSource(EmbeddedDataSource dataSource) {
+ this.delegate = dataSource;
+ }
+ @Override
+ public Connection getConnection() throws SQLException {
+ synchronized (this) {
+ return delegate.getConnection();
+ }
+ }
+
+ @Override
+ public Connection getConnection(String username, String password) throws SQLException {
+ synchronized (this) {
+ return delegate.getConnection();
+ }
+ }
+
+ @Override
+ public PrintWriter getLogWriter() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public void setLogWriter(PrintWriter out) throws SQLException {
+ }
+
+ @Override
+ public void setLoginTimeout(int seconds) throws SQLException {
+ }
+
+ @Override
+ public int getLoginTimeout() throws SQLException {
+ return 0;
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ return false;
+ }
+
+ EmbeddedDataSource getDelegate() {
+ return delegate;
+ }
+ };
}