You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/04/10 00:17:05 UTC

svn commit: r1466265 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/activemq/core/kernels/ test-integration/activemq/test/openwire/

Author: tabish
Date: Tue Apr  9 22:17:05 2013
New Revision: 1466265

URL: http://svn.apache.org/r1466265
Log:
https://issues.apache.org/jira/browse/AMQCPP-472

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1466265&r1=1466264&r2=1466265&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp Tue Apr  9 22:17:05 2013
@@ -26,6 +26,7 @@
 #include <decaf/lang/Integer.h>
 #include <decaf/lang/Long.h>
 #include <decaf/util/HashMap.h>
+#include <decaf/util/Collections.h>
 #include <decaf/util/concurrent/ExecutorService.h>
 #include <decaf/util/concurrent/Executors.h>
 #include <activemq/util/Config.h>
@@ -436,14 +437,14 @@ namespace {
 
     public:
 
-        ClientAckHandler( ActiveMQSessionKernel* session ) : session(session) {
+        ClientAckHandler(ActiveMQSessionKernel* session) : session(session) {
             if (session == NULL) {
                 throw NullPointerException(
                     __FILE__, __LINE__, "Ack Handler Created with NULL Session.");
             }
         }
 
-        void acknowledgeMessage(const commands::Message* message AMQCPP_UNUSED ) {
+        void acknowledgeMessage(const commands::Message* message AMQCPP_UNUSED) {
             try {
                 this->session->acknowledge();
             }
@@ -595,7 +596,7 @@ namespace {
         ActiveMQSessionKernel* session;
         Pointer<ActiveMQConsumerKernel> consumer;
         ActiveMQConsumerKernelConfig* impl;
-        LinkedList<Pointer<MessageDispatch> > redeliveries;
+        ArrayList<Pointer<MessageDispatch> > redeliveries;
 
     private:
 
@@ -608,6 +609,7 @@ namespace {
             Runnable(), session(session), consumer(consumer), impl(impl), redeliveries() {
 
             this->redeliveries.copy(impl->dispatchedMessages);
+            Collections::reverse(this->redeliveries);
         }
         virtual ~NonBlockingRedeliveryTask() {}
 
@@ -1183,7 +1185,7 @@ void ActiveMQConsumerKernel::afterMessag
                                 makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
                             if (ack != NULL) {
                                 this->internal->dispatchedMessages.clear();
-                                session->oneway(ack);
+                                session->sendAck(ack);
                             }
                         }
                     }
@@ -1275,7 +1277,7 @@ void ActiveMQConsumerKernel::ackLater(Po
         // old pending ack being superseded by ack of another type, if is is not a delivered
         // ack and hence important, send it now so it is not lost.
         if (oldPendingAck->getAckType() != ActiveMQConstants::ACK_TYPE_DELIVERED) {
-            session->oneway(oldPendingAck);
+            session->sendAck(oldPendingAck);
         }
     }
 
@@ -1436,14 +1438,17 @@ void ActiveMQConsumerKernel::rollback() 
                     this->internal->additionalWindowSize - (int) this->internal->dispatchedMessages.size());
                 this->internal->redeliveryDelay = 0;
 
+                this->internal->deliveredCounter -= (int) internal->dispatchedMessages.size();
+                this->internal->dispatchedMessages.clear();
+
             } else {
 
                 // only redelivery_ack after first delivery
                 if (currentRedeliveryCount > 0) {
-                    Pointer<MessageAck> ack(new MessageAck(lastMsg, ActiveMQConstants::ACK_TYPE_POISON,
+                    Pointer<MessageAck> ack(new MessageAck(lastMsg, ActiveMQConstants::ACK_TYPE_REDELIVERED,
                                             this->internal->dispatchedMessages.size()));
                     ack->setFirstMessageId(firstMsgId);
-                    session->oneway(ack);
+                    session->sendAck(ack);
                 }
 
                 if (this->internal->nonBlockingRedelivery) {
@@ -1451,9 +1456,15 @@ void ActiveMQConsumerKernel::rollback() 
                     if (!this->internal->unconsumedMessages->isClosed()) {
                         Pointer<ActiveMQConsumerKernel> self =
                             this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
+
+                        NonBlockingRedeliveryTask* redeliveryTask =
+                            new NonBlockingRedeliveryTask(session, self, this->internal);
+
+                        this->internal->deliveredCounter -= (int) internal->dispatchedMessages.size();
+                        this->internal->dispatchedMessages.clear();
+
                         this->session->getScheduler()->executeAfterDelay(
-                            new NonBlockingRedeliveryTask(session, self, this->internal),
-                            this->internal->redeliveryDelay);
+                            redeliveryTask, this->internal->redeliveryDelay);
                     }
                 } else {
                     // stop the delivery of messages.
@@ -1465,6 +1476,9 @@ void ActiveMQConsumerKernel::rollback() 
                         this->internal->unconsumedMessages->enqueueFirst(iter->next());
                     }
 
+                    this->internal->deliveredCounter -= (int) internal->dispatchedMessages.size();
+                    this->internal->dispatchedMessages.clear();
+
                     if (internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed()) {
                         Pointer<ActiveMQConsumerKernel> self =
                             this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
@@ -1475,8 +1489,6 @@ void ActiveMQConsumerKernel::rollback() 
                     }
                 }
             }
-            this->internal->deliveredCounter -= (int) internal->dispatchedMessages.size();
-            this->internal->dispatchedMessages.clear();
         }
     }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp?rev=1466265&r1=1466264&r2=1466265&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp Tue Apr  9 22:17:05 2013
