You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gr...@apache.org on 2010/10/13 17:06:27 UTC
svn commit: r1022127 [13/15] - in
/qpid/branches/grkvlt-network-20101013/qpid/java: ./
broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/
broker-plugins/access-control/src/test/java/org/apache/qpid/server/securi...
Modified: qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/TopicTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/TopicTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/TopicTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/xa/TopicTest.java Wed Oct 13 15:05:29 2010
@@ -116,233 +116,227 @@ public class TopicTest extends AbstractX
*/
public void init()
{
- if (!isBroker08())
+ // lookup test queue
+ try
{
- // lookup test queue
+ _topic = (Topic) getInitialContext().lookup(TOPICNAME);
+ }
+ catch (Exception e)
+ {
+ fail("cannot lookup test topic " + e.getMessage());
+ }
+ // lookup connection factory
+ try
+ {
+ _topicFactory = getConnectionFactory();
+ }
+ catch (Exception e)
+ {
+ fail("enable to lookup connection factory ");
+ }
+ // create standard connection
+ try
+ {
+ _topicConnection = getNewTopicXAConnection();
+ }
+ catch (JMSException e)
+ {
+ fail("cannot create queue connection: " + e.getMessage());
+ }
+ // create standard session
+ try
+ {
+ _session = _topicConnection.createXATopicSession();
+ }
+ catch (JMSException e)
+ {
+ fail("cannot create queue session: " + e.getMessage());
+ }
+ // create a standard session
+ try
+ {
+ _nonXASession = _topicConnection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use Options | File Templates.
+ }
+ init(_session, _topic);
+ }
+
+ /** -------------------------------------------------------------------------------------- **/
+ /** ----------------------------- Test Suite -------------------------------------------- **/
+ /** -------------------------------------------------------------------------------------- **/
+
+
+ /**
+ * Uses two transactions respectively with xid1 and xid2 that are use to send a message
+ * within xid1 and xid2. xid2 is committed and xid1 is used to receive the message that was sent within xid2.
+ * Xid is then committed and a standard transaction is used to receive the message that was sent within xid1.
+ */
+ public void testProducer()
+ {
+ _logger.debug("testProducer");
+ Xid xid1 = getNewXid();
+ Xid xid2 = getNewXid();
+ try
+ {
+ Session nonXASession = _nonXASession;
+ MessageConsumer nonXAConsumer = nonXASession.createConsumer(_topic);
+ _producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ // start the xaResource for xid1
try
{
- _topic = (Topic) getInitialContext().lookup(TOPICNAME);
+ _logger.debug("starting tx branch xid1");
+ _xaResource.start(xid1, XAResource.TMNOFLAGS);
}
- catch (Exception e)
+ catch (XAException e)
{
- fail("cannot lookup test topic " + e.getMessage());
+ e.printStackTrace();
+ fail("cannot start the transaction with xid1: " + e.getMessage());
}
- // lookup connection factory
try
{
- _topicFactory = getConnectionFactory();
+ // start the connection
+ _topicConnection.start();
+ _logger.debug("produce a message with sequence number 1");
+ _message.setLongProperty(_sequenceNumberPropertyName, 1);
+ _producer.send(_message);
}
- catch (Exception e)
+ catch (JMSException e)
{
- fail("enable to lookup connection factory ");
+ fail(" cannot send persistent message: " + e.getMessage());
}
- // create standard connection
+ _logger.debug("suspend the transaction branch xid1");
try
{
- _topicConnection = getNewTopicXAConnection();
+ _xaResource.end(xid1, XAResource.TMSUSPEND);
}
- catch (JMSException e)
+ catch (XAException e)
{
- fail("cannot create queue connection: " + e.getMessage());
+ fail("Cannot end the transaction with xid1: " + e.getMessage());
}
- // create standard session
+ _logger.debug("start the xaResource for xid2");
try
{
- _session = _topicConnection.createXATopicSession();
+ _xaResource.start(xid2, XAResource.TMNOFLAGS);
}
- catch (JMSException e)
+ catch (XAException e)
{
- fail("cannot create queue session: " + e.getMessage());
+ fail("cannot start the transaction with xid2: " + e.getMessage());
}
- // create a standard session
try
{
- _nonXASession = _topicConnection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
+ _logger.debug("produce a message");
+ _message.setLongProperty(_sequenceNumberPropertyName, 2);
+ _producer.send(_message);
}
catch (JMSException e)
{
- e.printStackTrace(); //To change body of catch statement use Options | File Templates.
+ fail(" cannot send second persistent message: " + e.getMessage());
}
- init(_session, _topic);
- }
- }
-
- /** -------------------------------------------------------------------------------------- **/
- /** ----------------------------- Test Suite -------------------------------------------- **/
- /** -------------------------------------------------------------------------------------- **/
-
-
- /**
- * Uses two transactions respectively with xid1 and xid2 that are use to send a message
- * within xid1 and xid2. xid2 is committed and xid1 is used to receive the message that was sent within xid2.
- * Xid is then committed and a standard transaction is used to receive the message that was sent within xid1.
- */
- public void testProducer()
- {
- if (!isBroker08())
- {
- _logger.debug("testProducer");
- Xid xid1 = getNewXid();
- Xid xid2 = getNewXid();
+ _logger.debug("end xid2 and start xid1");
try
{
- Session nonXASession = _nonXASession;
- MessageConsumer nonXAConsumer = nonXASession.createConsumer(_topic);
- _producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- // start the xaResource for xid1
- try
- {
- _logger.debug("starting tx branch xid1");
- _xaResource.start(xid1, XAResource.TMNOFLAGS);
- }
- catch (XAException e)
- {
- e.printStackTrace();
- fail("cannot start the transaction with xid1: " + e.getMessage());
- }
- try
- {
- // start the connection
- _topicConnection.start();
- _logger.debug("produce a message with sequence number 1");
- _message.setLongProperty(_sequenceNumberPropertyName, 1);
- _producer.send(_message);
- }
- catch (JMSException e)
- {
- fail(" cannot send persistent message: " + e.getMessage());
- }
- _logger.debug("suspend the transaction branch xid1");
- try
- {
- _xaResource.end(xid1, XAResource.TMSUSPEND);
- }
- catch (XAException e)
- {
- fail("Cannot end the transaction with xid1: " + e.getMessage());
- }
- _logger.debug("start the xaResource for xid2");
- try
- {
- _xaResource.start(xid2, XAResource.TMNOFLAGS);
- }
- catch (XAException e)
- {
- fail("cannot start the transaction with xid2: " + e.getMessage());
- }
- try
- {
- _logger.debug("produce a message");
- _message.setLongProperty(_sequenceNumberPropertyName, 2);
- _producer.send(_message);
- }
- catch (JMSException e)
- {
- fail(" cannot send second persistent message: " + e.getMessage());
- }
- _logger.debug("end xid2 and start xid1");
- try
+ _xaResource.end(xid2, XAResource.TMSUCCESS);
+ _xaResource.start(xid1, XAResource.TMRESUME);
+ }
+ catch (XAException e)
+ {
+ fail("Exception when ending and starting transactions: " + e.getMessage());
+ }
+ _logger.debug("two phases commit transaction with xid2");
+ try
+ {
+ int resPrepare = _xaResource.prepare(xid2);
+ if (resPrepare != XAResource.XA_OK)
{
- _xaResource.end(xid2, XAResource.TMSUCCESS);
- _xaResource.start(xid1, XAResource.TMRESUME);
+ fail("prepare returned: " + resPrepare);
}
- catch (XAException e)
+ _xaResource.commit(xid2, false);
+ }
+ catch (XAException e)
+ {
+ fail("Exception thrown when preparing transaction with xid2: " + e.getMessage());
+ }
+ _logger.debug("receiving a message from topic test we expect it to be the second one");
+ try
+ {
+ TextMessage message = (TextMessage) _consumer.receive(1000);
+ if (message == null)
{
- fail("Exception when ending and starting transactions: " + e.getMessage());
+ fail("did not receive second message as expected ");
}
- _logger.debug("two phases commit transaction with xid2");
- try
+ else
{
- int resPrepare = _xaResource.prepare(xid2);
- if (resPrepare != XAResource.XA_OK)
+ if (message.getLongProperty(_sequenceNumberPropertyName) != 2)
{
- fail("prepare returned: " + resPrepare);
+ fail("receive wrong message its sequence number is: " + message
+ .getLongProperty(_sequenceNumberPropertyName));
}
- _xaResource.commit(xid2, false);
}
- catch (XAException e)
+ }
+ catch (JMSException e)
+ {
+ fail("Exception when receiving second message: " + e.getMessage());
+ }
+ _logger.debug("end and one phase commit the first transaction");
+ try
+ {
+ _xaResource.end(xid1, XAResource.TMSUCCESS);
+ _xaResource.commit(xid1, true);
+ }
+ catch (XAException e)
+ {
+ fail("Exception thrown when commiting transaction with xid1");
+ }
+ _logger.debug("We should now be able to receive the first and second message");
+ try
+ {
+ TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000);
+ if (message1 == null)
{
- fail("Exception thrown when preparing transaction with xid2: " + e.getMessage());
+ fail("did not receive first message as expected ");
}
- _logger.debug("receiving a message from topic test we expect it to be the second one");
- try
+ else
{
- TextMessage message = (TextMessage) _consumer.receive(1000);
- if (message == null)
+ if (message1.getLongProperty(_sequenceNumberPropertyName) != 2)
{
- fail("did not receive second message as expected ");
- }
- else
- {
- if (message.getLongProperty(_sequenceNumberPropertyName) != 2)
- {
- fail("receive wrong message its sequence number is: " + message
- .getLongProperty(_sequenceNumberPropertyName));
- }
+ fail("receive wrong message its sequence number is: " + message1
+ .getLongProperty(_sequenceNumberPropertyName));
}
}
- catch (JMSException e)
- {
- fail("Exception when receiving second message: " + e.getMessage());
- }
- _logger.debug("end and one phase commit the first transaction");
- try
- {
- _xaResource.end(xid1, XAResource.TMSUCCESS);
- _xaResource.commit(xid1, true);
- }
- catch (XAException e)
+ message1 = (TextMessage) nonXAConsumer.receive(1000);
+ if (message1 == null)
{
- fail("Exception thrown when commiting transaction with xid1");
+ fail("did not receive first message as expected ");
}
- _logger.debug("We should now be able to receive the first and second message");
- try
+ else
{
- TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000);
- if (message1 == null)
- {
- fail("did not receive first message as expected ");
- }
- else
- {
- if (message1.getLongProperty(_sequenceNumberPropertyName) != 2)
- {
- fail("receive wrong message its sequence number is: " + message1
- .getLongProperty(_sequenceNumberPropertyName));
- }
- }
- message1 = (TextMessage) nonXAConsumer.receive(1000);
- if (message1 == null)
- {
- fail("did not receive first message as expected ");
- }
- else
- {
- if (message1.getLongProperty(_sequenceNumberPropertyName) != 1)
- {
- fail("receive wrong message its sequence number is: " + message1
- .getLongProperty(_sequenceNumberPropertyName));
- }
- }
- _logger.debug("commit transacted session");
- nonXASession.commit();
- _logger.debug("Test that the topic is now empty");
- message1 = (TextMessage) nonXAConsumer.receive(1000);
- if (message1 != null)
+ if (message1.getLongProperty(_sequenceNumberPropertyName) != 1)
{
- fail("receive an unexpected message ");
+ fail("receive wrong message its sequence number is: " + message1
+ .getLongProperty(_sequenceNumberPropertyName));
}
}
- catch (JMSException e)
+ _logger.debug("commit transacted session");
+ nonXASession.commit();
+ _logger.debug("Test that the topic is now empty");
+ message1 = (TextMessage) nonXAConsumer.receive(1000);
+ if (message1 != null)
{
- fail("Exception thrown when emptying the queue: " + e.getMessage());
+ fail("receive an unexpected message ");
}
}
catch (JMSException e)
{
- fail("cannot create standard consumer: " + e.getMessage());
+ fail("Exception thrown when emptying the queue: " + e.getMessage());
}
}
+ catch (JMSException e)
+ {
+ fail("cannot create standard consumer: " + e.getMessage());
+ }
}
@@ -352,144 +346,141 @@ public class TopicTest extends AbstractX
*/
public void testDurSub()
{
- if (!isBroker08())
+ Xid xid1 = getNewXid();
+ Xid xid2 = getNewXid();
+ Xid xid3 = getNewXid();
+ Xid xid4 = getNewXid();
+ String durSubName = "xaSubDurable";
+ try
{
- Xid xid1 = getNewXid();
- Xid xid2 = getNewXid();
- Xid xid3 = getNewXid();
- Xid xid4 = getNewXid();
- String durSubName = "xaSubDurable";
+ TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName);
try
{
- TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName);
- try
+ _topicConnection.start();
+ _logger.debug("start xid1");
+ _xaResource.start(xid1, XAResource.TMNOFLAGS);
+ // start the connection
+ _topicConnection.start();
+ _logger.debug("produce a message with sequence number 1");
+ _message.setLongProperty(_sequenceNumberPropertyName, 1);
+ _producer.send(_message);
+ _logger.debug("2 phases commit xid1");
+ _xaResource.end(xid1, XAResource.TMSUCCESS);
+ if (_xaResource.prepare(xid1) != XAResource.XA_OK)
{
- _topicConnection.start();
- _logger.debug("start xid1");
- _xaResource.start(xid1, XAResource.TMNOFLAGS);
- // start the connection
- _topicConnection.start();
- _logger.debug("produce a message with sequence number 1");
- _message.setLongProperty(_sequenceNumberPropertyName, 1);
- _producer.send(_message);
- _logger.debug("2 phases commit xid1");
- _xaResource.end(xid1, XAResource.TMSUCCESS);
- if (_xaResource.prepare(xid1) != XAResource.XA_OK)
- {
- fail("Problem when preparing tx1 ");
- }
- _xaResource.commit(xid1, false);
+ fail("Problem when preparing tx1 ");
}
- catch (Exception e)
+ _xaResource.commit(xid1, false);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ fail("Exception when working with xid1: " + e.getMessage());
+ }
+ try
+ {
+ _logger.debug("start xid2");
+ _xaResource.start(xid2, XAResource.TMNOFLAGS);
+ _logger.debug("receive the previously produced message");
+ TextMessage message = (TextMessage) xaDurSub.receive(1000);
+ if (message == null)
{
- e.printStackTrace();
- fail("Exception when working with xid1: " + e.getMessage());
+ fail("no message received ");
}
- try
+ else if (message.getLongProperty(_sequenceNumberPropertyName) != 1)
{
- _logger.debug("start xid2");
- _xaResource.start(xid2, XAResource.TMNOFLAGS);
- _logger.debug("receive the previously produced message");
- TextMessage message = (TextMessage) xaDurSub.receive(1000);
- if (message == null)
- {
- fail("no message received ");
- }
- else if (message.getLongProperty(_sequenceNumberPropertyName) != 1)
- {
- fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
- }
- _logger.debug("rollback xid2");
- boolean rollbackOnFailure = false;
- try
- {
- _xaResource.end(xid2, XAResource.TMFAIL);
- }
- catch (XAException e)
- {
- if (e.errorCode != XAException.XA_RBROLLBACK)
- {
- fail("Exception when working with xid2: " + e.getMessage());
- }
- rollbackOnFailure = true;
- }
- if (!rollbackOnFailure)
- {
- if (_xaResource.prepare(xid2) != XAResource.XA_OK)
- {
- fail("Problem when preparing tx2 ");
- }
- _xaResource.rollback(xid2);
- }
+ fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
- catch (Exception e)
+ _logger.debug("rollback xid2");
+ boolean rollbackOnFailure = false;
+ try
{
- e.printStackTrace();
- fail("Exception when working with xid2: " + e.getMessage());
+ _xaResource.end(xid2, XAResource.TMFAIL);
}
- try
+ catch (XAException e)
{
- _logger.debug("start xid3");
- _xaResource.start(xid3, XAResource.TMNOFLAGS);
- _logger.debug(" receive the previously aborted consumed message");
- TextMessage message = (TextMessage) xaDurSub.receive(1000);
- if (message == null)
- {
- fail("no message received ");
- }
- else if (message.getLongProperty(_sequenceNumberPropertyName) != 1)
+ if (e.errorCode != XAException.XA_RBROLLBACK)
{
- fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
+ fail("Exception when working with xid2: " + e.getMessage());
}
- _logger.debug("commit xid3");
- _xaResource.end(xid3, XAResource.TMSUCCESS);
- if (_xaResource.prepare(xid3) != XAResource.XA_OK)
+ rollbackOnFailure = true;
+ }
+ if (!rollbackOnFailure)
+ {
+ if (_xaResource.prepare(xid2) != XAResource.XA_OK)
{
- fail("Problem when preparing tx3 ");
+ fail("Problem when preparing tx2 ");
}
- _xaResource.commit(xid3, false);
+ _xaResource.rollback(xid2);
}
- catch (Exception e)
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ fail("Exception when working with xid2: " + e.getMessage());
+ }
+ try
+ {
+ _logger.debug("start xid3");
+ _xaResource.start(xid3, XAResource.TMNOFLAGS);
+ _logger.debug(" receive the previously aborted consumed message");
+ TextMessage message = (TextMessage) xaDurSub.receive(1000);
+ if (message == null)
{
- e.printStackTrace();
- fail("Exception when working with xid3: " + e.getMessage());
+ fail("no message received ");
}
- try
+ else if (message.getLongProperty(_sequenceNumberPropertyName) != 1)
{
- _logger.debug("start xid4");
- _xaResource.start(xid4, XAResource.TMNOFLAGS);
- _logger.debug("check that topic is empty");
- TextMessage message = (TextMessage) xaDurSub.receive(1000);
- if (message != null)
- {
- fail("An unexpected message was received ");
- }
- _logger.debug("commit xid4");
- _xaResource.end(xid4, XAResource.TMSUCCESS);
- _xaResource.commit(xid4, true);
+ fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
- catch (Exception e)
+ _logger.debug("commit xid3");
+ _xaResource.end(xid3, XAResource.TMSUCCESS);
+ if (_xaResource.prepare(xid3) != XAResource.XA_OK)
{
- e.printStackTrace();
- fail("Exception when working with xid4: " + e.getMessage());
+ fail("Problem when preparing tx3 ");
}
+ _xaResource.commit(xid3, false);
}
catch (Exception e)
{
e.printStackTrace();
- fail("problem when creating dur sub: " + e.getMessage());
+ fail("Exception when working with xid3: " + e.getMessage());
}
- finally
+ try
{
- try
- {
- _session.unsubscribe(durSubName);
- }
- catch (JMSException e)
+ _logger.debug("start xid4");
+ _xaResource.start(xid4, XAResource.TMNOFLAGS);
+ _logger.debug("check that topic is empty");
+ TextMessage message = (TextMessage) xaDurSub.receive(1000);
+ if (message != null)
{
- e.printStackTrace();
- fail("problem when unsubscribing dur sub: " + e.getMessage());
+ fail("An unexpected message was received ");
}
+ _logger.debug("commit xid4");
+ _xaResource.end(xid4, XAResource.TMSUCCESS);
+ _xaResource.commit(xid4, true);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ fail("Exception when working with xid4: " + e.getMessage());
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ fail("problem when creating dur sub: " + e.getMessage());
+ }
+ finally
+ {
+ try
+ {
+ _session.unsubscribe(durSubName);
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail("problem when unsubscribing dur sub: " + e.getMessage());
}
}
}
@@ -506,210 +497,207 @@ public class TopicTest extends AbstractX
*/
public void testMultiMessagesDurSub()
{
- if (!isBroker08())
+ Xid xid1 = getNewXid();
+ Xid xid2 = getNewXid();
+ Xid xid3 = getNewXid();
+ Xid xid4 = getNewXid();
+ Xid xid6 = getNewXid();
+ String durSubName = "xaSubDurable";
+ TextMessage message;
+ try
{
- Xid xid1 = getNewXid();
- Xid xid2 = getNewXid();
- Xid xid3 = getNewXid();
- Xid xid4 = getNewXid();
- Xid xid6 = getNewXid();
- String durSubName = "xaSubDurable";
- TextMessage message;
+ TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName);
try
{
- TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName);
- try
- {
- Session txSession = _nonXASession;
- MessageProducer txProducer = txSession.createProducer(_topic);
- _logger.debug("produce 10 persistent messages");
- txProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
- _topicConnection.start();
- for (int i = 1; i <= 7; i++)
- {
- _message.setLongProperty(_sequenceNumberPropertyName, i);
- txProducer.send(_message);
- }
- // commit txSession
- txSession.commit();
- }
- catch (JMSException e)
+ Session txSession = _nonXASession;
+ MessageProducer txProducer = txSession.createProducer(_topic);
+ _logger.debug("produce 10 persistent messages");
+ txProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ _topicConnection.start();
+ for (int i = 1; i <= 7; i++)
{
- e.printStackTrace();
- fail("Exception thrown when producing messages: " + e.getMessage());
+ _message.setLongProperty(_sequenceNumberPropertyName, i);
+ txProducer.send(_message);
}
+ // commit txSession
+ txSession.commit();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail("Exception thrown when producing messages: " + e.getMessage());
+ }
- try
+ try
+ {
+ _logger.debug(" consume 2 messages respectively with tx1, tx2 and tx3");
+ //----- start xid1
+ _xaResource.start(xid1, XAResource.TMNOFLAGS);
+ // receive the 2 first messages
+ for (int i = 1; i <= 2; i++)
{
- _logger.debug(" consume 2 messages respectively with tx1, tx2 and tx3");
- //----- start xid1
- _xaResource.start(xid1, XAResource.TMNOFLAGS);
- // receive the 2 first messages
- for (int i = 1; i <= 2; i++)
+ message = (TextMessage) xaDurSub.receive(1000);
+ if (message == null)
{
- message = (TextMessage) xaDurSub.receive(1000);
- if (message == null)
- {
- fail("no message received! expected: " + i);
- }
- else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
- {
- fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
- }
- }
- _xaResource.end(xid1, XAResource.TMSUSPEND);
- //----- start xid2
- _xaResource.start(xid2, XAResource.TMNOFLAGS);
- // receive the 2 first messages
- for (int i = 3; i <= 4; i++)
- {
- message = (TextMessage) xaDurSub.receive(1000);
- if (message == null)
- {
- fail("no message received! expected: " + i);
- }
- else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
- {
- fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
- }
+ fail("no message received! expected: " + i);
}
- _xaResource.end(xid2, XAResource.TMSUSPEND);
- //----- start xid3
- _xaResource.start(xid3, XAResource.TMNOFLAGS);
- // receive the 2 first messages
- for (int i = 5; i <= 6; i++)
+ else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
{
- message = (TextMessage) xaDurSub.receive(1000);
- if (message == null)
- {
- fail("no message received! expected: " + i);
- }
- else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
- {
- fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
- }
+ fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
- _xaResource.end(xid3, XAResource.TMSUCCESS);
}
- catch (Exception e)
- {
- e.printStackTrace();
- fail("Exception thrown when consumming 6 first messages: " + e.getMessage());
- }
- try
+ _xaResource.end(xid1, XAResource.TMSUSPEND);
+ //----- start xid2
+ _xaResource.start(xid2, XAResource.TMNOFLAGS);
+ // receive the 2 first messages
+ for (int i = 3; i <= 4; i++)
{
- _logger.debug("abort tx2, we now expect to receive messages 3, 4 and 7");
- _xaResource.start(xid2, XAResource.TMRESUME);
- _xaResource.end(xid2, XAResource.TMSUCCESS);
- _xaResource.prepare(xid2);
- _xaResource.rollback(xid2);
- // receive 3 message within tx1: 3, 4 and 7
- _xaResource.start(xid1, XAResource.TMRESUME);
- _logger.debug(" 3, 4 and 7");
- for (int i = 1; i <= 3; i++)
+ message = (TextMessage) xaDurSub.receive(1000);
+ if (message == null)
{
- message = (TextMessage) xaDurSub.receive(1000);
- if (message == null)
- {
- fail("no message received! expected: " + 3);
- }
- else if (message.getLongProperty(_sequenceNumberPropertyName) <= 2 || 5 == message
- .getLongProperty(_sequenceNumberPropertyName) || message
- .getLongProperty(_sequenceNumberPropertyName) == 6)
- {
- fail("wrong sequence number: " + message
- .getLongProperty(_sequenceNumberPropertyName));
- }
+ fail("no message received! expected: " + i);
}
- }
- catch (Exception e)
- {
- e.printStackTrace();
- fail("Exception thrown when consumming message: 3, 4 and 7: " + e.getMessage());
- }
-
- try
- {
- _xaResource.end(xid1, XAResource.TMSUCCESS);
- _logger.debug(" commit tx3");
- _xaResource.commit(xid3, true);
- _logger.debug("abort tx1");
- _xaResource.prepare(xid1);
- _xaResource.rollback(xid1);
- }
- catch (XAException e)
- {
- e.printStackTrace();
- fail("XAException thrown when committing tx3 or aborting tx1: " + e.getMessage());
- }
-
- try
- {
- // consume messages 1 - 4 + 7
- //----- start xid1
- _xaResource.start(xid4, XAResource.TMNOFLAGS);
- for (int i = 1; i <= 5; i++)
+ else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
{
-
- message = (TextMessage) xaDurSub.receive(1000);
- _logger.debug(" received message: " + message.getLongProperty(_sequenceNumberPropertyName));
- if (message == null)
- {
- fail("no message received! expected: " + i);
- }
- else if (message.getLongProperty(_sequenceNumberPropertyName) == 5 || message
- .getLongProperty(_sequenceNumberPropertyName) == 6)
- {
- fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
- }
+ fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
- _xaResource.end(xid4, XAResource.TMSUCCESS);
- _xaResource.prepare(xid4);
- _xaResource.commit(xid4, false);
}
- catch (Exception e)
+ _xaResource.end(xid2, XAResource.TMSUSPEND);
+ //----- start xid3
+ _xaResource.start(xid3, XAResource.TMNOFLAGS);
+ // receive the 2 first messages
+ for (int i = 5; i <= 6; i++)
{
- e.printStackTrace();
- fail("Exception thrown in last phase: " + e.getMessage());
+ message = (TextMessage) xaDurSub.receive(1000);
+ if (message == null)
+ {
+ fail("no message received! expected: " + i);
+ }
+ else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
+ {
+ fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
+ }
}
- // now the topic should be empty!!
- try
+ _xaResource.end(xid3, XAResource.TMSUCCESS);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ fail("Exception thrown when consumming 6 first messages: " + e.getMessage());
+ }
+ try
+ {
+ _logger.debug("abort tx2, we now expect to receive messages 3, 4 and 7");
+ _xaResource.start(xid2, XAResource.TMRESUME);
+ _xaResource.end(xid2, XAResource.TMSUCCESS);
+ _xaResource.prepare(xid2);
+ _xaResource.rollback(xid2);
+ // receive 3 message within tx1: 3, 4 and 7
+ _xaResource.start(xid1, XAResource.TMRESUME);
+ _logger.debug(" 3, 4 and 7");
+ for (int i = 1; i <= 3; i++)
{
- // start xid6
- _xaResource.start(xid6, XAResource.TMNOFLAGS);
- // should now be empty
message = (TextMessage) xaDurSub.receive(1000);
- if (message != null)
+ if (message == null)
{
- fail("An unexpected message was received " + message
+ fail("no message received! expected: " + 3);
+ }
+ else if (message.getLongProperty(_sequenceNumberPropertyName) <= 2 || 5 == message
+ .getLongProperty(_sequenceNumberPropertyName) || message
+ .getLongProperty(_sequenceNumberPropertyName) == 6)
+ {
+ fail("wrong sequence number: " + message
.getLongProperty(_sequenceNumberPropertyName));
}
- // commit xid6
- _xaResource.end(xid6, XAResource.TMSUCCESS);
- _xaResource.commit(xid6, true);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- fail("Exception when working with xid6: " + e.getMessage());
}
}
catch (Exception e)
{
e.printStackTrace();
- fail("problem when creating dur sub: " + e.getMessage());
+ fail("Exception thrown when consumming message: 3, 4 and 7: " + e.getMessage());
}
- finally
+
+ try
{
- try
+ _xaResource.end(xid1, XAResource.TMSUCCESS);
+ _logger.debug(" commit tx3");
+ _xaResource.commit(xid3, true);
+ _logger.debug("abort tx1");
+ _xaResource.prepare(xid1);
+ _xaResource.rollback(xid1);
+ }
+ catch (XAException e)
+ {
+ e.printStackTrace();
+ fail("XAException thrown when committing tx3 or aborting tx1: " + e.getMessage());
+ }
+
+ try
+ {
+ // consume messages 1 - 4 + 7
+ //----- start xid1
+ _xaResource.start(xid4, XAResource.TMNOFLAGS);
+ for (int i = 1; i <= 5; i++)
{
- _session.unsubscribe(durSubName);
+
+ message = (TextMessage) xaDurSub.receive(1000);
+ _logger.debug(" received message: " + message.getLongProperty(_sequenceNumberPropertyName));
+ if (message == null)
+ {
+ fail("no message received! expected: " + i);
+ }
+ else if (message.getLongProperty(_sequenceNumberPropertyName) == 5 || message
+ .getLongProperty(_sequenceNumberPropertyName) == 6)
+ {
+ fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
+ }
}
- catch (JMSException e)
+ _xaResource.end(xid4, XAResource.TMSUCCESS);
+ _xaResource.prepare(xid4);
+ _xaResource.commit(xid4, false);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ fail("Exception thrown in last phase: " + e.getMessage());
+ }
+ // now the topic should be empty!!
+ try
+ {
+ // start xid6
+ _xaResource.start(xid6, XAResource.TMNOFLAGS);
+ // should now be empty
+ message = (TextMessage) xaDurSub.receive(1000);
+ if (message != null)
{
- e.printStackTrace();
- fail("problem when unsubscribing dur sub: " + e.getMessage());
+ fail("An unexpected message was received " + message
+ .getLongProperty(_sequenceNumberPropertyName));
}
+ // commit xid6
+ _xaResource.end(xid6, XAResource.TMSUCCESS);
+ _xaResource.commit(xid6, true);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ fail("Exception when working with xid6: " + e.getMessage());
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ fail("problem when creating dur sub: " + e.getMessage());
+ }
+ finally
+ {
+ try
+ {
+ _session.unsubscribe(durSubName);
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail("problem when unsubscribing dur sub: " + e.getMessage());
}
}
}
@@ -732,398 +720,569 @@ public class TopicTest extends AbstractX
*/
public void testMultiMessagesDurSubCrash()
{
- if (!isBroker08())
+ Xid xid1 = getNewXid();
+ Xid xid2 = getNewXid();
+ Xid xid3 = getNewXid();
+ Xid xid4 = getNewXid();
+ Xid xid5 = getNewXid();
+ Xid xid6 = getNewXid();
+ String durSubName = "xaSubDurable";
+ TextMessage message;
+ try
{
- Xid xid1 = getNewXid();
- Xid xid2 = getNewXid();
- Xid xid3 = getNewXid();
- Xid xid4 = getNewXid();
- Xid xid5 = getNewXid();
- Xid xid6 = getNewXid();
- String durSubName = "xaSubDurable";
- TextMessage message;
+ TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName);
try
{
- TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName);
- try
- {
- Session txSession = _nonXASession;
- MessageProducer txProducer = txSession.createProducer(_topic);
- // produce 10 persistent messages
- txProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
- _topicConnection.start();
- for (int i = 1; i <= 10; i++)
- {
- _message.setLongProperty(_sequenceNumberPropertyName, i);
- txProducer.send(_message);
- }
- // commit txSession
- txSession.commit();
- }
- catch (JMSException e)
+ Session txSession = _nonXASession;
+ MessageProducer txProducer = txSession.createProducer(_topic);
+ // produce 10 persistent messages
+ txProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ _topicConnection.start();
+ for (int i = 1; i <= 10; i++)
{
- e.printStackTrace();
- fail("Exception thrown when producing messages: " + e.getMessage());
+ _message.setLongProperty(_sequenceNumberPropertyName, i);
+ txProducer.send(_message);
}
- try
+ // commit txSession
+ txSession.commit();
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail("Exception thrown when producing messages: " + e.getMessage());
+ }
+ try
+ {
+ // consume 2 messages respectively with tx1, tx2 and tx3
+ //----- start xid1
+ _xaResource.start(xid1, XAResource.TMNOFLAGS);
+ // receive the 2 first messages
+ for (int i = 1; i <= 2; i++)
{
- // consume 2 messages respectively with tx1, tx2 and tx3
- //----- start xid1
- _xaResource.start(xid1, XAResource.TMNOFLAGS);
- // receive the 2 first messages
- for (int i = 1; i <= 2; i++)
+ message = (TextMessage) xaDurSub.receive(1000);
+ if (message == null)
{
- message = (TextMessage) xaDurSub.receive(1000);
- if (message == null)
- {
- fail("no message received! expected: " + i);
- }
- else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
- {
- fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
- }
+ fail("no message received! expected: " + i);
}
- _xaResource.end(xid1, XAResource.TMSUCCESS);
- //----- start xid2
- _xaResource.start(xid2, XAResource.TMNOFLAGS);
- // receive the 2 first messages
- for (int i = 3; i <= 4; i++)
+ else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
{
- message = (TextMessage) xaDurSub.receive(1000);
- if (message == null)
- {
- fail("no message received! expected: " + i);
- }
- else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
- {
- fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
- }
+ fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
+ }
+ }
+ _xaResource.end(xid1, XAResource.TMSUCCESS);
+ //----- start xid2
+ _xaResource.start(xid2, XAResource.TMNOFLAGS);
+ // receive the 2 first messages
+ for (int i = 3; i <= 4; i++)
+ {
+ message = (TextMessage) xaDurSub.receive(1000);
+ if (message == null)
+ {
+ fail("no message received! expected: " + i);
}
- _xaResource.end(xid2, XAResource.TMSUCCESS);
- //----- start xid3
- _xaResource.start(xid3, XAResource.TMNOFLAGS);
- // receive the 2 first messages
- for (int i = 5; i <= 6; i++)
+ else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
{
- message = (TextMessage) xaDurSub.receive(1000);
- if (message == null)
- {
- fail("no message received! expected: " + i);
- }
- else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
- {
- fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
- }
+ fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
- _xaResource.end(xid3, XAResource.TMSUCCESS);
- // prepare tx2 and tx3
-
- _xaResource.prepare(xid2);
- _xaResource.prepare(xid3);
}
- catch (Exception e)
+ _xaResource.end(xid2, XAResource.TMSUCCESS);
+ //----- start xid3
+ _xaResource.start(xid3, XAResource.TMNOFLAGS);
+ // receive the 2 first messages
+ for (int i = 5; i <= 6; i++)
{
- e.printStackTrace();
- fail("Exception thrown when consumming 6 first messages: " + e.getMessage());
+ message = (TextMessage) xaDurSub.receive(1000);
+ if (message == null)
+ {
+ fail("no message received! expected: " + i);
+ }
+ else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
+ {
+ fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
+ }
}
- /////// stop the broker now !!
- try
+ _xaResource.end(xid3, XAResource.TMSUCCESS);
+ // prepare tx2 and tx3
+
+ _xaResource.prepare(xid2);
+ _xaResource.prepare(xid3);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ fail("Exception thrown when consumming 6 first messages: " + e.getMessage());
+ }
+ /////// stop the broker now !!
+ try
+ {
+ restartBroker();
+ init();
+ }
+ catch (Exception e)
+ {
+ fail("Exception when stopping and restarting the server");
+ }
+ // get the list of in doubt transactions
+ try
+ {
+ _topicConnection.start();
+ // reconnect to dursub!
+ xaDurSub = _session.createDurableSubscriber(_topic, durSubName);
+ Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN);
+ if (inDoubt == null)
{
- restartBroker();
- init();
+ fail("the array of in doubt transactions should not be null ");
}
- catch (Exception e)
+ // At that point we expect only two indoubt transactions:
+ if (inDoubt.length != 2)
{
- fail("Exception when stopping and restarting the server");
+ fail("in doubt transaction size is diffenrent than 2, there are " + inDoubt.length + "in doubt transactions");
}
- // get the list of in doubt transactions
- try
+ }
+ catch (XAException e)
+ {
+ e.printStackTrace();
+ fail("exception thrown when recovering transactions " + e.getMessage());
+ }
+ try
+ {
+ // xid1 has been aborted redo the job!
+ // consume 2 messages with tx1
+ //----- start xid1
+ _xaResource.start(xid1, XAResource.TMNOFLAGS);
+ // receive the 2 first messages
+ for (int i = 1; i <= 2; i++)
{
- _topicConnection.start();
- // reconnect to dursub!
- xaDurSub = _session.createDurableSubscriber(_topic, durSubName);
- Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN);
- if (inDoubt == null)
+ message = (TextMessage) xaDurSub.receive(1000);
+ if (message == null)
{
- fail("the array of in doubt transactions should not be null ");
+ fail("no message received! expected: " + i);
}
- // At that point we expect only two indoubt transactions:
- if (inDoubt.length != 2)
+ else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
{
- fail("in doubt transaction size is diffenrent than 2, there are " + inDoubt.length + "in doubt transactions");
+ fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
}
- catch (XAException e)
+ _xaResource.end(xid1, XAResource.TMSUSPEND);
+ // abort tx2, we now expect to receive messages 3 and 4 first!
+ _xaResource.rollback(xid2);
+
+ // receive 3 message within tx1: 3, 4 and 7
+ _xaResource.start(xid1, XAResource.TMRESUME);
+ // receive messages 3, 4 and 7
+ message = (TextMessage) xaDurSub.receive(1000);
+ if (message == null)
{
- e.printStackTrace();
- fail("exception thrown when recovering transactions " + e.getMessage());
+ fail("no message received! expected: " + 3);
}
- try
+ else if (message.getLongProperty(_sequenceNumberPropertyName) != 3)
{
- // xid1 has been aborted redo the job!
- // consume 2 messages with tx1
- //----- start xid1
- _xaResource.start(xid1, XAResource.TMNOFLAGS);
- // receive the 2 first messages
- for (int i = 1; i <= 2; i++)
- {
- message = (TextMessage) xaDurSub.receive(1000);
- if (message == null)
- {
- fail("no message received! expected: " + i);
- }
- else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
- {
- fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
- }
- }
- _xaResource.end(xid1, XAResource.TMSUSPEND);
- // abort tx2, we now expect to receive messages 3 and 4 first!
- _xaResource.rollback(xid2);
+ fail("wrong sequence number: " + message
+ .getLongProperty(_sequenceNumberPropertyName) + " 3 was expected");
+ }
+ message = (TextMessage) xaDurSub.receive(1000);
+ if (message == null)
+ {
+ fail("no message received! expected: " + 4);
+ }
+ else if (message.getLongProperty(_sequenceNumberPropertyName) != 4)
+ {
+ fail("wrong sequence number: " + message
+ .getLongProperty(_sequenceNumberPropertyName) + " 4 was expected");
+ }
+ message = (TextMessage) xaDurSub.receive(1000);
+ if (message == null)
+ {
+ fail("no message received! expected: " + 7);
+ }
+ else if (message.getLongProperty(_sequenceNumberPropertyName) != 7)
+ {
+ fail("wrong sequence number: " + message
+ .getLongProperty(_sequenceNumberPropertyName) + " 7 was expected");
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ fail("Exception thrown when consumming message: 3, 4 and 7: " + e.getMessage());
+ }
+
+ try
+ {
+ _xaResource.end(xid1, XAResource.TMSUSPEND);
+ // commit tx3
+ _xaResource.commit(xid3, false);
+ // abort tx1
+ _xaResource.prepare(xid1);
+ _xaResource.rollback(xid1);
+ }
+ catch (XAException e)
+ {
+ e.printStackTrace();
+ fail("XAException thrown when committing tx3 or aborting tx1: " + e.getMessage());
+ }
- // receive 3 message within tx1: 3, 4 and 7
- _xaResource.start(xid1, XAResource.TMRESUME);
- // receive messages 3, 4 and 7
+ try
+ {
+ // consume messages 1 - 4
+ //----- start xid1
+ _xaResource.start(xid4, XAResource.TMNOFLAGS);
+ for (int i = 1; i <= 4; i++)
+ {
message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
- fail("no message received! expected: " + 3);
+ fail("no message received! expected: " + i);
}
- else if (message.getLongProperty(_sequenceNumberPropertyName) != 3)
+ else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
{
- fail("wrong sequence number: " + message
- .getLongProperty(_sequenceNumberPropertyName) + " 3 was expected");
+ fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
+ }
+ _xaResource.end(xid4, XAResource.TMSUSPEND);
+ // consume messages 8 - 10
+ _xaResource.start(xid5, XAResource.TMNOFLAGS);
+ for (int i = 7; i <= 10; i++)
+ {
message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
- fail("no message received! expected: " + 4);
+ fail("no message received! expected: " + i);
}
- else if (message.getLongProperty(_sequenceNumberPropertyName) != 4)
+ else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
{
- fail("wrong sequence number: " + message
- .getLongProperty(_sequenceNumberPropertyName) + " 4 was expected");
+ fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
+ }
+ _xaResource.end(xid5, XAResource.TMSUSPEND);
+ // abort tx4
+ _xaResource.prepare(xid4);
+ _xaResource.rollback(xid4);
+ // consume messages 1-4 with tx5
+ _xaResource.start(xid5, XAResource.TMRESUME);
+ for (int i = 1; i <= 4; i++)
+ {
message = (TextMessage) xaDurSub.receive(1000);
if (message == null)
{
- fail("no message received! expected: " + 7);
+ fail("no message received! expected: " + i);
}
- else if (message.getLongProperty(_sequenceNumberPropertyName) != 7)
+ else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
{
- fail("wrong sequence number: " + message
- .getLongProperty(_sequenceNumberPropertyName) + " 7 was expected");
+ fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
}
- catch (Exception e)
+ _xaResource.end(xid5, XAResource.TMSUSPEND);
+ // commit tx5
+ _xaResource.prepare(xid5);
+ _xaResource.commit(xid5, false);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ fail("Exception thrown in last phase: " + e.getMessage());
+ }
+ // now the topic should be empty!!
+ try
+ {
+ // start xid6
+ _xaResource.start(xid6, XAResource.TMNOFLAGS);
+ // should now be empty
+ message = (TextMessage) xaDurSub.receive(1000);
+ if (message != null)
{
- e.printStackTrace();
- fail("Exception thrown when consumming message: 3, 4 and 7: " + e.getMessage());
+ fail("An unexpected message was received " + message
+ .getLongProperty(_sequenceNumberPropertyName));
}
+ // commit xid6
+ _xaResource.end(xid6, XAResource.TMSUSPEND);
+ _xaResource.commit(xid6, true);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ fail("Exception when working with xid6: " + e.getMessage());
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ fail("problem when creating dur sub: " + e.getMessage());
+ }
+ finally
+ {
+ try
+ {
+ _session.unsubscribe(durSubName);
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail("problem when unsubscribing dur sub: " + e.getMessage());
+ }
+ }
+ }
- try
+
+ /**
+ * strategy: Produce a message within Tx1 and commit tx1. a durable subscriber then receives that message within tx2
+ * that is then prepared.
+ * Shutdown the server and get the list of in doubt transactions:
+ * we expect tx2, Tx2 is aborted and the message consumed within tx3 that is committed we then check that the topic is empty.
+ */
+ public void testDurSubCrash()
+ {
+ Xid xid1 = getNewXid();
+ Xid xid2 = getNewXid();
+ Xid xid3 = getNewXid();
+ Xid xid4 = getNewXid();
+ String durSubName = "xaSubDurable";
+ try
+ {
+ TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName);
+ try
+ {
+ _topicConnection.start();
+ //----- start xid1
+ _xaResource.start(xid1, XAResource.TMNOFLAGS);
+ // start the connection
+ _topicConnection.start();
+ // produce a message with sequence number 1
+ _message.setLongProperty(_sequenceNumberPropertyName, 1);
+ _producer.send(_message);
+ // commit
+ _xaResource.end(xid1, XAResource.TMSUCCESS);
+ if (_xaResource.prepare(xid1) != XAResource.XA_OK)
{
- _xaResource.end(xid1, XAResource.TMSUSPEND);
- // commit tx3
- _xaResource.commit(xid3, false);
- // abort tx1
- _xaResource.prepare(xid1);
- _xaResource.rollback(xid1);
+ fail("Problem when preparing tx1 ");
}
- catch (XAException e)
+ _xaResource.commit(xid1, false);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ fail("Exception when working with xid1: " + e.getMessage());
+ }
+ try
+ {
+ // start xid2
+ _xaResource.start(xid2, XAResource.TMNOFLAGS);
+ // receive the previously produced message
+ TextMessage message = (TextMessage) xaDurSub.receive(1000);
+ if (message == null)
{
- e.printStackTrace();
- fail("XAException thrown when committing tx3 or aborting tx1: " + e.getMessage());
+ fail("no message received ");
+ }
+ else if (message.getLongProperty(_sequenceNumberPropertyName) != 1)
+ {
+ fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
+ // prepare xid2
+ _xaResource.end(xid2, XAResource.TMSUCCESS);
+ if (_xaResource.prepare(xid2) != XAResource.XA_OK)
+ {
+ fail("Problem when preparing tx2 ");
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ fail("Exception when working with xid2: " + e.getMessage());
+ }
- try
+ /////// stop the server now !!
+ try
+ {
+ restartBroker();
+ init();
+ }
+ catch (Exception e)
+ {
+ fail("Exception when stopping and restarting the server");
+ }
+
+ // get the list of in doubt transactions
+ try
+ {
+ _topicConnection.start();
+ // reconnect to dursub!
+ xaDurSub = _session.createDurableSubscriber(_topic, durSubName);
+ Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN);
+ if (inDoubt == null)
{
- // consume messages 1 - 4
- //----- start xid1
- _xaResource.start(xid4, XAResource.TMNOFLAGS);
- for (int i = 1; i <= 4; i++)
- {
- message = (TextMessage) xaDurSub.receive(1000);
- if (message == null)
- {
- fail("no message received! expected: " + i);
- }
- else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
- {
- fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
- }
- }
- _xaResource.end(xid4, XAResource.TMSUSPEND);
- // consume messages 8 - 10
- _xaResource.start(xid5, XAResource.TMNOFLAGS);
- for (int i = 7; i <= 10; i++)
+ fail("the array of in doubt transactions should not be null ");
+ }
+ // At that point we expect only two indoubt transactions:
+ if (inDoubt.length != 1)
+ {
+ fail("in doubt transaction size is diffenrent than 2, there are " + inDoubt.length + "in doubt transactions");
+ }
+
+ // commit them
+ for (Xid anInDoubt : inDoubt)
+ {
+ if (anInDoubt.equals(xid2))
{
- message = (TextMessage) xaDurSub.receive(1000);
- if (message == null)
+ System.out.println("aborting xid2 ");
+ try
{
- fail("no message received! expected: " + i);
+ _xaResource.rollback(anInDoubt);
}
- else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
+ catch (Exception e)
{
- fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
+ e.printStackTrace();
+ fail("exception when aborting xid2 ");
}
}
- _xaResource.end(xid5, XAResource.TMSUSPEND);
- // abort tx4
- _xaResource.prepare(xid4);
- _xaResource.rollback(xid4);
- // consume messages 1-4 with tx5
- _xaResource.start(xid5, XAResource.TMRESUME);
- for (int i = 1; i <= 4; i++)
+ else
{
- message = (TextMessage) xaDurSub.receive(1000);
- if (message == null)
- {
- fail("no message received! expected: " + i);
- }
- else if (message.getLongProperty(_sequenceNumberPropertyName) != i)
- {
- fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
- }
+ System.out.println("XID2 is not in doubt ");
}
- _xaResource.end(xid5, XAResource.TMSUSPEND);
- // commit tx5
- _xaResource.prepare(xid5);
- _xaResource.commit(xid5, false);
}
- catch (Exception e)
+ }
+ catch (XAException e)
+ {
+ e.printStackTrace();
+ fail("exception thrown when recovering transactions " + e.getMessage());
+ }
+
+ try
+ {
+ // start xid3
+ _xaResource.start(xid3, XAResource.TMNOFLAGS);
+ // receive the previously produced message and aborted
+ TextMessage message = (TextMessage) xaDurSub.receive(1000);
+ if (message == null)
{
- e.printStackTrace();
- fail("Exception thrown in last phase: " + e.getMessage());
+ fail("no message received ");
}
- // now the topic should be empty!!
- try
+ else if (message.getLongProperty(_sequenceNumberPropertyName) != 1)
{
- // start xid6
- _xaResource.start(xid6, XAResource.TMNOFLAGS);
- // should now be empty
- message = (TextMessage) xaDurSub.receive(1000);
- if (message != null)
- {
- fail("An unexpected message was received " + message
- .getLongProperty(_sequenceNumberPropertyName));
- }
- // commit xid6
- _xaResource.end(xid6, XAResource.TMSUSPEND);
- _xaResource.commit(xid6, true);
+ fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
}
- catch (Exception e)
+ // commit xid3
+ _xaResource.end(xid3, XAResource.TMSUCCESS);
+ if (_xaResource.prepare(xid3) != XAResource.XA_OK)
{
- e.printStackTrace();
- fail("Exception when working with xid6: " + e.getMessage());
+ fail("Problem when preparing tx3 ");
}
+ _xaResource.commit(xid3, false);
}
catch (Exception e)
{
e.printStackTrace();
- fail("problem when creating dur sub: " + e.getMessage());
+ fail("Exception when working with xid3: " + e.getMessage());
}
- finally
+ try
{
- try
- {
- _session.unsubscribe(durSubName);
- }
- catch (JMSException e)
+ // start xid4
+ _xaResource.start(xid4, XAResource.TMNOFLAGS);
+ // should now be empty
+ TextMessage message = (TextMessage) xaDurSub.receive(1000);
+ if (message != null)
{
- e.printStackTrace();
- fail("problem when unsubscribing dur sub: " + e.getMessage());
+ fail("An unexpected message was received " + message
+ .getLongProperty(_sequenceNumberPropertyName));
}
+ // commit xid4
+ _xaResource.end(xid4, XAResource.TMSUCCESS);
+ _xaResource.commit(xid4, true);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ fail("Exception when working with xid4: " + e.getMessage());
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ fail("problem when creating dur sub: " + e.getMessage());
+ }
+ finally
+ {
+ try
+ {
+ _session.unsubscribe(durSubName);
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail("problem when unsubscribing dur sub: " + e.getMessage());
}
}
}
-
/**
- * strategy: Produce a message within Tx1 and commit tx1. a durable subscriber then receives that message within tx2
- * that is then prepared.
- * Shutdown the server and get the list of in doubt transactions:
- * we expect tx2, Tx2 is aborted and the message consumed within tx3 that is committed we then check that the topic is empty.
+ * strategy: Produce a message within Tx1 and prepare tx1. Shutdown the server and get the list of indoubt transactions:
+ * we expect tx1, Tx1 is committed so we expect the test topic not to be empty!
*/
- public void testDurSubCrash()
+ public void testRecover()
{
- if (!isBroker08())
+ Xid xid1 = getNewXid();
+ String durSubName = "test1";
+ try
{
- Xid xid1 = getNewXid();
- Xid xid2 = getNewXid();
- Xid xid3 = getNewXid();
- Xid xid4 = getNewXid();
- String durSubName = "xaSubDurable";
+ // create a dummy durable subscriber to be sure that messages are persisted!
+ _nonXASession.createDurableSubscriber(_topic, durSubName);
+ // start the xaResource for xid1
try
{
- TopicSubscriber xaDurSub = _session.createDurableSubscriber(_topic, durSubName);
- try
- {
- _topicConnection.start();
- //----- start xid1
- _xaResource.start(xid1, XAResource.TMNOFLAGS);
- // start the connection
- _topicConnection.start();
- // produce a message with sequence number 1
- _message.setLongProperty(_sequenceNumberPropertyName, 1);
- _producer.send(_message);
- // commit
- _xaResource.end(xid1, XAResource.TMSUCCESS);
- if (_xaResource.prepare(xid1) != XAResource.XA_OK)
- {
- fail("Problem when preparing tx1 ");
- }
- _xaResource.commit(xid1, false);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- fail("Exception when working with xid1: " + e.getMessage());
- }
- try
- {
- // start xid2
- _xaResource.start(xid2, XAResource.TMNOFLAGS);
- // receive the previously produced message
- TextMessage message = (TextMessage) xaDurSub.receive(1000);
- if (message == null)
- {
- fail("no message received ");
- }
- else if (message.getLongProperty(_sequenceNumberPropertyName) != 1)
- {
- fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
- }
- // prepare xid2
- _xaResource.end(xid2, XAResource.TMSUCCESS);
- if (_xaResource.prepare(xid2) != XAResource.XA_OK)
- {
- fail("Problem when preparing tx2 ");
- }
- }
- catch (Exception e)
- {
- e.printStackTrace();
- fail("Exception when working with xid2: " + e.getMessage());
- }
+ _xaResource.start(xid1, XAResource.TMNOFLAGS);
+ }
+ catch (XAException e)
+ {
+ fail("cannot start the transaction with xid1: " + e.getMessage());
+ }
+ try
+ {
+ // start the connection
+ _topicConnection.start();
+ // produce a message with sequence number 1
+ _message.setLongProperty(_sequenceNumberPropertyName, 1);
+ _producer.send(_message);
+ }
+ catch (JMSException e)
+ {
+ fail(" cannot send persistent message: " + e.getMessage());
+ }
+ // suspend the transaction
+ try
+ {
+ _xaResource.end(xid1, XAResource.TMSUCCESS);
+ }
+ catch (XAException e)
+ {
+ fail("Cannot end the transaction with xid1: " + e.getMessage());
+ }
+ // prepare the transaction with xid1
+ try
+ {
+ _xaResource.prepare(xid1);
+ }
+ catch (XAException e)
+ {
+ fail("Exception when preparing xid1: " + e.getMessage());
+ }
- /////// stop the server now !!
- try
- {
- restartBroker();
- init();
- }
- catch (Exception e)
- {
- fail("Exception when stopping and restarting the server");
- }
+ /////// stop the server now !!
+ try
+ {
+ restartBroker();
+ init();
+ }
+ catch (Exception e)
+ {
+ fail("Exception when stopping and restarting the server");
+ }
+ try
+ {
+ MessageConsumer nonXAConsumer = _nonXASession.createDurableSubscriber(_topic, durSubName);
+ _topicConnection.start();
// get the list of in doubt transactions
try
{
- _topicConnection.start();
- // reconnect to dursub!
- xaDurSub = _session.createDurableSubscriber(_topic, durSubName);
Xid[] inDoubt = _xaResource.recover(XAResource.TMSTARTRSCAN);
if (inDoubt == null)
{
@@ -1132,28 +1291,28 @@ public class TopicTest extends AbstractX
// At that point we expect only two indoubt transactions:
if (inDoubt.length != 1)
{
- fail("in doubt transaction size is diffenrent than 2, there are " + inDoubt.length + "in doubt transactions");
+ fail("in doubt transaction size is diffenrent thatn 2, there are " + inDoubt.length + "in doubt transactions");
}
-
// commit them
for (Xid anInDoubt : inDoubt)
{
- if (anInDoubt.equals(xid2))
+ if (anInDoubt.equals(xid1))
{
- System.out.println("aborting xid2 ");
+ _logger.debug("committing xid1 ");
try
{
- _xaResource.rollback(anInDoubt);
+ _xaResource.commit(anInDoubt, false);
}
catch (Exception e)
{
+ _logger.debug("PB when aborted xid1");
e.printStackTrace();
- fail("exception when aborting xid2 ");
+ fail("exception when committing xid1 ");
}
}
else
{
- System.out.println("XID2 is not in doubt ");
+ _logger.debug("XID1 is not in doubt ");
}
}
}
@@ -1162,215 +1321,35 @@ public class TopicTest extends AbstractX
e.printStackTrace();
fail("exception thrown when recovering transactions " + e.getMessage());
}
-
- try
- {
- // start xid3
- _xaResource.start(xid3, XAResource.TMNOFLAGS);
- // receive the previously produced message and aborted
- TextMessage message = (TextMessage) xaDurSub.receive(1000);
- if (message == null)
- {
- fail("no message received ");
- }
- else if (message.getLongProperty(_sequenceNumberPropertyName) != 1)
- {
- fail("wrong sequence number: " + message.getLongProperty(_sequenceNumberPropertyName));
- }
- // commit xid3
- _xaResource.end(xid3, XAResource.TMSUCCESS);
- if (_xaResource.prepare(xid3) != XAResource.XA_OK)
- {
- fail("Problem when preparing tx3 ");
- }
- _xaResource.commit(xid3, false);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- fail("Exception when working with xid3: " + e.getMessage());
- }
- try
- {
- // start xid4
- _xaResource.start(xid4, XAResource.TMNOFLAGS);
- // should now be empty
- TextMessage message = (TextMessage) xaDurSub.receive(1000);
- if (message != null)
- {
- fail("An unexpected message was received " + message
- .getLongProperty(_sequenceNumberPropertyName));
- }
- // commit xid4
- _xaResource.end(xid4, XAResource.TMSUCCESS);
- _xaResource.commit(xid4, true);
- }
- catch (Exception e)
+ _logger.debug("the topic should not be empty");
+ TextMessage message1 = (TextMessage) nonXAConsumer.receive(1000);
+ if (message1 == null)
{
- e.printStackTrace();
- fail("Exception when working with xid4: " + e.getMessage());
+ fail("The topic is empty! ");
}
}
catch (Exception e)
{
e.printStackTrace();
- fail("problem when creating dur sub: " + e.getMessage());
- }
- finally
- {
- try
- {
- _session.unsubscribe(durSubName);
- }
- catch (JMSException e)
- {
- e.printStackTrace();
- fail("problem when unsubscribing dur sub: " + e.getMessage());
- }
+ fail("Exception thrown when testin that queue test is empty: " + e.getMessage());
}
}
- }
-
- /**
- * strategy: Produce a message within Tx1 and prepare tx1. Shutdown the server and get the list of indoubt transactions:
- * we expect tx1, Tx1 is committed so we expect the test topic not to be empty!
- */
- public void testRecover()
- {
- if (!isBroker08())
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ fail("cannot create dummy durable subscriber: " + e.getMessage());
+ }
+ finally
{
- Xid xid1 = getNewXid();
- String durSubName = "test1";
try
{
- // create a dummy durable subscriber to be sure that messages are persisted!
- _nonXASession.createDurableSubscriber(_topic, durSubName);
- // start the xaResource for xid1
- try
- {
- _xaResource.start(xid1, XAResource.TMNOFLAGS);
- }
- catch (XAException e)
- {
- fail("cannot start the transaction with xid1: " + e.getMessage());
- }
[... 576 lines stripped ...]
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org