You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/07/02 02:25:27 UTC

svn commit: r1498746 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/activemq/core/kernels/ test/activemq/core/

Author: tabish
Date: Tue Jul  2 00:25:26 2013
New Revision: 1498746

URL: http://svn.apache.org/r1498746
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-494

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1498746&r1=1498745&r2=1498746&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp Tue Jul  2 00:25:26 2013
@@ -97,6 +97,7 @@ namespace kernels {
         decaf::util::concurrent::Mutex listenerMutex;
         AtomicBoolean deliveringAcks;
         AtomicBoolean started;
+        AtomicBoolean closeSyncRegistered;
         Pointer<MessageDispatchChannel> unconsumedMessages;
         decaf::util::LinkedList< decaf::lang::Pointer<commands::MessageDispatch> > dispatchedMessages;
         long long lastDeliveredSequenceId;
@@ -133,6 +134,7 @@ namespace kernels {
                                          listenerMutex(),
                                          deliveringAcks(),
                                          started(),
+                                         closeSyncRegistered(),
                                          unconsumedMessages(),
                                          dispatchedMessages(),
                                          lastDeliveredSequenceId(0),
@@ -340,7 +342,7 @@ namespace {
     class TransactionSynhcronization : public Synchronization {
     private:
 
-        ActiveMQConsumerKernel* consumer;
+        Pointer<ActiveMQConsumerKernel> consumer;
         ActiveMQConsumerKernelConfig* impl;
 
     private:
@@ -350,7 +352,7 @@ namespace {
 
     public:
 
-        TransactionSynhcronization(ActiveMQConsumerKernel* consumer, ActiveMQConsumerKernelConfig* impl) :
+        TransactionSynhcronization(Pointer<ActiveMQConsumerKernel> consumer, ActiveMQConsumerKernelConfig* impl) :
             Synchronization(), consumer(consumer), impl(impl) {
 
             if (consumer == NULL) {
@@ -377,11 +379,13 @@ namespace {
         virtual void afterCommit() {
             consumer->commit();
             consumer->setSynchronizationRegistered(false);
+            consumer.reset(NULL);
         }
 
         virtual void afterRollback() {
             consumer->rollback();
             consumer->setSynchronizationRegistered(false);
+            consumer.reset(NULL);
         }
     };
 
@@ -393,7 +397,7 @@ namespace {
     class CloseSynhcronization : public Synchronization {
     private:
 
-        ActiveMQConsumerKernel* consumer;
+        Pointer<ActiveMQConsumerKernel> consumer;
 
     private:
 
@@ -402,7 +406,7 @@ namespace {
 
     public:
 
-        CloseSynhcronization(ActiveMQConsumerKernel* consumer) : consumer(consumer) {
+        CloseSynhcronization(Pointer<ActiveMQConsumerKernel> consumer) : consumer(consumer) {
             if (consumer == NULL) {
                 throw NullPointerException(__FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
             }
@@ -415,10 +419,12 @@ namespace {
 
         virtual void afterCommit() {
             consumer->doClose();
+            consumer.reset(NULL);
         }
 
         virtual void afterRollback() {
             consumer->doClose();
+            consumer.reset(NULL);
         }
     };
 
@@ -787,9 +793,12 @@ void ActiveMQConsumerKernel::close() {
         if (!this->isClosed()) {
 
             if (this->session->getTransactionContext() != NULL &&
-                this->session->getTransactionContext()->isInTransaction()) {
+                this->session->getTransactionContext()->isInTransaction() &&
+                this->internal->closeSyncRegistered.compareAndSet(false, true)) {
 
-                Pointer<Synchronization> sync(new CloseSynhcronization(this));
+                Pointer<ActiveMQConsumerKernel> self =
+                    this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
+                Pointer<Synchronization> sync(new CloseSynhcronization(self));
                 this->session->getTransactionContext()->addSynchronization(sync);
             } else {
                 doClose();
@@ -1134,7 +1143,9 @@ void ActiveMQConsumerKernel::registerSyn
     this->session->doStartTransaction();
     if (!this->internal->synchronizationRegistered) {
         this->internal->synchronizationRegistered = true;
-        Pointer<Synchronization> sync(new TransactionSynhcronization(this, this->internal));
+        Pointer<ActiveMQConsumerKernel> self =
+            this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
+        Pointer<Synchronization> sync(new TransactionSynhcronization(self, this->internal));
         this->session->getTransactionContext()->addSynchronization(sync);
     }
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h?rev=1498746&r1=1498745&r2=1498746&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h Tue Jul  2 00:25:26 2013
@@ -529,7 +529,7 @@ namespace kernels {
         Pointer<ActiveMQProducerKernel> lookupProducerKernel(Pointer<commands::ProducerId> id);
 
         /**
-         * @returns a Pointer to an ActiveMQProducerKernel using its ProducerId, or NULL.
+         * @returns a Pointer to an ActiveMQConsumerKernel using its ConsumerId, or NULL.
          */
         Pointer<ActiveMQConsumerKernel> lookupConsumerKernel(Pointer<commands::ConsumerId> id);
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp?rev=1498746&r1=1498745&r2=1498746&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp Tue Jul  2 00:25:26 2013
@@ -645,9 +645,10 @@ void ActiveMQSessionTest::testTransactio
     }
 
     msgListener1.asyncWaitForMessages(MSG_COUNT);
-    CPPUNIT_ASSERT_EQUAL(MSG_COUNT, (int )msgListener1.messages.size());
+    CPPUNIT_ASSERT_EQUAL(MSG_COUNT, (int)msgListener1.messages.size());
 
     consumer1->close();
+    consumer1.reset();
     session->commit();
 
     Pointer<cms::TextMessage> msg1 = msgListener1.messages[0].dynamicCast<cms::TextMessage>();

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h?rev=1498746&r1=1498745&r2=1498746&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h Tue Jul  2 00:25:26 2013
@@ -36,8 +36,8 @@
 namespace activemq{
 namespace core{
 
-    class ActiveMQSessionTest : public CppUnit::TestFixture
-    {
+    class ActiveMQSessionTest : public CppUnit::TestFixture {
+
         CPPUNIT_TEST_SUITE( ActiveMQSessionTest );
         CPPUNIT_TEST( testAutoAcking );
         CPPUNIT_TEST( testClientAck );