You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/07/29 23:51:45 UTC
[2/2] activemq-cpp git commit:
https://issues.apache.org/jira/browse/AMQCPP-532
https://issues.apache.org/jira/browse/AMQCPP-532
Add some additional safety measures to try and prevent message available
callback while destroying a browser object.
Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/d96e7616
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/d96e7616
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/d96e7616
Branch: refs/heads/master
Commit: d96e76169265269ad2a52041ec5c1a41549d3299
Parents: 32bad60
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jul 29 17:51:09 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jul 29 17:51:09 2015 -0400
----------------------------------------------------------------------
.../main/activemq/core/ActiveMQQueueBrowser.cpp | 53 +++++++++-----------
.../main/activemq/core/ActiveMQQueueBrowser.h | 2 +-
.../test/openwire/OpenwireQueueBrowserTest.h | 2 +-
3 files changed, 27 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/d96e7616/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
index 837db95..c4fe43a 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
@@ -63,13 +63,15 @@ namespace core{
const std::string& name, const std::string& selector,
int prefetch, int maxPendingMessageCount, bool noLocal,
bool browser, bool dispatchAsync,
- cms::MessageListener* listener ) :
+ cms::MessageListener* listener) :
ActiveMQConsumerKernel(session, id, destination, name, selector, prefetch,
maxPendingMessageCount, noLocal, browser, dispatchAsync,
listener), parent(parent) {
}
+ virtual ~Browser() {}
+
virtual void dispatch(const Pointer<MessageDispatch>& dispatched) {
try{
@@ -82,9 +84,9 @@ namespace core{
this->parent->notifyMessageAvailable();
}
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
+ AMQ_CATCH_RETHROW(ActiveMQException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
+ AMQ_CATCHALL_THROW(ActiveMQException)
}
};
}}
@@ -94,19 +96,19 @@ ActiveMQQueueBrowser::ActiveMQQueueBrowser(ActiveMQSessionKernel* session,
const Pointer<commands::ConsumerId>& consumerId,
const Pointer<commands::ActiveMQDestination>& destination,
const std::string& selector,
- bool dispatchAsync ) : cms::QueueBrowser(),
- cms::MessageEnumeration(),
- session(session),
- consumerId(consumerId),
- destination(destination),
- selector(selector),
- dispatchAsync(dispatchAsync),
- queue(NULL),
- closed(false),
- mutex(),
- wait(),
- browseDone(),
- browser(NULL) {
+ bool dispatchAsync) : cms::QueueBrowser(),
+ cms::MessageEnumeration(),
+ session(session),
+ consumerId(consumerId),
+ destination(destination),
+ selector(selector),
+ dispatchAsync(dispatchAsync),
+ queue(NULL),
+ closed(false),
+ mutex(),
+ wait(),
+ browseDone(),
+ browser(NULL) {
if (session == NULL) {
throw ActiveMQException(__FILE__, __LINE__, "Session instance provided was NULL.");
@@ -122,7 +124,6 @@ ActiveMQQueueBrowser::ActiveMQQueueBrowser(ActiveMQSessionKernel* session,
// Cache the Queue instance for faster retreival.
this->queue = destination.dynamicCast<cms::Queue>().get();
- this->closed = false;
}
////////////////////////////////////////////////////////////////////////////////
@@ -159,14 +160,10 @@ cms::MessageEnumeration* ActiveMQQueueBrowser::getEnumeration() {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQQueueBrowser::close() {
try {
-
- if (this->closed) {
- return;
- }
-
- synchronized(&mutex) {
- destroyConsumer();
- this->closed = true;
+ if (closed.compareAndSet(false, true)) {
+ synchronized(&mutex) {
+ destroyConsumer();
+ }
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -237,7 +234,6 @@ cms::Message* ActiveMQQueueBrowser::nextMessage() {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQQueueBrowser::notifyMessageAvailable() {
-
synchronized(&wait) {
wait.notifyAll();
}
@@ -287,6 +283,7 @@ void ActiveMQQueueBrowser::destroyConsumer() {
session->commit();
}
+ this->browser->stop();
this->browser->close();
this->browser.reset(NULL);
}
@@ -295,7 +292,7 @@ void ActiveMQQueueBrowser::destroyConsumer() {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQQueueBrowser::checkClosed() {
- if (closed) {
+ if (closed.get()) {
throw ActiveMQException(__FILE__, __LINE__, "The QueueBrowser is closed.");
}
}
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/d96e7616/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h b/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h
index e964766..3b5ed07 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h
@@ -55,7 +55,7 @@ namespace kernels {
std::string selector;
bool dispatchAsync;
cms::Queue* queue;
- volatile bool closed;
+ decaf::util::concurrent::atomic::AtomicBoolean closed;
mutable decaf::util::concurrent::Mutex mutex;
mutable decaf::util::concurrent::Mutex wait;
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/d96e7616/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h
index 3cfc121..6a4a050 100644
--- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h
+++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h
@@ -32,7 +32,7 @@ namespace openwire {
CPPUNIT_TEST( testBrowseReceive );
CPPUNIT_TEST( testQueueBrowserWith2Consumers );
CPPUNIT_TEST( testRepeatedQueueBrowserCreateDestroy );
- // TODO - CPPUNIT_TEST( testRepeatedQueueBrowserCreateDestroyWithMessageInQueue );
+ CPPUNIT_TEST( testRepeatedQueueBrowserCreateDestroyWithMessageInQueue );
CPPUNIT_TEST( testBrowsingExpirationIsIgnored );
CPPUNIT_TEST_SUITE_END();