@@ -32,6 +32,7 @@
 #include <decaf/lang/Thread.h>
 #include <decaf/lang/Pointer.h>
 #include <decaf/util/LinkedList.h>
+#include <decaf/util/LinkedHashSet.h>
 #include <decaf/util/concurrent/atomic/AtomicInteger.h>
 
 using namespace std;
@@ -52,6 +53,41 @@ using namespace decaf::util::concurrent:
 ////////////////////////////////////////////////////////////////////////////////
 namespace {
 
+    void sendMessages(const std::string& uri, const std::string destinationName, int count) {
+        Pointer<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory(uri));
+        Pointer<Connection> connection(connectionFactory->createConnection());
+        Pointer<Session> session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+        Pointer<Destination> destination(session->createQueue(destinationName));
+        Pointer<MessageProducer> producer(session->createProducer(destination.get()));
+        for(int i = 0; i < count; ++i) {
+            Pointer<TextMessage> message(session->createTextMessage());
+            producer->send(message.get());
+        }
+        connection->close();
+    }
+
+    void destroyDestination(const std::string& uri, const std::string destinationName) {
+        Pointer<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory(uri));
+        Pointer<Connection> connection(connectionFactory->createConnection());
+        Pointer<Session> session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+        Pointer<Destination> destination(session->createQueue(destinationName));
+        Pointer<ActiveMQConnection> amqCon = connection.dynamicCast<ActiveMQConnection>();
+        amqCon->destroyDestination(destination.get());
+        connection->close();
+    }
+
+    bool assertTrue(LinkedHashSet< Pointer<MessageId> >& set, int expected) {
+        for (int i = 0; i <= 60; ++i) {
+            if (set.size() == expected) {
+                return true;
+            }
+
+            Thread::sleep(1000);
+        }
+
+        return false;
+    }
+
     class TestProducer : public Thread {
     private:
 
@@ -235,8 +271,10 @@ void OpenwireNonBlockingRedeliveryTest::
 
     const std::string DEST_NAME = "QUEUE.FOO";
 
-    TestProducer producer(getBrokerURL(), DEST_NAME, 500);
-    TestConsumer consumer(getBrokerURL(), DEST_NAME, &messages, 500);
+    destroyDestination(getBrokerURL(), DEST_NAME);
+
+    TestProducer producer(getBrokerURL(), DEST_NAME, 100);
+    TestConsumer consumer(getBrokerURL(), DEST_NAME, &messages, 100);
 
     producer.start();
     consumer.start();
@@ -259,4 +297,369 @@ void OpenwireNonBlockingRedeliveryTest::
     }
 
     CPPUNIT_ASSERT(!ordered);
