You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/04/05 18:13:25 UTC

svn commit: r1737852 - in /qpid/trunk/qpid/cpp: include/qpid/messaging/ src/ src/qpid/client/ src/qpid/ha/ src/qpid/messaging/ src/qpid/messaging/amqp/

Author: aconway
Date: Tue Apr  5 16:13:24 2016
New Revision: 1737852

URL: http://svn.apache.org/viewvc?rev=1737852&view=rev
Log:
QPID-7149: Active HA broker memory leak

The leak was caused by the sys::Poller. When two pollers run in the same process
and one is idles, PollerHandles accumulate on the idle poller threads and are
never released. The HA broker uses the qpid::messaging API to get initial status from
other brokers. The messaging API creates a separate Poller from the broker.

The fix is to shut down the qpid::messaging poller as soon as initial status
checks are complete so it does not interfere with the broker's Poller.

Added:
    qpid/trunk/qpid/cpp/include/qpid/messaging/shutdown.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/shutdown.cpp
Modified:
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp

Added: qpid/trunk/qpid/cpp/include/qpid/messaging/shutdown.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/shutdown.h?rev=1737852&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/shutdown.h (added)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/shutdown.h Tue Apr  5 16:13:24 2016
@@ -0,0 +1,38 @@
+#ifndef QPID_MESSAGING_SHUTDOWN_H
+#define QPID_MESSAGING_SHUTDOWN_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qpid/messaging/ImportExport.h"
+
+namespace qpid {
+namespace messaging {
+
+/** Shut down the qpid::messaging library, clean up resources and stop background threads.
+ * Note you cannot use any of the qpid::messaging classes or functions after calling this.
+ *
+ * It is is not normally necessary to call this, the library cleans up automatically on process exit.
+ * You can use it to clean up resources early in unusual situations.
+ */
+QPID_MESSAGING_EXTERN void shutdown();
+
+}}
+
+#endif // QPID_MESSAGING_SHUTDOWN_H

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1737852&r1=1737851&r2=1737852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Tue Apr  5 16:13:24 2016
@@ -964,6 +964,7 @@ set (qpidmessaging_SOURCES
      qpid/messaging/ReceiverImpl.h
      qpid/messaging/SessionImpl.h
      qpid/messaging/SenderImpl.h
+     qpid/messaging/shutdown.cpp
      qpid/client/amqp0_10/AcceptTracker.h
      qpid/client/amqp0_10/AcceptTracker.cpp
      qpid/client/amqp0_10/AddressResolution.h

Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=1737852&r1=1737851&r2=1737852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Tue Apr  5 16:13:24 2016
@@ -448,5 +448,8 @@ std::ostream& operator<<(std::ostream& o
         return o << "Connection <not connected>";
 }
 
+void shutdown() {
+    theIO().poller()->shutdown();
+}
 
 }} // namespace qpid::client

Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=1737852&r1=1737851&r2=1737852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Tue Apr  5 16:13:24 2016
@@ -98,6 +98,9 @@ class ConnectionImpl : public Bounds,
   friend std::ostream& operator<<(std::ostream&, const ConnectionImpl&);
 };
 
+// Shut down the poller early. Internal use only.
+void shutdown();
+
 }}
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp?rev=1737852&r1=1737851&r2=1737852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp Tue Apr  5 16:13:24 2016
@@ -23,6 +23,7 @@
 #include "HaBroker.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/log/Statement.h"
+#include "qpid/messaging/shutdown.h"
 #include "qpid/messaging/Address.h"
 #include "qpid/messaging/Connection.h"
 #include "qpid/messaging/Message.h"
@@ -97,14 +98,17 @@ void StatusCheckThread::run() {
             string status = details["status"].getString();
             QPID_LOG(debug, logPrefix << status);
             if (status != "joining") {
-                statusCheck.setPromote(false);
+                statusCheck.noPromote();
                 QPID_LOG(info, logPrefix << "Joining established cluster");
             }
         }
         else
             QPID_LOG(error, logPrefix << "Invalid response " << response.getContent());
-    } catch(...) {}
+    } catch(const std::exception& e) {
+        QPID_LOG(info, logPrefix << e.what());
+    }
     try { c.close(); } catch(...) {}
+    statusCheck.endThread();
     delete this;
 }
 
@@ -117,16 +121,25 @@ StatusCheck::StatusCheck(HaBroker& hb) :
 {}
 
 StatusCheck::~StatusCheck() {
-    // Join any leftovers
+    // In case canPromote was never called.
     for (size_t i = 0; i < threads.size(); ++i) threads[i].join();
 }
 
 void StatusCheck::setUrl(const Url& url) {
     Mutex::ScopedLock l(lock);
+    threadCount = url.size();
     for (size_t i = 0; i < url.size(); ++i)
         threads.push_back(Thread(new StatusCheckThread(*this, url[i])));
 }
 
+void StatusCheck::endThread() {
+    // Shut down the client poller ASAP to avoid conflict with the broker's poller.
+    // See https://issues.apache.org/jira/browse/QPID-7149
+    if (--threadCount == 0) {
+        messaging::shutdown();
+    }
+}
+
 bool StatusCheck::canPromote() {
     Mutex::ScopedLock l(lock);
     while (!threads.empty()) {
@@ -138,9 +151,9 @@ bool StatusCheck::canPromote() {
     return promote;
 }
 
-void StatusCheck::setPromote(bool p) {
+void StatusCheck::noPromote() {
     Mutex::ScopedLock l(lock);
-    promote = p;
+    promote = false;
 }
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h?rev=1737852&r1=1737851&r2=1737852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h Tue Apr  5 16:13:24 2016
@@ -25,6 +25,7 @@
 #include "BrokerInfo.h"
 #include "Settings.h"
 #include "qpid/Url.h"
+#include "qpid/sys/AtomicValue.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/Runnable.h"
@@ -60,10 +61,12 @@ class StatusCheck
     bool canPromote();
 
   private:
-    void setPromote(bool p);
+    void noPromote();
+    void endThread();
 
     sys::Mutex lock;
     std::vector<sys::Thread> threads;
+    sys::AtomicValue<int> threadCount;
     bool promote;
     const Settings settings;
     const sys::Duration heartbeat;

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp?rev=1737852&r1=1737851&r2=1737852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp Tue Apr  5 16:13:24 2016
@@ -47,9 +47,11 @@ void DriverImpl::start()
 void DriverImpl::stop()
 {
     QPID_LOG(debug, "Driver stopped");
-    poller->shutdown();
-    thread.join();
-    timer->stop();
+    if (!poller->hasShutdown()) {
+        poller->shutdown();
+        thread.join();
+        timer->stop();
+    }
 }
 
 boost::shared_ptr<Transport> DriverImpl::getTransport(const std::string& protocol, TransportContext& connection)

Added: qpid/trunk/qpid/cpp/src/qpid/messaging/shutdown.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/shutdown.cpp?rev=1737852&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/shutdown.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/shutdown.cpp Tue Apr  5 16:13:24 2016
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/messaging/shutdown.h>
+#include "../client/ConnectionImpl.h"
+#include "amqp/DriverImpl.h"
+
+namespace qpid {
+namespace messaging {
+
+void shutdown() {
+    amqp::DriverImpl::getDefault()->stop();
+    qpid::client::shutdown();
+}
+
+}}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org