You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/07/28 00:55:38 UTC

[2/2] activemq-cpp git commit: https://issues.apache.org/jira/browse/AMQCPP-552 https://issues.apache.org/jira/browse/AMQCPP-577

https://issues.apache.org/jira/browse/AMQCPP-552
https://issues.apache.org/jira/browse/AMQCPP-577

Port all current MessageConsumer fixes from Java client to CMS 

Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/4822f75d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/4822f75d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/4822f75d

Branch: refs/heads/master
Commit: 4822f75deb774db91cdb0db8207b5961aae430b1
Parents: e2ff35a
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Jul 27 18:55:17 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Jul 27 18:55:17 2015 -0400

----------------------------------------------------------------------
 .../main/activemq/core/ActiveMQConnection.cpp   |  16 +
 .../src/main/activemq/core/ActiveMQConnection.h |  11 +-
 .../core/kernels/ActiveMQConsumerKernel.cpp     | 426 +++++++++++--------
 .../core/kernels/ActiveMQConsumerKernel.h       |  12 +-
 .../core/kernels/ActiveMQSessionKernel.cpp      |  15 +
 .../core/kernels/ActiveMQSessionKernel.h        |  10 +
 .../src/test-integration/TestRegistry.cpp       |   2 +-
 .../activemq/util/IntegrationCommon.cpp         |   2 +-
 8 files changed, 303 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4822f75d/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
