You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/04/16 23:46:11 UTC

svn commit: r1326811 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main: activemq/core/kernels/ActiveMQSessionKernel.cpp decaf/util/concurrent/atomic/AtomicBoolean.h

Author: tabish
Date: Mon Apr 16 21:46:11 2012
New Revision: 1326811

URL: http://svn.apache.org/viewvc?rev=1326811&view=rev
Log:
Allow session to delay close if in XA transaction.  

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/atomic/AtomicBoolean.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp?rev=1326811&r1=1326810&r2=1326811&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp Mon Apr 16 21:46:11 2012
@@ -55,6 +55,7 @@
 #include <decaf/lang/Long.h>
 #include <decaf/lang/Math.h>
 #include <decaf/util/Queue.h>
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/lang/exceptions/InvalidStateException.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
 
@@ -68,11 +69,41 @@ using namespace activemq::exceptions;
 using namespace activemq::threads;
 using namespace decaf::util;
 using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-namespace {
+namespace activemq{
+namespace core{
+namespace kernels{
+
+    class CloseSynhcronization;
+
+    class SessionConfig {
+    public:
+
+        typedef decaf::util::StlMap< Pointer<commands::ConsumerId>,
+                                     Pointer<ActiveMQConsumerKernel>,
+                                     commands::ConsumerId::COMPARATOR> ConsumersMap;
+
+    private:
+
+        SessionConfig(const SessionConfig&);
+        SessionConfig& operator=(const SessionConfig&);
+
+    public:
+
+        AtomicBoolean synchronizationRegistered;
+        decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQProducerKernel> > producers;
+        Pointer<Scheduler> scheduler;
+        Pointer<CloseSynhcronization> closeSync;
+
+    public:
+
+        SessionConfig() : synchronizationRegistered(false), producers(), scheduler(), closeSync() {}
+        ~SessionConfig() {}
+    };
 
     /**
      * Class used to clear a Consumer's dispatch queue asynchronously from the
@@ -114,6 +145,7 @@ namespace {
     private:
 
         ActiveMQSessionKernel* session;
+        SessionConfig* config;
 
     private:
 
@@ -122,9 +154,10 @@ namespace {
 
     public:
 
-        CloseSynhcronization(ActiveMQSessionKernel* session) : Synchronization(), session(session) {
+        CloseSynhcronization(ActiveMQSessionKernel* session, SessionConfig* config) :
+            Synchronization(), session(session), config(config) {
 
-            if (session == NULL) {
+            if (session == NULL || config == NULL) {
                 throw NullPointerException(
                     __FILE__, __LINE__, "Synchronization Created with NULL Session.");
             }
@@ -136,43 +169,17 @@ namespace {
         }
 
         virtual void afterCommit() {
+            config->closeSync.release();
             session->doClose();
+            config->synchronizationRegistered.set(false);
         }
 
         virtual void afterRollback() {
+            config->closeSync.release();
             session->doClose();
+            config->synchronizationRegistered.set(false);
         }
     };
-}
-
-////////////////////////////////////////////////////////////////////////////////
-namespace activemq{
-namespace core{
-namespace kernels{
-
-    class SessionConfig {
-    public:
-
-        typedef decaf::util::StlMap< Pointer<commands::ConsumerId>,
-                                     Pointer<ActiveMQConsumerKernel>,
-                                     commands::ConsumerId::COMPARATOR> ConsumersMap;
-
-    private:
-
-        SessionConfig(const SessionConfig&);
-        SessionConfig& operator=(const SessionConfig&);
-
-    public:
-
-        bool synchronizationRegistered;
-        decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQProducerKernel> > producers;
-        Pointer<Scheduler> scheduler;
-
-    public:
-
-        SessionConfig() : synchronizationRegistered(false), producers(), scheduler() {}
-        ~SessionConfig() {}
-    };
 
 }}}
 
@@ -205,7 +212,7 @@ ActiveMQSessionKernel::ActiveMQSessionKe
 
     this->connection->oneway(this->sessionInfo);
 
-    this->closed = false;
+    this->closed.set(false);
     this->lastDeliveredSequenceId = -1;
 
     // Create a Transaction objet
@@ -250,22 +257,16 @@ void ActiveMQSessionKernel::close() {
         return;
     }
 
-    if (this->transaction->isInXATransaction()) {
-
-        // TODO - Right now we don't have a safe way of dealing with this case
-        // since the session might be deleted before the XA Transaction is finalized
-        // registering a Synchronization could result in an segmentation fault.
-        //
-        // For now we just close badly and throw an exception.
-        doClose();
-
-        throw UnsupportedOperationException(
-            __FILE__, __LINE__,
-            "The Consumer is still in an Active XA Transaction, commit it first." );
-    }
-
     try {
-        doClose();
+
+        if (this->transaction->isInXATransaction()) {
+            if (!this->config->synchronizationRegistered.compareAndSet(false, true)) {
+                this->config->closeSync.reset(new CloseSynhcronization(this, this->config));
+                this->transaction->addSynchronization(this->config->closeSync);
+            }
+        } else {
+            doClose();
+        }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -290,6 +291,11 @@ void ActiveMQSessionKernel::doClose() {
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionKernel::dispose() {
 
+    // Prevent Dispose loop if transaction has a close synchronization registered.
+    if (!closed.compareAndSet(false, true)) {
+        return;
+    }
+
     class Finalizer {
     private:
 
@@ -315,7 +321,6 @@ void ActiveMQSessionKernel::dispose() {
                 session.release();
             }
             session.release();
-            this->session->closed = true;
         }
     };
 
@@ -633,8 +638,7 @@ cms::QueueBrowser* ActiveMQSessionKernel
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::QueueBrowser* ActiveMQSessionKernel::createBrowser(const cms::Queue* queue,
-                                                  const std::string& selector) {
+cms::QueueBrowser* ActiveMQSessionKernel::createBrowser(const cms::Queue* queue, const std::string& selector) {
 
     try {
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/atomic/AtomicBoolean.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/atomic/AtomicBoolean.h?rev=1326811&r1=1326810&r2=1326811&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/atomic/AtomicBoolean.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/atomic/AtomicBoolean.h Mon Apr 16 21:46:11 2012
@@ -47,7 +47,7 @@ namespace atomic {
          * Creates a new AtomicBoolean with the initial value.
          * @param initialValue - The initial value of this boolean.
          */
-        AtomicBoolean( bool initialValue );
+        AtomicBoolean(bool initialValue);
 
         virtual ~AtomicBoolean() {}
 
@@ -63,7 +63,7 @@ namespace atomic {
          * Unconditionally sets to the given value.
          * @param newValue - the new value
          */
-        void set( bool newValue ) {
+        void set(bool newValue) {
             this->value = newValue ? 1 : 0;
         }
 
@@ -76,7 +76,7 @@ namespace atomic {
          * @returns true if successful. False return indicates that the actual value
          * was not equal to the expected value.
          */
-        bool compareAndSet( bool expect, bool update );
+        bool compareAndSet(bool expect, bool update);
 
         /**
          * Atomically sets to the given value and returns the previous value.
@@ -84,7 +84,7 @@ namespace atomic {
          * @param newValue - the new value
          * @returns the previous value
          */
-        bool getAndSet( bool newValue );
+        bool getAndSet(bool newValue);
 
         /**
          * Returns the String representation of the current value.