You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/10/05 17:04:58 UTC
svn commit: r821826 - in
/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack:
AcknowledgeAfterFailoverOnMessageTest.java
AcknowledgeAfterFailoverTest.java AcknowledgeOnMessageTest.java
Author: ritchiem
Date: Mon Oct 5 15:04:57 2009
New Revision: 821826
URL: http://svn.apache.org/viewvc?rev=821826&view=rev
Log:
QPID-1816 : Add testing for ack after failover, and testing for acknowledgement in a dirty session after failover.
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java?rev=821826&r1=821825&r2=821826&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java Mon Oct 5 15:04:57 2009
@@ -20,17 +20,26 @@
*/
package org.apache.qpid.test.unit.ack;
-import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jms.ConnectionListener;
import javax.jms.Connection;
-import javax.jms.Session;
-import javax.jms.Message;
import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TransactionRolledBackException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
-public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageTest
+public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageTest implements ConnectionListener
{
+ protected CountDownLatch _failoverCompleted = new CountDownLatch(1);
+ private MessageListener _listener = null;
+
@Override
public void setUp() throws Exception
{
@@ -38,6 +47,27 @@
NUM_MESSAGES = 10;
}
+ /**
+ * Override default init to add connectionListener so we can verify that
+ * failover took place
+ *
+ * @param transacted create a transacted session for this test
+ * @param mode if not transacted what ack mode to use for this test
+ *
+ * @throws Exception if a problem occured during test setup.
+ */
+ @Override
+ public void init(boolean transacted, int mode) throws Exception
+ {
+ super.init(transacted, mode);
+ ((AMQConnection) _connection).setConnectionListener(this);
+ // Override the listener for the dirtyAck testing.
+ if (_listener != null)
+ {
+ _consumer.setMessageListener(_listener);
+ }
+ }
+
protected void prepBroker(int count) throws Exception
{
//Stop the connection whilst we repopulate the broker, or the no_ack
@@ -107,4 +137,220 @@
}
}
+
+ int msgCount = 0;
+ boolean cleaned = false;
+
+ class DirtyAckingHandler implements MessageListener
+ {
+ /**
+ * Validate first message but do nothing with it.
+ *
+ * Failover
+ *
+ * The receive the message again
+ *
+ * @param message
+ */
+ public void onMessage(Message message)
+ {
+ // Stop processing if we have an error and had to stop running.
+ if (_receviedAll.getCount() == 0)
+ {
+ _logger.debug("Dumping msgs due to error(" + _causeOfFailure.get().getMessage() + "):" + message);
+ return;
+ }
+
+ try
+ {
+ // Check we have the next message as expected
+ assertNotNull("Message " + msgCount + " not correctly received.", message);
+ assertEquals("Incorrect message received", msgCount, message.getIntProperty(INDEX));
+
+ if (msgCount == 0 && _failoverCompleted.getCount() != 0)
+ {
+ // This is the first message we've received so lets fail the broker
+
+ failBroker(getFailingPort());
+
+ repopulateBroker();
+
+ _logger.error("Received first msg so failing over");
+
+ return;
+ }
+
+ msgCount++;
+
+ // Don't acknowlege the first message after failover so we can commit
+ // them together
+ if (msgCount == 1)
+ {
+ _logger.error("Received first msg after failover ignoring:" + msgCount);
+
+ // Acknowledge the first message if we are now on the cleaned pass
+ if (cleaned)
+ {
+ _receviedAll.countDown();
+ }
+
+ return;
+ }
+
+ if (_consumerSession.getTransacted())
+ {
+ try
+ {
+ _consumerSession.commit();
+ if (!cleaned)
+ {
+ fail("Session is dirty we should get an TransactionRolledBackException");
+ }
+ }
+ catch (TransactionRolledBackException trbe)
+ {
+ //expected path
+ }
+ }
+ else
+ {
+ try
+ {
+ message.acknowledge();
+ if (!cleaned)
+ {
+ fail("Session is dirty we should get an IllegalStateException");
+ }
+ }
+ catch (javax.jms.IllegalStateException ise)
+ {
+ assertEquals("Incorrect Exception thrown", "has failed over", ise.getMessage());
+ // Recover the sesion and try again.
+ _consumerSession.recover();
+ }
+ }
+
+ // Acknowledge the last message if we are in a clean state
+ // this will then trigger test teardown.
+ if (cleaned)
+ {
+ _receviedAll.countDown();
+ }
+
+ //Reset message count so we can try again.
+ msgCount = 0;
+ cleaned = true;
+ }
+ catch (Exception e)
+ {
+ // If something goes wrong stop and notifiy main thread.
+ fail(e);
+ }
+ }
+ }
+
+ /**
+ * Test that Acking/Committing a message received before failover causes
+ * an exception at commit/ack time.
+ *
+ * Expected behaviour is that in:
+ * * tx mode commit() throws a transacted RolledBackException
+ * * client ack mode throws an IllegalStateException
+ *
+ * @param transacted is this session trasacted
+ * @param mode What ack mode should be used if not trasacted
+ *
+ * @throws Exception if something goes wrong.
+ */
+ protected void testDirtyAcking(boolean transacted, int mode) throws Exception
+ {
+ NUM_MESSAGES = 2;
+ _listener = new DirtyAckingHandler();
+
+ super.testAcking(transacted, mode);
+ }
+
+ public void testDirtyClientAck() throws Exception
+ {
+ testDirtyAcking(false, Session.CLIENT_ACKNOWLEDGE);
+ }
+
+ public void testDirtyAckingTransacted() throws Exception
+ {
+ testDirtyAcking(true, Session.SESSION_TRANSACTED);
+ }
+
+ private void repopulateBroker() throws Exception
+ {
+ // Repopulate this new broker so we can test what happends after failover
+
+ //Get the connection to the first (main port) broker.
+ Connection connection = getConnection();
+ // Use a transaction to send messages so we can be sure they arrive.
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ // ensure destination is created.
+ session.createConsumer(_queue).close();
+
+ sendMessage(session, _queue, NUM_MESSAGES);
+
+ assertEquals("Wrong number of messages on queue", NUM_MESSAGES,
+ ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
+
+ connection.close();
+ }
+
+ // AMQConnectionListener Interface.. used so we can validate that we
+ // actually failed over.
+
+ public void bytesSent(long count)
+ {
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ //Allow failover
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ //Allow failover
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _failoverCompleted.countDown();
+ }
+
+ /**
+ * Override so we can block until failover has completd
+ *
+ * @param port
+ */
+ @Override
+ public void failBroker(int port)
+ {
+ super.failBroker(port);
+
+ try
+ {
+ if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS))
+ {
+ // Use an exception so that we use our local fail() that notifies the main thread of failure
+ throw new Exception("Failover did not occur in specified time:" + DEFAULT_FAILOVER_TIME);
+ }
+
+ }
+ catch (Exception e)
+ {
+ // Use an exception so that we use our local fail() that notifies the main thread of failure
+ fail(e);
+ }
+ }
+
}
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java?rev=821826&r1=821825&r2=821826&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverTest.java Mon Oct 5 15:04:57 2009
@@ -54,6 +54,14 @@
NUM_MESSAGES = 10;
}
+ /**
+ * Override default init to add connectionListener so we can verify that
+ * failover took place
+ *
+ * @param transacted create a transacted session for this test
+ * @param mode if not transacted what ack mode to use for this test
+ * @throws Exception if a problem occured during test setup.
+ */
@Override
protected void init(boolean transacted, int mode) throws Exception
{
@@ -244,75 +252,6 @@
testDirtyAcking(true, Session.SESSION_TRANSACTED);
}
- /**
- * If a transacted session has failed over whilst it has uncommitted sent
- * data then we need to throw a TransactedRolledbackException on commit()
- *
- * The alternative would be to maintain a replay buffer so that the message
- * could be resent. This is not currently implemented
- *
- * @throws Exception if something goes wrong.
- */
- public void testDirtySendingTransacted() throws Exception
- {
- Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
-
- // Ensure we get failover notifications
- ((AMQConnection) _connection).setConnectionListener(this);
-
- MessageProducer producer = producerSession.createProducer(_queue);
-
- // Create and send message 0
- Message msg = producerSession.createMessage();
- msg.setIntProperty(INDEX, 0);
- producer.send(msg);
-
- // DON'T commit message .. fail connection
-
- failBroker(getFailingPort());
-
- // Ensure destination exists for sending
- producerSession.createConsumer(_queue).close();
-
- // Send the next message
- msg.setIntProperty(INDEX, 1);
- try
- {
- producer.send(msg);
- fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException.");
- }
- catch (JMSException jmse)
- {
- assertEquals("Early warning of dirty session not correct",
- "Failover has occurred and session is dirty so unable to send.", jmse.getMessage());
- }
-
- // Ignore that the session is dirty and attempt to commit to validate the
- // exception is thrown. AND that the above failure notification did NOT
- // clean up the session.
-
- try
- {
- producerSession.commit();
- fail("Session is dirty we should get an TransactionRolledBackException");
- }
- catch (TransactionRolledBackException trbe)
- {
- // Normal path.
- }
-
- // Resend messages
- msg.setIntProperty(INDEX, 0);
- producer.send(msg);
- msg.setIntProperty(INDEX, 1);
- producer.send(msg);
-
- producerSession.commit();
-
- assertEquals("Wrong number of messages on queue", 2,
- ((AMQSession) producerSession).getQueueDepth((AMQDestination) _queue));
- }
-
// AMQConnectionListener Interface.. used so we can validate that we
// actually failed over.
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java?rev=821826&r1=821825&r2=821826&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java Mon Oct 5 15:04:57 2009
@@ -32,14 +32,22 @@
public class AcknowledgeOnMessageTest extends AcknowledgeTest implements MessageListener
{
- private CountDownLatch _receviedAll;
- private AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null);
+ protected CountDownLatch _receviedAll;
+ protected AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null);
@Override
public void setUp() throws Exception
{
super.setUp();
+ }
+
+ @Override
+ public void init(boolean transacted, int mode) throws Exception
+ {
_receviedAll = new CountDownLatch(NUM_MESSAGES);
+
+ super.init(transacted, mode);
+ _consumer.setMessageListener(this);
}
/**
@@ -51,13 +59,22 @@
protected void testAcking(boolean transacted, int mode) throws Exception
{
init(transacted, mode);
- _consumer.setMessageListener(this);
_connection.start();
if (!_receviedAll.await(10000L, TimeUnit.MILLISECONDS))
{
- fail("All messages not received.");
+ // Check to see if we ended due to an exception in the onMessage handler
+ Exception cause = _causeOfFailure.get();
+ if (cause != null)
+ {
+ cause.printStackTrace();
+ fail(cause.getMessage());
+ }
+ else
+ {
+ fail("All messages not received:" + _receviedAll.getCount() + "/" + NUM_MESSAGES);
+ }
}
// Check to see if we ended due to an exception in the onMessage handler
@@ -102,11 +119,12 @@
/**
* Pass the given exception back to the waiting thread to fail the test run.
- * @param e The exception that is causing the test to fail.
+ *
+ * @param e The exception that is causing the test to fail.
*/
protected void fail(Exception e)
{
- _causeOfFailure.set(e);
+ _causeOfFailure.set(e);
// End the test.
while (_receviedAll.getCount() != 0)
{
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org