You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2010/04/12 17:42:04 UTC
svn commit: r933281 - in /qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/
systests/src/main/java/org/apache/qpid/test/unit/ack/
Author: ritchiem
Date: Mon Apr 12 15:42:04 2010
New Revision: 933281
URL: http://svn.apache.org/viewvc?rev=933281&view=rev
Log:
QPID-2346 : Addressed the problems with AcknowledgeAfterFailoverOnMessageTest, The issues were the same as AckAfterFailoverTest. So used same prepBroker approach. Test also need timeout increased as broke restarts took to long for a message to be sent/received. Finally the last change was thatthe queue needed to be re-declared so that the final queue depth check would have a queue to query.
Merged from 0.5.x-dev @ r907004
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=933281&r1=933280&r2=933281&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Apr 12 15:42:04 2010
@@ -486,6 +486,12 @@ public abstract class AMQSession<C exten
{
_queue = new FlowControllingBlockingQueue(_prefetchHighMark, null);
}
+
+ // Add creation logging to tie in with the existing close logging
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Created session:" + this);
+ }
}
/**
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java?rev=933281&r1=933280&r2=933281&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeAfterFailoverOnMessageTest.java Mon Apr 12 15:42:04 2010
@@ -24,6 +24,7 @@ import org.apache.qpid.client.AMQConnect
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.util.FileUtils;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -33,7 +34,25 @@ import javax.jms.Session;
import javax.jms.TransactionRolledBackException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.io.File;
+/**
+ * The AcknowlegeAfterFailoverOnMessageTests
+ *
+ * Extends the OnMessage AcknowledgeTests to validate that after the client has
+ * failed over that the client can still receive and ack messages.
+ *
+ * All the AcknowledgeTest ack modes are exercised here though some are disabled
+ * due to know issues (e.g. DupsOk, AutoAck : QPID-143 and the clientAck
+ * and dirtyClientAck due to QPID-1816)
+ *
+ * This class has two main test structures, overrides of AcknowledgeOnMessageTest
+ * to perform the clean acking based on session ack mode and a series of dirty
+ * ack tests that test what happends if you receive a message then try and ack
+ * AFTER you have failed over.
+ *
+ *
+ */
public class AcknowledgeAfterFailoverOnMessageTest extends AcknowledgeOnMessageTest implements ConnectionListener
{
@@ -68,61 +87,96 @@ public class AcknowledgeAfterFailoverOnM
}
}
- protected void prepBroker(int count) throws Exception
- {
- //Stop the connection whilst we repopulate the broker, or the no_ack
- // test will drain the msgs before we can check we put the right number
- // back on again.
-// _connection.stop();
-
- Connection connection = getConnection();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- // ensure destination is created.
- session.createConsumer(_queue).close();
-
- sendMessage(session, _queue, count, NUM_MESSAGES - count, 0);
-
- if (_consumerSession.getAcknowledgeMode() != AMQSession.NO_ACKNOWLEDGE)
- {
- assertEquals("Wrong number of messages on queue", count,
- ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
- }
-
- connection.close();
-
-// _connection.start();
- }
-
- @Override
- public void doAcknowlegement(Message msg) throws JMSException
+ /**
+ * Prepare the broker for the next round.
+ *
+ * Called after acknowledging the messsage this method shuts the current
+ * broker down connnects to the new broker and send a new message for the
+ * client to failover to and receive.
+ *
+ * It ends by restarting the orignal broker so that the cycle can repeat.
+ *
+ * When we are able to cluster the java broker then will not need to do the
+ * message repopulation or QPID_WORK clearing. All that we will need to do
+ * is send the initial NUM_MESSAGES during startup and then bring the
+ * brokers down at the right time to cause the client to fail between them.
+ *
+ * @param index
+ * @throws Exception
+ */
+ protected void prepBroker(int index) throws Exception
{
- //Acknowledge current message
- super.doAcknowlegement(msg);
+ // Alternate killing the broker based on the message index we are at.
- int msgCount = msg.getIntProperty(INDEX);
-
- if (msgCount % 2 == 0)
+ if (index % 2 == 0)
{
failBroker(getFailingPort());
+ // Clean up the failed broker
+ FileUtils.delete(new File(System.getProperty("QPID_WORK") + "/" + getFailingPort()), true);
}
else
{
failBroker(getPort());
+ // Clean up the failed broker
+ FileUtils.delete(new File(System.getProperty("QPID_WORK") + "/" + getPort()), true);
}
+ _failoverCompleted = new CountDownLatch(1);
+
+ _logger.info("AAFOMT: prepNewBroker for message send");
+ Connection connection = getConnection();
+
try
{
- prepBroker(NUM_MESSAGES - msgCount - 1);
+
+ //Stop the connection whilst we repopulate the broker, or the no_ack
+ // test will drain the msgs before we can check we put the right number
+ // back on again.
+
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ // ensure destination is created.
+ session.createConsumer(_queue).close();
+
+
+ // If this is the last message then we can skip the send.
+ // But we MUST ensure that we have created the queue with the
+ // above createConsumer(_queue).close() as the test will end by
+ // testing the queue depth which will fail if we don't ensure we
+ // declare the queue.
+ // index is 0 based so we need to check +1 against NUM_MESSAGES
+ if ((index + 1) == NUM_MESSAGES)
+ {
+ return;
+ }
+
+
+ sendMessage(session, _queue, 1, index + 1, 0);
+
+ // Validate that we have the message on the queue
+ // In NoAck mode though the messasge may already have been sent to
+ // the client so we have to skip the vaildation.
+ if (_consumerSession.getAcknowledgeMode() != AMQSession.NO_ACKNOWLEDGE)
+ {
+ assertEquals("Wrong number of messages on queue", 1,
+ ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
+ }
+
+
}
catch (Exception e)
{
fail("Unable to prep new broker," + e.getMessage());
}
+ finally
+ {
+ connection.close();
+ }
try
{
- if (msgCount % 2 == 0)
+ //Restart the broker
+ if (index % 2 == 0)
{
startBroker(getFailingPort());
}
@@ -138,8 +192,27 @@ public class AcknowledgeAfterFailoverOnM
}
- int msgCount = 0;
- boolean cleaned = false;
+ @Override
+ public void doAcknowlegement(Message msg) throws JMSException
+ {
+ //Acknowledge current message
+ super.doAcknowlegement(msg);
+
+ try
+ {
+ prepBroker(msg.getIntProperty(INDEX));
+ }
+ catch (Exception e)
+ {
+ // Provide details of what went wrong with the stack trace
+ e.printStackTrace();
+ fail("Unable to prep new broker," + e);
+ }
+ }
+
+ // Instance varilable for DirtyAcking test
+ int _msgCount = 0;
+ boolean _cleaned = false;
class DirtyAckingHandler implements MessageListener
{
@@ -164,10 +237,10 @@ public class AcknowledgeAfterFailoverOnM
try
{
// Check we have the next message as expected
- assertNotNull("Message " + msgCount + " not correctly received.", message);
- assertEquals("Incorrect message received", msgCount, message.getIntProperty(INDEX));
+ assertNotNull("Message " + _msgCount + " not correctly received.", message);
+ assertEquals("Incorrect message received", _msgCount, message.getIntProperty(INDEX));
- if (msgCount == 0 && _failoverCompleted.getCount() != 0)
+ if (_msgCount == 0 && _failoverCompleted.getCount() != 0)
{
// This is the first message we've received so lets fail the broker
@@ -180,16 +253,16 @@ public class AcknowledgeAfterFailoverOnM
return;
}
- msgCount++;
+ _msgCount++;
// Don't acknowlege the first message after failover so we can commit
// them together
- if (msgCount == 1)
+ if (_msgCount == 1)
{
- _logger.error("Received first msg after failover ignoring:" + msgCount);
+ _logger.error("Received first msg after failover ignoring:" + _msgCount);
// Acknowledge the first message if we are now on the cleaned pass
- if (cleaned)
+ if (_cleaned)
{
_receivedAll.countDown();
}
@@ -202,7 +275,7 @@ public class AcknowledgeAfterFailoverOnM
try
{
_consumerSession.commit();
- if (!cleaned)
+ if (!_cleaned)
{
fail("Session is dirty we should get an TransactionRolledBackException");
}
@@ -217,7 +290,7 @@ public class AcknowledgeAfterFailoverOnM
try
{
message.acknowledge();
- if (!cleaned)
+ if (!_cleaned)
{
fail("Session is dirty we should get an IllegalStateException");
}
@@ -232,14 +305,14 @@ public class AcknowledgeAfterFailoverOnM
// Acknowledge the last message if we are in a clean state
// this will then trigger test teardown.
- if (cleaned)
+ if (_cleaned)
{
_receivedAll.countDown();
}
//Reset message count so we can try again.
- msgCount = 0;
- cleaned = true;
+ _msgCount = 0;
+ _cleaned = true;
}
catch (Exception e)
{
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java?rev=933281&r1=933280&r2=933281&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeOnMessageTest.java Mon Apr 12 15:42:04 2010
@@ -32,6 +32,10 @@ import java.util.concurrent.CountDownLat
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+/**
+ * This test extends the synchronous AcknowledgeTest to use a MessageListener
+ * and receive messages asynchronously.
+ */
public class AcknowledgeOnMessageTest extends AcknowledgeTest implements MessageListener
{
protected CountDownLatch _receivedAll;
@@ -43,6 +47,13 @@ public class AcknowledgeOnMessageTest ex
super.setUp();
}
+ /**
+ * Override the synchronous AcknowledgeTest init to provide the _receivedAll
+ * CountDownLatch init and ensure that we set the MessageListener.
+ * @param transacted
+ * @param mode
+ * @throws Exception
+ */
@Override
public void init(boolean transacted, int mode) throws Exception
{
@@ -53,11 +64,25 @@ public class AcknowledgeOnMessageTest ex
}
/**
+ * This test overrides the testAcking from the simple recieve() model to all
+ * for asynchronous receiving of messages.
+ *
+ * Again the transaction/ack mode is provided to this main test run
+ *
+ * The init method is called which will setup the listener so that we can
+ * then sit and await using the _receivedAll CountDownLatch. We wait for up
+ * to 10s if no messages have been received in the last 10s then test will
+ * fail.
+ *
+ * If the test fails then it will attempt to retrieve any exception that the
+ * asynchronous delivery thread may have recorded.
+ *
* @param transacted
* @param mode
*
* @throws Exception
*/
+ @Override
protected void testAcking(boolean transacted, int mode) throws Exception
{
init(transacted, mode);
@@ -69,7 +94,7 @@ public class AcknowledgeOnMessageTest ex
int lastCount = NUM_MESSAGES;
// Wait for messages to arrive
- boolean complete = _receivedAll.await(5000L, TimeUnit.MILLISECONDS);
+ boolean complete = _receivedAll.await(10000L, TimeUnit.MILLISECONDS);
// If the messasges haven't arrived
while (!complete)
@@ -90,7 +115,7 @@ public class AcknowledgeOnMessageTest ex
lastCount = currentCount;
// Wait again for messages to arrive.
- complete = _receivedAll.await(5000L, TimeUnit.MILLISECONDS);
+ complete = _receivedAll.await(10000L, TimeUnit.MILLISECONDS);
}
// If we failed to receive all the messages then fail the test.
@@ -105,6 +130,7 @@ public class AcknowledgeOnMessageTest ex
}
else
{
+ _logger.info("AOMT: Check QueueDepth:" + _queue);
long onQueue=((AMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)).getQueueDepth((AMQDestination) _queue);
fail("All messages not received missing:" + _receivedAll.getCount() + "/" + NUM_MESSAGES+" On Queue:"+onQueue);
@@ -136,14 +162,27 @@ public class AcknowledgeOnMessageTest ex
_consumerSession.close();
+ _logger.info("AOMT: check number of message at end of test.");
assertEquals("Wrong number of messages on queue", 0,
((AMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)).getQueueDepth((AMQDestination) _queue));
}
+ /**
+ * The MessageListener interface that recieves the message and counts down
+ * the _receivedAll CountDownLatch.
+ *
+ * Again like AcknowledgeTest acknowledgement is actually handled in
+ * doAcknowlegement.
+ *
+ * The message INDEX is validated to ensure the correct message order is
+ * preserved.
+ *
+ * @param message
+ */
public void onMessage(Message message)
{
// Log received Message for debugging
- System.out.println("RECEIVED MESSAGE:" + message);
+ _logger.info("RECEIVED MESSAGE:" + message);
try
{
@@ -164,7 +203,7 @@ public class AcknowledgeOnMessageTest ex
}
catch (Exception e)
{
- // This will end the test run by counting down _receviedAll
+ // This will end the test run by counting down _receivedAll
fail(e);
}
}
@@ -176,6 +215,7 @@ public class AcknowledgeOnMessageTest ex
*/
protected void fail(Exception e)
{
+ //record the failure
_causeOfFailure.set(e);
// End the test.
while (_receivedAll.getCount() != 0)
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java?rev=933281&r1=933280&r2=933281&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/AcknowledgeTest.java Mon Apr 12 15:42:04 2010
@@ -33,6 +33,12 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.MessageProducer;
+/**
+ * Test the various JMS Acknowledge Modes the single testAcking method does all
+ * the work of receiving and validation of acking.
+ *
+ * The ack mode is provided from the various test methods.
+ */
public class AcknowledgeTest extends FailoverBaseCase
{
protected int NUM_MESSAGES;
@@ -50,6 +56,7 @@ public class AcknowledgeTest extends Fai
_queue = getTestQueue();
+ _logger.info("AT: setup");
//Create Producer put some messages on the queue
_connection = getConnection();
}
@@ -68,6 +75,16 @@ public class AcknowledgeTest extends Fai
}
/**
+ * The main test method.
+ *
+ * Receive the initial message and then proceed to send and ack messages
+ * until we have processed NUM_MESSAGES worth of messages.
+ *
+ * Each message is tagged with an INDEX value and these are used to check
+ * that the messages are received in the correct order.
+ *
+ * The test concludes by validating that the queue depth is 0 as expected.
+ *
* @param transacted
* @param mode
*
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org