You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/12/17 11:50:03 UTC

svn commit: r891622 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java

Author: gtully
Date: Thu Dec 17 10:50:02 2009
New Revision: 891622

URL: http://svn.apache.org/viewvc?rev=891622&view=rev
Log:
test for replayed committed transaction on lost commit reply

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=891622&r1=891621&r2=891622&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java Thu Dec 17 10:50:02 2009
@@ -19,6 +19,7 @@
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
+import java.util.concurrent.Executors;
 
 import javax.jms.Connection;
 import javax.jms.MessageConsumer;
@@ -28,7 +29,13 @@
 import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.TransactionId;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -36,6 +43,7 @@
 // see https://issues.apache.org/activemq/browse/AMQ-2473
 public class FailoverTransactionTest {
 	
+    private static final Log LOG = LogFactory.getLog(FailoverTransactionTest.class);
 	private static final String QUEUE_NAME = "test.FailoverTransactionTest";
 	private String url = "tcp://localhost:61616";
 	BrokerService broker;
@@ -53,13 +61,18 @@
 	}
 	
 	public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
-	    broker = new BrokerService();
-        broker.setUseJmx(false);
-        broker.addConnector(url);
-        broker.setDeleteAllMessagesOnStartup(true);
+	    broker = createBroker(deleteAllMessagesOnStartup);
         broker.start();
 	}
-	
+
+	public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {   
+	    broker = new BrokerService();
+	    broker.setUseJmx(false);
+	    broker.addConnector(url);
+	    broker.setDeleteAllMessagesOnStartup(true);
+	    return broker;
+	}
+
 	@Test
 	public void testFailoverProducerCloseBeforeTransaction() throws Exception {
 		
@@ -88,6 +101,69 @@
 		connection.close();
 	}
 	
+    @Test
+    public void testFailoverCommitReplyLost() throws Exception {
+        
+        broker.stop();
+        
+        broker = createBroker(true);
+        broker.setPlugins(new BrokerPlugin[] {
+                new BrokerPluginSupport() {
+                    @Override
+                    public void commitTransaction(ConnectionContext context,
+                            TransactionId xid, boolean onePhase) throws Exception {
+                        super.commitTransaction(context, xid, onePhase);
+                        // so commit will hang as if reply is lost
+                        context.setDontSendReponse(true);
+                        Executors.newSingleThreadExecutor().execute(new Runnable() {   
+                            public void run() {
+                                LOG.info("Stopping broker post commit...");
+                                try {
+                                    broker.stop();
+                                } catch (Exception e) {
+                                    e.printStackTrace();
+                                }
+                            }
+                        });
+                   }   
+                }
+        });
+        broker.start();
+        
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+        Connection connection = cf.createConnection();
+        connection.start();
+        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        Queue destination = session.createQueue(QUEUE_NAME);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+        
+        TextMessage message = session.createTextMessage("Test message");
+        producer.send(message);
+
+        // broker will die on commit reply so this will hang till restart
+        Executors.newSingleThreadExecutor().execute(new Runnable() {   
+            public void run() {
+                LOG.info("doing async commit...");
+                try {
+                    session.commit();
+                    LOG.info("done async commit");
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+       
+        broker.waitUntilStopped();
+        startBroker(false);
+
+        assertNotNull("we got the message", consumer.receive(20000));
+        assertNull("we got just one message", consumer.receive(2000));
+        session.commit();   
+        connection.close();
+    }
+
 	@Test
 	public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {