You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/07/29 23:51:45 UTC

[2/2] activemq-cpp git commit: https://issues.apache.org/jira/browse/AMQCPP-532

https://issues.apache.org/jira/browse/AMQCPP-532

Add some additional safety measures to try and prevent message available
callback while destroying a browser object.

Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/d96e7616
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/d96e7616
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/d96e7616

Branch: refs/heads/master
Commit: d96e76169265269ad2a52041ec5c1a41549d3299
Parents: 32bad60
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jul 29 17:51:09 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jul 29 17:51:09 2015 -0400

----------------------------------------------------------------------
 .../main/activemq/core/ActiveMQQueueBrowser.cpp | 53 +++++++++-----------
 .../main/activemq/core/ActiveMQQueueBrowser.h   |  2 +-
 .../test/openwire/OpenwireQueueBrowserTest.h    |  2 +-
 3 files changed, 27 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/d96e7616/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
index 837db95..c4fe43a 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
@@ -63,13 +63,15 @@ namespace core{
                 const std::string& name, const std::string& selector,
                 int prefetch, int maxPendingMessageCount, bool noLocal,
                 bool browser, bool dispatchAsync,
-                cms::MessageListener* listener ) :
+                cms::MessageListener* listener) :
             ActiveMQConsumerKernel(session, id, destination, name, selector, prefetch,
                                    maxPendingMessageCount, noLocal, browser, dispatchAsync,
                                    listener), parent(parent) {
 
         }
 
+        virtual ~Browser() {}
+
         virtual void dispatch(const Pointer<MessageDispatch>& dispatched) {
 
             try{
@@ -82,9 +84,9 @@ namespace core{
 
                 this->parent->notifyMessageAvailable();
             }
-            AMQ_CATCH_RETHROW( ActiveMQException )
-            AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-            AMQ_CATCHALL_THROW( ActiveMQException )
+            AMQ_CATCH_RETHROW(ActiveMQException)
+            AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
+            AMQ_CATCHALL_THROW(ActiveMQException)
         }
     };
 }}
@@ -94,19 +96,19 @@ ActiveMQQueueBrowser::ActiveMQQueueBrowser(ActiveMQSessionKernel* session,
                                            const Pointer<commands::ConsumerId>& consumerId,
                                            const Pointer<commands::ActiveMQDestination>& destination,
                                            const std::string& selector,
-                                           bool dispatchAsync ) : cms::QueueBrowser(),
-                                                                  cms::MessageEnumeration(),
-                                                                  session(session),
-                                                                  consumerId(consumerId),
-                                                                  destination(destination),
-                                                                  selector(selector),
-                                                                  dispatchAsync(dispatchAsync),
-                                                                  queue(NULL),
-                                                                  closed(false),
-                                                                  mutex(),
-                                                                  wait(),
-                                                                  browseDone(),
-                                                                  browser(NULL) {
+                                           bool dispatchAsync) : cms::QueueBrowser(),
+                                                                 cms::MessageEnumeration(),
+                                                                 session(session),
+                                                                 consumerId(consumerId),
+                                                                 destination(destination),
+                                                                 selector(selector),
+                                                                 dispatchAsync(dispatchAsync),
+                                                                 queue(NULL),
+                                                                 closed(false),
+                                                                 mutex(),
+                                                                 wait(),
+                                                                 browseDone(),
+                                                                 browser(NULL) {
 
     if (session == NULL) {
         throw ActiveMQException(__FILE__, __LINE__, "Session instance provided was NULL.");
@@ -122,7 +124,6 @@ ActiveMQQueueBrowser::ActiveMQQueueBrowser(ActiveMQSessionKernel* session,
 
     // Cache the Queue instance for faster retreival.
     this->queue = destination.dynamicCast<cms::Queue>().get();
-    this->closed = false;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -159,14 +160,10 @@ cms::MessageEnumeration* ActiveMQQueueBrowser::getEnumeration() {
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQQueueBrowser::close() {
     try {
-
-        if (this->closed) {
-            return;
-        }
-
-        synchronized(&mutex) {
-            destroyConsumer();
-            this->closed = true;
+        if (closed.compareAndSet(false, true)) {
+            synchronized(&mutex) {
+                destroyConsumer();
+            }
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -237,7 +234,6 @@ cms::Message* ActiveMQQueueBrowser::nextMessage() {
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQQueueBrowser::notifyMessageAvailable() {
-
     synchronized(&wait) {
         wait.notifyAll();
     }
@@ -287,6 +283,7 @@ void ActiveMQQueueBrowser::destroyConsumer() {
             session->commit();
         }
 
+        this->browser->stop();
         this->browser->close();
         this->browser.reset(NULL);
     }
@@ -295,7 +292,7 @@ void ActiveMQQueueBrowser::destroyConsumer() {
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQQueueBrowser::checkClosed() {
-    if (closed) {
+    if (closed.get()) {
         throw ActiveMQException(__FILE__, __LINE__, "The QueueBrowser is closed.");
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/d96e7616/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h b/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h
index e964766..3b5ed07 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h
@@ -55,7 +55,7 @@ namespace kernels {
         std::string selector;
         bool dispatchAsync;
         cms::Queue* queue;
-        volatile bool closed;
+        decaf::util::concurrent::atomic::AtomicBoolean closed;
 
         mutable decaf::util::concurrent::Mutex mutex;
         mutable decaf::util::concurrent::Mutex wait;

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/d96e7616/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h
index 3cfc121..6a4a050 100644
--- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h
+++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h
@@ -32,7 +32,7 @@ namespace openwire {
         CPPUNIT_TEST( testBrowseReceive );
         CPPUNIT_TEST( testQueueBrowserWith2Consumers );
         CPPUNIT_TEST( testRepeatedQueueBrowserCreateDestroy );
-        // TODO - CPPUNIT_TEST( testRepeatedQueueBrowserCreateDestroyWithMessageInQueue );
+        CPPUNIT_TEST( testRepeatedQueueBrowserCreateDestroyWithMessageInQueue );
         CPPUNIT_TEST( testBrowsingExpirationIsIgnored );
         CPPUNIT_TEST_SUITE_END();