You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/04/16 23:46:11 UTC
svn commit: r1326811 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main:
activemq/core/kernels/ActiveMQSessionKernel.cpp
decaf/util/concurrent/atomic/AtomicBoolean.h
Author: tabish
Date: Mon Apr 16 21:46:11 2012
New Revision: 1326811
URL: http://svn.apache.org/viewvc?rev=1326811&view=rev
Log:
Allow session to delay close if in XA transaction.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/atomic/AtomicBoolean.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp?rev=1326811&r1=1326810&r2=1326811&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp Mon Apr 16 21:46:11 2012
@@ -55,6 +55,7 @@
#include <decaf/lang/Long.h>
#include <decaf/lang/Math.h>
#include <decaf/util/Queue.h>
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
#include <decaf/lang/exceptions/InvalidStateException.h>
#include <decaf/lang/exceptions/NullPointerException.h>
@@ -68,11 +69,41 @@ using namespace activemq::exceptions;
using namespace activemq::threads;
using namespace decaf::util;
using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
-namespace {
+namespace activemq{
+namespace core{
+namespace kernels{
+
+ class CloseSynhcronization;
+
+ class SessionConfig {
+ public:
+
+ typedef decaf::util::StlMap< Pointer<commands::ConsumerId>,
+ Pointer<ActiveMQConsumerKernel>,
+ commands::ConsumerId::COMPARATOR> ConsumersMap;
+
+ private:
+
+ SessionConfig(const SessionConfig&);
+ SessionConfig& operator=(const SessionConfig&);
+
+ public:
+
+ AtomicBoolean synchronizationRegistered;
+ decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQProducerKernel> > producers;
+ Pointer<Scheduler> scheduler;
+ Pointer<CloseSynhcronization> closeSync;
+
+ public:
+
+ SessionConfig() : synchronizationRegistered(false), producers(), scheduler(), closeSync() {}
+ ~SessionConfig() {}
+ };
/**
* Class used to clear a Consumer's dispatch queue asynchronously from the
@@ -114,6 +145,7 @@ namespace {
private:
ActiveMQSessionKernel* session;
+ SessionConfig* config;
private:
@@ -122,9 +154,10 @@ namespace {
public:
- CloseSynhcronization(ActiveMQSessionKernel* session) : Synchronization(), session(session) {
+ CloseSynhcronization(ActiveMQSessionKernel* session, SessionConfig* config) :
+ Synchronization(), session(session), config(config) {
- if (session == NULL) {
+ if (session == NULL || config == NULL) {
throw NullPointerException(
__FILE__, __LINE__, "Synchronization Created with NULL Session.");
}
@@ -136,43 +169,17 @@ namespace {
}
virtual void afterCommit() {
+ config->closeSync.release();
session->doClose();
+ config->synchronizationRegistered.set(false);
}
virtual void afterRollback() {
+ config->closeSync.release();
session->doClose();
+ config->synchronizationRegistered.set(false);
}
};
-}
-
-////////////////////////////////////////////////////////////////////////////////
-namespace activemq{
-namespace core{
-namespace kernels{
-
- class SessionConfig {
- public:
-
- typedef decaf::util::StlMap< Pointer<commands::ConsumerId>,
- Pointer<ActiveMQConsumerKernel>,
- commands::ConsumerId::COMPARATOR> ConsumersMap;
-
- private:
-
- SessionConfig(const SessionConfig&);
- SessionConfig& operator=(const SessionConfig&);
-
- public:
-
- bool synchronizationRegistered;
- decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQProducerKernel> > producers;
- Pointer<Scheduler> scheduler;
-
- public:
-
- SessionConfig() : synchronizationRegistered(false), producers(), scheduler() {}
- ~SessionConfig() {}
- };
}}}
@@ -205,7 +212,7 @@ ActiveMQSessionKernel::ActiveMQSessionKe
this->connection->oneway(this->sessionInfo);
- this->closed = false;
+ this->closed.set(false);
this->lastDeliveredSequenceId = -1;
// Create a Transaction objet
@@ -250,22 +257,16 @@ void ActiveMQSessionKernel::close() {
return;
}
- if (this->transaction->isInXATransaction()) {
-
- // TODO - Right now we don't have a safe way of dealing with this case
- // since the session might be deleted before the XA Transaction is finalized
- // registering a Synchronization could result in an segmentation fault.
- //
- // For now we just close badly and throw an exception.
- doClose();
-
- throw UnsupportedOperationException(
- __FILE__, __LINE__,
- "The Consumer is still in an Active XA Transaction, commit it first." );
- }
-
try {
- doClose();
+
+ if (this->transaction->isInXATransaction()) {
+ if (!this->config->synchronizationRegistered.compareAndSet(false, true)) {
+ this->config->closeSync.reset(new CloseSynhcronization(this, this->config));
+ this->transaction->addSynchronization(this->config->closeSync);
+ }
+ } else {
+ doClose();
+ }
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
@@ -290,6 +291,11 @@ void ActiveMQSessionKernel::doClose() {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQSessionKernel::dispose() {
+ // Prevent Dispose loop if transaction has a close synchronization registered.
+ if (!closed.compareAndSet(false, true)) {
+ return;
+ }
+
class Finalizer {
private:
@@ -315,7 +321,6 @@ void ActiveMQSessionKernel::dispose() {
session.release();
}
session.release();
- this->session->closed = true;
}
};
@@ -633,8 +638,7 @@ cms::QueueBrowser* ActiveMQSessionKernel
}
////////////////////////////////////////////////////////////////////////////////
-cms::QueueBrowser* ActiveMQSessionKernel::createBrowser(const cms::Queue* queue,
- const std::string& selector) {
+cms::QueueBrowser* ActiveMQSessionKernel::createBrowser(const cms::Queue* queue, const std::string& selector) {
try {
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/atomic/AtomicBoolean.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/atomic/AtomicBoolean.h?rev=1326811&r1=1326810&r2=1326811&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/atomic/AtomicBoolean.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/atomic/AtomicBoolean.h Mon Apr 16 21:46:11 2012
@@ -47,7 +47,7 @@ namespace atomic {
* Creates a new AtomicBoolean with the initial value.
* @param initialValue - The initial value of this boolean.
*/
- AtomicBoolean( bool initialValue );
+ AtomicBoolean(bool initialValue);
virtual ~AtomicBoolean() {}
@@ -63,7 +63,7 @@ namespace atomic {
* Unconditionally sets to the given value.
* @param newValue - the new value
*/
- void set( bool newValue ) {
+ void set(bool newValue) {
this->value = newValue ? 1 : 0;
}
@@ -76,7 +76,7 @@ namespace atomic {
* @returns true if successful. False return indicates that the actual value
* was not equal to the expected value.
*/
- bool compareAndSet( bool expect, bool update );
+ bool compareAndSet(bool expect, bool update);
/**
* Atomically sets to the given value and returns the previous value.
@@ -84,7 +84,7 @@ namespace atomic {
* @param newValue - the new value
* @returns the previous value
*/
- bool getAndSet( bool newValue );
+ bool getAndSet(bool newValue);
/**
* Returns the String representation of the current value.