You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/01/27 22:14:09 UTC
[3/3] qpid-jms git commit: Some additional producer TX tests
Some additional producer TX tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/16c8ffd6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/16c8ffd6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/16c8ffd6
Branch: refs/heads/master
Commit: 16c8ffd69c794d5f53ff9b4bffaa080906590f98
Parents: 6d50176
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Jan 27 16:13:58 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Jan 27 16:13:58 2015 -0500
----------------------------------------------------------------------
.../jms/failover/JmsTxProducerFailoverTest.java | 110 ++++++++++++++++++-
1 file changed, 108 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/16c8ffd6/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxProducerFailoverTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxProducerFailoverTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxProducerFailoverTest.java
index 07b15e4..7087a7e 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxProducerFailoverTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/failover/JmsTxProducerFailoverTest.java
@@ -17,6 +17,7 @@
package org.apache.qpid.jms.failover;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.net.URI;
@@ -29,6 +30,7 @@ import javax.jms.TransactionRolledBackException;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
import org.junit.Test;
/**
@@ -41,14 +43,118 @@ public class JmsTxProducerFailoverTest extends AmqpTestSupport {
return true;
}
+ /*
+ * Test that the TX doesn't start until the first send so a failover
+ * before then should allow Commit to work as expected.
+ */
+ @Test
+ public void testTxProducerSendAfterFailoverCommits() throws Exception {
+ URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() +")?maxReconnectDelay=100");
+
+ connection = createAmqpConnection(brokerURI);
+ connection.start();
+
+ final int MSG_COUNT = 5;
+ final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(name.getMethodName());
+ final MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(0, proxy.getQueueSize());
+
+ stopPrimaryBroker();
+ restartPrimaryBroker();
+
+ assertTrue("Should have a new connection.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return brokerService.getAdminView().getCurrentConnectionsCount() == 1;
+ }
+ }));
+
+ assertTrue("Should have a recovered producer.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return brokerService.getAdminView().getQueueProducers().length == 1;
+ }
+ }));
+
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ LOG.debug("Producer sening message #{}", i + 1);
+ producer.send(session.createTextMessage("Message: " + i));
+ }
+
+ proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(0, proxy.getQueueSize());
+
+ try {
+ session.commit();
+ LOG.info("Transacted commit ok after failover.");
+ } catch (TransactionRolledBackException rb) {
+ fail("Session commit should not have failed with TX rolled back.");
+ }
+
+ assertEquals(MSG_COUNT, proxy.getQueueSize());
+ }
+
+ /*
+ * Tests that even if all sends complete prior to failover the commit that follows
+ * will fail and the message are not present on the broker.
+ */
+ @Test
+ public void testTxProducerSendsThenFailoverCommitFails() throws Exception {
+ URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() +")?maxReconnectDelay=100");
+
+ connection = createAmqpConnection(brokerURI);
+ connection.start();
+
+ final int MSG_COUNT = 5;
+ final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(name.getMethodName());
+ final MessageProducer producer = session.createProducer(queue);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(0, proxy.getQueueSize());
+
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ LOG.debug("Producer sening message #{}", i + 1);
+ producer.send(session.createTextMessage("Message: " + i));
+ }
+
+ assertEquals(0, proxy.getQueueSize());
+
+ stopPrimaryBroker();
+ restartPrimaryBroker();
+
+ proxy = getProxyToQueue(name.getMethodName());
+ assertEquals(0, proxy.getQueueSize());
+
+ try {
+ session.commit();
+ fail("Session commit should have failed with TX rolled back.");
+ } catch (TransactionRolledBackException rb) {
+ LOG.info("Transacted commit failed after failover: {}", rb.getMessage());
+ }
+
+ assertEquals(0, proxy.getQueueSize());
+ }
+
+ /*
+ * Tests that if some sends happen and then a failover followed by additional
+ * sends the commit will fail and no messages are left on the broker.
+ */
@Test
public void testTxProducerSendWorksButCommitFails() throws Exception {
- URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() +")?maxReconnectDelay=1000");
+ URI brokerURI = new URI("failover://("+ getBrokerAmqpConnectionURI() +")?maxReconnectDelay=100");
connection = createAmqpConnection(brokerURI);
connection.start();
- final int MSG_COUNT = 20;
+ final int MSG_COUNT = 10;
final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(name.getMethodName());
final MessageProducer producer = session.createProducer(queue);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org