You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/04/05 18:13:25 UTC
svn commit: r1737852 - in /qpid/trunk/qpid/cpp: include/qpid/messaging/ src/
src/qpid/client/ src/qpid/ha/ src/qpid/messaging/ src/qpid/messaging/amqp/
Author: aconway
Date: Tue Apr 5 16:13:24 2016
New Revision: 1737852
URL: http://svn.apache.org/viewvc?rev=1737852&view=rev
Log:
QPID-7149: Active HA broker memory leak
The leak was caused by the sys::Poller. When two pollers run in the same process
and one is idles, PollerHandles accumulate on the idle poller threads and are
never released. The HA broker uses the qpid::messaging API to get initial status from
other brokers. The messaging API creates a separate Poller from the broker.
The fix is to shut down the qpid::messaging poller as soon as initial status
checks are complete so it does not interfere with the broker's Poller.
Added:
qpid/trunk/qpid/cpp/include/qpid/messaging/shutdown.h
qpid/trunk/qpid/cpp/src/qpid/messaging/shutdown.cpp
Modified:
qpid/trunk/qpid/cpp/src/CMakeLists.txt
qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h
qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp
Added: qpid/trunk/qpid/cpp/include/qpid/messaging/shutdown.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/messaging/shutdown.h?rev=1737852&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/messaging/shutdown.h (added)
+++ qpid/trunk/qpid/cpp/include/qpid/messaging/shutdown.h Tue Apr 5 16:13:24 2016
@@ -0,0 +1,38 @@
+#ifndef QPID_MESSAGING_SHUTDOWN_H
+#define QPID_MESSAGING_SHUTDOWN_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qpid/messaging/ImportExport.h"
+
+namespace qpid {
+namespace messaging {
+
+/** Shut down the qpid::messaging library, clean up resources and stop background threads.
+ * Note you cannot use any of the qpid::messaging classes or functions after calling this.
+ *
+ * It is is not normally necessary to call this, the library cleans up automatically on process exit.
+ * You can use it to clean up resources early in unusual situations.
+ */
+QPID_MESSAGING_EXTERN void shutdown();
+
+}}
+
+#endif // QPID_MESSAGING_SHUTDOWN_H
Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1737852&r1=1737851&r2=1737852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Tue Apr 5 16:13:24 2016
@@ -964,6 +964,7 @@ set (qpidmessaging_SOURCES
qpid/messaging/ReceiverImpl.h
qpid/messaging/SessionImpl.h
qpid/messaging/SenderImpl.h
+ qpid/messaging/shutdown.cpp
qpid/client/amqp0_10/AcceptTracker.h
qpid/client/amqp0_10/AcceptTracker.cpp
qpid/client/amqp0_10/AddressResolution.h
Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=1737852&r1=1737851&r2=1737852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Tue Apr 5 16:13:24 2016
@@ -448,5 +448,8 @@ std::ostream& operator<<(std::ostream& o
return o << "Connection <not connected>";
}
+void shutdown() {
+ theIO().poller()->shutdown();
+}
}} // namespace qpid::client
Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=1737852&r1=1737851&r2=1737852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Tue Apr 5 16:13:24 2016
@@ -98,6 +98,9 @@ class ConnectionImpl : public Bounds,
friend std::ostream& operator<<(std::ostream&, const ConnectionImpl&);
};
+// Shut down the poller early. Internal use only.
+void shutdown();
+
}}
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp?rev=1737852&r1=1737851&r2=1737852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp Tue Apr 5 16:13:24 2016
@@ -23,6 +23,7 @@
#include "HaBroker.h"
#include "qpid/broker/Broker.h"
#include "qpid/log/Statement.h"
+#include "qpid/messaging/shutdown.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Connection.h"
#include "qpid/messaging/Message.h"
@@ -97,14 +98,17 @@ void StatusCheckThread::run() {
string status = details["status"].getString();
QPID_LOG(debug, logPrefix << status);
if (status != "joining") {
- statusCheck.setPromote(false);
+ statusCheck.noPromote();
QPID_LOG(info, logPrefix << "Joining established cluster");
}
}
else
QPID_LOG(error, logPrefix << "Invalid response " << response.getContent());
- } catch(...) {}
+ } catch(const std::exception& e) {
+ QPID_LOG(info, logPrefix << e.what());
+ }
try { c.close(); } catch(...) {}
+ statusCheck.endThread();
delete this;
}
@@ -117,16 +121,25 @@ StatusCheck::StatusCheck(HaBroker& hb) :
{}
StatusCheck::~StatusCheck() {
- // Join any leftovers
+ // In case canPromote was never called.
for (size_t i = 0; i < threads.size(); ++i) threads[i].join();
}
void StatusCheck::setUrl(const Url& url) {
Mutex::ScopedLock l(lock);
+ threadCount = url.size();
for (size_t i = 0; i < url.size(); ++i)
threads.push_back(Thread(new StatusCheckThread(*this, url[i])));
}
+void StatusCheck::endThread() {
+ // Shut down the client poller ASAP to avoid conflict with the broker's poller.
+ // See https://issues.apache.org/jira/browse/QPID-7149
+ if (--threadCount == 0) {
+ messaging::shutdown();
+ }
+}
+
bool StatusCheck::canPromote() {
Mutex::ScopedLock l(lock);
while (!threads.empty()) {
@@ -138,9 +151,9 @@ bool StatusCheck::canPromote() {
return promote;
}
-void StatusCheck::setPromote(bool p) {
+void StatusCheck::noPromote() {
Mutex::ScopedLock l(lock);
- promote = p;
+ promote = false;
}
}} // namespace qpid::ha
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h?rev=1737852&r1=1737851&r2=1737852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.h Tue Apr 5 16:13:24 2016
@@ -25,6 +25,7 @@
#include "BrokerInfo.h"
#include "Settings.h"
#include "qpid/Url.h"
+#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Runnable.h"
@@ -60,10 +61,12 @@ class StatusCheck
bool canPromote();
private:
- void setPromote(bool p);
+ void noPromote();
+ void endThread();
sys::Mutex lock;
std::vector<sys::Thread> threads;
+ sys::AtomicValue<int> threadCount;
bool promote;
const Settings settings;
const sys::Duration heartbeat;
Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp?rev=1737852&r1=1737851&r2=1737852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp Tue Apr 5 16:13:24 2016
@@ -47,9 +47,11 @@ void DriverImpl::start()
void DriverImpl::stop()
{
QPID_LOG(debug, "Driver stopped");
- poller->shutdown();
- thread.join();
- timer->stop();
+ if (!poller->hasShutdown()) {
+ poller->shutdown();
+ thread.join();
+ timer->stop();
+ }
}
boost::shared_ptr<Transport> DriverImpl::getTransport(const std::string& protocol, TransportContext& connection)
Added: qpid/trunk/qpid/cpp/src/qpid/messaging/shutdown.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/shutdown.cpp?rev=1737852&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/shutdown.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/shutdown.cpp Tue Apr 5 16:13:24 2016
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/messaging/shutdown.h>
+#include "../client/ConnectionImpl.h"
+#include "amqp/DriverImpl.h"
+
+namespace qpid {
+namespace messaging {
+
+void shutdown() {
+ amqp::DriverImpl::getDefault()->stop();
+ qpid::client::shutdown();
+}
+
+}}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org