You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/04/03 22:47:02 UTC
svn commit: r1464200 -
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
Author: tabish
Date: Wed Apr 3 20:47:01 2013
New Revision: 1464200
URL: http://svn.apache.org/r1464200
Log:
https://issues.apache.org/jira/browse/AMQCPP-473
https://issues.apache.org/jira/browse/AMQCPP-472
https://issues.apache.org/jira/browse/AMQCPP-471
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1464200&r1=1464199&r2=1464200&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp Wed Apr 3 20:47:01 2013
@@ -340,6 +340,7 @@ namespace {
private:
ActiveMQConsumerKernel* consumer;
+ ActiveMQConsumerKernelConfig* impl;
private:
@@ -348,7 +349,9 @@ namespace {
public:
- TransactionSynhcronization(ActiveMQConsumerKernel* consumer) : consumer(consumer) {
+ TransactionSynhcronization(ActiveMQConsumerKernel* consumer, ActiveMQConsumerKernelConfig* impl) :
+ Synchronization(), consumer(consumer), impl(impl) {
+
if (consumer == NULL) {
throw NullPointerException(__FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
}
@@ -358,12 +361,12 @@ namespace {
virtual void beforeEnd() {
- if (false) { // TODO transactedIndividualAck) {
-// consumer->clearDispatchList();
-// consumer->waitForRedeliveries();
-// synchronized(deliveredMessages) {
-// consumer->rollbackOnFailedRecoveryRedelivery();
-// }
+ if (impl->transactedIndividualAck) {
+ impl->doClearDispatchList();
+ impl->waitForRedeliveries();
+ synchronized(&impl->dispatchedMessages) {
+ impl->rollbackOnFailedRecoveryRedelivery();
+ }
} else {
consumer->acknowledge();
}
@@ -518,8 +521,8 @@ namespace {
this->consumer->start();
}
} catch(cms::CMSException& ex) {
- // TODO
- // this->session->getConnection()->onAsyncException(ex);
+ Exception wrapper(ex.clone());
+ this->session->getConnection()->onAsyncException(wrapper);
}
}
};
@@ -1112,15 +1115,8 @@ void ActiveMQConsumerKernel::immediateIn
// acks accumulate on the broker pending transaction completion to indicate delivery status
registerSync();
- Pointer<MessageAck> ack(new MessageAck);
-
- ack->setAckType(ActiveMQConstants::ACK_TYPE_INDIVIDUAL);
- ack->setConsumerId(dispatch->getConsumerId());
- ack->setDestination(dispatch->getDestination());
- ack->setLastMessageId(dispatch->getMessage()->getMessageId());
- ack->setMessageCount(1);
+ Pointer<MessageAck> ack(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_INDIVIDUAL, 1));
ack->setTransactionId(this->session->getTransactionContext()->getTransactionId());
-
this->session->syncRequest(ack);
}
@@ -1129,7 +1125,7 @@ void ActiveMQConsumerKernel::registerSyn
this->session->doStartTransaction();
if (!this->internal->synchronizationRegistered) {
this->internal->synchronizationRegistered = true;
- Pointer<Synchronization> sync(new TransactionSynhcronization(this));
+ Pointer<Synchronization> sync(new TransactionSynhcronization(this, this->internal));
this->session->getTransactionContext()->addSynchronization(sync);
}
}
@@ -1141,8 +1137,8 @@ void ActiveMQConsumerKernel::afterMessag
if (this->internal->unconsumedMessages->isClosed()) {
return;
- } else if (messageExpired == true) {
- ackLater(message, ActiveMQConstants::ACK_TYPE_DELIVERED);
+ } else if (messageExpired) {
+ acknowledge(message, ActiveMQConstants::ACK_TYPE_DELIVERED);
return;
} else if (session->isTransacted()) {
return;
@@ -1262,12 +1258,7 @@ void ActiveMQConsumerKernel::ackLater(Po
this->internal->deliveredCounter++;
Pointer<MessageAck> oldPendingAck = this->internal->pendingAck;
- this->internal->pendingAck.reset(new MessageAck());
- this->internal->pendingAck->setConsumerId(dispatch->getConsumerId());
- this->internal->pendingAck->setAckType((unsigned char) ackType);
- this->internal->pendingAck->setDestination(dispatch->getDestination());
- this->internal->pendingAck->setLastMessageId(dispatch->getMessage()->getMessageId());
- this->internal->pendingAck->setMessageCount(internal->deliveredCounter);
+ this->internal->pendingAck.reset(new MessageAck(dispatch, ackType, internal->deliveredCounter));
if (oldPendingAck == NULL) {
this->internal->pendingAck->setFirstMessageId(this->internal->pendingAck->getLastMessageId());
@@ -1303,13 +1294,7 @@ Pointer<MessageAck> ActiveMQConsumerKern
if (!this->internal->dispatchedMessages.isEmpty()) {
Pointer<MessageDispatch> dispatched = this->internal->dispatchedMessages.getFirst();
-
- Pointer<MessageAck> ack(new MessageAck());
- ack->setAckType((unsigned char) type);
- ack->setConsumerId(dispatched->getConsumerId());
- ack->setDestination(dispatched->getDestination());
- ack->setMessageCount((int) this->internal->dispatchedMessages.size());
- ack->setLastMessageId(dispatched->getMessage()->getMessageId());
+ Pointer<MessageAck> ack(new MessageAck(dispatched, type, this->internal->dispatchedMessages.size()));
ack->setFirstMessageId(this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId());
return ack;
@@ -1328,17 +1313,8 @@ void ActiveMQConsumerKernel::acknowledge
void ActiveMQConsumerKernel::acknowledge(Pointer<commands::MessageDispatch> dispatch, int ackType) {
try {
-
- Pointer<MessageAck> ack(new MessageAck());
- ack->setAckType((unsigned char) ackType);
- ack->setConsumerId(this->consumerInfo->getConsumerId());
- ack->setDestination(this->consumerInfo->getDestination());
- ack->setMessageCount(1);
- ack->setLastMessageId(dispatch->getMessage()->getMessageId());
- ack->setFirstMessageId(dispatch->getMessage()->getMessageId());
-
+ Pointer<MessageAck> ack(new MessageAck(dispatch, ackType, 1));
session->sendAck(ack);
-
synchronized(&this->internal->dispatchedMessages) {
this->internal->dispatchedMessages.remove(dispatch);
}
@@ -1442,15 +1418,11 @@ void ActiveMQConsumerKernel::rollback()
// We need to NACK the messages so that they get sent to the DLQ.
// Acknowledge the last message.
- Pointer<MessageAck> ack(new MessageAck());
- ack->setAckType(ActiveMQConstants::ACK_TYPE_POISON);
- ack->setConsumerId(this->consumerInfo->getConsumerId());
- ack->setDestination(lastMsg->getDestination());
- ack->setMessageCount((int) this->internal->dispatchedMessages.size());
- ack->setLastMessageId(lastMsg->getMessage()->getMessageId());
+ Pointer<MessageAck> ack(new MessageAck(lastMsg, ActiveMQConstants::ACK_TYPE_POISON,
+ this->internal->dispatchedMessages.size()));
ack->setFirstMessageId(firstMsgId);
- // TODO - ack->setPoisonCause()
-
+ Pointer<BrokerError> cause(new BrokerError);
+ ack->setPoisonCause(cause);
session->sendAck(ack, true);
// Adjust the window size.
this->internal->additionalWindowSize = Math::max(0,
@@ -1461,14 +1433,9 @@ void ActiveMQConsumerKernel::rollback()
// only redelivery_ack after first delivery
if (currentRedeliveryCount > 0) {
- Pointer<MessageAck> ack(new MessageAck());
- ack->setAckType(ActiveMQConstants::ACK_TYPE_REDELIVERED);
- ack->setConsumerId(this->consumerInfo->getConsumerId());
- ack->setDestination(lastMsg->getDestination());
- ack->setMessageCount((int) this->internal->dispatchedMessages.size());
- ack->setLastMessageId(lastMsg->getMessage()->getMessageId());
+ Pointer<MessageAck> ack(new MessageAck(lastMsg, ActiveMQConstants::ACK_TYPE_POISON,
+ this->internal->dispatchedMessages.size()));
ack->setFirstMessageId(firstMsgId);
-
session->oneway(ack);
}
@@ -1539,7 +1506,7 @@ void ActiveMQConsumerKernel::dispatch(co
} catch (RuntimeException& e) {
if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session->isIndividualAcknowledge()) {
// Schedule redelivery and possible DLQ processing
- // dispatch->setRollbackCause(e); // TODO
+ dispatch->setRollbackCause(e);
rollback();
} else {
// Transacted or Client ack: Deliver the next message.
@@ -1559,13 +1526,7 @@ void ActiveMQConsumerKernel::dispatch(co
}
} else {
if (!session->isTransacted()) {
- Pointer<MessageAck> ack(new MessageAck());
- ack->setAckType(ActiveMQConstants::ACK_TYPE_INDIVIDUAL);
- ack->setConsumerId(this->consumerInfo->getConsumerId());
- ack->setDestination(dispatch->getDestination());
- ack->setMessageCount(1);
- ack->setLastMessageId(dispatch->getMessage()->getMessageId());
- ack->setFirstMessageId(dispatch->getMessage()->getMessageId());
+ Pointer<MessageAck> ack(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_INDIVIDUAL, 1));
session->sendAck(ack);
} else {
bool needsPoisonAck = false;
@@ -1574,34 +1535,26 @@ void ActiveMQConsumerKernel::dispatch(co
this->internal->previouslyDeliveredMessages->put(
dispatch->getMessage()->getMessageId(), true);
} else {
- // delivery while pending redelivery to another consumer on the same connection
- // not waiting for redelivery will help here
+ // delivery while pending redelivery to another consumer on the same
+ // connection not waiting for redelivery will help here
needsPoisonAck = true;
}
}
if (needsPoisonAck) {
- Pointer<MessageAck> poisonAck(new MessageAck());
- poisonAck->setAckType(ActiveMQConstants::ACK_TYPE_POISON);
- poisonAck->setConsumerId(this->consumerInfo->getConsumerId());
- poisonAck->setDestination(dispatch->getDestination());
- poisonAck->setMessageCount(1);
- poisonAck->setLastMessageId(dispatch->getMessage()->getMessageId());
+ Pointer<MessageAck> poisonAck(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_POISON, 1));
poisonAck->setFirstMessageId(dispatch->getMessage()->getMessageId());
-// TODO
-// poisonAck.setPoisonCause(new JMSException("Duplicate dispatch with transacted redeliver pending on another consumer, connection: "
-// + session.getConnection().getConnectionInfo().getConnectionId()));
+ Pointer<BrokerError> cause(new BrokerError);
+ cause->setExceptionClass("javax.jms.JMSException");
+ cause->setMessage(std::string() + "Duplicate dispatch with transacted " +
+ "redeliver pending on another consumer, connection: " +
+ this->session->getConnection()->getConnectionInfo().getConnectionId()->toString());
+ poisonAck->setPoisonCause(cause);
session->sendAck(poisonAck);
} else {
if (this->internal->transactedIndividualAck) {
immediateIndividualTransactedAck(dispatch);
} else {
- Pointer<MessageAck> ack(new MessageAck());
- ack->setAckType(ActiveMQConstants::ACK_TYPE_DELIVERED);
- ack->setConsumerId(this->consumerInfo->getConsumerId());
- ack->setDestination(dispatch->getDestination());
- ack->setMessageCount(1);
- ack->setLastMessageId(dispatch->getMessage()->getMessageId());
- ack->setFirstMessageId(dispatch->getMessage()->getMessageId());
+ Pointer<MessageAck> ack(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED, 1));
session->sendAck(ack);
}
}