You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/04/03 22:47:02 UTC

svn commit: r1464200 - /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp

Author: tabish
Date: Wed Apr  3 20:47:01 2013
New Revision: 1464200

URL: http://svn.apache.org/r1464200
Log:
https://issues.apache.org/jira/browse/AMQCPP-473
https://issues.apache.org/jira/browse/AMQCPP-472
https://issues.apache.org/jira/browse/AMQCPP-471

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1464200&r1=1464199&r2=1464200&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp Wed Apr  3 20:47:01 2013
@@ -340,6 +340,7 @@ namespace {
     private:
 
         ActiveMQConsumerKernel* consumer;
+        ActiveMQConsumerKernelConfig* impl;
 
     private:
 
@@ -348,7 +349,9 @@ namespace {
 
     public:
 
-        TransactionSynhcronization(ActiveMQConsumerKernel* consumer) : consumer(consumer) {
+        TransactionSynhcronization(ActiveMQConsumerKernel* consumer, ActiveMQConsumerKernelConfig* impl) :
+            Synchronization(), consumer(consumer), impl(impl) {
+
             if (consumer == NULL) {
                 throw NullPointerException(__FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
             }
@@ -358,12 +361,12 @@ namespace {
 
         virtual void beforeEnd() {
 
-            if (false) { // TODO transactedIndividualAck) {
-//                consumer->clearDispatchList();
-//                consumer->waitForRedeliveries();
-//                synchronized(deliveredMessages) {
-//                    consumer->rollbackOnFailedRecoveryRedelivery();
-//                }
+            if (impl->transactedIndividualAck) {
+                impl->doClearDispatchList();
+                impl->waitForRedeliveries();
+                synchronized(&impl->dispatchedMessages) {
+                    impl->rollbackOnFailedRecoveryRedelivery();
+                }
             } else {
                 consumer->acknowledge();
             }
@@ -518,8 +521,8 @@ namespace {
                     this->consumer->start();
                 }
             } catch(cms::CMSException& ex) {
-                // TODO
-                // this->session->getConnection()->onAsyncException(ex);
+                Exception wrapper(ex.clone());
+                this->session->getConnection()->onAsyncException(wrapper);
             }
         }
     };
@@ -1112,15 +1115,8 @@ void ActiveMQConsumerKernel::immediateIn
     // acks accumulate on the broker pending transaction completion to indicate delivery status
     registerSync();
 
-    Pointer<MessageAck> ack(new MessageAck);
-
-    ack->setAckType(ActiveMQConstants::ACK_TYPE_INDIVIDUAL);
-    ack->setConsumerId(dispatch->getConsumerId());
-    ack->setDestination(dispatch->getDestination());
-    ack->setLastMessageId(dispatch->getMessage()->getMessageId());
-    ack->setMessageCount(1);
+    Pointer<MessageAck> ack(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_INDIVIDUAL, 1));
     ack->setTransactionId(this->session->getTransactionContext()->getTransactionId());
-
     this->session->syncRequest(ack);
 }
 
@@ -1129,7 +1125,7 @@ void ActiveMQConsumerKernel::registerSyn
     this->session->doStartTransaction();
     if (!this->internal->synchronizationRegistered) {
         this->internal->synchronizationRegistered = true;
-        Pointer<Synchronization> sync(new TransactionSynhcronization(this));
+        Pointer<Synchronization> sync(new TransactionSynhcronization(this, this->internal));
         this->session->getTransactionContext()->addSynchronization(sync);
     }
 }
@@ -1141,8 +1137,8 @@ void ActiveMQConsumerKernel::afterMessag
 
         if (this->internal->unconsumedMessages->isClosed()) {
             return;
-        } else if (messageExpired == true) {
-            ackLater(message, ActiveMQConstants::ACK_TYPE_DELIVERED);
+        } else if (messageExpired) {
+            acknowledge(message, ActiveMQConstants::ACK_TYPE_DELIVERED);
             return;
         } else if (session->isTransacted()) {
             return;
@@ -1262,12 +1258,7 @@ void ActiveMQConsumerKernel::ackLater(Po
     this->internal->deliveredCounter++;
 
     Pointer<MessageAck> oldPendingAck = this->internal->pendingAck;
-    this->internal->pendingAck.reset(new MessageAck());
-    this->internal->pendingAck->setConsumerId(dispatch->getConsumerId());
-    this->internal->pendingAck->setAckType((unsigned char) ackType);
-    this->internal->pendingAck->setDestination(dispatch->getDestination());
-    this->internal->pendingAck->setLastMessageId(dispatch->getMessage()->getMessageId());
-    this->internal->pendingAck->setMessageCount(internal->deliveredCounter);
+    this->internal->pendingAck.reset(new MessageAck(dispatch, ackType, internal->deliveredCounter));
 
     if (oldPendingAck == NULL) {
         this->internal->pendingAck->setFirstMessageId(this->internal->pendingAck->getLastMessageId());
@@ -1303,13 +1294,7 @@ Pointer<MessageAck> ActiveMQConsumerKern
         if (!this->internal->dispatchedMessages.isEmpty()) {
 
             Pointer<MessageDispatch> dispatched = this->internal->dispatchedMessages.getFirst();
-
-            Pointer<MessageAck> ack(new MessageAck());
-            ack->setAckType((unsigned char) type);
-            ack->setConsumerId(dispatched->getConsumerId());
-            ack->setDestination(dispatched->getDestination());
-            ack->setMessageCount((int) this->internal->dispatchedMessages.size());
-            ack->setLastMessageId(dispatched->getMessage()->getMessageId());
+            Pointer<MessageAck> ack(new MessageAck(dispatched, type, this->internal->dispatchedMessages.size()));
             ack->setFirstMessageId(this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId());
 
             return ack;
@@ -1328,17 +1313,8 @@ void ActiveMQConsumerKernel::acknowledge
 void ActiveMQConsumerKernel::acknowledge(Pointer<commands::MessageDispatch> dispatch, int ackType) {
 
     try {
-
-        Pointer<MessageAck> ack(new MessageAck());
-        ack->setAckType((unsigned char) ackType);
-        ack->setConsumerId(this->consumerInfo->getConsumerId());
-        ack->setDestination(this->consumerInfo->getDestination());
-        ack->setMessageCount(1);
-        ack->setLastMessageId(dispatch->getMessage()->getMessageId());
-        ack->setFirstMessageId(dispatch->getMessage()->getMessageId());
-
+        Pointer<MessageAck> ack(new MessageAck(dispatch, ackType, 1));
         session->sendAck(ack);
-
         synchronized(&this->internal->dispatchedMessages) {
             this->internal->dispatchedMessages.remove(dispatch);
         }
@@ -1442,15 +1418,11 @@ void ActiveMQConsumerKernel::rollback() 
 
                 // We need to NACK the messages so that they get sent to the DLQ.
                 // Acknowledge the last message.
-                Pointer<MessageAck> ack(new MessageAck());
-                ack->setAckType(ActiveMQConstants::ACK_TYPE_POISON);
-                ack->setConsumerId(this->consumerInfo->getConsumerId());
-                ack->setDestination(lastMsg->getDestination());
-                ack->setMessageCount((int) this->internal->dispatchedMessages.size());
-                ack->setLastMessageId(lastMsg->getMessage()->getMessageId());
+                Pointer<MessageAck> ack(new MessageAck(lastMsg, ActiveMQConstants::ACK_TYPE_POISON,
+                                        this->internal->dispatchedMessages.size()));
                 ack->setFirstMessageId(firstMsgId);
-                // TODO - ack->setPoisonCause()
-
+                Pointer<BrokerError> cause(new BrokerError);
+                ack->setPoisonCause(cause);
                 session->sendAck(ack, true);
                 // Adjust the window size.
                 this->internal->additionalWindowSize = Math::max(0,
@@ -1461,14 +1433,9 @@ void ActiveMQConsumerKernel::rollback() 
 
                 // only redelivery_ack after first delivery
                 if (currentRedeliveryCount > 0) {
-                    Pointer<MessageAck> ack(new MessageAck());
-                    ack->setAckType(ActiveMQConstants::ACK_TYPE_REDELIVERED);
-                    ack->setConsumerId(this->consumerInfo->getConsumerId());
-                    ack->setDestination(lastMsg->getDestination());
-                    ack->setMessageCount((int) this->internal->dispatchedMessages.size());
-                    ack->setLastMessageId(lastMsg->getMessage()->getMessageId());
+                    Pointer<MessageAck> ack(new MessageAck(lastMsg, ActiveMQConstants::ACK_TYPE_POISON,
+                                            this->internal->dispatchedMessages.size()));
                     ack->setFirstMessageId(firstMsgId);
-
                     session->oneway(ack);
                 }
 
@@ -1539,7 +1506,7 @@ void ActiveMQConsumerKernel::dispatch(co
                             } catch (RuntimeException& e) {
                                 if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session->isIndividualAcknowledge()) {
                                     // Schedule redelivery and possible DLQ processing
-                                    // dispatch->setRollbackCause(e); // TODO
+                                    dispatch->setRollbackCause(e);
                                     rollback();
                                 } else {
                                     // Transacted or Client ack: Deliver the next message.
@@ -1559,13 +1526,7 @@ void ActiveMQConsumerKernel::dispatch(co
                     }
                 } else {
                     if (!session->isTransacted()) {
-                        Pointer<MessageAck> ack(new MessageAck());
-                        ack->setAckType(ActiveMQConstants::ACK_TYPE_INDIVIDUAL);
-                        ack->setConsumerId(this->consumerInfo->getConsumerId());
-                        ack->setDestination(dispatch->getDestination());
-                        ack->setMessageCount(1);
-                        ack->setLastMessageId(dispatch->getMessage()->getMessageId());
-                        ack->setFirstMessageId(dispatch->getMessage()->getMessageId());
+                        Pointer<MessageAck> ack(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_INDIVIDUAL, 1));
                         session->sendAck(ack);
                     } else {
                         bool needsPoisonAck = false;
@@ -1574,34 +1535,26 @@ void ActiveMQConsumerKernel::dispatch(co
                                 this->internal->previouslyDeliveredMessages->put(
                                         dispatch->getMessage()->getMessageId(), true);
                             } else {
-                                // delivery while pending redelivery to another consumer on the same connection
-                                // not waiting for redelivery will help here
+                                // delivery while pending redelivery to another consumer on the same
+                                // connection not waiting for redelivery will help here
                                 needsPoisonAck = true;
                             }
                         }
                         if (needsPoisonAck) {
-                            Pointer<MessageAck> poisonAck(new MessageAck());
-                            poisonAck->setAckType(ActiveMQConstants::ACK_TYPE_POISON);
-                            poisonAck->setConsumerId(this->consumerInfo->getConsumerId());
-                            poisonAck->setDestination(dispatch->getDestination());
-                            poisonAck->setMessageCount(1);
-                            poisonAck->setLastMessageId(dispatch->getMessage()->getMessageId());
+                            Pointer<MessageAck> poisonAck(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_POISON, 1));
                             poisonAck->setFirstMessageId(dispatch->getMessage()->getMessageId());
-// TODO
-//                            poisonAck.setPoisonCause(new JMSException("Duplicate dispatch with transacted redeliver pending on another consumer, connection: "
-//                                    + session.getConnection().getConnectionInfo().getConnectionId()));
+                            Pointer<BrokerError> cause(new BrokerError);
+                            cause->setExceptionClass("javax.jms.JMSException");
+                            cause->setMessage(std::string() + "Duplicate dispatch with transacted " +
+                                              "redeliver pending on another consumer, connection: " +
+                                              this->session->getConnection()->getConnectionInfo().getConnectionId()->toString());
+                            poisonAck->setPoisonCause(cause);
                             session->sendAck(poisonAck);
                         } else {
                             if (this->internal->transactedIndividualAck) {
                                 immediateIndividualTransactedAck(dispatch);
                             } else {
-                                Pointer<MessageAck> ack(new MessageAck());
-                                ack->setAckType(ActiveMQConstants::ACK_TYPE_DELIVERED);
-                                ack->setConsumerId(this->consumerInfo->getConsumerId());
-                                ack->setDestination(dispatch->getDestination());
-                                ack->setMessageCount(1);
-                                ack->setLastMessageId(dispatch->getMessage()->getMessageId());
-                                ack->setFirstMessageId(dispatch->getMessage()->getMessageId());
+                                Pointer<MessageAck> ack(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED, 1));
                                 session->sendAck(ack);
                             }
                         }