+    destroyDestination(getBrokerURL(), DEST_NAME);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class ReceivedListener : public cms::MessageListener {
+    private:
+
+        LinkedHashSet< Pointer<MessageId> >* received;
+
+    public:
+
+        ReceivedListener(LinkedHashSet< Pointer<MessageId> >* received) :
+            cms::MessageListener(), received(received) {
+        }
+
+        virtual ~ReceivedListener() {
+        }
+
+        virtual void onMessage(const cms::Message* message) {
+            const commands::Message* amqMessage =
+                dynamic_cast<const commands::Message*>(message);
+
+            received->add(amqMessage->getMessageId());
+        }
+
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireNonBlockingRedeliveryTest::testMessageDeleiveredWhenNonBlockingEnabled() {
+
+    LinkedHashSet< Pointer<MessageId> > received;
+    LinkedHashSet< Pointer<MessageId> > beforeRollback;
+    LinkedHashSet< Pointer<MessageId> > afterRollback;
+
+    const int MSG_COUNT = 100;
+    const std::string destinationName = "testMessageDeleiveredWhenNonBlockingEnabled";
+
+    destroyDestination(getBrokerURL(), destinationName);
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory(getBrokerURL()));
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
+    Pointer<Destination> destination(session->createQueue(destinationName));
+    Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+
+    ReceivedListener receivedListener(&received);
+    consumer->setMessageListener(&receivedListener);
+    sendMessages(getBrokerURL(), destinationName, MSG_COUNT);
+
+    connection->start();
+
+    CPPUNIT_ASSERT_MESSAGE("Pre-Rollack received size incorrect", assertTrue(received, MSG_COUNT));
+
+    beforeRollback.addAll(received);
+    received.clear();
+    session->rollback();
+
+    CPPUNIT_ASSERT_MESSAGE("Post-Rollack received size incorrect", assertTrue(received, MSG_COUNT));
+
+    afterRollback.addAll(received);
+    received.clear();
+
+    CPPUNIT_ASSERT_EQUAL(beforeRollback.size(), afterRollback.size());
+    CPPUNIT_ASSERT(beforeRollback.equals(afterRollback));
+    session->commit();
+    connection->close();
+    destroyDestination(getBrokerURL(), destinationName);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireNonBlockingRedeliveryTest::testMessageRedeliveriesAreInOrder() {
+
+    LinkedHashSet< Pointer<MessageId> > received;
+    LinkedHashSet< Pointer<MessageId> > beforeRollback;
+    LinkedHashSet< Pointer<MessageId> > afterRollback;
+
+    const int MSG_COUNT = 100;
+    const std::string destinationName = "testMessageDeleiveredWhenNonBlockingEnabled";
+
+    destroyDestination(getBrokerURL(), destinationName);
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory(getBrokerURL()));
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
+    Pointer<Destination> destination(session->createQueue(destinationName));
+    Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+
+    ReceivedListener receivedListener(&received);
+    consumer->setMessageListener(&receivedListener);
+    sendMessages(getBrokerURL(), destinationName, MSG_COUNT);
+
+    connection->start();
+
+    CPPUNIT_ASSERT_MESSAGE("Pre-Rollack received size incorrect", assertTrue(received, MSG_COUNT));
+
+    beforeRollback.addAll(received);
+    received.clear();
+    session->rollback();
+
+    CPPUNIT_ASSERT_MESSAGE("Post-Rollack received size incorrect", assertTrue(received, MSG_COUNT));
+
+    afterRollback.addAll(received);
+    received.clear();
+
+    CPPUNIT_ASSERT_EQUAL(beforeRollback.size(), afterRollback.size());
+    CPPUNIT_ASSERT(beforeRollback.equals(afterRollback));
+
+    Pointer< Iterator<Pointer<MessageId> > > after(afterRollback.iterator());
+    Pointer< Iterator<Pointer<MessageId> > > before(beforeRollback.iterator());
+
+    while (before->hasNext() && after->hasNext()) {
+        Pointer<MessageId> original = before->next();
+        Pointer<MessageId> rolledBack = after->next();
+
+        long long originalSeq = original->getProducerSequenceId();
+        long long rolledbackSeq = rolledBack->getProducerSequenceId();
+
+        CPPUNIT_ASSERT_EQUAL(originalSeq, rolledbackSeq);
+    }
+
+    session->commit();
+    connection->close();
+    destroyDestination(getBrokerURL(), destinationName);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireNonBlockingRedeliveryTest::testMessageDeleiveryDoesntStop() {
+
+    LinkedHashSet< Pointer<MessageId> > received;
+    LinkedHashSet< Pointer<MessageId> > beforeRollback;
+    LinkedHashSet< Pointer<MessageId> > afterRollback;
+
+    const int MSG_COUNT = 100;
+    const std::string destinationName = "testMessageDeleiveryDoesntStop";
+
+    destroyDestination(getBrokerURL(), destinationName);
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory(getBrokerURL()));
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
+    Pointer<Destination> destination(session->createQueue(destinationName));
+    Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+
+    ReceivedListener receivedListener(&received);
+    consumer->setMessageListener(&receivedListener);
+    sendMessages(getBrokerURL(), destinationName, MSG_COUNT);
+
+    connection->start();
+
+    CPPUNIT_ASSERT_MESSAGE("Pre-Rollack received size incorrect", assertTrue(received, MSG_COUNT));
+
+    beforeRollback.addAll(received);
+    received.clear();
+    session->rollback();
+
+    sendMessages(getBrokerURL(), destinationName, MSG_COUNT);
+
+    CPPUNIT_ASSERT_MESSAGE("Post-Rollack received size incorrect", assertTrue(received, MSG_COUNT * 2));
+
+    afterRollback.addAll(received);
+    received.clear();
+
+    CPPUNIT_ASSERT_EQUAL(beforeRollback.size() * 2, afterRollback.size());
+    session->commit();
+    connection->close();
+
+    destroyDestination(getBrokerURL(), destinationName);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireNonBlockingRedeliveryTest::testNonBlockingMessageDeleiveryIsDelayed() {
+
+    LinkedHashSet< Pointer<MessageId> > received;
+
+    const int MSG_COUNT = 100;
+    const std::string destinationName = "testNonBlockingMessageDeleiveryIsDelayed";
+
+    destroyDestination(getBrokerURL(), destinationName);
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory(getBrokerURL()));
+    connectionFactory->getRedeliveryPolicy()->setInitialRedeliveryDelay(TimeUnit::SECONDS.toMillis(10));
+
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
+    Pointer<Destination> destination(session->createQueue(destinationName));
+    Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+
+    ReceivedListener receivedListener(&received);
+    consumer->setMessageListener(&receivedListener);
+    sendMessages(getBrokerURL(), destinationName, MSG_COUNT);
+
+    connection->start();
+
+    CPPUNIT_ASSERT_MESSAGE("Pre-Rollack received size incorrect", assertTrue(received, MSG_COUNT));
+
+    received.clear();
+    session->rollback();
+
+    TimeUnit::SECONDS.sleep(6);
+    CPPUNIT_ASSERT_MESSAGE("Rollback redelivery was not delayed.", received.isEmpty());
+
+    CPPUNIT_ASSERT_MESSAGE("Post-Rollack received size incorrect", assertTrue(received, MSG_COUNT));
+
+    session->commit();
+    connection->close();
+
+    destroyDestination(getBrokerURL(), destinationName);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class SomeRollbacksListener : public cms::MessageListener {
+    private:
+
+        int count;
+        Pointer<Session> session;
+        LinkedHashSet< Pointer<MessageId> >* received;
+
+    public:
+
+        SomeRollbacksListener(Pointer<Session> session, LinkedHashSet< Pointer<MessageId> >* received) :
+            cms::MessageListener(), count(0), session(session), received(received) {
+        }
+
+        virtual ~SomeRollbacksListener() {}
+
+        virtual void onMessage(const cms::Message* message) {
+            const commands::Message* amqMessage =
+                dynamic_cast<const commands::Message*>(message);
+
+            if (++count > 10) {
+                try {
+                    session->rollback();
+                    count = 0;
+                } catch (CMSException& e) {
+                }
+            } else {
+                received->add(amqMessage->getMessageId());
+                try {
+                    session->commit();
+                } catch (CMSException& e) {
+                }
+            }
+        }
+
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireNonBlockingRedeliveryTest::testNonBlockingMessageDeleiveryWithRollbacks() {
+
+    LinkedHashSet< Pointer<MessageId> > received;
+
+    const int MSG_COUNT = 100;
+    const std::string destinationName = "testNonBlockingMessageDeleiveryWithRollbacks";
+
+    destroyDestination(getBrokerURL(), destinationName);
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory(getBrokerURL()));
+    connectionFactory->getRedeliveryPolicy()->setInitialRedeliveryDelay(TimeUnit::SECONDS.toMillis(10));
+
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
+    Pointer<Destination> destination(session->createQueue(destinationName));
+    Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+
+    ReceivedListener receivedListener(&received);
+    consumer->setMessageListener(&receivedListener);
+    sendMessages(getBrokerURL(), destinationName, MSG_COUNT);
+
+    connection->start();
+
+    CPPUNIT_ASSERT_MESSAGE("Pre-Rollack received size incorrect", assertTrue(received, MSG_COUNT));
+
+    received.clear();
+
+    SomeRollbacksListener newListener(session, &received);
+    consumer->setMessageListener(&newListener);
+
+    session->rollback();
+
+    CPPUNIT_ASSERT_MESSAGE("Post-Rollack received size incorrect", assertTrue(received, MSG_COUNT));
+
+    session->commit();
+    connection->close();
+
+    destroyDestination(getBrokerURL(), destinationName);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class RollbacksListener : public cms::MessageListener {
+    private:
+
+        Pointer<Session> session;
+
+    public:
+
+        RollbacksListener(Pointer<Session> session) :
+            cms::MessageListener(), session(session) {
+        }
+
+        virtual ~RollbacksListener() {
+        }
+
+        virtual void onMessage(const cms::Message* message) {
+            try {
+                session->rollback();
+            } catch (CMSException& e) {
+            }
+        }
+
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireNonBlockingRedeliveryTest::testNonBlockingMessageDeleiveryWithAllRolledBack() {
+
+    LinkedHashSet< Pointer<MessageId> > received;
+    LinkedHashSet< Pointer<MessageId> > dlqed;
+
+    const int MSG_COUNT = 100;
+    const std::string destinationName = "testNonBlockingMessageDeleiveryWithAllRolledBack";
+
+    destroyDestination(getBrokerURL(), destinationName);
+    destroyDestination(getBrokerURL(), "ActiveMQ.DLQ");
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory(getBrokerURL()));
+    connectionFactory->getRedeliveryPolicy()->setMaximumRedeliveries(5);
+    connectionFactory->getRedeliveryPolicy()->setInitialRedeliveryDelay(TimeUnit::SECONDS.toMillis(5));
+
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<Session> session(connection->createSession(Session::SESSION_TRANSACTED));
+    Pointer<Destination> destination(session->createQueue(destinationName));
+    Pointer<Destination> dlq(session->createQueue("ActiveMQ.DLQ"));
+    Pointer<MessageConsumer> consumer(session->createConsumer(destination.get()));
+    Pointer<MessageConsumer> dlqConsumer(session->createConsumer(dlq.get()));
+
+    ReceivedListener dlqReceivedListener(&dlqed);
+    dlqConsumer->setMessageListener(&dlqReceivedListener);
+
+    ReceivedListener receivedListener(&received);
+    consumer->setMessageListener(&receivedListener);
+
+    sendMessages(getBrokerURL(), destinationName, MSG_COUNT);
+    connection->start();
+
+    CPPUNIT_ASSERT_MESSAGE("Pre-Rollack received size incorrect", assertTrue(received, MSG_COUNT));
+
+    session->rollback();
+
+    RollbacksListener rollbackListener(session);
+    consumer->setMessageListener(&rollbackListener);
+
+    CPPUNIT_ASSERT_MESSAGE("Post-Rollack DQL size incorrect", assertTrue(dlqed, MSG_COUNT));
+
+    session->commit();
+    connection->close();
+
+    destroyDestination(getBrokerURL(), destinationName);
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h?rev=1466265&r1=1466264&r2=1466265&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h Tue Apr  9 22:17:05 2013
@@ -27,7 +27,13 @@ namespace openwire {
     class OpenwireNonBlockingRedeliveryTest : public MessagePriorityTest {
 
         CPPUNIT_TEST_SUITE( OpenwireNonBlockingRedeliveryTest );
-        CPPUNIT_TEST( testConsumerMessagesAreNotOrdered );
+//        CPPUNIT_TEST( testConsumerMessagesAreNotOrdered );
+//        CPPUNIT_TEST( testMessageDeleiveredWhenNonBlockingEnabled );
+//        CPPUNIT_TEST( testMessageDeleiveredWhenNonBlockingEnabled );
+//        CPPUNIT_TEST( testMessageDeleiveryDoesntStop );
+//        CPPUNIT_TEST( testNonBlockingMessageDeleiveryIsDelayed );
+//        CPPUNIT_TEST( testNonBlockingMessageDeleiveryWithRollbacks );
+        CPPUNIT_TEST( testNonBlockingMessageDeleiveryWithAllRolledBack );
         CPPUNIT_TEST_SUITE_END();
 
     public:
@@ -38,6 +44,12 @@ namespace openwire {
         virtual std::string getBrokerURL() const;
 
         void testConsumerMessagesAreNotOrdered();
+        void testMessageDeleiveredWhenNonBlockingEnabled();
+        void testMessageRedeliveriesAreInOrder();
+        void testMessageDeleiveryDoesntStop();
+        void testNonBlockingMessageDeleiveryIsDelayed();
+        void testNonBlockingMessageDeleiveryWithRollbacks();
+        void testNonBlockingMessageDeleiveryWithAllRolledBack();
 
     };