You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/04/01 23:56:01 UTC
svn commit: r1463311 [2/2] - in
/activemq/activemq-cpp/trunk/activemq-cpp/src: main/activemq/core/
main/activemq/core/kernels/ test/activemq/core/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp Mon Apr 1 21:56:01 2013
@@ -25,6 +25,9 @@
#include <decaf/lang/Boolean.h>
#include <decaf/lang/Integer.h>
#include <decaf/lang/Long.h>
+#include <decaf/util/HashMap.h>
+#include <decaf/util/concurrent/ExecutorService.h>
+#include <decaf/util/concurrent/Executors.h>
#include <activemq/util/Config.h>
#include <activemq/util/CMSExceptionSupport.h>
#include <activemq/util/ActiveMQProperties.h>
@@ -63,10 +66,22 @@ using namespace decaf::util;
using namespace decaf::util::concurrent;
////////////////////////////////////////////////////////////////////////////////
-namespace activemq{
+namespace activemq {
namespace core {
namespace kernels {
+ class PreviouslyDeliveredMap : public HashMap<Pointer<MessageId>, bool> {
+ public:
+
+ Pointer<TransactionId> transactionId;
+
+ PreviouslyDeliveredMap(Pointer<TransactionId> transactionId) :
+ transactionId(transactionId) {
+ }
+
+ virtual ~PreviouslyDeliveredMap() {}
+ };
+
class ActiveMQConsumerKernelConfig {
private:
@@ -95,6 +110,21 @@ namespace kernels {
Pointer<Exception> failureError;
Pointer<Scheduler> scheduler;
int hashCode;
+ Pointer<PreviouslyDeliveredMap> previouslyDeliveredMessages;
+ long long failoverRedeliveryWaitPeriod;
+ bool transactedIndividualAck;
+ bool nonBlockingRedelivery;
+ bool optimizeAcknowledge;
+ long long optimizeAckTimestamp;
+ long long optimizeAcknowledgeTimeOut;
+ long long optimizedAckScheduledAckInterval;
+ Runnable* optimizedAckTask;
+ int ackCounter;
+ int dispatchedCount;
+ Pointer<ExecutorService> executor;
+ ActiveMQSessionKernel* session;
+ ActiveMQConsumerKernel* parent;
+ Pointer<ConsumerInfo> info;
ActiveMQConsumerKernelConfig() : listener(NULL),
messageAvailableListener(NULL),
@@ -115,8 +145,185 @@ namespace kernels {
redeliveryPolicy(),
failureError(),
scheduler(),
- hashCode() {
+ hashCode(),
+ previouslyDeliveredMessages(),
+ failoverRedeliveryWaitPeriod(0),
+ transactedIndividualAck(false),
+ nonBlockingRedelivery(false),
+ optimizeAcknowledge(false),
+ optimizeAckTimestamp(System::currentTimeMillis()),
+ optimizeAcknowledgeTimeOut(),
+ optimizedAckScheduledAckInterval(),
+ optimizedAckTask(),
+ ackCounter(),
+ dispatchedCount(),
+ executor(),
+ session(),
+ parent(),
+ info() {
+ }
+
+ bool isTimeForOptimizedAck(int prefetchSize) const {
+ if (ackCounter + deliveredCounter >= (prefetchSize * 0.65)) {
+ return true;
+ }
+
+ long long nextAckTime = optimizeAckTimestamp + optimizeAcknowledgeTimeOut;
+
+ if (optimizeAcknowledgeTimeOut > 0 && System::currentTimeMillis() >= nextAckTime) {
+ return true;
+ }
+
+ return false;
+ }
+
+ void doClearMessagesInProgress() {
+ if (this->inProgressClearRequiredFlag) {
+ synchronized(this->unconsumedMessages.get()) {
+ if (this->inProgressClearRequiredFlag) {
+
+ // ensure messages that were not yet consumed are rolled back up front as they
+ // may get redelivered to another consumer by the Broker.
+ std::vector< Pointer<MessageDispatch> > list = this->unconsumedMessages->removeAll();
+ if (!this->info->isBrowser()) {
+ std::vector<Pointer<MessageDispatch> >::const_iterator iter = list.begin();
+
+ for (; iter != list.end(); ++iter) {
+ Pointer<MessageDispatch> md = *iter;
+ this->session->getConnection()->rollbackDuplicate(this->parent, md->getMessage());
+ }
+ }
+
+ // allow dispatch on this connection to resume
+ this->session->getConnection()->setTransportInterruptionProcessingComplete();
+ this->inProgressClearRequiredFlag = false;
+
+ // Wake up any blockers and allow them to recheck state.
+ this->unconsumedMessages->notifyAll();
+ }
+ }
+ }
+ }
+
+ void doClearDispatchList() {
+ if (clearDispatchList) {
+ synchronized (&this->dispatchedMessages) {
+ if (clearDispatchList) {
+ if (!dispatchedMessages.isEmpty()) {
+ if (session->isTransacted()) {
+ if (previouslyDeliveredMessages == NULL) {
+ previouslyDeliveredMessages.reset(new PreviouslyDeliveredMap(
+ session->getTransactionContext()->getTransactionId()));
+ }
+
+ Pointer<Iterator<Pointer<MessageDispatch> > > iter(dispatchedMessages.iterator());
+
+ while (iter->hasNext()) {
+ Pointer<MessageDispatch> dispatch = iter->next();
+ previouslyDeliveredMessages->put(dispatch->getMessage()->getMessageId(), false);
+ }
+ } else {
+ dispatchedMessages.clear();
+ pendingAck.reset(NULL);
+ }
+ }
+ clearDispatchList = false;
+ }
+ }
+ }
+ }
+
+ void doClearPreviouslyDelivered() {
+ if (previouslyDeliveredMessages != NULL) {
+ previouslyDeliveredMessages->clear();
+ previouslyDeliveredMessages.reset(NULL);
+ }
}
+
+ // called with deliveredMessages locked
+ void removeFromDeliveredMessages(Pointer<MessageId> key) {
+ Pointer< Iterator< Pointer<MessageDispatch> > > iter(this->dispatchedMessages.iterator());
+ while (iter->hasNext()) {
+ Pointer<MessageDispatch> candidate = iter->next();
+ if (key->equals(candidate->getMessage()->getMessageId().get())) {
+ session->getConnection()->rollbackDuplicate(this->parent, candidate->getMessage());
+ iter->remove();
+ break;
+ }
+ }
+ }
+
+ // called with unconsumedMessages && deliveredMessages locked remove any message
+ // not re-delivered as they can't be replayed to this consumer on rollback
+ void rollbackPreviouslyDeliveredAndNotRedelivered() {
+ if (previouslyDeliveredMessages != NULL) {
+
+ Set<MapEntry<Pointer<MessageId>, bool> >& entries = previouslyDeliveredMessages->entrySet();
+ Pointer<Iterator<MapEntry<Pointer<MessageId>, bool> > > iter(entries.iterator());
+ while (iter->hasNext()) {
+ MapEntry<Pointer<MessageId>, bool> entry = iter->next();
+ if (!entry.getValue()) {
+ removeFromDeliveredMessages(entry.getKey());
+ }
+ }
+
+ doClearPreviouslyDelivered();
+ }
+ }
+
+ void rollbackOnFailedRecoveryRedelivery() {
+ if (previouslyDeliveredMessages != NULL) {
+ // if any previously delivered messages was not re-delivered, transaction is invalid
+ // and must roll back as messages have been dispatched elsewhere.
+ int numberNotReplayed = 0;
+ Set<MapEntry<Pointer<MessageId>, bool> >& entries = previouslyDeliveredMessages->entrySet();
+ Pointer<Iterator<MapEntry<Pointer<MessageId>, bool> > > iter(entries.iterator());
+ while (iter->hasNext()) {
+ MapEntry<Pointer<MessageId>, bool> entry = iter->next();
+ if (!entry.getValue()) {
+ numberNotReplayed++;
+ }
+ }
+ if (numberNotReplayed > 0) {
+ std::string message = std::string("rolling back transaction (") +
+ previouslyDeliveredMessages->transactionId->toString() +
+ ") post failover recovery. " + Integer::toString(numberNotReplayed) +
+ " previously delivered message(s) not replayed to consumer: " +
+ info->getConsumerId()->toString();
+ throw cms::TransactionRolledBackException(message);
+ }
+ }
+ }
+
+ void waitForRedeliveries() {
+ if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != NULL) {
+ long expiry = System::currentTimeMillis() + failoverRedeliveryWaitPeriod;
+ int numberNotReplayed;
+ do {
+ numberNotReplayed = 0;
+ synchronized (&this->dispatchedMessages) {
+ if (previouslyDeliveredMessages != NULL) {
+ Set<MapEntry<Pointer<MessageId>, bool> >& entries = previouslyDeliveredMessages->entrySet();
+ Pointer<Iterator<MapEntry<Pointer<MessageId>, bool> > > iter(entries.iterator());
+ while (iter->hasNext()) {
+ MapEntry<Pointer<MessageId>, bool> entry = iter->next();
+ if (!entry.getValue()) {
+ numberNotReplayed++;
+ }
+ }
+ }
+ }
+ if (numberNotReplayed > 0) {
+ try {
+ Thread::sleep(Math::max(500LL, failoverRedeliveryWaitPeriod/4));
+ } catch (InterruptedException& ex) {
+ break;
+ }
+ }
+ } while (numberNotReplayed > 0 && expiry < System::currentTimeMillis());
+ }
+ }
+
};
}}}
@@ -150,7 +357,16 @@ namespace {
virtual ~TransactionSynhcronization() {}
virtual void beforeEnd() {
- consumer->acknowledge();
+
+ if (false) { // TODO transactedIndividualAck) {
+// consumer->clearDispatchList();
+// consumer->waitForRedeliveries();
+// synchronized(deliveredMessages) {
+// consumer->rollbackOnFailedRecoveryRedelivery();
+// }
+ } else {
+ consumer->acknowledge();
+ }
consumer->setSynchronizationRegistered(false);
}
@@ -276,6 +492,7 @@ namespace {
private:
Pointer<ActiveMQConsumerKernel> consumer;
+ ActiveMQSessionKernel* session;
private:
@@ -284,7 +501,9 @@ namespace {
public:
- StartConsumerTask(Pointer<ActiveMQConsumerKernel> consumer) : Runnable(), consumer(consumer) {
+ StartConsumerTask(Pointer<ActiveMQConsumerKernel> consumer, ActiveMQSessionKernel* session) :
+ Runnable(), consumer(consumer), session(session) {
+
if (consumer == NULL) {
throw NullPointerException(
__FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
@@ -299,11 +518,105 @@ namespace {
this->consumer->start();
}
} catch(cms::CMSException& ex) {
- // TODO - Need Connection onAsyncException method.
+ // TODO
+ // this->session->getConnection()->onAsyncException(ex);
}
}
};
+ class AsyncMessageAckTask : public Runnable {
+ private:
+
+ Pointer<MessageAck> ack;
+ ActiveMQSessionKernel* session;
+ ActiveMQConsumerKernelConfig* impl;
+
+ private:
+
+ AsyncMessageAckTask(const AsyncMessageAckTask&);
+ AsyncMessageAckTask& operator=(const AsyncMessageAckTask&);
+
+ public:
+
+ AsyncMessageAckTask(Pointer<MessageAck> ack, ActiveMQSessionKernel* session, ActiveMQConsumerKernelConfig* impl) :
+ Runnable(), ack(ack), session(session), impl(impl) {}
+ virtual ~AsyncMessageAckTask() {}
+
+ virtual void run() {
+ try {
+ this->session->sendAck(ack, true);
+ this->impl->deliveringAcks.set(false);
+ } catch(Exception& ex) {
+ this->impl->deliveringAcks.set(false);
+ } catch(cms::CMSException& ex) {
+ this->impl->deliveringAcks.set(false);
+ }
+ }
+ };
+
+ class OptimizedAckTask : public Runnable {
+ private:
+
+ ActiveMQConsumerKernel* consumer;
+ ActiveMQConsumerKernelConfig* impl;
+
+ private:
+
+ OptimizedAckTask(const OptimizedAckTask&);
+ OptimizedAckTask& operator=(const OptimizedAckTask&);
+
+ public:
+
+ OptimizedAckTask(ActiveMQConsumerKernel* consumer, ActiveMQConsumerKernelConfig* impl) :
+ Runnable(), consumer(consumer), impl(impl) {}
+ virtual ~OptimizedAckTask() {}
+
+ virtual void run() {
+ try {
+ if (impl->optimizeAcknowledge && !impl->unconsumedMessages->isClosed()) {
+ this->consumer->deliverAcks();
+ }
+ } catch(Exception& ex) {
+ }
+ }
+ };
+
+ class NonBlockingRedeliveryTask : public Runnable {
+ private:
+
+ ActiveMQSessionKernel* session;
+ Pointer<ActiveMQConsumerKernel> consumer;
+ ActiveMQConsumerKernelConfig* impl;
+ LinkedList<Pointer<MessageDispatch> > redeliveries;
+
+ private:
+
+ NonBlockingRedeliveryTask(const NonBlockingRedeliveryTask&);
+ NonBlockingRedeliveryTask& operator=(const NonBlockingRedeliveryTask&);
+
+ public:
+
+ NonBlockingRedeliveryTask(ActiveMQSessionKernel* session, Pointer<ActiveMQConsumerKernel> consumer, ActiveMQConsumerKernelConfig* impl) :
+ Runnable(), session(session), consumer(consumer), impl(impl), redeliveries() {
+
+ this->redeliveries.copy(impl->dispatchedMessages);
+ }
+ virtual ~NonBlockingRedeliveryTask() {}
+
+ virtual void run() {
+ try {
+ if (!impl->unconsumedMessages->isClosed()) {
+ Pointer<Iterator<Pointer<MessageDispatch> > > iter(redeliveries.iterator());
+ while (iter->hasNext() && !impl->unconsumedMessages->isClosed()) {
+ Pointer<MessageDispatch> dispatch = iter->next();
+ session->dispatch(dispatch);
+ }
+ }
+ } catch (Exception& e) {
+ session->getConnection()->onAsyncException(e);
+ }
+ }
+ };
}
////////////////////////////////////////////////////////////////////////////////
@@ -356,14 +669,20 @@ ActiveMQConsumerKernel::ActiveMQConsumer
consumerInfo->setSubscriptionName(name);
consumerInfo->setSelector(selector);
consumerInfo->setPrefetchSize(prefetch);
+ consumerInfo->setCurrentPrefetchSize(prefetch);
consumerInfo->setMaximumPendingMessageLimit(maxPendingMessageCount);
consumerInfo->setBrowser(browser);
consumerInfo->setDispatchAsync(dispatchAsync);
consumerInfo->setNoLocal(noLocal);
+ consumerInfo->setExclusive(session->getConnection()->isExclusiveConsumer());
+ consumerInfo->setRetroactive(session->getConnection()->isUseRetroactiveConsumer());
// Initialize Consumer Data
this->session = session;
this->consumerInfo = consumerInfo;
+ this->internal->session = session;
+ this->internal->parent = this;
+ this->internal->info = consumerInfo;
this->internal->hashCode = id->getHashCode();
this->internal->lastDeliveredSequenceId = -1;
this->internal->synchronizationRegistered = false;
@@ -388,6 +707,23 @@ ActiveMQConsumerKernel::ActiveMQConsumer
applyDestinationOptions(this->consumerInfo);
+ if (session->getConnection()->isOptimizeAcknowledge() && session->isAutoAcknowledge() && !consumerInfo->isBrowser()) {
+ this->internal->optimizeAcknowledge = true;
+ }
+
+ if (this->internal->optimizeAcknowledge) {
+ this->internal->optimizeAcknowledgeTimeOut = session->getConnection()->getOptimizeAcknowledgeTimeOut();
+ this->setOptimizedAckScheduledAckInterval(
+ session->getConnection()->getOptimizedAckScheduledAckInterval());
+ }
+
+ consumerInfo->setOptimizedAcknowledge(this->internal->optimizeAcknowledge);
+ this->internal->failoverRedeliveryWaitPeriod =
+ session->getConnection()->getConsumerFailoverRedeliveryWaitPeriod();
+ this->internal->nonBlockingRedelivery = session->getConnection()->isNonBlockingRedelivery();
+ this->internal->transactedIndividualAck =
+ session->getConnection()->isTransactedIndividualAck() || this->internal->nonBlockingRedelivery;
+
if (this->consumerInfo->getPrefetchSize() < 0) {
delete this->internal;
throw IllegalArgumentException(
@@ -423,7 +759,7 @@ void ActiveMQConsumerKernel::start() {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::stop() {
- this->internal->started.set( false );
+ this->internal->started.set(false);
this->internal->unconsumedMessages->stop();
}
@@ -443,8 +779,6 @@ void ActiveMQConsumerKernel::close() {
Pointer<Synchronization> sync(new CloseSynhcronization(this));
this->session->getTransactionContext()->addSynchronization(sync);
-
- doClose();
} else {
doClose();
}
@@ -457,6 +791,9 @@ void ActiveMQConsumerKernel::close() {
void ActiveMQConsumerKernel::doClose() {
try {
+ // Store interrupted state and clear so that Transport operations don't
+ // throw InterruptedException and we ensure that resources are clened up.
+ bool interrupted = Thread::interrupted();
dispose();
// Remove at the Broker Side, consumer has been removed from the local
@@ -466,6 +803,9 @@ void ActiveMQConsumerKernel::doClose() {
info->setObjectId(this->consumerInfo->getConsumerId());
info->setLastDeliveredSequenceId(this->internal->lastDeliveredSequenceId);
this->session->oneway(info);
+ if (interrupted) {
+ Thread::currentThread()->interrupt();
+ }
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -481,10 +821,40 @@ void ActiveMQConsumerKernel::dispose() {
if (!session->isTransacted()) {
deliverAcks();
+ if (isAutoAcknowledgeBatch()) {
+ acknowledge();
+ }
}
this->internal->started.set(false);
+ if (this->internal->executor != NULL) {
+ this->internal->executor->shutdown();
+ this->internal->executor->awaitTermination(60, TimeUnit::SECONDS);
+ this->internal->executor.reset(NULL);
+ }
+
+ if (this->internal->optimizedAckTask != NULL) {
+ this->session->getScheduler()->cancel(this->internal->optimizedAckTask);
+ this->internal->optimizedAckTask = NULL;
+ }
+
+ if (session->isClientAcknowledge()) {
+ if (!this->consumerInfo->isBrowser()) {
+ // roll back duplicates that aren't acknowledged
+ ArrayList< Pointer<MessageDispatch> > tmp;
+ synchronized(&this->internal->dispatchedMessages) {
+ tmp.copy(this->internal->dispatchedMessages);
+ }
+ Pointer< Iterator<Pointer<MessageDispatch> > > iter(tmp.iterator());
+ while (iter->hasNext()) {
+ Pointer<MessageDispatch> msg = iter->next();
+ this->session->getConnection()->rollbackDuplicate(this, msg->getMessage());
+ }
+ tmp.clear();
+ }
+ }
+
// Identifies any errors encountered during shutdown.
bool haveException = false;
ActiveMQException error;
@@ -553,8 +923,6 @@ decaf::lang::Pointer<MessageDispatch> Ac
try {
- this->checkClosed();
-
// Calculate the deadline
long long deadline = 0;
if (timeout > 0) {
@@ -566,7 +934,6 @@ decaf::lang::Pointer<MessageDispatch> Ac
Pointer<MessageDispatch> dispatch = this->internal->unconsumedMessages->dequeue(timeout);
if (dispatch == NULL) {
-
if (timeout > 0 && !this->internal->unconsumedMessages->isClosed()) {
timeout = Math::max(deadline - System::currentTimeMillis(), 0LL);
} else {
@@ -576,13 +943,9 @@ decaf::lang::Pointer<MessageDispatch> Ac
return Pointer<MessageDispatch> ();
}
}
-
} else if (dispatch->getMessage() == NULL) {
-
return Pointer<MessageDispatch> ();
-
} else if (dispatch->getMessage()->isExpired()) {
-
beforeMessageIsConsumed(dispatch);
afterMessageIsConsumed(dispatch, true);
if (timeout > 0) {
@@ -597,6 +960,9 @@ decaf::lang::Pointer<MessageDispatch> Ac
}
return Pointer<MessageDispatch>();
+ } catch (InterruptedException& ex) {
+ Thread::currentThread()->interrupt();
+ throw CMSExceptionSupport::create(ex);
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
@@ -607,6 +973,7 @@ cms::Message* ActiveMQConsumerKernel::re
try {
this->checkClosed();
+ this->checkMessageListener();
// Send a request for a new message if needed
this->sendPullRequest(0);
@@ -633,6 +1000,7 @@ cms::Message* ActiveMQConsumerKernel::re
try {
this->checkClosed();
+ this->checkMessageListener();
// Send a request for a new message if needed
this->sendPullRequest(millisecs);
@@ -659,6 +1027,7 @@ cms::Message* ActiveMQConsumerKernel::re
try {
this->checkClosed();
+ this->checkMessageListener();
// Send a request for a new message if needed
this->sendPullRequest(-1);
@@ -718,8 +1087,7 @@ void ActiveMQConsumerKernel::setMessageL
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::beforeMessageIsConsumed(const Pointer<MessageDispatch>& dispatch) {
-
+void ActiveMQConsumerKernel::beforeMessageIsConsumed(Pointer<MessageDispatch> dispatch) {
this->internal->lastDeliveredSequenceId = dispatch->getMessage()->getMessageId()->getBrokerSequenceId();
if (!isAutoAcknowledgeBatch()) {
@@ -730,54 +1098,100 @@ void ActiveMQConsumerKernel::beforeMessa
}
if (this->session->isTransacted()) {
- ackLater(dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED);
+ if (this->internal->transactedIndividualAck) {
+ immediateIndividualTransactedAck(dispatch);
+ } else {
+ ackLater(dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED);
+ }
}
}
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::afterMessageIsConsumed(const Pointer<MessageDispatch>& message, bool messageExpired ) {
+void ActiveMQConsumerKernel::immediateIndividualTransactedAck(Pointer<MessageDispatch> dispatch) {
+ // acks accumulate on the broker pending transaction completion to indicate delivery status
+ registerSync();
+
+ Pointer<MessageAck> ack(new MessageAck);
+
+ ack->setAckType(ActiveMQConstants::ACK_TYPE_INDIVIDUAL);
+ ack->setConsumerId(dispatch->getConsumerId());
+ ack->setDestination(dispatch->getDestination());
+ ack->setLastMessageId(dispatch->getMessage()->getMessageId());
+ ack->setMessageCount(1);
+ ack->setTransactionId(this->session->getTransactionContext()->getTransactionId());
+
+ this->session->syncRequest(ack);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::registerSync() {
+ this->session->doStartTransaction();
+ if (!this->internal->synchronizationRegistered) {
+ this->internal->synchronizationRegistered = true;
+ Pointer<Synchronization> sync(new TransactionSynhcronization(this));
+ this->session->getTransactionContext()->addSynchronization(sync);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::afterMessageIsConsumed(Pointer<MessageDispatch> message, bool messageExpired ) {
try {
if (this->internal->unconsumedMessages->isClosed()) {
return;
- }
-
- if (messageExpired == true) {
- synchronized(&this->internal->dispatchedMessages) {
- this->internal->dispatchedMessages.remove(message);
- }
+ } else if (messageExpired == true) {
ackLater(message, ActiveMQConstants::ACK_TYPE_DELIVERED);
return;
- }
-
- if (session->isTransacted()) {
+ } else if (session->isTransacted()) {
return;
- } else if (isAutoAcknowledgeEach()) {
+ }
+ if (isAutoAcknowledgeEach()) {
if (this->internal->deliveringAcks.compareAndSet(false, true)) {
-
synchronized(&this->internal->dispatchedMessages) {
if (!this->internal->dispatchedMessages.isEmpty()) {
- Pointer<MessageAck> ack = makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
+ if (this->internal->optimizeAcknowledge) {
- if (ack != NULL) {
- this->internal->dispatchedMessages.clear();
- session->oneway(ack);
+ this->internal->ackCounter++;
+ if (this->internal->isTimeForOptimizedAck(this->consumerInfo->getPrefetchSize())) {
+ Pointer<MessageAck> ack =
+ makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
+ if (ack != NULL) {
+ this->internal->dispatchedMessages.clear();
+ this->internal->ackCounter = 0;
+ this->session->sendAck(ack);
+ this->internal->optimizeAckTimestamp = System::currentTimeMillis();
+ }
+
+ // As further optimization send ack for expired messages when there
+ // are any. This resets the deliveredCounter to 0 so that we won't
+ // send standard acks with every message just because the deliveredCounter
+ // just below 0.5 * prefetch as used in ackLater()
+ if (this->internal->pendingAck != NULL && this->internal->deliveredCounter > 0) {
+ this->session->sendAck(this->internal->pendingAck);
+ this->internal->pendingAck.reset(NULL);
+ this->internal->deliveredCounter = 0;
+ }
+ }
+ } else {
+ Pointer<MessageAck> ack =
+ makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
+ if (ack != NULL) {
+ this->internal->dispatchedMessages.clear();
+ session->oneway(ack);
+ }
}
}
}
this->internal->deliveringAcks.set(false);
}
-
} else if (isAutoAcknowledgeBatch()) {
ackLater(message, ActiveMQConstants::ACK_TYPE_CONSUMED);
} else if (session->isClientAcknowledge() || session->isIndividualAcknowledge()) {
-
bool messageUnackedByConsumer = false;
-
synchronized(&this->internal->dispatchedMessages) {
messageUnackedByConsumer = this->internal->dispatchedMessages.contains(message);
}
@@ -785,9 +1199,8 @@ void ActiveMQConsumerKernel::afterMessag
if (messageUnackedByConsumer) {
this->ackLater(message, ActiveMQConstants::ACK_TYPE_DELIVERED);
}
-
} else {
- throw IllegalStateException( __FILE__, __LINE__, "Invalid Session State" );
+ throw IllegalStateException(__FILE__, __LINE__, "Invalid Session State");
}
}
AMQ_CATCH_RETHROW(ActiveMQException)
@@ -801,34 +1214,30 @@ void ActiveMQConsumerKernel::deliverAcks
try {
Pointer<MessageAck> ack;
-
if (this->internal->deliveringAcks.compareAndSet(false, true)) {
if (isAutoAcknowledgeEach()) {
-
synchronized(&this->internal->dispatchedMessages) {
-
ack = makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
-
if (ack != NULL) {
this->internal->dispatchedMessages.clear();
+ this->internal->ackCounter = 0;
} else {
ack.swap(internal->pendingAck);
}
}
-
- } else if (this->internal->pendingAck != NULL && this->internal->pendingAck->getAckType() == ActiveMQConstants::ACK_TYPE_CONSUMED) {
-
+ } else if (this->internal->pendingAck != NULL &&
+ this->internal->pendingAck->getAckType() == ActiveMQConstants::ACK_TYPE_CONSUMED) {
ack.swap(this->internal->pendingAck);
}
if (ack != NULL) {
-
- try {
- this->session->oneway(ack);
- } catch (...) {
+ if (this->internal->executor == NULL) {
+ this->internal->executor.reset(Executors::newSingleThreadExecutor());
}
+ this->internal->executor->submit(
+ new AsyncMessageAckTask(ack, this->session, this->internal), true);
} else {
this->internal->deliveringAcks.set(false);
}
@@ -840,18 +1249,12 @@ void ActiveMQConsumerKernel::deliverAcks
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::ackLater(const Pointer<MessageDispatch>& dispatch, int ackType) {
+void ActiveMQConsumerKernel::ackLater(Pointer<MessageDispatch> dispatch, int ackType) {
// Don't acknowledge now, but we may need to let the broker know the
// consumer got the message to expand the pre-fetch window
if (session->isTransacted()) {
- session->doStartTransaction();
- if (!this->internal->synchronizationRegistered) {
- this->internal->synchronizationRegistered = true;
-
- Pointer<Synchronization> sync(new TransactionSynhcronization(this));
- this->session->getTransactionContext()->addSynchronization(sync);
- }
+ registerSync();
}
// The delivered message list is only needed for the recover method
@@ -882,8 +1285,10 @@ void ActiveMQConsumerKernel::ackLater(co
this->internal->pendingAck->setTransactionId(this->session->getTransactionContext()->getTransactionId());
}
- if ((0.5 * this->consumerInfo->getPrefetchSize()) <= (internal->deliveredCounter - internal->additionalWindowSize)) {
- session->oneway(this->internal->pendingAck);
+ // Need to evaluate both expired and normal messages as otherwise consumer may get stalled
+ int pendingAcks = (internal->deliveredCounter + internal->ackCounter) - internal->additionalWindowSize;
+ if ((0.5 * this->consumerInfo->getPrefetchSize()) <= pendingAcks) {
+ session->sendAck(this->internal->pendingAck);
this->internal->pendingAck.reset(NULL);
this->internal->deliveredCounter = 0;
this->internal->additionalWindowSize = 0;
@@ -915,36 +1320,27 @@ Pointer<MessageAck> ActiveMQConsumerKern
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::acknowledge(const Pointer<commands::MessageDispatch>& dispatch) {
-
- try {
-
- this->checkClosed();
+void ActiveMQConsumerKernel::acknowledge(Pointer<commands::MessageDispatch> dispatch) {
+ this->acknowledge(dispatch, ActiveMQConstants::ACK_TYPE_INDIVIDUAL);
+}
- if (this->session->isIndividualAcknowledge()) {
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::acknowledge(Pointer<commands::MessageDispatch> dispatch, int ackType) {
- Pointer<MessageAck> ack(new MessageAck());
- ack->setAckType(ActiveMQConstants::ACK_TYPE_CONSUMED);
- ack->setConsumerId(this->consumerInfo->getConsumerId());
- ack->setDestination(this->consumerInfo->getDestination());
- ack->setMessageCount(1);
- ack->setLastMessageId(dispatch->getMessage()->getMessageId());
- ack->setFirstMessageId(dispatch->getMessage()->getMessageId());
+ try {
- session->oneway(ack);
+ Pointer<MessageAck> ack(new MessageAck());
+ ack->setAckType((unsigned char) ackType);
+ ack->setConsumerId(this->consumerInfo->getConsumerId());
+ ack->setDestination(this->consumerInfo->getDestination());
+ ack->setMessageCount(1);
+ ack->setLastMessageId(dispatch->getMessage()->getMessageId());
+ ack->setFirstMessageId(dispatch->getMessage()->getMessageId());
- synchronized(&this->internal->dispatchedMessages) {
- std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(this->internal->dispatchedMessages.iterator());
- while(iter->hasNext()) {
- if (iter->next() == dispatch) {
- iter->remove();
- break;
- }
- }
- }
+ session->sendAck(ack);
- } else {
- throw IllegalStateException(__FILE__, __LINE__, "Session is not in IndividualAcknowledge mode." );
+ synchronized(&this->internal->dispatchedMessages) {
+ this->internal->dispatchedMessages.remove(dispatch);
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -955,6 +1351,9 @@ void ActiveMQConsumerKernel::acknowledge
try {
+ clearDispatchList();
+ this->internal->waitForRedeliveries();
+
synchronized(&this->internal->dispatchedMessages) {
// Acknowledge all messages so far.
@@ -965,12 +1364,13 @@ void ActiveMQConsumerKernel::acknowledge
}
if (session->isTransacted()) {
+ this->internal->rollbackOnFailedRecoveryRedelivery();
session->doStartTransaction();
ack->setTransactionId(session->getTransactionContext()->getTransactionId());
}
- session->oneway(ack);
this->internal->pendingAck.reset(NULL);
+ session->sendAck(ack);
// Adjust the counters
this->internal->deliveredCounter = Math::max(0, this->internal->deliveredCounter - (int) this->internal->dispatchedMessages.size());
@@ -998,7 +1398,22 @@ void ActiveMQConsumerKernel::rollback()
synchronized(this->internal->unconsumedMessages.get()) {
+ if (this->internal->optimizeAcknowledge) {
+ // remove messages read but not acknowledged at the broker yet through optimizeAcknowledge
+ if (!this->consumerInfo->isBrowser()) {
+ synchronized(&this->internal->dispatchedMessages) {
+ for (int i = 0; (i < this->internal->dispatchedMessages.size()) &&
+ (i < this->internal->ackCounter); i++) {
+ // ensure we don't filter this as a duplicate
+ Pointer<MessageDispatch> md = this->internal->dispatchedMessages.removeLast();
+ session->getConnection()->rollbackDuplicate(this, md->getMessage());
+ }
+ }
+ }
+ }
+
synchronized(&this->internal->dispatchedMessages) {
+ this->internal->rollbackPreviouslyDeliveredAndNotRedelivered();
if (this->internal->dispatchedMessages.isEmpty()) {
return;
}
@@ -1014,15 +1429,16 @@ void ActiveMQConsumerKernel::rollback()
Pointer<MessageId> firstMsgId = this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId();
- std::auto_ptr<Iterator<Pointer<MessageDispatch> > > iter(internal->dispatchedMessages.iterator());
-
+ Pointer<Iterator<Pointer<MessageDispatch> > > iter(internal->dispatchedMessages.iterator());
while (iter->hasNext()) {
Pointer<Message> message = iter->next()->getMessage();
message->setRedeliveryCounter(message->getRedeliveryCounter() + 1);
+ // ensure we don't filter this as a duplicate
+ session->getConnection()->rollbackDuplicate(this, message);
}
- if (this->internal->redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES
- && lastMsg->getMessage()->getRedeliveryCounter() > this->internal->redeliveryPolicy->getMaximumRedeliveries()) {
+ if (this->internal->redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES &&
+ lastMsg->getMessage()->getRedeliveryCounter() > this->internal->redeliveryPolicy->getMaximumRedeliveries()) {
// We need to NACK the messages so that they get sent to the DLQ.
// Acknowledge the last message.
@@ -1033,8 +1449,9 @@ void ActiveMQConsumerKernel::rollback()
ack->setMessageCount((int) this->internal->dispatchedMessages.size());
ack->setLastMessageId(lastMsg->getMessage()->getMessageId());
ack->setFirstMessageId(firstMsgId);
+ // TODO - ack->setPoisonCause()
- session->oneway(ack);
+ session->sendAck(ack, true);
// Adjust the window size.
this->internal->additionalWindowSize = Math::max(0,
this->internal->additionalWindowSize - (int) this->internal->dispatchedMessages.size());
@@ -1055,22 +1472,33 @@ void ActiveMQConsumerKernel::rollback()
session->oneway(ack);
}
- // stop the delivery of messages.
- this->internal->unconsumedMessages->stop();
-
- std::auto_ptr<Iterator<Pointer<MessageDispatch> > > iter(
- this->internal->dispatchedMessages.iterator());
- while (iter->hasNext()) {
- this->internal->unconsumedMessages->enqueueFirst(iter->next());
- }
+ if (this->internal->nonBlockingRedelivery) {
- if (internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed()) {
- Pointer<ActiveMQConsumerKernel> self =
- this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
- this->internal->scheduler->executeAfterDelay(
- new StartConsumerTask(self), internal->redeliveryDelay);
+ if (!this->internal->unconsumedMessages->isClosed()) {
+ Pointer<ActiveMQConsumerKernel> self =
+ this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
+ this->session->getScheduler()->executeAfterDelay(
+ new NonBlockingRedeliveryTask(session, self, this->internal),
+ this->internal->redeliveryDelay);
+ }
} else {
- start();
+ // stop the delivery of messages.
+ this->internal->unconsumedMessages->stop();
+
+ std::auto_ptr<Iterator<Pointer<MessageDispatch> > > iter(
+ this->internal->dispatchedMessages.iterator());
+ while (iter->hasNext()) {
+ this->internal->unconsumedMessages->enqueueFirst(iter->next());
+ }
+
+ if (internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed()) {
+ Pointer<ActiveMQConsumerKernel> self =
+ this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
+ this->internal->scheduler->executeAfterDelay(
+ new StartConsumerTask(self, session), internal->redeliveryDelay);
+ } else {
+ start();
+ }
}
}
this->internal->deliveredCounter -= (int) internal->dispatchedMessages.size();
@@ -1088,40 +1516,94 @@ void ActiveMQConsumerKernel::dispatch(co
try {
- synchronized(this->internal->unconsumedMessages.get()) {
+ clearMessagesInProgress();
+ clearDispatchList();
- clearMessagesInProgress();
- if (this->internal->clearDispatchList) {
- // we are reconnecting so lets flush the in progress messages
- this->internal->clearDispatchList = false;
- this->internal->unconsumedMessages->clear();
- }
+ synchronized(this->internal->unconsumedMessages.get()) {
if (!this->internal->unconsumedMessages->isClosed()) {
- // Don't dispatch expired messages, ack it and then destroy it
- if (dispatch->getMessage() != NULL && dispatch->getMessage()->isExpired()) {
- this->ackLater(dispatch, ActiveMQConstants::ACK_TYPE_CONSUMED);
- return;
- }
-
- synchronized(&this->internal->listenerMutex) {
+ if (this->consumerInfo->isBrowser() || !session->getConnection()->isDuplicate(this, dispatch->getMessage())) {
- // If we have a listener, send the message.
- if (this->internal->listener != NULL && internal->unconsumedMessages->isRunning()) {
-
- Pointer<cms::Message> message = createCMSMessage(dispatch);
- beforeMessageIsConsumed(dispatch);
- this->internal->listener->onMessage(message.get());
- afterMessageIsConsumed(dispatch, false);
+ synchronized(&this->internal->listenerMutex) {
+ if (this->internal->listener != NULL && this->internal->unconsumedMessages->isRunning()) {
+ Pointer<cms::Message> message = createCMSMessage(dispatch);
+ beforeMessageIsConsumed(dispatch);
+ try {
+ bool expired = dispatch->getMessage()->isExpired();
+ if (!expired) {
+ this->internal->listener->onMessage(message.get());
+ }
+ afterMessageIsConsumed(dispatch, expired);
+ } catch (RuntimeException& e) {
+ if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session->isIndividualAcknowledge()) {
+ // Schedule redelivery and possible DLQ processing
+ // dispatch->setRollbackCause(e); // TODO
+ rollback();
+ } else {
+ // Transacted or Client ack: Deliver the next message.
+ afterMessageIsConsumed(dispatch, false);
+ }
+ }
+ } else {
+ if (!this->internal->unconsumedMessages->isRunning()) {
+ // delayed redelivery, ensure it can be re delivered
+ session->getConnection()->rollbackDuplicate(this, dispatch->getMessage());
+ }
+ this->internal->unconsumedMessages->enqueue(dispatch);
+ if (this->internal->messageAvailableListener != NULL) {
+ this->internal->messageAvailableListener->onMessageAvailable(this);
+ }
+ }
+ }
+ } else {
+ if (!session->isTransacted()) {
+ Pointer<MessageAck> ack(new MessageAck());
+ ack->setAckType(ActiveMQConstants::ACK_TYPE_INDIVIDUAL);
+ ack->setConsumerId(this->consumerInfo->getConsumerId());
+ ack->setDestination(dispatch->getDestination());
+ ack->setMessageCount(1);
+ ack->setLastMessageId(dispatch->getMessage()->getMessageId());
+ ack->setFirstMessageId(dispatch->getMessage()->getMessageId());
+ session->sendAck(ack);
} else {
-
- // No listener, add it to the unconsumed messages list it will get pushed on the
- // next receive call or when a new listener is added.
- this->internal->unconsumedMessages->enqueue(dispatch);
- if (this->internal->messageAvailableListener != NULL) {
- this->internal->messageAvailableListener->onMessageAvailable(this);
+ bool needsPoisonAck = false;
+ synchronized (&this->internal->dispatchedMessages) {
+ if (this->internal->previouslyDeliveredMessages != NULL) {
+ this->internal->previouslyDeliveredMessages->put(
+ dispatch->getMessage()->getMessageId(), true);
+ } else {
+ // delivery while pending redelivery to another consumer on the same connection
+ // not waiting for redelivery will help here
+ needsPoisonAck = true;
+ }
+ }
+ if (needsPoisonAck) {
+ Pointer<MessageAck> poisonAck(new MessageAck());
+ poisonAck->setAckType(ActiveMQConstants::ACK_TYPE_POISON);
+ poisonAck->setConsumerId(this->consumerInfo->getConsumerId());
+ poisonAck->setDestination(dispatch->getDestination());
+ poisonAck->setMessageCount(1);
+ poisonAck->setLastMessageId(dispatch->getMessage()->getMessageId());
+ poisonAck->setFirstMessageId(dispatch->getMessage()->getMessageId());
+// TODO
+// poisonAck.setPoisonCause(new JMSException("Duplicate dispatch with transacted redeliver pending on another consumer, connection: "
+// + session.getConnection().getConnectionInfo().getConnectionId()));
+ session->sendAck(poisonAck);
+ } else {
+ if (this->internal->transactedIndividualAck) {
+ immediateIndividualTransactedAck(dispatch);
+ } else {
+ Pointer<MessageAck> ack(new MessageAck());
+ ack->setAckType(ActiveMQConstants::ACK_TYPE_DELIVERED);
+ ack->setConsumerId(this->consumerInfo->getConsumerId());
+ ack->setDestination(dispatch->getDestination());
+ ack->setMessageCount(1);
+ ack->setLastMessageId(dispatch->getMessage()->getMessageId());
+ ack->setFirstMessageId(dispatch->getMessage()->getMessageId());
+ session->sendAck(ack);
+ }
}
}
}
@@ -1187,7 +1669,7 @@ void ActiveMQConsumerKernel::sendPullReq
try {
- this->checkClosed();
+ clearDispatchList();
// There are still local message, consume them first.
if (!this->internal->unconsumedMessages->isEmpty()) {
@@ -1217,6 +1699,16 @@ void ActiveMQConsumerKernel::checkClosed
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::checkMessageListener() const {
+ if (this->internal->listener != NULL) {
+ throw cms::IllegalStateException(
+ "Cannot synchronously receive a message when a MessageListener is set");
+ }
+
+ this->session->checkMessageListener();
+}
+
+////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumerKernel::iterate() {
synchronized(&this->internal->listenerMutex) {
@@ -1234,7 +1726,6 @@ bool ActiveMQConsumerKernel::iterate() {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::inProgressClearRequired() {
-
this->internal->inProgressClearRequiredFlag = true;
// Clears dispatched messages async to avoid lock contention with inprogress acks.
this->internal->clearDispatchList = true;
@@ -1246,17 +1737,35 @@ void ActiveMQConsumerKernel::clearMessag
synchronized(this->internal->unconsumedMessages.get()) {
if (this->internal->inProgressClearRequiredFlag) {
- // TODO - Rollback duplicates.
+ // ensure messages that were not yet consumed are rolled back up front as they
+ // may get redelivered to another consumer by the Broker.
+ std::vector< Pointer<MessageDispatch> > list = this->internal->unconsumedMessages->removeAll();
+ if (!this->consumerInfo->isBrowser()) {
+ std::vector< Pointer<MessageDispatch> >::const_iterator iter = list.begin();
+
+ for (; iter != list.end(); ++iter) {
+ Pointer<MessageDispatch> md = *iter;
+ this->session->getConnection()->rollbackDuplicate(this, md->getMessage());
+ }
+ }
// allow dispatch on this connection to resume
this->session->getConnection()->setTransportInterruptionProcessingComplete();
this->internal->inProgressClearRequiredFlag = false;
+
+ // Wake up any blockers and allow them to recheck state.
+ this->internal->unconsumedMessages->notifyAll();
}
}
}
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::clearDispatchList() {
+ this->internal->doClearDispatchList();
+}
+
+////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumerKernel::isAutoAcknowledgeEach() const {
return this->session->isAutoAcknowledge() ||
(this->session->isDupsOkAcknowledge() && this->consumerInfo->getDestination()->isQueue());
@@ -1273,12 +1782,11 @@ int ActiveMQConsumerKernel::getMessageAv
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::applyDestinationOptions(const Pointer<ConsumerInfo>& info) {
+void ActiveMQConsumerKernel::applyDestinationOptions(Pointer<ConsumerInfo> info) {
decaf::lang::Pointer<commands::ActiveMQDestination> amqDestination = info->getDestination();
- // Get any options specified in the destination and apply them to the
- // ConsumerInfo object.
+ // Get any options specified in the destination and apply them to the ConsumerInfo object.
const ActiveMQProperties& options = amqDestination->getOptions();
std::string noLocalStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_NOLOCAL);
@@ -1307,7 +1815,6 @@ void ActiveMQConsumerKernel::applyDestin
}
std::string maxPendingMsgLimitStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CUNSUMER_MAXPENDINGMSGLIMIT);
-
if (options.hasProperty(maxPendingMsgLimitStr)) {
info->setMaximumPendingMessageLimit(Integer::parseInt(options.getProperty(maxPendingMsgLimitStr)));
}
@@ -1322,11 +1829,10 @@ void ActiveMQConsumerKernel::applyDestin
info->setRetroactive(Boolean::parseBoolean(options.getProperty(retroactiveStr)));
}
- std::string networkSubscriptionStr = "consumer.networkSubscription";
-
- if (options.hasProperty(networkSubscriptionStr)) {
- info->setNetworkSubscription(Boolean::parseBoolean(options.getProperty(networkSubscriptionStr)));
- }
+ this->internal->nonBlockingRedelivery = Boolean::parseBoolean(
+ options.getProperty("consumer.nonBlockingRedelivery", "false"));
+ this->internal->nonBlockingRedelivery = Boolean::parseBoolean(
+ options.getProperty("consumer.transactedIndividualAck", "false"));
}
////////////////////////////////////////////////////////////////////////////////
@@ -1372,7 +1878,7 @@ bool ActiveMQConsumerKernel::isSynchroni
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::setSynchronizationRegistered( bool value ) {
+void ActiveMQConsumerKernel::setSynchronizationRegistered(bool value) {
this->internal->synchronizationRegistered = value;
}
@@ -1382,11 +1888,31 @@ long long ActiveMQConsumerKernel::getLas
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::setLastDeliveredSequenceId( long long value ) {
+void ActiveMQConsumerKernel::setLastDeliveredSequenceId(long long value) {
this->internal->lastDeliveredSequenceId = value;
}
////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumerKernel::isTransactedIndividualAck() const {
+ return this->internal->transactedIndividualAck;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::setTransactedIndividualAck(bool value) {
+ this->internal->transactedIndividualAck = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConsumerKernel::setFailoverRedeliveryWaitPeriod() const {
+ return this->internal->failoverRedeliveryWaitPeriod;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::setFailoverRedeliveryWaitPeriod(long long value) {
+ this->internal->failoverRedeliveryWaitPeriod = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::setFailureError(decaf::lang::Exception* error) {
if (error != NULL) {
this->internal->failureError.reset(error->clone());
@@ -1427,3 +1953,49 @@ cms::MessageAvailableListener* ActiveMQC
int ActiveMQConsumerKernel::getHashCode() const {
return this->internal->hashCode;
}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConsumerKernel::getOptimizedAckScheduledAckInterval() const {
+ return this->internal->optimizedAckScheduledAckInterval;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::setOptimizedAckScheduledAckInterval(long long value) {
+ this->internal->optimizedAckScheduledAckInterval = value;
+
+ if (this->internal->optimizedAckTask != NULL) {
+ try {
+ this->session->getScheduler()->cancel(this->internal->optimizedAckTask);
+ this->internal->optimizedAckTask = NULL;
+ } catch (Exception& e) {
+ this->internal->optimizedAckTask = NULL;
+ throw CMSExceptionSupport::create(e);
+ }
+ }
+
+ // Should we periodically send out all outstanding acks.
+ if (this->internal->optimizeAcknowledge && this->internal->optimizedAckScheduledAckInterval > 0) {
+ this->internal->optimizedAckTask = new OptimizedAckTask(this, this->internal);
+
+ try {
+ this->session->getScheduler()->executePeriodically(
+ this->internal->optimizedAckTask, this->internal->optimizedAckScheduledAckInterval);
+ } catch (Exception& e) {
+ throw CMSExceptionSupport::create(e);
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumerKernel::isOptimizeAcknowledge() const {
+ return this->internal->optimizeAcknowledge;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::setOptimizeAcknowledge(bool value) {
+ if (this->internal->optimizeAcknowledge && !value) {
+ deliverAcks();
+ }
+
+ this->internal->optimizeAcknowledge = value;
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h Mon Apr 1 21:56:01 2013
@@ -109,8 +109,6 @@ namespace kernels {
virtual std::string getMessageSelector() const;
- virtual void acknowledge(const Pointer<commands::MessageDispatch>& dispatch);
-
virtual void setMessageTransformer(cms::MessageTransformer* transformer);
virtual cms::MessageTransformer* getMessageTransformer() const;
@@ -131,6 +129,20 @@ namespace kernels {
void acknowledge();
/**
+ * Method called to acknowledge the Message contained in the given MessageDispatch
+ *
+ * @throw CMSException if an error occurs while ack'ing the message.
+ */
+ void acknowledge(Pointer<commands::MessageDispatch> dispatch);
+
+ /**
+ * Method called to acknowledge the Message contained in the given MessageDispatch
+ *
+ * @throw CMSException if an error occurs while ack'ing the message.
+ */
+ void acknowledge(Pointer<commands::MessageDispatch> dispatch, int ackType);
+
+ /**
* Called to Commit the current set of messages in this Transaction
*
* @throw ActiveMQException if an error occurs while performing the operation.
@@ -220,6 +232,37 @@ namespace kernels {
long long getLastDeliveredSequenceId() const;
/**
+ * Will Message's in a transaction be acknowledged using the Individual Acknowledge mode.
+ *
+ * @return true if individual transacted acknowledge is enabled.
+ */
+ bool isTransactedIndividualAck() const;
+
+ /**
+ * Set if Message's in a transaction be acknowledged using the Individual Acknowledge mode.
+ *
+ * @param value
+ * True if individual transacted acknowledge is enabled.
+ */
+ void setTransactedIndividualAck(bool value);
+
+ /**
+ * Returns the delay after a failover before Message redelivery starts.
+ *
+ * @returns time in milliseconds to wait after failover.
+ */
+ long long setFailoverRedeliveryWaitPeriod() const;
+
+ /**
+ * Sets the time in milliseconds to delay after failover before starting
+ * message redelivery.
+ *
+ * @param value
+ * Time in milliseconds to delay after failover for redelivery start.
+ */
+ void setFailoverRedeliveryWaitPeriod(long long value);
+
+ /**
* Sets the value of the Last Delivered Sequence Id
*
* @param value
@@ -280,6 +323,37 @@ namespace kernels {
*/
bool isInUse(Pointer<commands::ActiveMQDestination> destination) const;
+ /**
+ * Time in Milliseconds before an automatic acknowledge is done for any outstanding
+ * delivered Messages. A value less than one means no task is scheduled.
+ *
+ * @returns time in milliseconds for the scheduled ack task.
+ */
+ long long getOptimizedAckScheduledAckInterval() const;
+
+ /**
+ * Sets the time in Milliseconds to schedule an automatic acknowledge of outstanding
+ * messages when optimize acknowledge is enabled. A value less than one means disable
+ * any scheduled tasks.
+ *
+ * @param value
+ * The time interval to send scheduled acks.
+ */
+ void setOptimizedAckScheduledAckInterval(long long value);
+
+ /**
+ * @returns true if this consumer is using optimize acknowledge mode.
+ */
+ bool isOptimizeAcknowledge() const;
+
+ /**
+ * Enable or disable optimized acknowledge for this consumer.
+ *
+ * @param value
+ * True if optimize acknowledge is enabled, false otherwise.
+ */
+ void setOptimizeAcknowledge(bool value);
+
protected:
/**
@@ -303,50 +377,41 @@ namespace kernels {
* Pre-consume processing
* @param dispatch - the message being consumed.
*/
- void beforeMessageIsConsumed(const Pointer<commands::MessageDispatch>& dispatch);
+ void beforeMessageIsConsumed(Pointer<commands::MessageDispatch> dispatch);
/**
* Post-consume processing
* @param dispatch - the consumed message
* @param messageExpired - flag indicating if the message has expired.
*/
- void afterMessageIsConsumed(const Pointer<commands::MessageDispatch>& dispatch, bool messageExpired);
+ void afterMessageIsConsumed(Pointer<commands::MessageDispatch> dispatch, bool messageExpired);
private:
- // Creates a deliverable cms::Message from a received MessageDispatch, transforming if needed
- // and configuring appropriate ack handlers.
Pointer<cms::Message> createCMSMessage(Pointer<commands::MessageDispatch> dispatch);
- // Using options from the Destination URI override any settings that are
- // defined for this consumer.
- void applyDestinationOptions(const Pointer<commands::ConsumerInfo>& info);
-
- // If supported sends a message pull request to the service provider asking
- // for the delivery of a new message. This is used in the case where the
- // service provider has been configured with a zero prefetch or is only
- // capable of delivering messages on a pull basis. No request is made if
- // there are already messages in the unconsumed queue since there's no need
- // for a server round-trip in that instance.
+ void applyDestinationOptions(Pointer<commands::ConsumerInfo> info);
+
void sendPullRequest(long long timeout);
- // Checks for the closed state and throws if so.
void checkClosed() const;
- // Sends an ack as needed in order to keep them coming in if the current
- // ack mode allows the consumer to receive up to the prefetch limit before
- // an real ack is sent.
- void ackLater(const Pointer<commands::MessageDispatch>& message, int ackType);
+ void checkMessageListener() const;
+
+ void ackLater(Pointer<commands::MessageDispatch> message, int ackType);
+
+ void immediateIndividualTransactedAck(Pointer<commands::MessageDispatch> dispatch);
- // Create an Ack Message that acks all messages that have been delivered so far.
Pointer<commands::MessageAck> makeAckForAllDeliveredMessages(int type);
- // Should Acks be sent on each dispatched message
bool isAutoAcknowledgeEach() const;
- // Can Acks be batched for less network overhead.
bool isAutoAcknowledgeBatch() const;
+ void registerSync();
+
+ void clearDispatchList();
+
};
}}}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp Mon Apr 1 21:56:01 2013
@@ -105,7 +105,8 @@ namespace kernels{
SessionConfig() : synchronizationRegistered(false),
producerLock(), producers(), consumerLock(), consumers(),
- scheduler(), closeSync(), sendMutex(), transformer(NULL), hashCode() {}
+ scheduler(), closeSync(), sendMutex(), transformer(NULL),
+ hashCode() {}
~SessionConfig() {}
};
@@ -1455,3 +1456,32 @@ Pointer<commands::ProducerId> ActiveMQSe
int ActiveMQSessionKernel::getHashCode() const {
return this->config->hashCode;
}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::checkMessageListener() const {
+
+ this->config->consumerLock.readLock().lock();
+ try {
+ Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator());
+ while (iter->hasNext()) {
+ Pointer<ActiveMQConsumerKernel> consumer = iter->next();
+ if (consumer->getMessageListener() != NULL) {
+ throw cms::IllegalStateException(
+ "Cannot synchronously receive a message when a MessageListener is set");
+ }
+ }
+ this->config->consumerLock.readLock().unlock();
+ } catch (Exception& ex) {
+ this->config->consumerLock.readLock().unlock();
+ throw;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::sendAck(Pointer<MessageAck> ack, bool async) {
+ if (async || this->connection->isSendAcksAsync() || this->isTransacted()) {
+ this->connection->oneway(ack);
+ } else {
+ this->connection->syncRequest(ack);
+ }
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h Mon Apr 1 21:56:01 2013
@@ -29,6 +29,7 @@
#include <activemq/core/kernels/ActiveMQProducerKernel.h>
#include <activemq/commands/ActiveMQTempDestination.h>
#include <activemq/commands/Response.h>
+#include <activemq/commands/MessageAck.h>
#include <activemq/commands/SessionInfo.h>
#include <activemq/commands/ConsumerInfo.h>
#include <activemq/commands/ConsumerId.h>
@@ -542,8 +543,31 @@ namespace kernels {
*/
bool iterateConsumers();
+ /**
+ * Checks if any MessageConsumer owned by this Session has a set MessageListener
+ * and throws an exception if so. This enforces the rule that the MessageConsumers
+ * belonging to a Session either operate in sync or async receive as a group.
+ */
+ void checkMessageListener() const;
+
+ /**
+ * Returns a Hash Code for this Session based on its SessionId.
+ *
+ * @returns an int hash code based on the string balue of SessionId.
+ */
virtual int getHashCode() const;
+ /**
+ * Sends the given MessageAck command to the Broker either via Synchronous call or
+ * an Asynchronous call depending on the value of the async parameter.
+ *
+ * @param ack
+ * The MessageAck command to send.
+ * @param async
+ * True if the command can be sent asynchronously.
+ */
+ void sendAck(decaf::lang::Pointer<commands::MessageAck> ack, bool async = false);
+
private:
/**
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp Mon Apr 1 21:56:01 2013
@@ -18,6 +18,7 @@
#include "ConnectionAuditTest.h"
#include <activemq/core/ConnectionAudit.h>
+#include <activemq/core/Dispatcher.h>
#include <activemq/core/ActiveMQMessageAudit.h>
#include <activemq/util/IdGenerator.h>
#include <activemq/commands/Message.h>
@@ -104,7 +105,7 @@ void ConnectionAuditTest::testIsDuplicat
list.add(id);
message->setMessageId(id);
- CPPUNIT_ASSERT(!audit.isDuplicate(dispatcher, message));
+ CPPUNIT_ASSERT(!audit.isDuplicate(dispatcher.get(), message));
}
int index = list.size() -1 -audit.getAuditDepth();
@@ -112,7 +113,7 @@ void ConnectionAuditTest::testIsDuplicat
Pointer<MessageId> id = list.get(index);
message->setMessageId(id);
CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(),
- audit.isDuplicate(dispatcher, message));
+ audit.isDuplicate(dispatcher.get(), message));
}
}
@@ -140,7 +141,7 @@ void ConnectionAuditTest::testRollbackDu
list.add(id);
message->setMessageId(id);
- CPPUNIT_ASSERT(!audit.isDuplicate(dispatcher, message));
+ CPPUNIT_ASSERT(!audit.isDuplicate(dispatcher.get(), message));
}
int index = list.size() -1 -audit.getAuditDepth();
@@ -148,9 +149,9 @@ void ConnectionAuditTest::testRollbackDu
Pointer<MessageId> id = list.get(index);
message->setMessageId(id);
CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(),
- audit.isDuplicate(dispatcher, message));
- audit.rollbackDuplicate(dispatcher, message);
+ audit.isDuplicate(dispatcher.get(), message));
+ audit.rollbackDuplicate(dispatcher.get(), message);
CPPUNIT_ASSERT_MESSAGE(std::string() + "duplicate msg:" + id->toString(),
- !audit.isDuplicate(dispatcher, message));
+ !audit.isDuplicate(dispatcher.get(), message));
}
}