You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/07/02 02:25:27 UTC
svn commit: r1498746 - in /activemq/activemq-cpp/trunk/activemq-cpp/src:
main/activemq/core/kernels/ test/activemq/core/
Author: tabish
Date: Tue Jul 2 00:25:26 2013
New Revision: 1498746
URL: http://svn.apache.org/r1498746
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-494
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1498746&r1=1498745&r2=1498746&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp Tue Jul 2 00:25:26 2013
@@ -97,6 +97,7 @@ namespace kernels {
decaf::util::concurrent::Mutex listenerMutex;
AtomicBoolean deliveringAcks;
AtomicBoolean started;
+ AtomicBoolean closeSyncRegistered;
Pointer<MessageDispatchChannel> unconsumedMessages;
decaf::util::LinkedList< decaf::lang::Pointer<commands::MessageDispatch> > dispatchedMessages;
long long lastDeliveredSequenceId;
@@ -133,6 +134,7 @@ namespace kernels {
listenerMutex(),
deliveringAcks(),
started(),
+ closeSyncRegistered(),
unconsumedMessages(),
dispatchedMessages(),
lastDeliveredSequenceId(0),
@@ -340,7 +342,7 @@ namespace {
class TransactionSynhcronization : public Synchronization {
private:
- ActiveMQConsumerKernel* consumer;
+ Pointer<ActiveMQConsumerKernel> consumer;
ActiveMQConsumerKernelConfig* impl;
private:
@@ -350,7 +352,7 @@ namespace {
public:
- TransactionSynhcronization(ActiveMQConsumerKernel* consumer, ActiveMQConsumerKernelConfig* impl) :
+ TransactionSynhcronization(Pointer<ActiveMQConsumerKernel> consumer, ActiveMQConsumerKernelConfig* impl) :
Synchronization(), consumer(consumer), impl(impl) {
if (consumer == NULL) {
@@ -377,11 +379,13 @@ namespace {
virtual void afterCommit() {
consumer->commit();
consumer->setSynchronizationRegistered(false);
+ consumer.reset(NULL);
}
virtual void afterRollback() {
consumer->rollback();
consumer->setSynchronizationRegistered(false);
+ consumer.reset(NULL);
}
};
@@ -393,7 +397,7 @@ namespace {
class CloseSynhcronization : public Synchronization {
private:
- ActiveMQConsumerKernel* consumer;
+ Pointer<ActiveMQConsumerKernel> consumer;
private:
@@ -402,7 +406,7 @@ namespace {
public:
- CloseSynhcronization(ActiveMQConsumerKernel* consumer) : consumer(consumer) {
+ CloseSynhcronization(Pointer<ActiveMQConsumerKernel> consumer) : consumer(consumer) {
if (consumer == NULL) {
throw NullPointerException(__FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
}
@@ -415,10 +419,12 @@ namespace {
virtual void afterCommit() {
consumer->doClose();
+ consumer.reset(NULL);
}
virtual void afterRollback() {
consumer->doClose();
+ consumer.reset(NULL);
}
};
@@ -787,9 +793,12 @@ void ActiveMQConsumerKernel::close() {
if (!this->isClosed()) {
if (this->session->getTransactionContext() != NULL &&
- this->session->getTransactionContext()->isInTransaction()) {
+ this->session->getTransactionContext()->isInTransaction() &&
+ this->internal->closeSyncRegistered.compareAndSet(false, true)) {
- Pointer<Synchronization> sync(new CloseSynhcronization(this));
+ Pointer<ActiveMQConsumerKernel> self =
+ this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
+ Pointer<Synchronization> sync(new CloseSynhcronization(self));
this->session->getTransactionContext()->addSynchronization(sync);
} else {
doClose();
@@ -1134,7 +1143,9 @@ void ActiveMQConsumerKernel::registerSyn
this->session->doStartTransaction();
if (!this->internal->synchronizationRegistered) {
this->internal->synchronizationRegistered = true;
- Pointer<Synchronization> sync(new TransactionSynhcronization(this, this->internal));
+ Pointer<ActiveMQConsumerKernel> self =
+ this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
+ Pointer<Synchronization> sync(new TransactionSynhcronization(self, this->internal));
this->session->getTransactionContext()->addSynchronization(sync);
}
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h?rev=1498746&r1=1498745&r2=1498746&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h Tue Jul 2 00:25:26 2013
@@ -529,7 +529,7 @@ namespace kernels {
Pointer<ActiveMQProducerKernel> lookupProducerKernel(Pointer<commands::ProducerId> id);
/**
- * @returns a Pointer to an ActiveMQProducerKernel using its ProducerId, or NULL.
+ * @returns a Pointer to an ActiveMQConsumerKernel using its ConsumerId, or NULL.
*/
Pointer<ActiveMQConsumerKernel> lookupConsumerKernel(Pointer<commands::ConsumerId> id);
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp?rev=1498746&r1=1498745&r2=1498746&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp Tue Jul 2 00:25:26 2013
@@ -645,9 +645,10 @@ void ActiveMQSessionTest::testTransactio
}
msgListener1.asyncWaitForMessages(MSG_COUNT);
- CPPUNIT_ASSERT_EQUAL(MSG_COUNT, (int )msgListener1.messages.size());
+ CPPUNIT_ASSERT_EQUAL(MSG_COUNT, (int)msgListener1.messages.size());
consumer1->close();
+ consumer1.reset();
session->commit();
Pointer<cms::TextMessage> msg1 = msgListener1.messages[0].dynamicCast<cms::TextMessage>();
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h?rev=1498746&r1=1498745&r2=1498746&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.h Tue Jul 2 00:25:26 2013
@@ -36,8 +36,8 @@
namespace activemq{
namespace core{
- class ActiveMQSessionTest : public CppUnit::TestFixture
- {
+ class ActiveMQSessionTest : public CppUnit::TestFixture {
+
CPPUNIT_TEST_SUITE( ActiveMQSessionTest );
CPPUNIT_TEST( testAutoAcking );
CPPUNIT_TEST( testClientAck );