You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/12/17 11:50:03 UTC
svn commit: r891622 -
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Author: gtully
Date: Thu Dec 17 10:50:02 2009
New Revision: 891622
URL: http://svn.apache.org/viewvc?rev=891622&view=rev
Log:
test for replayed committed transaction on lost commit reply
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=891622&r1=891621&r2=891622&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java Thu Dec 17 10:50:02 2009
@@ -19,6 +19,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import java.util.concurrent.Executors;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
@@ -28,7 +29,13 @@
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.TransactionId;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -36,6 +43,7 @@
// see https://issues.apache.org/activemq/browse/AMQ-2473
public class FailoverTransactionTest {
+ private static final Log LOG = LogFactory.getLog(FailoverTransactionTest.class);
private static final String QUEUE_NAME = "test.FailoverTransactionTest";
private String url = "tcp://localhost:61616";
BrokerService broker;
@@ -53,13 +61,18 @@
}
public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
- broker = new BrokerService();
- broker.setUseJmx(false);
- broker.addConnector(url);
- broker.setDeleteAllMessagesOnStartup(true);
+ broker = createBroker(deleteAllMessagesOnStartup);
broker.start();
}
-
+
+ public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
+ broker = new BrokerService();
+ broker.setUseJmx(false);
+ broker.addConnector(url);
+ broker.setDeleteAllMessagesOnStartup(true);
+ return broker;
+ }
+
@Test
public void testFailoverProducerCloseBeforeTransaction() throws Exception {
@@ -88,6 +101,69 @@
connection.close();
}
+ @Test
+ public void testFailoverCommitReplyLost() throws Exception {
+
+ broker.stop();
+
+ broker = createBroker(true);
+ broker.setPlugins(new BrokerPlugin[] {
+ new BrokerPluginSupport() {
+ @Override
+ public void commitTransaction(ConnectionContext context,
+ TransactionId xid, boolean onePhase) throws Exception {
+ super.commitTransaction(context, xid, onePhase);
+ // so commit will hang as if reply is lost
+ context.setDontSendReponse(true);
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ LOG.info("Stopping broker post commit...");
+ try {
+ broker.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+ }
+ });
+ broker.start();
+
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+ Connection connection = cf.createConnection();
+ connection.start();
+ final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(QUEUE_NAME);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+
+ TextMessage message = session.createTextMessage("Test message");
+ producer.send(message);
+
+ // broker will die on commit reply so this will hang till restart
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ LOG.info("doing async commit...");
+ try {
+ session.commit();
+ LOG.info("done async commit");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ broker.waitUntilStopped();
+ startBroker(false);
+
+ assertNotNull("we got the message", consumer.receive(20000));
+ assertNull("we got just one message", consumer.receive(2000));
+ session.commit();
+ connection.close();
+ }
+
@Test
public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {