You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/01/27 22:14:09 UTC

[3/3] qpid-jms git commit: Some additional producer TX tests

Some additional producer TX tests

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/16c8ffd6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/16c8ffd6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/16c8ffd6

Branch: refs/heads/master
Commit: 16c8ffd69c794d5f53ff9b4bffaa080906590f98
Parents: 6d50176
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Jan 27 16:13:58 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Jan 27 16:13:58 2015 -0500

----------------------------------------------------------------------
 .../jms/failover/JmsTxProducerFailoverTest.java | 110 ++++++++++++++++++-
 1 file changed, 108 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/16c8ffd6/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxProducerFailoverTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxProducerFailoverTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxProducerFailoverTest.java
index 07b15e4..7087a7e 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxProducerFailoverTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxProducerFailoverTest.java
@@ -17,6 +17,7 @@
 package org.apache.qpid.jms.failover;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.net.URI;
@@ -29,6 +30,7 @@ import javax.jms.TransactionRolledBackException;
 
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
 import org.junit.Test;
 
 /**
@@ -41,14 +43,118 @@ public class JmsTxProducerFailoverTest extends AmqpTestSupport {
         return true;
     }
 
+    /*
+     * Test that the TX doesn't start until the first send so a failover
+     * before then should allow Commit to work as expected.
+     */
+    @Test
+    public void testTxProducerSendAfterFailoverCommits() throws Exception {
+        URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() +")?maxReconnectDelay=100");
+
+        connection = createAmqpConnection(brokerURI);
+        connection.start();
+
+        final int MSG_COUNT = 5;
+        final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Queue queue = session.createQueue(name.getMethodName());
+        final MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(0, proxy.getQueueSize());
+
+        stopPrimaryBroker();
+        restartPrimaryBroker();
+
+        assertTrue("Should have a new connection.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getCurrentConnectionsCount() == 1;
+            }
+        }));
+
+        assertTrue("Should have a recovered producer.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerService.getAdminView().getQueueProducers().length == 1;
+            }
+        }));
+
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            LOG.debug("Producer sening message #{}", i + 1);
+            producer.send(session.createTextMessage("Message: " + i));
+        }
+
+        proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(0, proxy.getQueueSize());
+
+        try {
+            session.commit();
+            LOG.info("Transacted commit ok after failover.");
+        } catch (TransactionRolledBackException rb) {
+            fail("Session commit should not have failed with TX rolled back.");
+        }
+
+        assertEquals(MSG_COUNT, proxy.getQueueSize());
+    }
+
+    /*
+     * Tests that even if all sends complete prior to failover the commit that follows
+     * will fail and the message are not present on the broker.
+     */
+    @Test
+    public void testTxProducerSendsThenFailoverCommitFails() throws Exception {
+        URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() +")?maxReconnectDelay=100");
+
+        connection = createAmqpConnection(brokerURI);
+        connection.start();
+
+        final int MSG_COUNT = 5;
+        final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Queue queue = session.createQueue(name.getMethodName());
+        final MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+        QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(0, proxy.getQueueSize());
+
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            LOG.debug("Producer sening message #{}", i + 1);
+            producer.send(session.createTextMessage("Message: " + i));
+        }
+
+        assertEquals(0, proxy.getQueueSize());
+
+        stopPrimaryBroker();
+        restartPrimaryBroker();
+
+        proxy = getProxyToQueue(name.getMethodName());
+        assertEquals(0, proxy.getQueueSize());
+
+        try {
+            session.commit();
+            fail("Session commit should have failed with TX rolled back.");
+        } catch (TransactionRolledBackException rb) {
+            LOG.info("Transacted commit failed after failover: {}", rb.getMessage());
+        }
+
+        assertEquals(0, proxy.getQueueSize());
+    }
+
+    /*
+     * Tests that if some sends happen and then a failover followed by additional
+     * sends the commit will fail and no messages are left on the broker.
+     */
     @Test
     public void testTxProducerSendWorksButCommitFails() throws Exception {
-        URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() +")?maxReconnectDelay=1000");
+        URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() +")?maxReconnectDelay=100");
 
         connection = createAmqpConnection(brokerURI);
         connection.start();
 
-        final int MSG_COUNT = 20;
+        final int MSG_COUNT = 10;
         final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
         Queue queue = session.createQueue(name.getMethodName());
         final MessageProducer producer = session.createProducer(queue);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org