index 951f61e..535d7f3 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
@@ -1679,6 +1679,22 @@ ExecutorService* ActiveMQConnection::getExecutor() const {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+ArrayList< Pointer<ActiveMQSessionKernel> > ActiveMQConnection::getSessions() const {
+    ArrayList< Pointer<ActiveMQSessionKernel> > result;
+
+    this->config->sessionsLock.readLock().lock();
+    try {
+        result.addAll(this->config->activeSessions);
+        this->config->sessionsLock.readLock().unlock();
+    } catch (Exception& ex) {
+        this->config->sessionsLock.readLock().unlock();
+        throw;
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQConnection::isWatchTopicAdvisories() const {
     return this->config->watchTopicAdvisories;
 }

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4822f75d/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
index 26672a1..573a44f 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
@@ -32,6 +32,7 @@
 #include <activemq/core/kernels/ActiveMQProducerKernel.h>
 #include <activemq/core/kernels/ActiveMQSessionKernel.h>
 #include <decaf/util/Properties.h>
+#include <decaf/util/ArrayList.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/util/concurrent/ExecutorService.h>
 #include <decaf/lang/exceptions/UnsupportedOperationException.h>
@@ -1070,10 +1071,18 @@ namespace core {
          * Determines whether the supplied Temporary Destination has already been deleted from the
          * Broker.  If watchTopicAdvisories is disabled this method will always return false.
          *
-         * @returns true if the temporary destination was deleted already.
+         * @return true if the temporary destination was deleted already.
          */
         bool isDeleted(Pointer<commands::ActiveMQTempDestination> destination) const;
 
+        /**
+         * Returns an ArrayList that contains a copy of all Sessions that are
+         * currently active in the Connection
+         *
+         * @return an ArrayList of Sessions active in this connection.
+         */
+        decaf::util::ArrayList< Pointer<activemq::core::kernels::ActiveMQSessionKernel> > getSessions() const;
+
     protected:
 
         /**

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4822f75d/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
index ad9d77a..57b9bc2 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
@@ -29,6 +29,7 @@
 #include <decaf/util/Collections.h>
 #include <decaf/util/concurrent/ExecutorService.h>
 #include <decaf/util/concurrent/Executors.h>
+#include <decaf/util/concurrent/atomic/AtomicInteger.h>
 #include <activemq/util/Config.h>
 #include <activemq/util/CMSExceptionSupport.h>
 #include <activemq/util/ActiveMQProperties.h>
@@ -65,6 +66,7 @@ using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 using namespace decaf::util;
 using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
 
 ////////////////////////////////////////////////////////////////////////////////
 namespace activemq {
@@ -99,14 +101,14 @@ namespace kernels {
         AtomicBoolean started;
         AtomicBoolean closeSyncRegistered;
         Pointer<MessageDispatchChannel> unconsumedMessages;
-        decaf::util::LinkedList< decaf::lang::Pointer<commands::MessageDispatch> > dispatchedMessages;
+        decaf::util::LinkedList< decaf::lang::Pointer<commands::MessageDispatch> > deliveredMessages;
         long long lastDeliveredSequenceId;
         Pointer<commands::MessageAck> pendingAck;
         int deliveredCounter;
         int additionalWindowSize;
         volatile bool synchronizationRegistered;
-        bool clearDispatchList;
-        bool inProgressClearRequiredFlag;
+        volatile bool isClearDeliveredList;
+        AtomicInteger inProgressClearRequiredFlag;
         long long redeliveryDelay;
         Pointer<RedeliveryPolicy> redeliveryPolicy;
         Pointer<Exception> failureError;
@@ -137,14 +139,14 @@ namespace kernels {
                                          started(),
                                          closeSyncRegistered(),
                                          unconsumedMessages(),
-                                         dispatchedMessages(),
+                                         deliveredMessages(),
                                          lastDeliveredSequenceId(-1),
                                          pendingAck(),
                                          deliveredCounter(0),
                                          additionalWindowSize(0),
                                          synchronizationRegistered(false),
-                                         clearDispatchList(false),
-                                         inProgressClearRequiredFlag(false),
+                                         isClearDeliveredList(false),
+                                         inProgressClearRequiredFlag(0),
                                          redeliveryDelay(0),
                                          redeliveryPolicy(),
                                          failureError(),
@@ -182,63 +184,46 @@ namespace kernels {
             return false;
         }
 
-        void doClearMessagesInProgress() {
-            if (this->inProgressClearRequiredFlag) {
-                synchronized(this->unconsumedMessages.get()) {
-                    if (this->inProgressClearRequiredFlag) {
-
-                        // ensure messages that were not yet consumed are rolled back up front as they
-                        // may get redelivered to another consumer by the Broker.
-                        std::vector< Pointer<MessageDispatch> > list = this->unconsumedMessages->removeAll();
-                        if (!this->info->isBrowser()) {
-                            std::vector<Pointer<MessageDispatch> >::const_iterator iter = list.begin();
-
-                            for (; iter != list.end(); ++iter) {
-                                Pointer<MessageDispatch> md = *iter;
-                                this->session->getConnection()->rollbackDuplicate(this->parent, md->getMessage());
-                            }
-                        }
-
-                        // allow dispatch on this connection to resume
-                        this->session->getConnection()->setTransportInterruptionProcessingComplete();
-                        this->inProgressClearRequiredFlag = false;
-
-                        // Wake up any blockers and allow them to recheck state.
-                        this->unconsumedMessages->notifyAll();
-                    }
-                }
-            }
-        }
-
-        void doClearDispatchList() {
-            if (clearDispatchList) {
-                synchronized (&this->dispatchedMessages) {
-                    if (clearDispatchList) {
-                        if (!dispatchedMessages.isEmpty()) {
+        void clearDeliveredList() {
+            if (isClearDeliveredList) {
+                synchronized (&this->deliveredMessages) {
+                    if (isClearDeliveredList) {
+                        if (!deliveredMessages.isEmpty()) {
                             if (session->isTransacted()) {
                                 if (previouslyDeliveredMessages == NULL) {
                                     previouslyDeliveredMessages.reset(new PreviouslyDeliveredMap(
                                         session->getTransactionContext()->getTransactionId()));
                                 }
 
-                                Pointer<Iterator<Pointer<MessageDispatch> > > iter(dispatchedMessages.iterator());
+                                Pointer<Iterator<Pointer<MessageDispatch> > > iter(deliveredMessages.iterator());
 
                                 while (iter->hasNext()) {
                                     Pointer<MessageDispatch> dispatch = iter->next();
                                     previouslyDeliveredMessages->put(dispatch->getMessage()->getMessageId(), false);
                                 }
                             } else {
-                                dispatchedMessages.clear();
+                                if (session->isClientAcknowledge() || session->isIndividualAcknowledge()) {
+                                    if (!info->isBrowser()) {
+                                        Pointer<Iterator<Pointer<MessageDispatch> > > iter(deliveredMessages.iterator());
+
+                                        // allow redelivery
+                                        while (iter->hasNext()) {
+                                            Pointer<MessageDispatch> dispatch = iter->next();
+                                            session->getConnection()->rollbackDuplicate(parent, dispatch->getMessage());
+                                        }
+                                    }
+                                }
+                                deliveredMessages.clear();
                                 pendingAck.reset(NULL);
                             }
                         }
-                        clearDispatchList = false;
+                        isClearDeliveredList = false;
                     }
                 }
             }
         }
 
-        void doClearPreviouslyDelivered() {
+        void clearPreviouslyDelivered() {
             if (previouslyDeliveredMessages != NULL) {
                 previouslyDeliveredMessages->clear();
                 previouslyDeliveredMessages.reset(NULL);
@@ -247,7 +232,7 @@ namespace kernels {
 
         // called with deliveredMessages locked
         void removeFromDeliveredMessages(Pointer<MessageId> key) {
-            Pointer< Iterator< Pointer<MessageDispatch> > > iter(this->dispatchedMessages.iterator());
+            Pointer< Iterator< Pointer<MessageDispatch> > > iter(this->deliveredMessages.iterator());
             while (iter->hasNext()) {
                 Pointer<MessageDispatch> candidate = iter->next();
                 if (key->equals(candidate->getMessage()->getMessageId().get())) {
@@ -262,7 +247,6 @@ namespace kernels {
         // not re-delivered as they can't be replayed to this consumer on rollback
         void rollbackPreviouslyDeliveredAndNotRedelivered() {
             if (previouslyDeliveredMessages != NULL) {
-
                 Set<MapEntry<Pointer<MessageId>, bool> >& entries = previouslyDeliveredMessages->entrySet();
                 Pointer<Iterator<MapEntry<Pointer<MessageId>, bool> > > iter(entries.iterator());
                 while (iter->hasNext()) {
@@ -272,7 +256,7 @@ namespace kernels {
                     }
                 }
 
-                doClearPreviouslyDelivered();
+                clearPreviouslyDelivered();
             }
         }
 
@@ -306,7 +290,7 @@ namespace kernels {
                 int numberNotReplayed;
                 do {
                     numberNotReplayed = 0;
-                    synchronized (&this->dispatchedMessages) {
+                    synchronized (&this->deliveredMessages) {
                         if (previouslyDeliveredMessages != NULL) {
                             Set<MapEntry<Pointer<MessageId>, bool> >& entries = previouslyDeliveredMessages->entrySet();
                             Pointer<Iterator<MapEntry<Pointer<MessageId>, bool> > > iter(entries.iterator());
@@ -329,6 +313,40 @@ namespace kernels {
             }
         }
 
+        bool redeliveryExpectedInCurrentTransaction(Pointer<MessageDispatch> dispatch, bool markReceipt) {
+            if (session->isTransacted()) {
+                synchronized (&this->deliveredMessages) {
+                    if (previouslyDeliveredMessages != NULL) {
+                        if (previouslyDeliveredMessages->containsKey(dispatch->getMessage()->getMessageId())) {
+                            if (markReceipt) {
+                                previouslyDeliveredMessages->put(dispatch->getMessage()->getMessageId(), true);
+                            }
+                            return true;
+                        }
+                    }
+                }
+            }
+            return false;
+        }
+
+        bool redeliveryPendingInCompetingTransaction(Pointer<MessageDispatch> dispatch) {
+            ArrayList< Pointer<ActiveMQSessionKernel> > sessions = session->getConnection()->getSessions();
+
+            Pointer<Iterator<Pointer<ActiveMQSessionKernel> > > sessionIter(sessions.iterator());
+            while (sessionIter->hasNext()) {
+                Pointer<ActiveMQSessionKernel> session = sessionIter->next();
+                ArrayList< Pointer<ActiveMQConsumerKernel> > consumers = session->getConsumers();
+                Pointer<Iterator<Pointer<ActiveMQConsumerKernel> > > consumersIter(consumers.iterator());
+
+                while (consumersIter->hasNext()) {
+                    Pointer<ActiveMQConsumerKernel> consumer = consumersIter->next();
+                    return consumer->isRedeliveryExpectedInCurrentTransaction(dispatch);
+                }
+            }
+
+            return false;
+        }
+
         bool consumeExpiredMessage(const Pointer<MessageDispatch> dispatch) {
             if (dispatch->getMessage()->isExpired()) {
                 return !info->isBrowser() && consumerExpiryCheckEnabled;
@@ -336,6 +354,32 @@ namespace kernels {
 
             return false;
         }
+
+        bool redeliveryExceeded(Pointer<MessageDispatch> dispatch) {
+            try {
+                return session->isTransacted() && redeliveryPolicy != NULL &&
+                       redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES &&
+                       dispatch->getRedeliveryCounter() > redeliveryPolicy->getMaximumRedeliveries() &&
+                        // redeliveryCounter > x expected after resend via brokerRedeliveryPlugin
+                       !dispatch->getMessage()->getMessageProperties().containsKey("redeliveryDelay");
+            } catch (Exception& ignored) {
+                return false;
+            }
+        }
+
+        void posionAck(Pointer<MessageDispatch> dispatch, const std::string& cause) {
+            Pointer<MessageAck> poisonAck(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_POISON, 1));
+            poisonAck->setFirstMessageId(dispatch->getMessage()->getMessageId());
+            poisonAck->setPoisonCause(createBrokerError(cause));
+            session->sendAck(poisonAck);
+        }
+
+        Pointer<BrokerError> createBrokerError(const std::string& message) {
+            Pointer<BrokerError> cause(new BrokerError());
+            cause->setExceptionClass("javax.jms.JMSException");
+            cause->setMessage(message);
+            return cause;
+        }
     };
 
 }}}
@@ -373,9 +417,9 @@ namespace {
 
         virtual void beforeEnd() {
             if (impl->transactedIndividualAck) {
-                impl->doClearDispatchList();
+                impl->clearDeliveredList();
                 impl->waitForRedeliveries();
-                synchronized(&impl->dispatchedMessages) {
+                synchronized(&impl->deliveredMessages) {
                     impl->rollbackOnFailedRecoveryRedelivery();
                 }
             } else {
@@ -622,7 +666,7 @@ namespace {
         NonBlockingRedeliveryTask(ActiveMQSessionKernel* session, Pointer<ActiveMQConsumerKernel> consumer, ActiveMQConsumerKernelConfig* impl) :
             Runnable(), session(session), consumer(consumer), impl(impl), redeliveries() {
 
-            this->redeliveries.copy(impl->dispatchedMessages);
+            this->redeliveries.copy(impl->deliveredMessages);
             Collections::reverse(this->redeliveries);
         }
         virtual ~NonBlockingRedeliveryTask() {}
@@ -714,15 +758,6 @@ ActiveMQConsumerKernel::ActiveMQConsumerKernel(ActiveMQSessionKernel* session,
     this->internal->session = session;
     this->internal->parent = this;
     this->internal->info = consumerInfo;
-    this->internal->hashCode = id->getHashCode();
-    this->internal->lastDeliveredSequenceId = -1;
-    this->internal->synchronizationRegistered = false;
-    this->internal->additionalWindowSize = 0;
-    this->internal->deliveredCounter = 0;
-    this->internal->clearDispatchList = false;
-    this->internal->inProgressClearRequiredFlag = false;
-    this->internal->listener = NULL;
-    this->internal->redeliveryDelay = 0;
     this->internal->redeliveryPolicy.reset(this->session->getConnection()->getRedeliveryPolicy()->clone());
     this->internal->scheduler = this->session->getScheduler();
 
@@ -768,7 +803,6 @@ ActiveMQConsumerKernel::ActiveMQConsumerKernel(ActiveMQSessionKernel* session,
 
 ////////////////////////////////////////////////////////////////////////////////
 ActiveMQConsumerKernel::~ActiveMQConsumerKernel() {
-
     try {
         this->close();
     }
@@ -809,7 +843,7 @@ void ActiveMQConsumerKernel::close() {
     try {
         if (!this->isClosed()) {
 
-            if (!this->internal->dispatchedMessages.isEmpty() &&
+            if (!this->internal->deliveredMessages.isEmpty() &&
                 this->session->getTransactionContext() != NULL &&
                 this->session->getTransactionContext()->isInTransaction() &&
                 this->internal->closeSyncRegistered.compareAndSet(false, true)) {
@@ -882,8 +916,8 @@ void ActiveMQConsumerKernel::dispose() {
                 if (!this->consumerInfo->isBrowser()) {
                     // roll back duplicates that aren't acknowledged
                     ArrayList< Pointer<MessageDispatch> > tmp;
-                    synchronized(&this->internal->dispatchedMessages) {
-                        tmp.copy(this->internal->dispatchedMessages);
+                    synchronized(&this->internal->deliveredMessages) {
+                        tmp.copy(this->internal->deliveredMessages);
                     }
                     Pointer< Iterator<Pointer<MessageDispatch> > > iter(tmp.iterator());
                     while (iter->hasNext()) {
@@ -898,43 +932,44 @@ void ActiveMQConsumerKernel::dispose() {
             bool haveException = false;
             ActiveMQException error;
 
-            // Purge all the pending messages
-            try{
-                this->internal->unconsumedMessages->clear();
-            } catch (ActiveMQException& ex){
-                if( !haveException ){
-                    ex.setMark( __FILE__, __LINE__ );
-                    error = ex;
-                    haveException = true;
-                }
-            }
-
-            // Stop and Wakeup all sync consumers.
-            this->internal->unconsumedMessages->close();
-
-            if (this->session->isIndividualAcknowledge()) {
+            if (!this->internal->session->isTransacted()) {
                 // For IndividualAck Mode we need to unlink the ack handler to remove a
                 // cyclic reference to the MessageDispatch that brought the message to us.
-                synchronized(&internal->dispatchedMessages) {
-                    std::auto_ptr<Iterator<Pointer<MessageDispatch> > > iter(this->internal->dispatchedMessages.iterator());
-                    while (iter->hasNext()) {
-                        iter->next()->getMessage()->setAckHandler(Pointer<ActiveMQAckHandler>());
+                synchronized(&internal->deliveredMessages) {
+                    if (this->session->isIndividualAcknowledge()) {
+                        std::auto_ptr<Iterator<Pointer<MessageDispatch> > > iter(this->internal->deliveredMessages.iterator());
+                        while (iter->hasNext()) {
+                            iter->next()->getMessage()->setAckHandler(Pointer<ActiveMQAckHandler>());
+                        }
                     }
-
-                    this->internal->dispatchedMessages.clear();
+                    this->internal->deliveredMessages.clear();
                 }
             }
 
+            // Stop and Wakeup all sync consumers.
+            this->internal->unconsumedMessages->close();
+
             // Remove this Consumer from the Connections set of Dispatchers
             Pointer<ActiveMQConsumerKernel> consumer(this);
             try {
                 this->session->removeConsumer(consumer);
-            } catch(Exception& e) {
+            } catch (Exception& e) {
                 consumer.release();
                 throw;
             }
             consumer.release();
 
+            // Ensure these are filtered as duplicates.
+            std::vector< Pointer<MessageDispatch> > list = this->internal->unconsumedMessages->removeAll();
+            if (!this->consumerInfo->isBrowser()) {
+                std::vector< Pointer<MessageDispatch> >::const_iterator iter = list.begin();
+
+                for (; iter != list.end(); ++iter) {
+                    Pointer<MessageDispatch> md = *iter;
+                    this->session->getConnection()->rollbackDuplicate(this, md->getMessage());
+                }
+            }
+
             // If we encountered an error, propagate it.
             if (haveException) {
                 error.setMark(__FILE__, __LINE__);
@@ -949,9 +984,8 @@ void ActiveMQConsumerKernel::dispose() {
 
 ////////////////////////////////////////////////////////////////////////////////
 std::string ActiveMQConsumerKernel::getMessageSelector() const {
-
     try {
-        // Fetch the Selector
+        checkClosed();
         return this->consumerInfo->getSelector();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -970,7 +1004,6 @@ decaf::lang::Pointer<MessageDispatch> ActiveMQConsumerKernel::dequeue(long long
 
         // Loop until the time is up or we get a non-expired message
         while (true) {
-
             Pointer<MessageDispatch> dispatch = this->internal->unconsumedMessages->dequeue(timeout);
             if (dispatch == NULL) {
                 if (timeout > 0 && !this->internal->unconsumedMessages->isClosed()) {
@@ -979,7 +1012,7 @@ decaf::lang::Pointer<MessageDispatch> ActiveMQConsumerKernel::dequeue(long long
                     if (this->internal->failureError != NULL) {
                         throw CMSExceptionSupport::create(*this->internal->failureError);
                     } else {
-                        return Pointer<MessageDispatch> ();
+                        return Pointer<MessageDispatch>();
                     }
                 }
             } else if (dispatch->getMessage() == NULL) {
@@ -992,6 +1025,11 @@ decaf::lang::Pointer<MessageDispatch> ActiveMQConsumerKernel::dequeue(long long
                 }
 
                 continue;
+            } else if (internal->redeliveryExceeded(dispatch)) {
+                internal->posionAck(dispatch,
+                                    "dispatch to " + getConsumerId()->toString() +
+                                    " exceeds redelivery policy limit: " +
+                                    Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries()));
             }
 
             // Return the message.
@@ -1034,18 +1072,27 @@ cms::Message* ActiveMQConsumerKernel::receive() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::Message* ActiveMQConsumerKernel::receive(int millisecs) {
+cms::Message* ActiveMQConsumerKernel::receive(int timeout) {
 
     try {
 
         this->checkClosed();
         this->checkMessageListener();
+        if (timeout == 0) {
+            return this->receive();
+        }
 
         // Send a request for a new message if needed
-        this->sendPullRequest(millisecs);
+        this->sendPullRequest(timeout);
+
+        // Get the next available message, if there is one.
+        Pointer<MessageDispatch> message;
+        if (internal->info->getPrefetchSize() == 0) {
+            message = dequeue(-1);  // Broker will signal if no message.
+        } else {
+            message = dequeue(timeout);
+        }
 
-        // Wait for the next message.
-        Pointer<MessageDispatch> message = dequeue(millisecs);
         if (message == NULL) {
             return NULL;
         }
@@ -1072,7 +1119,13 @@ cms::Message* ActiveMQConsumerKernel::receiveNoWait() {
         this->sendPullRequest(-1);
 
         // Get the next available message, if there is one.
-        Pointer<MessageDispatch> message = dequeue(0);
+        Pointer<MessageDispatch> message;
+        if (internal->info->getPrefetchSize() == 0) {
+            message = dequeue(-1);  // Broker will signal if no message.
+        } else {
+            message = dequeue(0);
+        }
+
         if (message == NULL) {
             return NULL;
         }
@@ -1132,8 +1185,8 @@ void ActiveMQConsumerKernel::beforeMessageIsConsumed(Pointer<MessageDispatch> di
     if (!isAutoAcknowledgeBatch()) {
 
         // When not in an Auto
-        synchronized(&this->internal->dispatchedMessages) {
-            this->internal->dispatchedMessages.addFirst(dispatch);
+        synchronized(&this->internal->deliveredMessages) {
+            this->internal->deliveredMessages.addFirst(dispatch);
         }
 
         if (this->session->isTransacted()) {
@@ -1184,8 +1237,8 @@ void ActiveMQConsumerKernel::afterMessageIsConsumed(Pointer<MessageDispatch> mes
 
         if (isAutoAcknowledgeEach()) {
             if (this->internal->deliveringAcks.compareAndSet(false, true)) {
-                synchronized(&this->internal->dispatchedMessages) {
-                    if (!this->internal->dispatchedMessages.isEmpty()) {
+                synchronized(&this->internal->deliveredMessages) {
+                    if (!this->internal->deliveredMessages.isEmpty()) {
                         if (this->internal->optimizeAcknowledge) {
 
                             this->internal->ackCounter++;
@@ -1193,7 +1246,7 @@ void ActiveMQConsumerKernel::afterMessageIsConsumed(Pointer<MessageDispatch> mes
                                 Pointer<MessageAck> ack =
                                     makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
                                 if (ack != NULL) {
-                                    this->internal->dispatchedMessages.clear();
+                                    this->internal->deliveredMessages.clear();
                                     this->internal->ackCounter = 0;
                                     this->session->sendAck(ack);
                                     this->internal->optimizeAckTimestamp = System::currentTimeMillis();
@@ -1213,7 +1266,7 @@ void ActiveMQConsumerKernel::afterMessageIsConsumed(Pointer<MessageDispatch> mes
                             Pointer<MessageAck> ack =
                                 makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
                             if (ack != NULL) {
-                                this->internal->dispatchedMessages.clear();
+                                this->internal->deliveredMessages.clear();
                                 session->sendAck(ack);
                             }
                         }
@@ -1226,8 +1279,8 @@ void ActiveMQConsumerKernel::afterMessageIsConsumed(Pointer<MessageDispatch> mes
             ackLater(message, ActiveMQConstants::ACK_TYPE_CONSUMED);
         } else if (session->isClientAcknowledge() || session->isIndividualAcknowledge()) {
             bool messageUnackedByConsumer = false;
-            synchronized(&this->internal->dispatchedMessages) {
-                messageUnackedByConsumer = this->internal->dispatchedMessages.contains(message);
+            synchronized(&this->internal->deliveredMessages) {
+                messageUnackedByConsumer = this->internal->deliveredMessages.contains(message);
             }
 
             if (messageUnackedByConsumer) {
@@ -1251,17 +1304,16 @@ void ActiveMQConsumerKernel::deliverAcks() {
         if (this->internal->deliveringAcks.compareAndSet(false, true)) {
 
             if (isAutoAcknowledgeEach()) {
-                synchronized(&this->internal->dispatchedMessages) {
+                synchronized(&this->internal->deliveredMessages) {
                     ack = makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
                     if (ack != NULL) {
-                        this->internal->dispatchedMessages.clear();
+                        this->internal->deliveredMessages.clear();
                         this->internal->ackCounter = 0;
                     } else {
                         ack.swap(internal->pendingAck);
                     }
                 }
-            } else if (this->internal->pendingAck != NULL &&
-                       this->internal->pendingAck->getAckType() == ActiveMQConstants::ACK_TYPE_CONSUMED) {
+            } else if (this->internal->pendingAck != NULL && this->internal->pendingAck->isStandardAck()) {
                 ack.swap(this->internal->pendingAck);
             }
 
@@ -1305,7 +1357,7 @@ void ActiveMQConsumerKernel::ackLater(Pointer<MessageDispatch> dispatch, int ack
     } else {
         // old pending ack being superseded by ack of another type, if is is not a delivered
         // ack and hence important, send it now so it is not lost.
-        if (oldPendingAck->getAckType() != ActiveMQConstants::ACK_TYPE_DELIVERED) {
+        if (!oldPendingAck->isDeliveredAck()) {
             session->sendAck(oldPendingAck);
         }
     }
@@ -1327,13 +1379,13 @@ void ActiveMQConsumerKernel::ackLater(Pointer<MessageDispatch> dispatch, int ack
 ////////////////////////////////////////////////////////////////////////////////
 Pointer<MessageAck> ActiveMQConsumerKernel::makeAckForAllDeliveredMessages(int type) {
 
-    synchronized( &this->internal->dispatchedMessages ) {
+    synchronized( &this->internal->deliveredMessages ) {
 
-        if (!this->internal->dispatchedMessages.isEmpty()) {
+        if (!this->internal->deliveredMessages.isEmpty()) {
 
-            Pointer<MessageDispatch> dispatched = this->internal->dispatchedMessages.getFirst();
-            Pointer<MessageAck> ack(new MessageAck(dispatched, type, this->internal->dispatchedMessages.size()));
-            ack->setFirstMessageId(this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId());
+            Pointer<MessageDispatch> dispatched = this->internal->deliveredMessages.getFirst();
+            Pointer<MessageAck> ack(new MessageAck(dispatched, type, this->internal->deliveredMessages.size()));
+            ack->setFirstMessageId(this->internal->deliveredMessages.getLast()->getMessage()->getMessageId());
 
             return ack;
         }
@@ -1356,8 +1408,8 @@ void ActiveMQConsumerKernel::acknowledge(Pointer<commands::MessageDispatch> disp
             ack->setFirstMessageId(ack->getLastMessageId());
         }
         session->sendAck(ack);
-        synchronized(&this->internal->dispatchedMessages) {
-            this->internal->dispatchedMessages.remove(dispatch);
+        synchronized(&this->internal->deliveredMessages) {
+            this->internal->deliveredMessages.remove(dispatch);
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -1368,10 +1420,10 @@ void ActiveMQConsumerKernel::acknowledge() {
 
     try {
 
-        clearDispatchList();
+        clearDeliveredList();
         this->internal->waitForRedeliveries();
 
-        synchronized(&this->internal->dispatchedMessages) {
+        synchronized(&this->internal->deliveredMessages) {
 
             // Acknowledge all messages so far.
             Pointer<MessageAck> ack = makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
@@ -1390,11 +1442,11 @@ void ActiveMQConsumerKernel::acknowledge() {
             session->sendAck(ack);
 
             // Adjust the counters
-            this->internal->deliveredCounter = Math::max(0, this->internal->deliveredCounter - (int) this->internal->dispatchedMessages.size());
-            this->internal->additionalWindowSize = Math::max(0, this->internal->additionalWindowSize - (int) this->internal->dispatchedMessages.size());
+            this->internal->deliveredCounter = Math::max(0, this->internal->deliveredCounter - (int) this->internal->deliveredMessages.size());
+            this->internal->additionalWindowSize = Math::max(0, this->internal->additionalWindowSize - (int) this->internal->deliveredMessages.size());
 
             if (!session->isTransacted()) {
-                this->internal->dispatchedMessages.clear();
+                this->internal->deliveredMessages.clear();
             }
         }
     }
@@ -1404,8 +1456,9 @@ void ActiveMQConsumerKernel::acknowledge() {
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumerKernel::commit() {
 
-    synchronized(&(this->internal->dispatchedMessages)) {
-        this->internal->dispatchedMessages.clear();
+    synchronized(&(this->internal->deliveredMessages)) {
+        this->internal->deliveredMessages.clear();
+        this->internal->clearPreviouslyDelivered();
     }
     this->internal->redeliveryDelay = 0;
 }
@@ -1413,31 +1466,31 @@ void ActiveMQConsumerKernel::commit() {
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumerKernel::rollback() {
 
-    clearDispatchList();
+    clearDeliveredList();
     synchronized(this->internal->unconsumedMessages.get()) {
 
         if (this->internal->optimizeAcknowledge) {
             // remove messages read but not acknowledged at the broker yet through optimizeAcknowledge
             if (!this->consumerInfo->isBrowser()) {
-                synchronized(&this->internal->dispatchedMessages) {
-                    for (int i = 0; (i < this->internal->dispatchedMessages.size()) &&
+                synchronized(&this->internal->deliveredMessages) {
+                    for (int i = 0; (i < this->internal->deliveredMessages.size()) &&
                                     (i < this->internal->ackCounter); i++) {
                         // ensure we don't filter this as a duplicate
-                        Pointer<MessageDispatch> md = this->internal->dispatchedMessages.removeLast();
+                        Pointer<MessageDispatch> md = this->internal->deliveredMessages.removeLast();
                         session->getConnection()->rollbackDuplicate(this, md->getMessage());
                     }
                 }
             }
         }
 
-        synchronized(&this->internal->dispatchedMessages) {
+        synchronized(&this->internal->deliveredMessages) {
             this->internal->rollbackPreviouslyDeliveredAndNotRedelivered();
-            if (this->internal->dispatchedMessages.isEmpty()) {
+            if (this->internal->deliveredMessages.isEmpty()) {
                 return;
             }
 
             // Only increase the redelivery delay after the first redelivery..
-            Pointer<MessageDispatch> lastMsg = this->internal->dispatchedMessages.getFirst();
+            Pointer<MessageDispatch> lastMsg = this->internal->deliveredMessages.getFirst();
             const int currentRedeliveryCount = lastMsg->getMessage()->getRedeliveryCounter();
             if (currentRedeliveryCount > 0) {
                 this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getNextRedeliveryDelay(internal->redeliveryDelay);
@@ -1445,9 +1498,9 @@ void ActiveMQConsumerKernel::rollback() {
                 this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getInitialRedeliveryDelay();
             }
 
-            Pointer<MessageId> firstMsgId = this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId();
+            Pointer<MessageId> firstMsgId = this->internal->deliveredMessages.getLast()->getMessage()->getMessageId();
 
-            Pointer<Iterator<Pointer<MessageDispatch> > > iter(internal->dispatchedMessages.iterator());
+            Pointer<Iterator<Pointer<MessageDispatch> > > iter(internal->deliveredMessages.iterator());
             while (iter->hasNext()) {
                 Pointer<Message> message = iter->next()->getMessage();
                 message->setRedeliveryCounter(message->getRedeliveryCounter() + 1);
@@ -1461,25 +1514,28 @@ void ActiveMQConsumerKernel::rollback() {
                 // We need to NACK the messages so that they get sent to the DLQ.
                 // Acknowledge the last message.
                 Pointer<MessageAck> ack(new MessageAck(lastMsg, ActiveMQConstants::ACK_TYPE_POISON,
-                                        this->internal->dispatchedMessages.size()));
+                                        this->internal->deliveredMessages.size()));
                 ack->setFirstMessageId(firstMsgId);
-                Pointer<BrokerError> cause(new BrokerError);
-                ack->setPoisonCause(cause);
+                // TODO - Add cause to the message.
+                std::string message = "Exceeded redelivery policy limit:" +
+                                       Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries());
+                                       //", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause()));
+                ack->setPoisonCause(internal->createBrokerError(message));
                 session->sendAck(ack, true);
                 // Adjust the window size.
                 this->internal->additionalWindowSize = Math::max(0,
-                    this->internal->additionalWindowSize - (int) this->internal->dispatchedMessages.size());
+                    this->internal->additionalWindowSize - (int) this->internal->deliveredMessages.size());
                 this->internal->redeliveryDelay = 0;
 
-                this->internal->deliveredCounter -= (int) internal->dispatchedMessages.size();
-                this->internal->dispatchedMessages.clear();
+                this->internal->deliveredCounter -= (int) internal->deliveredMessages.size();
+                this->internal->deliveredMessages.clear();
 
             } else {
 
                 // only redelivery_ack after first delivery
                 if (currentRedeliveryCount > 0) {
                     Pointer<MessageAck> ack(new MessageAck(lastMsg, ActiveMQConstants::ACK_TYPE_REDELIVERED,
-                                            this->internal->dispatchedMessages.size()));
+                                            this->internal->deliveredMessages.size()));
                     ack->setFirstMessageId(firstMsgId);
                     session->sendAck(ack);
                 }
@@ -1493,8 +1549,8 @@ void ActiveMQConsumerKernel::rollback() {
                         NonBlockingRedeliveryTask* redeliveryTask =
                             new NonBlockingRedeliveryTask(session, self, this->internal);
 
-                        this->internal->deliveredCounter -= (int) internal->dispatchedMessages.size();
-                        this->internal->dispatchedMessages.clear();
+                        this->internal->deliveredCounter -= (int) internal->deliveredMessages.size();
+                        this->internal->deliveredMessages.clear();
 
                         this->session->getScheduler()->executeAfterDelay(
                             redeliveryTask, this->internal->redeliveryDelay);
@@ -1504,13 +1560,13 @@ void ActiveMQConsumerKernel::rollback() {
                     this->internal->unconsumedMessages->stop();
 
                     std::auto_ptr<Iterator<Pointer<MessageDispatch> > > iter(
-                        this->internal->dispatchedMessages.iterator());
+                        this->internal->deliveredMessages.iterator());
                     while (iter->hasNext()) {
                         this->internal->unconsumedMessages->enqueueFirst(iter->next());
                     }
 
-                    this->internal->deliveredCounter -= (int) internal->dispatchedMessages.size();
-                    this->internal->dispatchedMessages.clear();
+                    this->internal->deliveredCounter -= (int) internal->deliveredMessages.size();
+                    this->internal->deliveredMessages.clear();
 
                     if (internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed()) {
                         Pointer<ActiveMQConsumerKernel> self =
@@ -1536,7 +1592,7 @@ void ActiveMQConsumerKernel::dispatch(const Pointer<MessageDispatch>& dispatch)
     try {
 
         clearMessagesInProgress();
-        clearDispatchList();
+        clearDeliveredList();
 
         synchronized(this->internal->unconsumedMessages.get()) {
 
@@ -1547,6 +1603,13 @@ void ActiveMQConsumerKernel::dispatch(const Pointer<MessageDispatch>& dispatch)
                     synchronized(&this->internal->listenerMutex) {
 
                         if (this->internal->listener != NULL && this->internal->unconsumedMessages->isRunning()) {
+                            if (this->internal->redeliveryExceeded(dispatch)) {
+                                internal->posionAck(dispatch,
+                                                    "dispatch to " + getConsumerId()->toString() +
+                                                    " exceeds redelivery policy limit:" +
+                                                    Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries()));
+                                return;
+                            }
                             Pointer<cms::Message> message = createCMSMessage(dispatch);
                             beforeMessageIsConsumed(dispatch);
                             try {
@@ -1577,42 +1640,27 @@ void ActiveMQConsumerKernel::dispatch(const Pointer<MessageDispatch>& dispatch)
                         }
                     }
                 } else {
-                    if (!session->isTransacted()) {
-                        Pointer<MessageAck> ack(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_INDIVIDUAL, 1));
-                        session->sendAck(ack);
-                    } else {
-                        bool needsPoisonAck = false;
-                        synchronized (&this->internal->dispatchedMessages) {
-                            if (this->internal->previouslyDeliveredMessages != NULL) {
-                                this->internal->previouslyDeliveredMessages->put(
-                                        dispatch->getMessage()->getMessageId(), true);
-                            } else {
-                                // delivery while pending redelivery to another consumer on the same
-                                // connection not waiting for redelivery will help here
-                                needsPoisonAck = true;
-                            }
-                        }
-                        if (needsPoisonAck) {
-                            Pointer<MessageAck> poisonAck(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_POISON, 1));
-                            poisonAck->setFirstMessageId(dispatch->getMessage()->getMessageId());
-                            Pointer<BrokerError> cause(new BrokerError);
-                            cause->setExceptionClass("javax.jms.JMSException");
-                            cause->setMessage(std::string() + "Duplicate dispatch with transacted " +
-                                              "redeliver pending on another consumer, connection: " +
-                                              this->session->getConnection()->getConnectionInfo().getConnectionId()->toString());
-                            poisonAck->setPoisonCause(cause);
-                            session->sendAck(poisonAck);
+                    // deal with duplicate delivery
+                    if (this->internal->redeliveryExpectedInCurrentTransaction(dispatch, true)) {
+                        if (this->internal->transactedIndividualAck) {
+                            immediateIndividualTransactedAck(dispatch);
                         } else {
-                            if (this->internal->transactedIndividualAck) {
-                                immediateIndividualTransactedAck(dispatch);
-                            } else {
-                                Pointer<MessageAck> ack(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED, 1));
-                                session->sendAck(ack);
-                            }
+                            Pointer<MessageAck> ack(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED, 1));
+                            internal->session->sendAck(ack);
                         }
+                    } else if ((internal->redeliveryPendingInCompetingTransaction(dispatch))) {
+                        internal->session->getConnection()->rollbackDuplicate(this, dispatch->getMessage());
+                        this->dispatch(dispatch);
+                    } else {
+                        internal->posionAck(dispatch,
+                            std::string("Suppressing duplicate delivery on connection, consumer ") + getConsumerId()->toString());
                     }
                 }
             }
+            if (++internal->dispatchedCount % 1000 == 0) {
+                internal->dispatchedCount = 0;
+                Thread::yield();
+            }
         }
     }
     AMQ_CATCH_RETHROW(ActiveMQException)
@@ -1673,8 +1721,7 @@ Pointer<cms::Message> ActiveMQConsumerKernel::createCMSMessage(Pointer<MessageDi
 void ActiveMQConsumerKernel::sendPullRequest(long long timeout) {
 
     try {
-
-        clearDispatchList();
+        this->internal->clearDeliveredList();
 
         // There are still local message, consume them first.
         if (!this->internal->unconsumedMessages->isEmpty()) {
@@ -1699,7 +1746,7 @@ void ActiveMQConsumerKernel::sendPullRequest(long long timeout) {
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumerKernel::checkClosed() const {
     if (this->isClosed()) {
-        throw ActiveMQException(__FILE__, __LINE__, "Consumer Already Closed" );
+        throw cms::IllegalStateException("Consumer Already Closed" );
     }
 }
 
@@ -1731,16 +1778,21 @@ bool ActiveMQConsumerKernel::iterate() {
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumerKernel::inProgressClearRequired() {
-    this->internal->inProgressClearRequiredFlag = true;
+    this->internal->inProgressClearRequiredFlag.incrementAndGet();
     // Clears dispatched messages async to avoid lock contention with inprogress acks.
-    this->internal->clearDispatchList = true;
+    this->internal->isClearDeliveredList = true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::clearDeliveredList() {
+    this->internal->clearDeliveredList();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumerKernel::clearMessagesInProgress() {
-    if (this->internal->inProgressClearRequiredFlag) {
+    if (this->internal->inProgressClearRequiredFlag.get() > 0) {
         synchronized(this->internal->unconsumedMessages.get()) {
-            if (this->internal->inProgressClearRequiredFlag) {
+            if (this->internal->inProgressClearRequiredFlag.get() > 0) {
 
                 // ensure messages that were not yet consumed are rolled back up front as they
                 // may get redelivered to another consumer by the Broker.
@@ -1756,7 +1808,7 @@ void ActiveMQConsumerKernel::clearMessagesInProgress() {
 
                 // allow dispatch on this connection to resume
                 this->session->getConnection()->setTransportInterruptionProcessingComplete();
-                this->internal->inProgressClearRequiredFlag = false;
+                this->internal->inProgressClearRequiredFlag.decrementAndGet();
 
                 // Wake up any blockers and allow them to recheck state.
                 this->internal->unconsumedMessages->notifyAll();
@@ -1766,11 +1818,6 @@ void ActiveMQConsumerKernel::clearMessagesInProgress() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::clearDispatchList() {
-    this->internal->doClearDispatchList();
-}
-
-////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQConsumerKernel::isAutoAcknowledgeEach() const {
     return this->session->isAutoAcknowledge() ||
            (this->session->isDupsOkAcknowledge() && this->consumerInfo->getDestination()->isQueue());
@@ -2018,3 +2065,8 @@ bool ActiveMQConsumerKernel::isConsumerExpiryCheckEnabled() {
 void ActiveMQConsumerKernel::setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled) {
     this->internal->consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
 }
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumerKernel::isRedeliveryExpectedInCurrentTransaction(Pointer<MessageDispatch> dispatch) const {
+    return this->internal->redeliveryExpectedInCurrentTransaction(dispatch, false);
+}

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4822f75d/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
index b77e2a5..dc5d1c8 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
@@ -368,6 +368,16 @@ namespace kernels {
          */
         void setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled);
 
+        /**
+         * Returns true if the given MessageDispatch is expected to be redelivered in the
+         * currently open transaction.  This would be true for any message that was previously
+         * delivered in a transaction and a failover occurred prior to the transaction being
+         * completed.
+         *
+         * @return true if the given dispatch needs to be delivered to this consumer to recover.
+         */
+        bool isRedeliveryExpectedInCurrentTransaction(Pointer<commands::MessageDispatch> dispatch) const;
+
     protected:
 
         /**
@@ -424,7 +434,7 @@ namespace kernels {
 
         void registerSync();
 
-        void clearDispatchList();
+        void clearDeliveredList();
 
     };
 

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4822f75d/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
index aaf0a38..e6d2760 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
@@ -1523,3 +1523,18 @@ bool ActiveMQSessionKernel::isSessionAsyncDispatch() const {
 void ActiveMQSessionKernel::setSessionAsyncDispatch(bool sessionAsyncDispatch) {
     this->config->sessionAsyncDispatch = sessionAsyncDispatch;
 }
+
+////////////////////////////////////////////////////////////////////////////////
+decaf::util::ArrayList< Pointer<ActiveMQConsumerKernel> > ActiveMQSessionKernel::getConsumers() const {
+    ArrayList< Pointer<ActiveMQConsumerKernel> > result;
+    this->config->consumerLock.readLock().lock();
+    try {
+        result.addAll(this->config->consumers);
+        this->config->consumerLock.readLock().unlock();
+    } catch (Exception& ex) {
+        this->config->consumerLock.readLock().unlock();
+        throw;
+    }
+
+    return result;
+}

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4822f75d/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
index d62beb5..5247e7c 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
@@ -41,6 +41,7 @@
 #include <activemq/threads/Scheduler.h>
 
 #include <decaf/lang/Pointer.h>
+#include <decaf/util/ArrayList.h>
 #include <decaf/util/Properties.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/util/concurrent/atomic/AtomicInteger.h>
@@ -584,6 +585,15 @@ namespace kernels {
          */
         void setSessionAsyncDispatch(bool sessionAsyncDispatch);
 
+        /**
+         * Returns an ArrayList containing a copy of all consumers currently in
+         * use on this Session.  Since this list is copied from the main consumers
+         * list the usage is thread safe after return.
+         *
+         * @return a list containing a pointer to each consumer active in this session.
+         */
+        decaf::util::ArrayList< Pointer<ActiveMQConsumerKernel> > getConsumers() const;
+
    private:
 
        /**

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4822f75d/activemq-cpp/src/test-integration/TestRegistry.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/TestRegistry.cpp b/activemq-cpp/src/test-integration/TestRegistry.cpp
index a1244ce..2df8e3a 100644
--- a/activemq-cpp/src/test-integration/TestRegistry.cpp
+++ b/activemq-cpp/src/test-integration/TestRegistry.cpp
@@ -71,7 +71,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMessagePriori
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMapMessageTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireNonBlockingRedeliveryTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireOptimizedAckTest );
-CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireQueueBrowserTest );
+//CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireQueueBrowserTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenWireRedeliveryPolicyTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleRollbackTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleRollbackTest );

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4822f75d/activemq-cpp/src/test-integration/activemq/util/IntegrationCommon.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/util/IntegrationCommon.cpp b/activemq-cpp/src/test-integration/activemq/util/IntegrationCommon.cpp
index abe9973..d079493 100644
--- a/activemq-cpp/src/test-integration/activemq/util/IntegrationCommon.cpp
+++ b/activemq-cpp/src/test-integration/activemq/util/IntegrationCommon.cpp
@@ -30,7 +30,7 @@ IntegrationCommon::IntegrationCommon() : urlCommon(), stompURL(), openwireURL()
 
     this->urlCommon = "tcp://localhost:";
     this->stompURL = this->urlCommon + "61613?wireFormat=stomp";
-    this->openwireURL = this->urlCommon + "61616?wireFormat=openwire";
+    this->openwireURL = this->urlCommon + "61616?transport.trace=false";
 }
 
 ////////////////////////////////////////////////////////////////////////////////