You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/04/10 00:17:05 UTC
svn commit: r1466265 - in /activemq/activemq-cpp/trunk/activemq-cpp/src:
main/activemq/core/kernels/ test-integration/activemq/test/openwire/
Author: tabish
Date: Tue Apr 9 22:17:05 2013
New Revision: 1466265
URL: http://svn.apache.org/r1466265
Log:
https://issues.apache.org/jira/browse/AMQCPP-472
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1466265&r1=1466264&r2=1466265&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp Tue Apr 9 22:17:05 2013
@@ -26,6 +26,7 @@
#include <decaf/lang/Integer.h>
#include <decaf/lang/Long.h>
#include <decaf/util/HashMap.h>
+#include <decaf/util/Collections.h>
#include <decaf/util/concurrent/ExecutorService.h>
#include <decaf/util/concurrent/Executors.h>
#include <activemq/util/Config.h>
@@ -436,14 +437,14 @@ namespace {
public:
- ClientAckHandler( ActiveMQSessionKernel* session ) : session(session) {
+ ClientAckHandler(ActiveMQSessionKernel* session) : session(session) {
if (session == NULL) {
throw NullPointerException(
__FILE__, __LINE__, "Ack Handler Created with NULL Session.");
}
}
- void acknowledgeMessage(const commands::Message* message AMQCPP_UNUSED ) {
+ void acknowledgeMessage(const commands::Message* message AMQCPP_UNUSED) {
try {
this->session->acknowledge();
}
@@ -595,7 +596,7 @@ namespace {
ActiveMQSessionKernel* session;
Pointer<ActiveMQConsumerKernel> consumer;
ActiveMQConsumerKernelConfig* impl;
- LinkedList<Pointer<MessageDispatch> > redeliveries;
+ ArrayList<Pointer<MessageDispatch> > redeliveries;
private:
@@ -608,6 +609,7 @@ namespace {
Runnable(), session(session), consumer(consumer), impl(impl), redeliveries() {
this->redeliveries.copy(impl->dispatchedMessages);
+ Collections::reverse(this->redeliveries);
}
virtual ~NonBlockingRedeliveryTask() {}
@@ -1183,7 +1185,7 @@ void ActiveMQConsumerKernel::afterMessag
makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
if (ack != NULL) {
this->internal->dispatchedMessages.clear();
- session->oneway(ack);
+ session->sendAck(ack);
}
}
}
@@ -1275,7 +1277,7 @@ void ActiveMQConsumerKernel::ackLater(Po
// old pending ack being superseded by ack of another type, if is is not a delivered
// ack and hence important, send it now so it is not lost.
if (oldPendingAck->getAckType() != ActiveMQConstants::ACK_TYPE_DELIVERED) {
- session->oneway(oldPendingAck);
+ session->sendAck(oldPendingAck);
}
}
@@ -1436,14 +1438,17 @@ void ActiveMQConsumerKernel::rollback()
this->internal->additionalWindowSize - (int) this->internal->dispatchedMessages.size());
this->internal->redeliveryDelay = 0;
+ this->internal->deliveredCounter -= (int) internal->dispatchedMessages.size();
+ this->internal->dispatchedMessages.clear();
+
} else {
// only redelivery_ack after first delivery
if (currentRedeliveryCount > 0) {
- Pointer<MessageAck> ack(new MessageAck(lastMsg, ActiveMQConstants::ACK_TYPE_POISON,
+ Pointer<MessageAck> ack(new MessageAck(lastMsg, ActiveMQConstants::ACK_TYPE_REDELIVERED,
this->internal->dispatchedMessages.size()));
ack->setFirstMessageId(firstMsgId);
- session->oneway(ack);
+ session->sendAck(ack);
}
if (this->internal->nonBlockingRedelivery) {
@@ -1451,9 +1456,15 @@ void ActiveMQConsumerKernel::rollback()
if (!this->internal->unconsumedMessages->isClosed()) {
Pointer<ActiveMQConsumerKernel> self =
this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
+
+ NonBlockingRedeliveryTask* redeliveryTask =
+ new NonBlockingRedeliveryTask(session, self, this->internal);
+
+ this->internal->deliveredCounter -= (int) internal->dispatchedMessages.size();
+ this->internal->dispatchedMessages.clear();
+
this->session->getScheduler()->executeAfterDelay(
- new NonBlockingRedeliveryTask(session, self, this->internal),
- this->internal->redeliveryDelay);
+ redeliveryTask, this->internal->redeliveryDelay);
}
} else {
// stop the delivery of messages.
@@ -1465,6 +1476,9 @@ void ActiveMQConsumerKernel::rollback()
this->internal->unconsumedMessages->enqueueFirst(iter->next());
}
+ this->internal->deliveredCounter -= (int) internal->dispatchedMessages.size();
+ this->internal->dispatchedMessages.clear();
+
if (internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed()) {
Pointer<ActiveMQConsumerKernel> self =
this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
@@ -1475,8 +1489,6 @@ void ActiveMQConsumerKernel::rollback()
}
}
}
- this->internal->deliveredCounter -= (int) internal->dispatchedMessages.size();
- this->internal->dispatchedMessages.clear();
}
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp?rev=1466265&r1=1466264&r2=1466265&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp Tue Apr 9 22:17:05 2013
@@ -32,6 +32,7 @@
#include <decaf/lang/Thread.h>
#include <decaf/lang/Pointer.h>
#include <decaf/util/LinkedList.h>
+#include <decaf/util/LinkedHashSet.h>
#include <decaf/util/concurrent/atomic/AtomicInteger.h>
using namespace std;
@@ -52,6 +53,41 @@ using namespace decaf::util::concurrent:
////////////////////////////////////////////////////////////////////////////////
namespace {
+ void sendMessages(const std::string& uri, const std::string destinationName, int count) {
+ Pointer<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory(uri));
+ Pointer<Connection> connection(connectionFactory->createConnection());
+ Pointer<Session> session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+ Pointer<Destination> destination(session->createQueue(destinationName));
+ Pointer<MessageProducer> producer(session->createProducer(destination.get()));
+ for(int i = 0; i < count; ++i) {
+ Pointer<TextMessage> message(session->createTextMessage());
+ producer->send(message.get());
+ }
+ connection->close();
+ }
+
+ void destroyDestination(const std::string& uri, const std::string destinationName) {
+ Pointer<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory(uri));
+ Pointer<Connection> connection(connectionFactory->createConnection());
+ Pointer<Session> session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+ Pointer<Destination> destination(session->createQueue(destinationName));
+ Pointer<ActiveMQConnection> amqCon = connection.dynamicCast<ActiveMQConnection>();
+ amqCon->destroyDestination(destination.get());
+ connection->close();
+ }
+
+ bool assertTrue(LinkedHashSet< Pointer<MessageId> >& set, int expected) {
+ for (int i = 0; i <= 60; ++i) {
+ if (set.size() == expected) {
+ return true;
+ }
+
+ Thread::sleep(1000);
+ }
+
+ return false;
+ }
+
class TestProducer : public Thread {
private:
@@ -235,8 +271,10 @@ void OpenwireNonBlockingRedeliveryTest::
const std::string DEST_NAME = "QUEUE.FOO";
- TestProducer producer(getBrokerURL(), DEST_NAME, 500);
- TestConsumer consumer(getBrokerURL(), DEST_NAME, &messages, 500);
+ destroyDestination(getBrokerURL(), DEST_NAME);
+
+ TestProducer producer(getBrokerURL(), DEST_NAME, 100);
+ TestConsumer consumer(getBrokerURL(), DEST_NAME, &messages, 100);
producer.start();
consumer.start();
@@ -259,4 +297,369 @@ void OpenwireNonBlockingRedeliveryTest::
}
CPPUNIT_ASSERT(!ordered);
+ destroyDestination(getBrokerURL(), DEST_NAME);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+ class ReceivedListener : public cms::MessageListener {
+ private:
+
+ LinkedHashSet< Pointer<MessageId> >* received;
+
+ public:
+
+ ReceivedListener(LinkedHashSet< Pointer<MessageId> >* received) :
+ cms::MessageListener(), received(received) {
+ }
+
+ virtual ~ReceivedListener() {
+ }
+
+ virtual void onMessage(const cms::Message* message) {
+ const commands::Message* amqMessage =
+ dynamic_cast<const commands::Message*>(message);
+
+ received->add(amqMessage->getMessageId());
+ }
+
+ };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireNonBlockingRedeliveryTest::testMessageDeleiveredWhenNonBlockingEnabled() {
+
+ LinkedHashSet< Pointer<MessageId> > received;
+ LinkedHashSet< Pointer<MessageId> > beforeRollback;
+ LinkedHashSet< Pointer<MessageId> > afterRollback;
+
+ const int MSG_COUNT = 100;
+ const std::string destinationName = "testMessageDeleiveredWhenNonBlockingEnabled";
+
+ destroyDestination(getBrokerURL(), destinationName);
+
+ Pointer<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory(getBrokerURL()));
+ Pointer<Connection> connection(connectionFactory->createConnection());
+ Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
+ Pointer<Destination> destination(session->createQueue(destinationName));
+ Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+
+ ReceivedListener receivedListener(&received);
+ consumer->setMessageListener(&receivedListener);
+ sendMessages(getBrokerURL(), destinationName, MSG_COUNT);
+
+ connection->start();
+
+ CPPUNIT_ASSERT_MESSAGE("Pre-Rollack received size incorrect", assertTrue(received, MSG_COUNT));
+
+ beforeRollback.addAll(received);
+ received.clear();
+ session->rollback();
+
+ CPPUNIT_ASSERT_MESSAGE("Post-Rollack received size incorrect", assertTrue(received, MSG_COUNT));
+
+ afterRollback.addAll(received);
+ received.clear();
+
+ CPPUNIT_ASSERT_EQUAL(beforeRollback.size(), afterRollback.size());
+ CPPUNIT_ASSERT(beforeRollback.equals(afterRollback));
+ session->commit();
+ connection->close();
+ destroyDestination(getBrokerURL(), destinationName);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireNonBlockingRedeliveryTest::testMessageRedeliveriesAreInOrder() {
+
+ LinkedHashSet< Pointer<MessageId> > received;
+ LinkedHashSet< Pointer<MessageId> > beforeRollback;
+ LinkedHashSet< Pointer<MessageId> > afterRollback;
+
+ const int MSG_COUNT = 100;
+ const std::string destinationName = "testMessageDeleiveredWhenNonBlockingEnabled";
+
+ destroyDestination(getBrokerURL(), destinationName);
+
+ Pointer<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory(getBrokerURL()));
+ Pointer<Connection> connection(connectionFactory->createConnection());
+ Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
+ Pointer<Destination> destination(session->createQueue(destinationName));
+ Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+
+ ReceivedListener receivedListener(&received);
+ consumer->setMessageListener(&receivedListener);
+ sendMessages(getBrokerURL(), destinationName, MSG_COUNT);
+
+ connection->start();
+
+ CPPUNIT_ASSERT_MESSAGE("Pre-Rollack received size incorrect", assertTrue(received, MSG_COUNT));
+
+ beforeRollback.addAll(received);
+ received.clear();
+ session->rollback();
+
+ CPPUNIT_ASSERT_MESSAGE("Post-Rollack received size incorrect", assertTrue(received, MSG_COUNT));
+
+ afterRollback.addAll(received);
+ received.clear();
+
+ CPPUNIT_ASSERT_EQUAL(beforeRollback.size(), afterRollback.size());
+ CPPUNIT_ASSERT(beforeRollback.equals(afterRollback));
+
+ Pointer< Iterator<Pointer<MessageId> > > after(afterRollback.iterator());
+ Pointer< Iterator<Pointer<MessageId> > > before(beforeRollback.iterator());
+
+ while (before->hasNext() && after->hasNext()) {
+ Pointer<MessageId> original = before->next();
+ Pointer<MessageId> rolledBack = after->next();
+
+ long long originalSeq = original->getProducerSequenceId();
+ long long rolledbackSeq = rolledBack->getProducerSequenceId();
+
+ CPPUNIT_ASSERT_EQUAL(originalSeq, rolledbackSeq);
+ }
+
+ session->commit();
+ connection->close();
+ destroyDestination(getBrokerURL(), destinationName);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireNonBlockingRedeliveryTest::testMessageDeleiveryDoesntStop() {
+
+ LinkedHashSet< Pointer<MessageId> > received;
+ LinkedHashSet< Pointer<MessageId> > beforeRollback;
+ LinkedHashSet< Pointer<MessageId> > afterRollback;
+
+ const int MSG_COUNT = 100;
+ const std::string destinationName = "testMessageDeleiveryDoesntStop";
+
+ destroyDestination(getBrokerURL(), destinationName);
+
+ Pointer<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory(getBrokerURL()));
+ Pointer<Connection> connection(connectionFactory->createConnection());
+ Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
+ Pointer<Destination> destination(session->createQueue(destinationName));
+ Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+
+ ReceivedListener receivedListener(&received);
+ consumer->setMessageListener(&receivedListener);
+ sendMessages(getBrokerURL(), destinationName, MSG_COUNT);
+
+ connection->start();
+
+ CPPUNIT_ASSERT_MESSAGE("Pre-Rollack received size incorrect", assertTrue(received, MSG_COUNT));
+
+ beforeRollback.addAll(received);
+ received.clear();
+ session->rollback();
+
+ sendMessages(getBrokerURL(), destinationName, MSG_COUNT);
+
+ CPPUNIT_ASSERT_MESSAGE("Post-Rollack received size incorrect", assertTrue(received, MSG_COUNT * 2));
+
+ afterRollback.addAll(received);
+ received.clear();
+
+ CPPUNIT_ASSERT_EQUAL(beforeRollback.size() * 2, afterRollback.size());
+ session->commit();
+ connection->close();
+
+ destroyDestination(getBrokerURL(), destinationName);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireNonBlockingRedeliveryTest::testNonBlockingMessageDeleiveryIsDelayed() {
+
+ LinkedHashSet< Pointer<MessageId> > received;
+
+ const int MSG_COUNT = 100;
+ const std::string destinationName = "testNonBlockingMessageDeleiveryIsDelayed";
+
+ destroyDestination(getBrokerURL(), destinationName);
+
+ Pointer<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory(getBrokerURL()));
+ connectionFactory->getRedeliveryPolicy()->setInitialRedeliveryDelay(TimeUnit::SECONDS.toMillis(10));
+
+ Pointer<Connection> connection(connectionFactory->createConnection());
+ Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
+ Pointer<Destination> destination(session->createQueue(destinationName));
+ Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+
+ ReceivedListener receivedListener(&received);
+ consumer->setMessageListener(&receivedListener);
+ sendMessages(getBrokerURL(), destinationName, MSG_COUNT);
+
+ connection->start();
+
+ CPPUNIT_ASSERT_MESSAGE("Pre-Rollack received size incorrect", assertTrue(received, MSG_COUNT));
+
+ received.clear();
+ session->rollback();
+
+ TimeUnit::SECONDS.sleep(6);
+ CPPUNIT_ASSERT_MESSAGE("Rollback redelivery was not delayed.", received.isEmpty());
+
+ CPPUNIT_ASSERT_MESSAGE("Post-Rollack received size incorrect", assertTrue(received, MSG_COUNT));
+
+ session->commit();
+ connection->close();
+
+ destroyDestination(getBrokerURL(), destinationName);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+ class SomeRollbacksListener : public cms::MessageListener {
+ private:
+
+ int count;
+ Pointer<Session> session;
+ LinkedHashSet< Pointer<MessageId> >* received;
+
+ public:
+
+ SomeRollbacksListener(Pointer<Session> session, LinkedHashSet< Pointer<MessageId> >* received) :
+ cms::MessageListener(), count(0), session(session), received(received) {
+ }
+
+ virtual ~SomeRollbacksListener() {}
+
+ virtual void onMessage(const cms::Message* message) {
+ const commands::Message* amqMessage =
+ dynamic_cast<const commands::Message*>(message);
+
+ if (++count > 10) {
+ try {
+ session->rollback();
+ count = 0;
+ } catch (CMSException& e) {
+ }
+ } else {
+ received->add(amqMessage->getMessageId());
+ try {
+ session->commit();
+ } catch (CMSException& e) {
+ }
+ }
+ }
+
+ };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireNonBlockingRedeliveryTest::testNonBlockingMessageDeleiveryWithRollbacks() {
+
+ LinkedHashSet< Pointer<MessageId> > received;
+
+ const int MSG_COUNT = 100;
+ const std::string destinationName = "testNonBlockingMessageDeleiveryWithRollbacks";
+
+ destroyDestination(getBrokerURL(), destinationName);
+
+ Pointer<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory(getBrokerURL()));
+ connectionFactory->getRedeliveryPolicy()->setInitialRedeliveryDelay(TimeUnit::SECONDS.toMillis(10));
+
+ Pointer<Connection> connection(connectionFactory->createConnection());
+ Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
+ Pointer<Destination> destination(session->createQueue(destinationName));
+ Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+
+ ReceivedListener receivedListener(&received);
+ consumer->setMessageListener(&receivedListener);
+ sendMessages(getBrokerURL(), destinationName, MSG_COUNT);
+
+ connection->start();
+
+ CPPUNIT_ASSERT_MESSAGE("Pre-Rollack received size incorrect", assertTrue(received, MSG_COUNT));
+
+ received.clear();
+
+ SomeRollbacksListener newListener(session, &received);
+ consumer->setMessageListener(&newListener);
+
+ session->rollback();
+
+ CPPUNIT_ASSERT_MESSAGE("Post-Rollack received size incorrect", assertTrue(received, MSG_COUNT));
+
+ session->commit();
+ connection->close();
+
+ destroyDestination(getBrokerURL(), destinationName);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+ class RollbacksListener : public cms::MessageListener {
+ private:
+
+ Pointer<Session> session;
+
+ public:
+
+ RollbacksListener(Pointer<Session> session) :
+ cms::MessageListener(), session(session) {
+ }
+
+ virtual ~RollbacksListener() {
+ }
+
+ virtual void onMessage(const cms::Message* message) {
+ try {
+ session->rollback();
+ } catch (CMSException& e) {
+ }
+ }
+
+ };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireNonBlockingRedeliveryTest::testNonBlockingMessageDeleiveryWithAllRolledBack() {
+
+ LinkedHashSet< Pointer<MessageId> > received;
+ LinkedHashSet< Pointer<MessageId> > dlqed;
+
+ const int MSG_COUNT = 100;
+ const std::string destinationName = "testNonBlockingMessageDeleiveryWithAllRolledBack";
+
+ destroyDestination(getBrokerURL(), destinationName);
+ destroyDestination(getBrokerURL(), "ActiveMQ.DLQ");
+
+ Pointer<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory(getBrokerURL()));
+ connectionFactory->getRedeliveryPolicy()->setMaximumRedeliveries(5);
+ connectionFactory->getRedeliveryPolicy()->setInitialRedeliveryDelay(TimeUnit::SECONDS.toMillis(5));
+
+ Pointer<Connection> connection(connectionFactory->createConnection());
+ Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
+ Pointer<Destination> destination(session->createQueue(destinationName));
+ Pointer<Destination> dlq(session->createQueue("ActiveMQ.DLQ"));
+ Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+ Pointer<MessageConsumer> dlqConsumer(session->createConsumer(dlq.get()));
+
+ ReceivedListener dlqReceivedListener(&dlqed);
+ dlqConsumer->setMessageListener(&dlqReceivedListener);
+
+ ReceivedListener receivedListener(&received);
+ consumer->setMessageListener(&receivedListener);
+
+ sendMessages(getBrokerURL(), destinationName, MSG_COUNT);
+ connection->start();
+
+ CPPUNIT_ASSERT_MESSAGE("Pre-Rollack received size incorrect", assertTrue(received, MSG_COUNT));
+
+ session->rollback();
+
+ RollbacksListener rollbackListener(session);
+ consumer->setMessageListener(&rollbackListener);
+
+ CPPUNIT_ASSERT_MESSAGE("Post-Rollack DQL size incorrect", assertTrue(dlqed, MSG_COUNT));
+
+ session->commit();
+ connection->close();
+
+ destroyDestination(getBrokerURL(), destinationName);
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h?rev=1466265&r1=1466264&r2=1466265&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h Tue Apr 9 22:17:05 2013
@@ -27,7 +27,13 @@ namespace openwire {
class OpenwireNonBlockingRedeliveryTest : public MessagePriorityTest {
CPPUNIT_TEST_SUITE( OpenwireNonBlockingRedeliveryTest );
- CPPUNIT_TEST( testConsumerMessagesAreNotOrdered );
+// CPPUNIT_TEST( testConsumerMessagesAreNotOrdered );
+// CPPUNIT_TEST( testMessageDeleiveredWhenNonBlockingEnabled );
+// CPPUNIT_TEST( testMessageDeleiveredWhenNonBlockingEnabled );
+// CPPUNIT_TEST( testMessageDeleiveryDoesntStop );
+// CPPUNIT_TEST( testNonBlockingMessageDeleiveryIsDelayed );
+// CPPUNIT_TEST( testNonBlockingMessageDeleiveryWithRollbacks );
+ CPPUNIT_TEST( testNonBlockingMessageDeleiveryWithAllRolledBack );
CPPUNIT_TEST_SUITE_END();
public:
@@ -38,6 +44,12 @@ namespace openwire {
virtual std::string getBrokerURL() const;
void testConsumerMessagesAreNotOrdered();
+ void testMessageDeleiveredWhenNonBlockingEnabled();
+ void testMessageRedeliveriesAreInOrder();
+ void testMessageDeleiveryDoesntStop();
+ void testNonBlockingMessageDeleiveryIsDelayed();
+ void testNonBlockingMessageDeleiveryWithRollbacks();
+ void testNonBlockingMessageDeleiveryWithAllRolledBack();
};