You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/04/01 23:56:01 UTC

svn commit: r1463311 [2/2] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/activemq/core/ main/activemq/core/kernels/ test/activemq/core/

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp Mon Apr  1 21:56:01 2013
@@ -25,6 +25,9 @@
 #include <decaf/lang/Boolean.h>
 #include <decaf/lang/Integer.h>
 #include <decaf/lang/Long.h>
+#include <decaf/util/HashMap.h>
+#include <decaf/util/concurrent/ExecutorService.h>
+#include <decaf/util/concurrent/Executors.h>
 #include <activemq/util/Config.h>
 #include <activemq/util/CMSExceptionSupport.h>
 #include <activemq/util/ActiveMQProperties.h>
@@ -63,10 +66,22 @@ using namespace decaf::util;
 using namespace decaf::util::concurrent;
 
 ////////////////////////////////////////////////////////////////////////////////
-namespace activemq{
+namespace activemq {
 namespace core {
 namespace kernels {
 
+    class PreviouslyDeliveredMap : public HashMap<Pointer<MessageId>, bool> {
+    public:
+
+        Pointer<TransactionId> transactionId;
+
+        PreviouslyDeliveredMap(Pointer<TransactionId> transactionId) :
+            transactionId(transactionId) {
+        }
+
+        virtual ~PreviouslyDeliveredMap() {}
+    };
+
     class ActiveMQConsumerKernelConfig {
     private:
 
@@ -95,6 +110,21 @@ namespace kernels {
         Pointer<Exception> failureError;
         Pointer<Scheduler> scheduler;
         int hashCode;
+        Pointer<PreviouslyDeliveredMap> previouslyDeliveredMessages;
+        long long failoverRedeliveryWaitPeriod;
+        bool transactedIndividualAck;
+        bool nonBlockingRedelivery;
+        bool optimizeAcknowledge;
+        long long optimizeAckTimestamp;
+        long long optimizeAcknowledgeTimeOut;
+        long long optimizedAckScheduledAckInterval;
+        Runnable* optimizedAckTask;
+        int ackCounter;
+        int dispatchedCount;
+        Pointer<ExecutorService> executor;
+        ActiveMQSessionKernel* session;
+        ActiveMQConsumerKernel* parent;
+        Pointer<ConsumerInfo> info;
 
         ActiveMQConsumerKernelConfig() : listener(NULL),
                                          messageAvailableListener(NULL),
@@ -115,8 +145,185 @@ namespace kernels {
                                          redeliveryPolicy(),
                                          failureError(),
                                          scheduler(),
-                                         hashCode() {
+                                         hashCode(),
+                                         previouslyDeliveredMessages(),
+                                         failoverRedeliveryWaitPeriod(0),
+                                         transactedIndividualAck(false),
+                                         nonBlockingRedelivery(false),
+                                         optimizeAcknowledge(false),
+                                         optimizeAckTimestamp(System::currentTimeMillis()),
+                                         optimizeAcknowledgeTimeOut(),
+                                         optimizedAckScheduledAckInterval(),
+                                         optimizedAckTask(),
+                                         ackCounter(),
+                                         dispatchedCount(),
+                                         executor(),
+                                         session(),
+                                         parent(),
+                                         info() {
+        }
+
+        bool isTimeForOptimizedAck(int prefetchSize) const {
+            if (ackCounter + deliveredCounter >= (prefetchSize * 0.65)) {
+                return true;
+            }
+
+            long long nextAckTime = optimizeAckTimestamp + optimizeAcknowledgeTimeOut;
+
+            if (optimizeAcknowledgeTimeOut > 0 && System::currentTimeMillis() >= nextAckTime) {
+                return true;
+            }
+
+            return false;
+        }
+
+        void doClearMessagesInProgress() {
+            if (this->inProgressClearRequiredFlag) {
+                synchronized(this->unconsumedMessages.get()) {
+                    if (this->inProgressClearRequiredFlag) {
+
+                        // ensure messages that were not yet consumed are rolled back up front as they
+                        // may get redelivered to another consumer by the Broker.
+                        std::vector< Pointer<MessageDispatch> > list = this->unconsumedMessages->removeAll();
+                        if (!this->info->isBrowser()) {
+                            std::vector<Pointer<MessageDispatch> >::const_iterator iter = list.begin();
+
+                            for (; iter != list.end(); ++iter) {
+                                Pointer<MessageDispatch> md = *iter;
+                                this->session->getConnection()->rollbackDuplicate(this->parent, md->getMessage());
+                            }
+                        }
+
+                        // allow dispatch on this connection to resume
+                        this->session->getConnection()->setTransportInterruptionProcessingComplete();
+                        this->inProgressClearRequiredFlag = false;
+
+                        // Wake up any blockers and allow them to recheck state.
+                        this->unconsumedMessages->notifyAll();
+                    }
+                }
+            }
+        }
+
+        void doClearDispatchList() {
+            if (clearDispatchList) {
+                synchronized (&this->dispatchedMessages) {
+                    if (clearDispatchList) {
+                        if (!dispatchedMessages.isEmpty()) {
+                            if (session->isTransacted()) {
+                                if (previouslyDeliveredMessages == NULL) {
+                                    previouslyDeliveredMessages.reset(new PreviouslyDeliveredMap(
+                                        session->getTransactionContext()->getTransactionId()));
+                                }
+
+                                Pointer<Iterator<Pointer<MessageDispatch> > > iter(dispatchedMessages.iterator());
+
+                                while (iter->hasNext()) {
+                                    Pointer<MessageDispatch> dispatch = iter->next();
+                                    previouslyDeliveredMessages->put(dispatch->getMessage()->getMessageId(), false);
+                                }
+                            } else {
+                                dispatchedMessages.clear();
+                                pendingAck.reset(NULL);
+                            }
+                        }
+                        clearDispatchList = false;
+                    }
+                }
+            }
+        }
+
+        void doClearPreviouslyDelivered() {
+            if (previouslyDeliveredMessages != NULL) {
+                previouslyDeliveredMessages->clear();
+                previouslyDeliveredMessages.reset(NULL);
+            }
         }
+
+        // called with deliveredMessages locked
+        void removeFromDeliveredMessages(Pointer<MessageId> key) {
+            Pointer< Iterator< Pointer<MessageDispatch> > > iter(this->dispatchedMessages.iterator());
+            while (iter->hasNext()) {
+                Pointer<MessageDispatch> candidate = iter->next();
+                if (key->equals(candidate->getMessage()->getMessageId().get())) {
+                    session->getConnection()->rollbackDuplicate(this->parent, candidate->getMessage());
+                    iter->remove();
+                    break;
+                }
+            }
+        }
+
+        // called with unconsumedMessages && deliveredMessages locked remove any message
+        // not re-delivered as they can't be replayed to this consumer on rollback
+        void rollbackPreviouslyDeliveredAndNotRedelivered() {
+            if (previouslyDeliveredMessages != NULL) {
+
+                Set<MapEntry<Pointer<MessageId>, bool> >& entries = previouslyDeliveredMessages->entrySet();
+                Pointer<Iterator<MapEntry<Pointer<MessageId>, bool> > > iter(entries.iterator());
+                while (iter->hasNext()) {
+                    MapEntry<Pointer<MessageId>, bool> entry = iter->next();
+                    if (!entry.getValue()) {
+                        removeFromDeliveredMessages(entry.getKey());
+                    }
+                }
+
+                doClearPreviouslyDelivered();
+            }
+        }
+
+        void rollbackOnFailedRecoveryRedelivery() {
+            if (previouslyDeliveredMessages != NULL) {
+                // if any previously delivered messages was not re-delivered, transaction is invalid
+                // and must roll back as messages have been dispatched elsewhere.
+                int numberNotReplayed = 0;
+                Set<MapEntry<Pointer<MessageId>, bool> >& entries = previouslyDeliveredMessages->entrySet();
+                Pointer<Iterator<MapEntry<Pointer<MessageId>, bool> > > iter(entries.iterator());
+                while (iter->hasNext()) {
+                    MapEntry<Pointer<MessageId>, bool> entry = iter->next();
+                    if (!entry.getValue()) {
+                        numberNotReplayed++;
+                    }
+                }
+                if (numberNotReplayed > 0) {
+                    std::string message = std::string("rolling back transaction (") +
+                        previouslyDeliveredMessages->transactionId->toString() +
+                        ") post failover recovery. " + Integer::toString(numberNotReplayed) +
+                        " previously delivered message(s) not replayed to consumer: " +
+                        info->getConsumerId()->toString();
+                    throw cms::TransactionRolledBackException(message);
+                }
+            }
+        }
+
+        void waitForRedeliveries() {
+            if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != NULL) {
+                long expiry = System::currentTimeMillis() + failoverRedeliveryWaitPeriod;
+                int numberNotReplayed;
+                do {
+                    numberNotReplayed = 0;
+                    synchronized (&this->dispatchedMessages) {
+                        if (previouslyDeliveredMessages != NULL) {
+                            Set<MapEntry<Pointer<MessageId>, bool> >& entries = previouslyDeliveredMessages->entrySet();
+                            Pointer<Iterator<MapEntry<Pointer<MessageId>, bool> > > iter(entries.iterator());
+                            while (iter->hasNext()) {
+                                MapEntry<Pointer<MessageId>, bool> entry = iter->next();
+                                if (!entry.getValue()) {
+                                    numberNotReplayed++;
+                                }
+                            }
+                        }
+                    }
+                    if (numberNotReplayed > 0) {
+                        try {
+                            Thread::sleep(Math::max(500LL, failoverRedeliveryWaitPeriod/4));
+                        } catch (InterruptedException& ex) {
+                            break;
+                        }
+                    }
+                } while (numberNotReplayed > 0 && expiry < System::currentTimeMillis());
+            }
+        }
+
     };
 
 }}}
@@ -150,7 +357,16 @@ namespace {
         virtual ~TransactionSynhcronization() {}
 
         virtual void beforeEnd() {
-            consumer->acknowledge();
+
+            if (false) { // TODO transactedIndividualAck) {
+//                consumer->clearDispatchList();
+//                consumer->waitForRedeliveries();
+//                synchronized(deliveredMessages) {
+//                    consumer->rollbackOnFailedRecoveryRedelivery();
+//                }
+            } else {
+                consumer->acknowledge();
+            }
             consumer->setSynchronizationRegistered(false);
         }
 
@@ -276,6 +492,7 @@ namespace {
     private:
 
         Pointer<ActiveMQConsumerKernel> consumer;
+        ActiveMQSessionKernel* session;
 
     private:
 
@@ -284,7 +501,9 @@ namespace {
 
     public:
 
-        StartConsumerTask(Pointer<ActiveMQConsumerKernel> consumer) : Runnable(), consumer(consumer) {
+        StartConsumerTask(Pointer<ActiveMQConsumerKernel> consumer, ActiveMQSessionKernel* session) :
+            Runnable(), consumer(consumer), session(session) {
+
             if (consumer == NULL) {
                 throw NullPointerException(
                     __FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
@@ -299,11 +518,105 @@ namespace {
                     this->consumer->start();
                 }
             } catch(cms::CMSException& ex) {
-                // TODO - Need Connection onAsyncException method.
+                // TODO
+                // this->session->getConnection()->onAsyncException(ex);
             }
         }
     };
 
+    class AsyncMessageAckTask : public Runnable {
+    private:
+
+        Pointer<MessageAck> ack;
+        ActiveMQSessionKernel* session;
+        ActiveMQConsumerKernelConfig* impl;
+
+    private:
+
+        AsyncMessageAckTask(const AsyncMessageAckTask&);
+        AsyncMessageAckTask& operator=(const AsyncMessageAckTask&);
+
+    public:
+
+        AsyncMessageAckTask(Pointer<MessageAck> ack, ActiveMQSessionKernel* session, ActiveMQConsumerKernelConfig* impl) :
+            Runnable(), ack(ack), session(session), impl(impl) {}
+        virtual ~AsyncMessageAckTask() {}
+
+        virtual void run() {
+            try {
+                this->session->sendAck(ack, true);
+                this->impl->deliveringAcks.set(false);
+            } catch(Exception& ex) {
+                this->impl->deliveringAcks.set(false);
+            } catch(cms::CMSException& ex) {
+                this->impl->deliveringAcks.set(false);
+            }
+        }
+    };
+
+    class OptimizedAckTask : public Runnable {
+    private:
+
+        ActiveMQConsumerKernel* consumer;
+        ActiveMQConsumerKernelConfig* impl;
+
+    private:
+
+        OptimizedAckTask(const OptimizedAckTask&);
+        OptimizedAckTask& operator=(const OptimizedAckTask&);
+
+    public:
+
+        OptimizedAckTask(ActiveMQConsumerKernel* consumer, ActiveMQConsumerKernelConfig* impl) :
+            Runnable(), consumer(consumer), impl(impl) {}
+        virtual ~OptimizedAckTask() {}
+
+        virtual void run() {
+            try {
+                if (impl->optimizeAcknowledge && !impl->unconsumedMessages->isClosed()) {
+                    this->consumer->deliverAcks();
+                }
+            } catch(Exception& ex) {
+            }
+        }
+    };
+
+    class NonBlockingRedeliveryTask : public Runnable {
+    private:
+
+        ActiveMQSessionKernel* session;
+        Pointer<ActiveMQConsumerKernel> consumer;
+        ActiveMQConsumerKernelConfig* impl;
+        LinkedList<Pointer<MessageDispatch> > redeliveries;
+
+    private:
+
+        NonBlockingRedeliveryTask(const NonBlockingRedeliveryTask&);
+        NonBlockingRedeliveryTask& operator=(const NonBlockingRedeliveryTask&);
+
+    public:
+
+        NonBlockingRedeliveryTask(ActiveMQSessionKernel* session, Pointer<ActiveMQConsumerKernel> consumer, ActiveMQConsumerKernelConfig* impl) :
+            Runnable(), session(session), consumer(consumer), impl(impl), redeliveries() {
+
+            this->redeliveries.copy(impl->dispatchedMessages);
+        }
+        virtual ~NonBlockingRedeliveryTask() {}
+
+        virtual void run() {
+            try {
+                if (!impl->unconsumedMessages->isClosed()) {
+                    Pointer<Iterator<Pointer<MessageDispatch> > > iter(redeliveries.iterator());
+                    while (iter->hasNext() && !impl->unconsumedMessages->isClosed()) {
+                        Pointer<MessageDispatch> dispatch = iter->next();
+                        session->dispatch(dispatch);
+                    }
+                }
+            } catch (Exception& e) {
+                session->getConnection()->onAsyncException(e);
+            }
+        }
+    };
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -356,14 +669,20 @@ ActiveMQConsumerKernel::ActiveMQConsumer
     consumerInfo->setSubscriptionName(name);
     consumerInfo->setSelector(selector);
     consumerInfo->setPrefetchSize(prefetch);
+    consumerInfo->setCurrentPrefetchSize(prefetch);
     consumerInfo->setMaximumPendingMessageLimit(maxPendingMessageCount);
     consumerInfo->setBrowser(browser);
     consumerInfo->setDispatchAsync(dispatchAsync);
     consumerInfo->setNoLocal(noLocal);
+    consumerInfo->setExclusive(session->getConnection()->isExclusiveConsumer());
+    consumerInfo->setRetroactive(session->getConnection()->isUseRetroactiveConsumer());
 
     // Initialize Consumer Data
     this->session = session;
     this->consumerInfo = consumerInfo;
+    this->internal->session = session;
+    this->internal->parent = this;
+    this->internal->info = consumerInfo;
     this->internal->hashCode = id->getHashCode();
     this->internal->lastDeliveredSequenceId = -1;
     this->internal->synchronizationRegistered = false;
@@ -388,6 +707,23 @@ ActiveMQConsumerKernel::ActiveMQConsumer
 
     applyDestinationOptions(this->consumerInfo);
 
+    if (session->getConnection()->isOptimizeAcknowledge() && session->isAutoAcknowledge() && !consumerInfo->isBrowser()) {
+        this->internal->optimizeAcknowledge = true;
+    }
+
+    if (this->internal->optimizeAcknowledge) {
+        this->internal->optimizeAcknowledgeTimeOut = session->getConnection()->getOptimizeAcknowledgeTimeOut();
+        this->setOptimizedAckScheduledAckInterval(
+            session->getConnection()->getOptimizedAckScheduledAckInterval());
+    }
+
+    consumerInfo->setOptimizedAcknowledge(this->internal->optimizeAcknowledge);
+    this->internal->failoverRedeliveryWaitPeriod =
+        session->getConnection()->getConsumerFailoverRedeliveryWaitPeriod();
+    this->internal->nonBlockingRedelivery = session->getConnection()->isNonBlockingRedelivery();
+    this->internal->transactedIndividualAck =
+        session->getConnection()->isTransactedIndividualAck() || this->internal->nonBlockingRedelivery;
+
     if (this->consumerInfo->getPrefetchSize() < 0) {
         delete this->internal;
         throw IllegalArgumentException(
@@ -423,7 +759,7 @@ void ActiveMQConsumerKernel::start() {
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumerKernel::stop() {
-    this->internal->started.set( false );
+    this->internal->started.set(false);
     this->internal->unconsumedMessages->stop();
 }
 
@@ -443,8 +779,6 @@ void ActiveMQConsumerKernel::close() {
 
                 Pointer<Synchronization> sync(new CloseSynhcronization(this));
                 this->session->getTransactionContext()->addSynchronization(sync);
-
-                doClose();
             } else {
                 doClose();
             }
@@ -457,6 +791,9 @@ void ActiveMQConsumerKernel::close() {
 void ActiveMQConsumerKernel::doClose() {
 
     try {
+        // Store interrupted state and clear so that Transport operations don't
+        // throw InterruptedException and we ensure that resources are clened up.
+        bool interrupted = Thread::interrupted();
 
         dispose();
         // Remove at the Broker Side, consumer has been removed from the local
@@ -466,6 +803,9 @@ void ActiveMQConsumerKernel::doClose() {
         info->setObjectId(this->consumerInfo->getConsumerId());
         info->setLastDeliveredSequenceId(this->internal->lastDeliveredSequenceId);
         this->session->oneway(info);
+        if (interrupted) {
+            Thread::currentThread()->interrupt();
+        }
     }
     AMQ_CATCH_RETHROW(ActiveMQException)
     AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -481,10 +821,40 @@ void ActiveMQConsumerKernel::dispose() {
 
             if (!session->isTransacted()) {
                 deliverAcks();
+                if (isAutoAcknowledgeBatch()) {
+                    acknowledge();
+                }
             }
 
             this->internal->started.set(false);
 
+            if (this->internal->executor != NULL) {
+                this->internal->executor->shutdown();
+                this->internal->executor->awaitTermination(60, TimeUnit::SECONDS);
+                this->internal->executor.reset(NULL);
+            }
+
+            if (this->internal->optimizedAckTask != NULL) {
+                this->session->getScheduler()->cancel(this->internal->optimizedAckTask);
+                this->internal->optimizedAckTask = NULL;
+            }
+
+            if (session->isClientAcknowledge()) {
+                if (!this->consumerInfo->isBrowser()) {
+                    // roll back duplicates that aren't acknowledged
+                    ArrayList< Pointer<MessageDispatch> > tmp;
+                    synchronized(&this->internal->dispatchedMessages) {
+                        tmp.copy(this->internal->dispatchedMessages);
+                    }
+                    Pointer< Iterator<Pointer<MessageDispatch> > > iter(tmp.iterator());
+                    while (iter->hasNext()) {
+                        Pointer<MessageDispatch> msg = iter->next();
+                        this->session->getConnection()->rollbackDuplicate(this, msg->getMessage());
+                    }
+                    tmp.clear();
+                }
+            }
+
             // Identifies any errors encountered during shutdown.
             bool haveException = false;
             ActiveMQException error;
@@ -553,8 +923,6 @@ decaf::lang::Pointer<MessageDispatch> Ac
 
     try {
 
-        this->checkClosed();
-
         // Calculate the deadline
         long long deadline = 0;
         if (timeout > 0) {
@@ -566,7 +934,6 @@ decaf::lang::Pointer<MessageDispatch> Ac
 
             Pointer<MessageDispatch> dispatch = this->internal->unconsumedMessages->dequeue(timeout);
             if (dispatch == NULL) {
-
                 if (timeout > 0 && !this->internal->unconsumedMessages->isClosed()) {
                     timeout = Math::max(deadline - System::currentTimeMillis(), 0LL);
                 } else {
@@ -576,13 +943,9 @@ decaf::lang::Pointer<MessageDispatch> Ac
                         return Pointer<MessageDispatch> ();
                     }
                 }
-
             } else if (dispatch->getMessage() == NULL) {
-
                 return Pointer<MessageDispatch> ();
-
             } else if (dispatch->getMessage()->isExpired()) {
-
                 beforeMessageIsConsumed(dispatch);
                 afterMessageIsConsumed(dispatch, true);
                 if (timeout > 0) {
@@ -597,6 +960,9 @@ decaf::lang::Pointer<MessageDispatch> Ac
         }
 
         return Pointer<MessageDispatch>();
+    } catch (InterruptedException& ex) {
+        Thread::currentThread()->interrupt();
+        throw CMSExceptionSupport::create(ex);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -607,6 +973,7 @@ cms::Message* ActiveMQConsumerKernel::re
     try {
 
         this->checkClosed();
+        this->checkMessageListener();
 
         // Send a request for a new message if needed
         this->sendPullRequest(0);
@@ -633,6 +1000,7 @@ cms::Message* ActiveMQConsumerKernel::re
     try {
 
         this->checkClosed();
+        this->checkMessageListener();
 
         // Send a request for a new message if needed
         this->sendPullRequest(millisecs);
@@ -659,6 +1027,7 @@ cms::Message* ActiveMQConsumerKernel::re
     try {
 
         this->checkClosed();
+        this->checkMessageListener();
 
         // Send a request for a new message if needed
         this->sendPullRequest(-1);
@@ -718,8 +1087,7 @@ void ActiveMQConsumerKernel::setMessageL
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::beforeMessageIsConsumed(const Pointer<MessageDispatch>& dispatch) {
-
+void ActiveMQConsumerKernel::beforeMessageIsConsumed(Pointer<MessageDispatch> dispatch) {
     this->internal->lastDeliveredSequenceId = dispatch->getMessage()->getMessageId()->getBrokerSequenceId();
 
     if (!isAutoAcknowledgeBatch()) {
@@ -730,54 +1098,100 @@ void ActiveMQConsumerKernel::beforeMessa
         }
 
         if (this->session->isTransacted()) {
-            ackLater(dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED);
+            if (this->internal->transactedIndividualAck) {
+                immediateIndividualTransactedAck(dispatch);
+            } else {
+                ackLater(dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED);
+            }
         }
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::afterMessageIsConsumed(const Pointer<MessageDispatch>& message, bool messageExpired ) {
+void ActiveMQConsumerKernel::immediateIndividualTransactedAck(Pointer<MessageDispatch> dispatch) {
+    // acks accumulate on the broker pending transaction completion to indicate delivery status
+    registerSync();
+
+    Pointer<MessageAck> ack(new MessageAck);
+
+    ack->setAckType(ActiveMQConstants::ACK_TYPE_INDIVIDUAL);
+    ack->setConsumerId(dispatch->getConsumerId());
+    ack->setDestination(dispatch->getDestination());
+    ack->setLastMessageId(dispatch->getMessage()->getMessageId());
+    ack->setMessageCount(1);
+    ack->setTransactionId(this->session->getTransactionContext()->getTransactionId());
+
+    this->session->syncRequest(ack);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::registerSync() {
+    this->session->doStartTransaction();
+    if (!this->internal->synchronizationRegistered) {
+        this->internal->synchronizationRegistered = true;
+        Pointer<Synchronization> sync(new TransactionSynhcronization(this));
+        this->session->getTransactionContext()->addSynchronization(sync);
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::afterMessageIsConsumed(Pointer<MessageDispatch> message, bool messageExpired ) {
 
     try {
 
         if (this->internal->unconsumedMessages->isClosed()) {
             return;
-        }
-
-        if (messageExpired == true) {
-            synchronized(&this->internal->dispatchedMessages) {
-                this->internal->dispatchedMessages.remove(message);
-            }
+        } else if (messageExpired == true) {
             ackLater(message, ActiveMQConstants::ACK_TYPE_DELIVERED);
             return;
-        }
-
-        if (session->isTransacted()) {
+        } else if (session->isTransacted()) {
             return;
-        } else if (isAutoAcknowledgeEach()) {
+        }
 
+        if (isAutoAcknowledgeEach()) {
             if (this->internal->deliveringAcks.compareAndSet(false, true)) {
-
                 synchronized(&this->internal->dispatchedMessages) {
                     if (!this->internal->dispatchedMessages.isEmpty()) {
-                        Pointer<MessageAck> ack = makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
+                        if (this->internal->optimizeAcknowledge) {
 
-                        if (ack != NULL) {
-                            this->internal->dispatchedMessages.clear();
-                            session->oneway(ack);
+                            this->internal->ackCounter++;
+                            if (this->internal->isTimeForOptimizedAck(this->consumerInfo->getPrefetchSize())) {
+                                Pointer<MessageAck> ack =
+                                    makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
+                                if (ack != NULL) {
+                                    this->internal->dispatchedMessages.clear();
+                                    this->internal->ackCounter = 0;
+                                    this->session->sendAck(ack);
+                                    this->internal->optimizeAckTimestamp = System::currentTimeMillis();
+                                }
+
+                                // As further optimization send ack for expired messages when there
+                                // are any. This resets the deliveredCounter to 0 so that we won't
+                                // send standard acks with every message just because the deliveredCounter
+                                // just below 0.5 * prefetch as used in ackLater()
+                                if (this->internal->pendingAck != NULL && this->internal->deliveredCounter > 0) {
+                                    this->session->sendAck(this->internal->pendingAck);
+                                    this->internal->pendingAck.reset(NULL);
+                                    this->internal->deliveredCounter = 0;
+                                }
+                            }
+                        } else {
+                            Pointer<MessageAck> ack =
+                                makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
+                            if (ack != NULL) {
+                                this->internal->dispatchedMessages.clear();
+                                session->oneway(ack);
+                            }
                         }
                     }
                 }
 
                 this->internal->deliveringAcks.set(false);
             }
-
         } else if (isAutoAcknowledgeBatch()) {
             ackLater(message, ActiveMQConstants::ACK_TYPE_CONSUMED);
         } else if (session->isClientAcknowledge() || session->isIndividualAcknowledge()) {
-
             bool messageUnackedByConsumer = false;
-
             synchronized(&this->internal->dispatchedMessages) {
                 messageUnackedByConsumer = this->internal->dispatchedMessages.contains(message);
             }
@@ -785,9 +1199,8 @@ void ActiveMQConsumerKernel::afterMessag
             if (messageUnackedByConsumer) {
                 this->ackLater(message, ActiveMQConstants::ACK_TYPE_DELIVERED);
             }
-
         } else {
-            throw IllegalStateException( __FILE__, __LINE__, "Invalid Session State" );
+            throw IllegalStateException(__FILE__, __LINE__, "Invalid Session State");
         }
     }
     AMQ_CATCH_RETHROW(ActiveMQException)
@@ -801,34 +1214,30 @@ void ActiveMQConsumerKernel::deliverAcks
     try {
 
         Pointer<MessageAck> ack;
-
         if (this->internal->deliveringAcks.compareAndSet(false, true)) {
 
             if (isAutoAcknowledgeEach()) {
-
                 synchronized(&this->internal->dispatchedMessages) {
-
                     ack = makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
-
                     if (ack != NULL) {
                         this->internal->dispatchedMessages.clear();
+                        this->internal->ackCounter = 0;
                     } else {
                         ack.swap(internal->pendingAck);
                     }
                 }
-
-            } else if (this->internal->pendingAck != NULL && this->internal->pendingAck->getAckType() == ActiveMQConstants::ACK_TYPE_CONSUMED) {
-
+            } else if (this->internal->pendingAck != NULL &&
+                       this->internal->pendingAck->getAckType() == ActiveMQConstants::ACK_TYPE_CONSUMED) {
                 ack.swap(this->internal->pendingAck);
             }
 
             if (ack != NULL) {
-
-                try {
-                    this->session->oneway(ack);
-                } catch (...) {
+                if (this->internal->executor == NULL) {
+                    this->internal->executor.reset(Executors::newSingleThreadExecutor());
                 }
 
+                this->internal->executor->submit(
+                    new AsyncMessageAckTask(ack, this->session, this->internal), true);
             } else {
                 this->internal->deliveringAcks.set(false);
             }
@@ -840,18 +1249,12 @@ void ActiveMQConsumerKernel::deliverAcks
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::ackLater(const Pointer<MessageDispatch>& dispatch, int ackType) {
+void ActiveMQConsumerKernel::ackLater(Pointer<MessageDispatch> dispatch, int ackType) {
 
     // Don't acknowledge now, but we may need to let the broker know the
     // consumer got the message to expand the pre-fetch window
     if (session->isTransacted()) {
-        session->doStartTransaction();
-        if (!this->internal->synchronizationRegistered) {
-            this->internal->synchronizationRegistered = true;
-
-            Pointer<Synchronization> sync(new TransactionSynhcronization(this));
-            this->session->getTransactionContext()->addSynchronization(sync);
-        }
+        registerSync();
     }
 
     // The delivered message list is only needed for the recover method
@@ -882,8 +1285,10 @@ void ActiveMQConsumerKernel::ackLater(co
         this->internal->pendingAck->setTransactionId(this->session->getTransactionContext()->getTransactionId());
     }
 
-    if ((0.5 * this->consumerInfo->getPrefetchSize()) <= (internal->deliveredCounter - internal->additionalWindowSize)) {
-        session->oneway(this->internal->pendingAck);
+    // Need to evaluate both expired and normal messages as otherwise consumer may get stalled
+    int pendingAcks = (internal->deliveredCounter + internal->ackCounter) - internal->additionalWindowSize;
+    if ((0.5 * this->consumerInfo->getPrefetchSize()) <= pendingAcks) {
+        session->sendAck(this->internal->pendingAck);
         this->internal->pendingAck.reset(NULL);
         this->internal->deliveredCounter = 0;
         this->internal->additionalWindowSize = 0;
@@ -915,36 +1320,27 @@ Pointer<MessageAck> ActiveMQConsumerKern
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::acknowledge(const Pointer<commands::MessageDispatch>& dispatch) {
-
-    try {
-
-        this->checkClosed();
+void ActiveMQConsumerKernel::acknowledge(Pointer<commands::MessageDispatch> dispatch) {
+    this->acknowledge(dispatch, ActiveMQConstants::ACK_TYPE_INDIVIDUAL);
+}
 
-        if (this->session->isIndividualAcknowledge()) {
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::acknowledge(Pointer<commands::MessageDispatch> dispatch, int ackType) {
 
-            Pointer<MessageAck> ack(new MessageAck());
-            ack->setAckType(ActiveMQConstants::ACK_TYPE_CONSUMED);
-            ack->setConsumerId(this->consumerInfo->getConsumerId());
-            ack->setDestination(this->consumerInfo->getDestination());
-            ack->setMessageCount(1);
-            ack->setLastMessageId(dispatch->getMessage()->getMessageId());
-            ack->setFirstMessageId(dispatch->getMessage()->getMessageId());
+    try {
 
-            session->oneway(ack);
+        Pointer<MessageAck> ack(new MessageAck());
+        ack->setAckType((unsigned char) ackType);
+        ack->setConsumerId(this->consumerInfo->getConsumerId());
+        ack->setDestination(this->consumerInfo->getDestination());
+        ack->setMessageCount(1);
+        ack->setLastMessageId(dispatch->getMessage()->getMessageId());
+        ack->setFirstMessageId(dispatch->getMessage()->getMessageId());
 
-            synchronized(&this->internal->dispatchedMessages) {
-                std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(this->internal->dispatchedMessages.iterator());
-                while(iter->hasNext()) {
-                    if (iter->next() == dispatch) {
-                        iter->remove();
-                        break;
-                    }
-                }
-            }
+        session->sendAck(ack);
 
-        } else {
-            throw IllegalStateException(__FILE__, __LINE__, "Session is not in IndividualAcknowledge mode." );
+        synchronized(&this->internal->dispatchedMessages) {
+            this->internal->dispatchedMessages.remove(dispatch);
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -955,6 +1351,9 @@ void ActiveMQConsumerKernel::acknowledge
 
     try {
 
+        clearDispatchList();
+        this->internal->waitForRedeliveries();
+
         synchronized(&this->internal->dispatchedMessages) {
 
             // Acknowledge all messages so far.
@@ -965,12 +1364,13 @@ void ActiveMQConsumerKernel::acknowledge
             }
 
             if (session->isTransacted()) {
+                this->internal->rollbackOnFailedRecoveryRedelivery();
                 session->doStartTransaction();
                 ack->setTransactionId(session->getTransactionContext()->getTransactionId());
             }
 
-            session->oneway(ack);
             this->internal->pendingAck.reset(NULL);
+            session->sendAck(ack);
 
             // Adjust the counters
             this->internal->deliveredCounter = Math::max(0, this->internal->deliveredCounter - (int) this->internal->dispatchedMessages.size());
@@ -998,7 +1398,22 @@ void ActiveMQConsumerKernel::rollback() 
 
     synchronized(this->internal->unconsumedMessages.get()) {
 
+        if (this->internal->optimizeAcknowledge) {
+            // remove messages read but not acknowledged at the broker yet through optimizeAcknowledge
+            if (!this->consumerInfo->isBrowser()) {
+                synchronized(&this->internal->dispatchedMessages) {
+                    for (int i = 0; (i < this->internal->dispatchedMessages.size()) &&
+                                    (i < this->internal->ackCounter); i++) {
+                        // ensure we don't filter this as a duplicate
+                        Pointer<MessageDispatch> md = this->internal->dispatchedMessages.removeLast();
+                        session->getConnection()->rollbackDuplicate(this, md->getMessage());
+                    }
+                }
+            }
+        }
+
         synchronized(&this->internal->dispatchedMessages) {
+            this->internal->rollbackPreviouslyDeliveredAndNotRedelivered();
             if (this->internal->dispatchedMessages.isEmpty()) {
                 return;
             }
@@ -1014,15 +1429,16 @@ void ActiveMQConsumerKernel::rollback() 
 
             Pointer<MessageId> firstMsgId = this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId();
 
-            std::auto_ptr<Iterator<Pointer<MessageDispatch> > > iter(internal->dispatchedMessages.iterator());
-
+            Pointer<Iterator<Pointer<MessageDispatch> > > iter(internal->dispatchedMessages.iterator());
             while (iter->hasNext()) {
                 Pointer<Message> message = iter->next()->getMessage();
                 message->setRedeliveryCounter(message->getRedeliveryCounter() + 1);
+                // ensure we don't filter this as a duplicate
+                session->getConnection()->rollbackDuplicate(this, message);
             }
 
-            if (this->internal->redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES
-                && lastMsg->getMessage()->getRedeliveryCounter() > this->internal->redeliveryPolicy->getMaximumRedeliveries()) {
+            if (this->internal->redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES &&
+                lastMsg->getMessage()->getRedeliveryCounter() > this->internal->redeliveryPolicy->getMaximumRedeliveries()) {
 
                 // We need to NACK the messages so that they get sent to the DLQ.
                 // Acknowledge the last message.
@@ -1033,8 +1449,9 @@ void ActiveMQConsumerKernel::rollback() 
                 ack->setMessageCount((int) this->internal->dispatchedMessages.size());
                 ack->setLastMessageId(lastMsg->getMessage()->getMessageId());
                 ack->setFirstMessageId(firstMsgId);
+                // TODO - ack->setPoisonCause()
 
-                session->oneway(ack);
+                session->sendAck(ack, true);
                 // Adjust the window size.
                 this->internal->additionalWindowSize = Math::max(0,
                     this->internal->additionalWindowSize - (int) this->internal->dispatchedMessages.size());
@@ -1055,22 +1472,33 @@ void ActiveMQConsumerKernel::rollback() 
                     session->oneway(ack);
                 }
 
-                // stop the delivery of messages.
-                this->internal->unconsumedMessages->stop();
-
-                std::auto_ptr<Iterator<Pointer<MessageDispatch> > > iter(
-                    this->internal->dispatchedMessages.iterator());
-                while (iter->hasNext()) {
-                    this->internal->unconsumedMessages->enqueueFirst(iter->next());
-                }
+                if (this->internal->nonBlockingRedelivery) {
 
-                if (internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed()) {
-                    Pointer<ActiveMQConsumerKernel> self =
-                        this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
-                    this->internal->scheduler->executeAfterDelay(
-                        new StartConsumerTask(self), internal->redeliveryDelay);
+                    if (!this->internal->unconsumedMessages->isClosed()) {
+                        Pointer<ActiveMQConsumerKernel> self =
+                            this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
+                        this->session->getScheduler()->executeAfterDelay(
+                            new NonBlockingRedeliveryTask(session, self, this->internal),
+                            this->internal->redeliveryDelay);
+                    }
                 } else {
-                    start();
+                    // stop the delivery of messages.
+                    this->internal->unconsumedMessages->stop();
+
+                    std::auto_ptr<Iterator<Pointer<MessageDispatch> > > iter(
+                        this->internal->dispatchedMessages.iterator());
+                    while (iter->hasNext()) {
+                        this->internal->unconsumedMessages->enqueueFirst(iter->next());
+                    }
+
+                    if (internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed()) {
+                        Pointer<ActiveMQConsumerKernel> self =
+                            this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
+                        this->internal->scheduler->executeAfterDelay(
+                            new StartConsumerTask(self, session), internal->redeliveryDelay);
+                    } else {
+                        start();
+                    }
                 }
             }
             this->internal->deliveredCounter -= (int) internal->dispatchedMessages.size();
@@ -1088,40 +1516,94 @@ void ActiveMQConsumerKernel::dispatch(co
 
     try {
 
-        synchronized(this->internal->unconsumedMessages.get()) {
+        clearMessagesInProgress();
+        clearDispatchList();
 
-            clearMessagesInProgress();
-            if (this->internal->clearDispatchList) {
-                // we are reconnecting so lets flush the in progress messages
-                this->internal->clearDispatchList = false;
-                this->internal->unconsumedMessages->clear();
-            }
+        synchronized(this->internal->unconsumedMessages.get()) {
 
             if (!this->internal->unconsumedMessages->isClosed()) {
 
-                // Don't dispatch expired messages, ack it and then destroy it
-                if (dispatch->getMessage() != NULL && dispatch->getMessage()->isExpired()) {
-                    this->ackLater(dispatch, ActiveMQConstants::ACK_TYPE_CONSUMED);
-                    return;
-                }
-
-                synchronized(&this->internal->listenerMutex) {
+                if (this->consumerInfo->isBrowser() || !session->getConnection()->isDuplicate(this, dispatch->getMessage())) {
 
-                    // If we have a listener, send the message.
-                    if (this->internal->listener != NULL && internal->unconsumedMessages->isRunning()) {
-
-                        Pointer<cms::Message> message = createCMSMessage(dispatch);
-                        beforeMessageIsConsumed(dispatch);
-                        this->internal->listener->onMessage(message.get());
-                        afterMessageIsConsumed(dispatch, false);
+                    synchronized(&this->internal->listenerMutex) {
 
+                        if (this->internal->listener != NULL && this->internal->unconsumedMessages->isRunning()) {
+                            Pointer<cms::Message> message = createCMSMessage(dispatch);
+                            beforeMessageIsConsumed(dispatch);
+                            try {
+                                bool expired = dispatch->getMessage()->isExpired();
+                                if (!expired) {
+                                    this->internal->listener->onMessage(message.get());
+                                }
+                                afterMessageIsConsumed(dispatch, expired);
+                            } catch (RuntimeException& e) {
+                                if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session->isIndividualAcknowledge()) {
+                                    // Schedule redelivery and possible DLQ processing
+                                    // dispatch->setRollbackCause(e); // TODO
+                                    rollback();
+                                } else {
+                                    // Transacted or Client ack: Deliver the next message.
+                                    afterMessageIsConsumed(dispatch, false);
+                                }
+                            }
+                        } else {
+                            if (!this->internal->unconsumedMessages->isRunning()) {
+                                // delayed redelivery, ensure it can be re delivered
+                                session->getConnection()->rollbackDuplicate(this, dispatch->getMessage());
+                            }
+                            this->internal->unconsumedMessages->enqueue(dispatch);
+                            if (this->internal->messageAvailableListener != NULL) {
+                                this->internal->messageAvailableListener->onMessageAvailable(this);
+                            }
+                        }
+                    }
+                } else {
+                    if (!session->isTransacted()) {
+                        Pointer<MessageAck> ack(new MessageAck());
+                        ack->setAckType(ActiveMQConstants::ACK_TYPE_INDIVIDUAL);
+                        ack->setConsumerId(this->consumerInfo->getConsumerId());
+                        ack->setDestination(dispatch->getDestination());
+                        ack->setMessageCount(1);
+                        ack->setLastMessageId(dispatch->getMessage()->getMessageId());
+                        ack->setFirstMessageId(dispatch->getMessage()->getMessageId());
+                        session->sendAck(ack);
                     } else {
-
-                        // No listener, add it to the unconsumed messages list it will get pushed on the
-                        // next receive call or when a new listener is added.
-                        this->internal->unconsumedMessages->enqueue(dispatch);
-                        if (this->internal->messageAvailableListener != NULL) {
-                            this->internal->messageAvailableListener->onMessageAvailable(this);
+                        bool needsPoisonAck = false;
+                        synchronized (&this->internal->dispatchedMessages) {
+                            if (this->internal->previouslyDeliveredMessages != NULL) {
+                                this->internal->previouslyDeliveredMessages->put(
+                                        dispatch->getMessage()->getMessageId(), true);
+                            } else {
+                                // delivery while pending redelivery to another consumer on the same connection
+                                // not waiting for redelivery will help here
+                                needsPoisonAck = true;
+                            }
+                        }
+                        if (needsPoisonAck) {
+                            Pointer<MessageAck> poisonAck(new MessageAck());
+                            poisonAck->setAckType(ActiveMQConstants::ACK_TYPE_POISON);
+                            poisonAck->setConsumerId(this->consumerInfo->getConsumerId());
+                            poisonAck->setDestination(dispatch->getDestination());
+                            poisonAck->setMessageCount(1);
+                            poisonAck->setLastMessageId(dispatch->getMessage()->getMessageId());
+                            poisonAck->setFirstMessageId(dispatch->getMessage()->getMessageId());
+// TODO
+//                            poisonAck.setPoisonCause(new JMSException("Duplicate dispatch with transacted redeliver pending on another consumer, connection: "
+//                                    + session.getConnection().getConnectionInfo().getConnectionId()));
+                            session->sendAck(poisonAck);
+                        } else {
+                            if (this->internal->transactedIndividualAck) {
+                                immediateIndividualTransactedAck(dispatch);
+                            } else {
+                                Pointer<MessageAck> ack(new MessageAck());
+                                ack->setAckType(ActiveMQConstants::ACK_TYPE_DELIVERED);
+                                ack->setConsumerId(this->consumerInfo->getConsumerId());
+                                ack->setDestination(dispatch->getDestination());
+                                ack->setMessageCount(1);
+                                ack->setLastMessageId(dispatch->getMessage()->getMessageId());
+                                ack->setFirstMessageId(dispatch->getMessage()->getMessageId());
+                                session->sendAck(ack);
+                            }
                         }
                     }
                 }
@@ -1187,7 +1669,7 @@ void ActiveMQConsumerKernel::sendPullReq
 
     try {
 
-        this->checkClosed();
+        clearDispatchList();
 
         // There are still local message, consume them first.
         if (!this->internal->unconsumedMessages->isEmpty()) {
@@ -1217,6 +1699,16 @@ void ActiveMQConsumerKernel::checkClosed
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::checkMessageListener() const {
+    if (this->internal->listener != NULL) {
+        throw cms::IllegalStateException(
+            "Cannot synchronously receive a message when a MessageListener is set");
+    }
+
+    this->session->checkMessageListener();
+}
+
+////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQConsumerKernel::iterate() {
 
     synchronized(&this->internal->listenerMutex) {
@@ -1234,7 +1726,6 @@ bool ActiveMQConsumerKernel::iterate() {
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumerKernel::inProgressClearRequired() {
-
     this->internal->inProgressClearRequiredFlag = true;
     // Clears dispatched messages async to avoid lock contention with inprogress acks.
     this->internal->clearDispatchList = true;
@@ -1246,17 +1737,35 @@ void ActiveMQConsumerKernel::clearMessag
         synchronized(this->internal->unconsumedMessages.get()) {
             if (this->internal->inProgressClearRequiredFlag) {
 
-                // TODO - Rollback duplicates.
+                // ensure messages that were not yet consumed are rolled back up front as they
+                // may get redelivered to another consumer by the Broker.
+                std::vector< Pointer<MessageDispatch> > list = this->internal->unconsumedMessages->removeAll();
+                if (!this->consumerInfo->isBrowser()) {
+                    std::vector< Pointer<MessageDispatch> >::const_iterator iter = list.begin();
+
+                    for (; iter != list.end(); ++iter) {
+                        Pointer<MessageDispatch> md = *iter;
+                        this->session->getConnection()->rollbackDuplicate(this, md->getMessage());
+                    }
+                }
 
                 // allow dispatch on this connection to resume
                 this->session->getConnection()->setTransportInterruptionProcessingComplete();
                 this->internal->inProgressClearRequiredFlag = false;
+
+                // Wake up any blockers and allow them to recheck state.
+                this->internal->unconsumedMessages->notifyAll();
             }
         }
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::clearDispatchList() {
+    this->internal->doClearDispatchList();
+}
+
+////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQConsumerKernel::isAutoAcknowledgeEach() const {
     return this->session->isAutoAcknowledge() ||
            (this->session->isDupsOkAcknowledge() && this->consumerInfo->getDestination()->isQueue());
@@ -1273,12 +1782,11 @@ int ActiveMQConsumerKernel::getMessageAv
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::applyDestinationOptions(const Pointer<ConsumerInfo>& info) {
+void ActiveMQConsumerKernel::applyDestinationOptions(Pointer<ConsumerInfo> info) {
 
     decaf::lang::Pointer<commands::ActiveMQDestination> amqDestination = info->getDestination();
 
-    // Get any options specified in the destination and apply them to the
-    // ConsumerInfo object.
+    // Get any options specified in the destination and apply them to the ConsumerInfo object.
     const ActiveMQProperties& options = amqDestination->getOptions();
 
     std::string noLocalStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_NOLOCAL);
@@ -1307,7 +1815,6 @@ void ActiveMQConsumerKernel::applyDestin
     }
 
     std::string maxPendingMsgLimitStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CUNSUMER_MAXPENDINGMSGLIMIT);
-
     if (options.hasProperty(maxPendingMsgLimitStr)) {
         info->setMaximumPendingMessageLimit(Integer::parseInt(options.getProperty(maxPendingMsgLimitStr)));
     }
@@ -1322,11 +1829,10 @@ void ActiveMQConsumerKernel::applyDestin
         info->setRetroactive(Boolean::parseBoolean(options.getProperty(retroactiveStr)));
     }
 
-    std::string networkSubscriptionStr = "consumer.networkSubscription";
-
-    if (options.hasProperty(networkSubscriptionStr)) {
-        info->setNetworkSubscription(Boolean::parseBoolean(options.getProperty(networkSubscriptionStr)));
-    }
+    this->internal->nonBlockingRedelivery = Boolean::parseBoolean(
+        options.getProperty("consumer.nonBlockingRedelivery", "false"));
+    this->internal->nonBlockingRedelivery = Boolean::parseBoolean(
+        options.getProperty("consumer.transactedIndividualAck", "false"));
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1372,7 +1878,7 @@ bool ActiveMQConsumerKernel::isSynchroni
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::setSynchronizationRegistered( bool value ) {
+void ActiveMQConsumerKernel::setSynchronizationRegistered(bool value) {
     this->internal->synchronizationRegistered = value;
 }
 
@@ -1382,11 +1888,31 @@ long long ActiveMQConsumerKernel::getLas
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::setLastDeliveredSequenceId( long long value ) {
+void ActiveMQConsumerKernel::setLastDeliveredSequenceId(long long value) {
     this->internal->lastDeliveredSequenceId = value;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumerKernel::isTransactedIndividualAck() const {
+    return this->internal->transactedIndividualAck;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::setTransactedIndividualAck(bool value) {
+    this->internal->transactedIndividualAck = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConsumerKernel::setFailoverRedeliveryWaitPeriod() const {
+    return this->internal->failoverRedeliveryWaitPeriod;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::setFailoverRedeliveryWaitPeriod(long long value) {
+    this->internal->failoverRedeliveryWaitPeriod = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumerKernel::setFailureError(decaf::lang::Exception* error) {
     if (error != NULL) {
         this->internal->failureError.reset(error->clone());
@@ -1427,3 +1953,49 @@ cms::MessageAvailableListener* ActiveMQC
 int ActiveMQConsumerKernel::getHashCode() const {
     return this->internal->hashCode;
 }
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConsumerKernel::getOptimizedAckScheduledAckInterval() const {
+    return this->internal->optimizedAckScheduledAckInterval;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::setOptimizedAckScheduledAckInterval(long long value) {
+    this->internal->optimizedAckScheduledAckInterval = value;
+
+    if (this->internal->optimizedAckTask != NULL) {
+        try {
+            this->session->getScheduler()->cancel(this->internal->optimizedAckTask);
+            this->internal->optimizedAckTask = NULL;
+        } catch (Exception& e) {
+            this->internal->optimizedAckTask = NULL;
+            throw CMSExceptionSupport::create(e);
+        }
+    }
+
+    // Should we periodically send out all outstanding acks.
+    if (this->internal->optimizeAcknowledge && this->internal->optimizedAckScheduledAckInterval > 0) {
+        this->internal->optimizedAckTask = new OptimizedAckTask(this, this->internal);
+
+        try {
+            this->session->getScheduler()->executePeriodically(
+                this->internal->optimizedAckTask, this->internal->optimizedAckScheduledAckInterval);
+        } catch (Exception& e) {
+            throw CMSExceptionSupport::create(e);
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumerKernel::isOptimizeAcknowledge() const {
+    return this->internal->optimizeAcknowledge;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::setOptimizeAcknowledge(bool value) {
+    if (this->internal->optimizeAcknowledge && !value) {
+        deliverAcks();
+    }
+
+    this->internal->optimizeAcknowledge = value;
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h Mon Apr  1 21:56:01 2013
@@ -109,8 +109,6 @@ namespace kernels {
 
         virtual std::string getMessageSelector() const;
 
-        virtual void acknowledge(const Pointer<commands::MessageDispatch>& dispatch);
-
         virtual void setMessageTransformer(cms::MessageTransformer* transformer);
 
         virtual cms::MessageTransformer* getMessageTransformer() const;
@@ -131,6 +129,20 @@ namespace kernels {
         void acknowledge();
 
         /**
+         * Method called to acknowledge the Message contained in the given MessageDispatch
+         *
+         * @throw CMSException if an error occurs while ack'ing the message.
+         */
+        void acknowledge(Pointer<commands::MessageDispatch> dispatch);
+
+        /**
+         * Method called to acknowledge the Message contained in the given MessageDispatch
+         *
+         * @throw CMSException if an error occurs while ack'ing the message.
+         */
+        void acknowledge(Pointer<commands::MessageDispatch> dispatch, int ackType);
+
+        /**
          * Called to Commit the current set of messages in this Transaction
          *
          * @throw ActiveMQException if an error occurs while performing the operation.
@@ -220,6 +232,37 @@ namespace kernels {
         long long getLastDeliveredSequenceId() const;
 
         /**
+         * Will Message's in a transaction be acknowledged using the Individual Acknowledge mode.
+         *
+         * @return true if individual transacted acknowledge is enabled.
+         */
+        bool isTransactedIndividualAck() const;
+
+        /**
+         * Set if Message's in a transaction be acknowledged using the Individual Acknowledge mode.
+         *
+         * @param value
+         *      True if individual transacted acknowledge is enabled.
+         */
+        void setTransactedIndividualAck(bool value);
+
+        /**
+         * Returns the delay after a failover before Message redelivery starts.
+         *
+         * @returns time in milliseconds to wait after failover.
+         */
+        long long setFailoverRedeliveryWaitPeriod() const;
+
+        /**
+         * Sets the time in milliseconds to delay after failover before starting
+         * message redelivery.
+         *
+         * @param value
+         *      Time in milliseconds to delay after failover for redelivery start.
+         */
+        void setFailoverRedeliveryWaitPeriod(long long value);
+
+        /**
          * Sets the value of the Last Delivered Sequence Id
          *
          * @param value
@@ -280,6 +323,37 @@ namespace kernels {
          */
         bool isInUse(Pointer<commands::ActiveMQDestination> destination) const;
 
+        /**
+         * Time in Milliseconds before an automatic acknowledge is done for any outstanding
+         * delivered Messages.  A value less than one means no task is scheduled.
+         *
+         * @returns time in milliseconds for the scheduled ack task.
+         */
+        long long getOptimizedAckScheduledAckInterval() const;
+
+        /**
+         * Sets the time in Milliseconds to schedule an automatic acknowledge of outstanding
+         * messages when optimize acknowledge is enabled.  A value less than one means disable
+         * any scheduled tasks.
+         *
+         * @param value
+         *      The time interval to send scheduled acks.
+         */
+        void setOptimizedAckScheduledAckInterval(long long value);
+
+        /**
+         * @returns true if this consumer is using optimize acknowledge mode.
+         */
+        bool isOptimizeAcknowledge() const;
+
+        /**
+         * Enable or disable optimized acknowledge for this consumer.
+         *
+         * @param value
+         *      True if optimize acknowledge is enabled, false otherwise.
+         */
+        void setOptimizeAcknowledge(bool value);
+
     protected:
 
         /**
@@ -303,50 +377,41 @@ namespace kernels {
          * Pre-consume processing
          * @param dispatch - the message being consumed.
          */
-        void beforeMessageIsConsumed(const Pointer<commands::MessageDispatch>& dispatch);
+        void beforeMessageIsConsumed(Pointer<commands::MessageDispatch> dispatch);
 
         /**
          * Post-consume processing
          * @param dispatch - the consumed message
          * @param messageExpired - flag indicating if the message has expired.
          */
-        void afterMessageIsConsumed(const Pointer<commands::MessageDispatch>& dispatch, bool messageExpired);
+        void afterMessageIsConsumed(Pointer<commands::MessageDispatch> dispatch, bool messageExpired);
 
     private:
 
-        // Creates a deliverable cms::Message from a received MessageDispatch, transforming if needed
-        // and configuring appropriate ack handlers.
         Pointer<cms::Message> createCMSMessage(Pointer<commands::MessageDispatch> dispatch);
 
-        // Using options from the Destination URI override any settings that are
-        // defined for this consumer.
-        void applyDestinationOptions(const Pointer<commands::ConsumerInfo>& info);
-
-        // If supported sends a message pull request to the service provider asking
-        // for the delivery of a new message.  This is used in the case where the
-        // service provider has been configured with a zero prefetch or is only
-        // capable of delivering messages on a pull basis.  No request is made if
-        // there are already messages in the unconsumed queue since there's no need
-        // for a server round-trip in that instance.
+        void applyDestinationOptions(Pointer<commands::ConsumerInfo> info);
+
         void sendPullRequest(long long timeout);
 
-        // Checks for the closed state and throws if so.
         void checkClosed() const;
 
-        // Sends an ack as needed in order to keep them coming in if the current
-        // ack mode allows the consumer to receive up to the prefetch limit before
-        // an real ack is sent.
-        void ackLater(const Pointer<commands::MessageDispatch>& message, int ackType);
+        void checkMessageListener() const;
+
+        void ackLater(Pointer<commands::MessageDispatch> message, int ackType);
+
+        void immediateIndividualTransactedAck(Pointer<commands::MessageDispatch> dispatch);
 
-        // Create an Ack Message that acks all messages that have been delivered so far.
         Pointer<commands::MessageAck> makeAckForAllDeliveredMessages(int type);
 
-        // Should Acks be sent on each dispatched message
         bool isAutoAcknowledgeEach() const;
 
-        // Can Acks be batched for less network overhead.
         bool isAutoAcknowledgeBatch() const;
 
+        void registerSync();
+
+        void clearDispatchList();
+
     };
 
 }}}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp Mon Apr  1 21:56:01 2013
@@ -105,7 +105,8 @@ namespace kernels{
 
         SessionConfig() : synchronizationRegistered(false),
                           producerLock(), producers(), consumerLock(), consumers(),
-                          scheduler(), closeSync(), sendMutex(), transformer(NULL), hashCode() {}
+                          scheduler(), closeSync(), sendMutex(), transformer(NULL),
+                          hashCode() {}
         ~SessionConfig() {}
     };
 
@@ -1455,3 +1456,32 @@ Pointer<commands::ProducerId> ActiveMQSe
 int ActiveMQSessionKernel::getHashCode() const {
     return this->config->hashCode;
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::checkMessageListener() const {
+
+    this->config->consumerLock.readLock().lock();
+    try {
+        Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
+        while (iter->hasNext()) {
+            Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+            if (consumer->getMessageListener() != NULL) {
+                throw cms::IllegalStateException(
+                    "Cannot synchronously receive a message when a MessageListener is set");
+            }
+        }
+        this->config->consumerLock.readLock().unlock();
+    } catch (Exception& ex) {
+        this->config->consumerLock.readLock().unlock();
+        throw;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::sendAck(Pointer<MessageAck> ack, bool async) {
+    if (async || this->connection->isSendAcksAsync() || this->isTransacted()) {
+        this->connection->oneway(ack);
+    } else {
+        this->connection->syncRequest(ack);
+    }
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h Mon Apr  1 21:56:01 2013
@@ -29,6 +29,7 @@
 #include <activemq/core/kernels/ActiveMQProducerKernel.h>
 #include <activemq/commands/ActiveMQTempDestination.h>
 #include <activemq/commands/Response.h>
+#include <activemq/commands/MessageAck.h>
 #include <activemq/commands/SessionInfo.h>
 #include <activemq/commands/ConsumerInfo.h>
 #include <activemq/commands/ConsumerId.h>
@@ -542,8 +543,31 @@ namespace kernels {
          */
         bool iterateConsumers();
 
+        /**
+         * Checks if any MessageConsumer owned by this Session has a set MessageListener
+         * and throws an exception if so.  This enforces the rule that the MessageConsumers
+         * belonging to a Session either operate in sync or async receive as a group.
+         */
+        void checkMessageListener() const;
+
+        /**
+         * Returns a Hash Code for this Session based on its SessionId.
+         *
+         * @returns an int hash code based on the string balue of SessionId.
+         */
         virtual int getHashCode() const;
 
+        /**
+         * Sends the given MessageAck command to the Broker either via Synchronous call or
+         * an Asynchronous call depending on the value of the async parameter.
+         *
+         * @param ack
+         *      The MessageAck command to send.
+         * @param async
+         *      True if the command can be sent asynchronously.
+         */
+        void sendAck(decaf::lang::Pointer<commands::MessageAck> ack, bool async = false);
+
    private:
 
        /**

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp Mon Apr  1 21:56:01 2013
@@ -18,6 +18,7 @@
 #include "ConnectionAuditTest.h"
 
 #include <activemq/core/ConnectionAudit.h>
+#include <activemq/core/Dispatcher.h>
 #include <activemq/core/ActiveMQMessageAudit.h>
 #include <activemq/util/IdGenerator.h>
 #include <activemq/commands/Message.h>
@@ -104,7 +105,7 @@ void ConnectionAuditTest::testIsDuplicat
         list.add(id);
 
         message->setMessageId(id);
-        CPPUNIT_ASSERT(!audit.isDuplicate(dispatcher, message));
+        CPPUNIT_ASSERT(!audit.isDuplicate(dispatcher.get(), message));
     }
 
     int index = list.size() -1 -audit.getAuditDepth();
@@ -112,7 +113,7 @@ void ConnectionAuditTest::testIsDuplicat
         Pointer<MessageId> id = list.get(index);
         message->setMessageId(id);
         CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(),
-                               audit.isDuplicate(dispatcher, message));
+                               audit.isDuplicate(dispatcher.get(), message));
     }
 }
 
@@ -140,7 +141,7 @@ void ConnectionAuditTest::testRollbackDu
         list.add(id);
 
         message->setMessageId(id);
-        CPPUNIT_ASSERT(!audit.isDuplicate(dispatcher, message));
+        CPPUNIT_ASSERT(!audit.isDuplicate(dispatcher.get(), message));
     }
 
     int index = list.size() -1 -audit.getAuditDepth();
@@ -148,9 +149,9 @@ void ConnectionAuditTest::testRollbackDu
         Pointer<MessageId> id = list.get(index);
         message->setMessageId(id);
         CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(),
-                               audit.isDuplicate(dispatcher, message));
-        audit.rollbackDuplicate(dispatcher, message);
+                               audit.isDuplicate(dispatcher.get(), message));
+        audit.rollbackDuplicate(dispatcher.get(), message);
         CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(),
-                               !audit.isDuplicate(dispatcher, message));
+                               !audit.isDuplicate(dispatcher.get(), message));
     }
 }