You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/06/16 22:32:05 UTC
svn commit: r955370 - in /qpid/trunk/qpid/cpp/src: qpid/cluster/Cluster.cpp
qpid/cluster/Connection.cpp qpid/cluster/Connection.h
qpid/cluster/OutputInterceptor.cpp qpid/cluster/UpdateClient.cpp
tests/failover_soak.cpp
Author: aconway
Date: Wed Jun 16 20:32:04 2010
New Revision: 955370
URL: http://svn.apache.org/viewvc?rev=955370&view=rev
Log:
Bug 603835 - cluster_tests.test_management failing.
Clean up connections causing extra connection objects in the mangement agent map.
- update connection was not being closed.
- connections belonging to members that left the cluster were not fully cleaned up
Also fixed test errors making failover_soak fail sporadically.
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=955370&r1=955369&r2=955370&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jun 16 20:32:04 2010
@@ -361,7 +361,6 @@ void Cluster::erase(const ConnectionId&
// Called by Connection::deliverClose() in deliverFrameQueue thread.
void Cluster::erase(const ConnectionId& id, Lock&) {
- QPID_LOG(info, *this << " connection closed " << id);
connections.erase(id);
decoder.erase(id);
}
@@ -1024,7 +1023,7 @@ void Cluster::memberUpdate(Lock& l) {
ConnectionMap::iterator j = i++;
MemberId m = j->second->getId().getMember();
if (m != self && !map.isMember(m)) {
- j->second->getBrokerConnection().closed();
+ j->second->close();
erase(j->second->getId(), l);
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=955370&r1=955369&r2=955370&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Jun 16 20:32:04 2010
@@ -101,19 +101,18 @@ Connection::Connection(Cluster& c, sys::
if (isLocalClient()) {
// Local clients are announced to the cluster
// and initialized when the announce is received.
- QPID_LOG(info, "new client connection " << *this);
giveReadCredit(cluster.getSettings().readMax); // Flow control
init();
}
else {
// Catch-up shadow connections initialized using nextShadow id.
assert(catchUp);
- QPID_LOG(info, "new catch-up connection " << *this);
- connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
+ if (!updateIn.nextShadowMgmtId.empty())
+ connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
updateIn.nextShadowMgmtId.clear();
init();
}
-
+ QPID_LOG(info, "incoming connection " << *this);
}
void Connection::setSecureConnection(broker::SecureConnection* sc) {
@@ -123,8 +122,6 @@ void Connection::setSecureConnection(bro
void Connection::init() {
connection = connectionCtor.construct();
- QPID_LOG(debug, cluster << " initialized connection: " << *this
- << " ssf=" << connection->getExternalSecuritySettings().ssf);
if (isLocalClient()) {
if (secureConnection) connection->setSecureConnection(secureConnection);
// Actively send cluster-order frames from local node
@@ -171,7 +168,6 @@ void Connection::announce(
Connection::~Connection() {
if (connection.get()) connection->setErrorListener(0);
- QPID_LOG(debug, cluster << " deleted connection: " << *this);
}
bool Connection::doOutput() {
@@ -250,16 +246,15 @@ void Connection::deliveredFrame(const Ev
// A local connection is closed by the network layer.
void Connection::closed() {
try {
- if (catchUp) {
+ if (isUpdated()) {
+ QPID_LOG(debug, cluster << " update connection closed " << *this);
+ close();
+ }
+ else if (catchUp) {
QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this);
cluster.leave();
}
- else if (isUpdated()) {
- QPID_LOG(debug, cluster << " closed update connection " << *this);
- if (connection.get()) connection->closed();
- }
else if (isLocal()) {
- QPID_LOG(debug, cluster << " local close of replicated connection " << *this);
// This was a local replicated connection. Multicast a deliver
// closed and process any outstanding frames from the cluster
// until self-delivery of deliver-close.
@@ -275,15 +270,20 @@ void Connection::closed() {
// Self-delivery of close message, close the connection.
void Connection::deliverClose () {
assert(!catchUp);
+ close();
+ cluster.erase(self);
+}
+
+// Close the connection
+void Connection::close() {
if (connection.get()) {
connection->closed();
// Ensure we delete the broker::Connection in the deliver thread.
connection.reset();
}
- cluster.erase(self);
}
-// The connection has been killed for misbehaving
+// The connection has been killed for misbehaving, called in connection thread.
void Connection::abort() {
if (connection.get()) {
connection->abort();
@@ -424,7 +424,7 @@ void Connection::shadowReady(
uint64_t memberId, uint64_t connectionId, const string& mgmtId,
const string& username, const string& fragment, uint32_t sendMax)
{
- QPID_ASSERT(mgmtId == getBrokerConnection().getMgmtId());
+ QPID_ASSERT(mgmtId == getBrokerConnection()->getMgmtId());
ConnectionId shadowId = ConnectionId(memberId, connectionId);
QPID_LOG(debug, cluster << " catch-up connection " << *this
<< " becomes shadow " << shadowId);
@@ -442,13 +442,19 @@ void Connection::membership(const FieldT
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
updateIn.consumerNumbering.clear();
- self.second = 0; // Mark this as completed update connection.
+ closeUpdated();
}
void Connection::retractOffer() {
QPID_LOG(info, cluster << " incoming update retracted on connection " << *this);
cluster.updateInRetracted();
- self.second = 0; // Mark this as completed update connection.
+ closeUpdated();
+}
+
+void Connection::closeUpdated() {
+ self.second = 0; // Mark this as completed update connection.
+ if (connection.get())
+ connection->close(connection::CLOSE_CODE_NORMAL, "OK");
}
bool Connection::isLocal() const {
@@ -527,7 +533,10 @@ std::ostream& operator<<(std::ostream& o
if (c.isLocal()) type = "local";
else if (c.isShadow()) type = "shadow";
else if (c.isUpdated()) type = "updated";
- return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")";
+ const broker::Connection* bc = c.getBrokerConnection();
+ if (bc) o << bc->getMgmtId();
+ else o << "<disconnected>";
+ return o << "(" << c.getId() << " " << type << (c.isCatchUp() ? ",catchup":"") << ")";
}
void Connection::txStart() {
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=955370&r1=955369&r2=955370&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Wed Jun 16 20:32:04 2010
@@ -75,7 +75,8 @@ class Connection :
~Connection();
ConnectionId getId() const { return self; }
- broker::Connection& getBrokerConnection() { return *connection; }
+ broker::Connection* getBrokerConnection() { return connection.get(); }
+ const broker::Connection* getBrokerConnection() const { return connection.get(); }
/** Local connections may be clients or catch-up connections */
bool isLocal() const;
@@ -167,6 +168,7 @@ class Connection :
void announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid,
bool nodict, const std::string& username,
const std::string& initFrames);
+ void close();
void abort();
void deliverClose();
@@ -227,6 +229,7 @@ class Connection :
broker::SessionState& sessionState();
broker::SemanticState& semanticState();
broker::QueuedMessage getUpdateMessage();
+ void closeUpdated();
Cluster& cluster;
ConnectionId self;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=955370&r1=955369&r2=955370&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Wed Jun 16 20:32:04 2010
@@ -83,7 +83,7 @@ void OutputInterceptor::deliverDoOutput(
newLimit = (sendMax + sent) / 2;
}
sent = 0;
- while (sent < limit && parent.getBrokerConnection().doOutput())
+ while (sent < limit && parent.getBrokerConnection()->doOutput())
++sent;
if (sent == limit) sendDoOutput(newLimit);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=955370&r1=955369&r2=955370&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Wed Jun 16 20:32:04 2010
@@ -150,7 +150,8 @@ void UpdateClient::update() {
// longer on their original queue.
session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
session.sync();
- std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1));
+ std::for_each(connections.begin(), connections.end(),
+ boost::bind(&UpdateClient::updateConnection, this, _1));
session.queueDelete(arg::queue=UPDATE);
session.close();
@@ -167,15 +168,18 @@ void UpdateClient::update() {
client::ConnectionAccess::getImpl(connection)->expand(frame.encodedSize(), false);
client::ConnectionAccess::getImpl(connection)->handle(frame);
- connection.close();
- QPID_LOG(debug, updaterId << " update completed to " << updateeId
- << " at " << updateeUrl << ": " << membership);
+ // FIXME aconway 2010-06-16: Connection will be closed from the other end.
+ // connection.close();
+
// FIXME aconway 2010-03-15: This sleep avoids the race condition
// described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831.
// It allows the connection to fully close before destroying the
// Connection object. Remove when the bug is fixed.
//
- sys::usleep(10*1000); // 100ms
+ sys::usleep(10*1000);
+
+ QPID_LOG(debug, updaterId << " update completed to " << updateeId
+ << " at " << updateeUrl << ": " << membership);
}
namespace {
@@ -347,9 +351,11 @@ void UpdateClient::updateOutputTask(cons
void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
-
+ assert(updateConnection->getBrokerConnection());
+ broker::Connection& bc = *updateConnection->getBrokerConnection();
+
// Send the management ID first on the main connection.
- std::string mgmtId = updateConnection->getBrokerConnection().getMgmtId();
+ std::string mgmtId = updateConnection->getBrokerConnection()->getMgmtId();
ClusterConnectionProxy(session).shadowPrepare(mgmtId);
// Make sure its received before opening shadow connection
session.sync();
@@ -357,7 +363,6 @@ void UpdateClient::updateConnection(cons
// Open shadow connection and update it.
shadowConnection = catchUpConnection();
- broker::Connection& bc = updateConnection->getBrokerConnection();
connectionSettings.maxFrameSize = bc.getFrameMax();
shadowConnection.open(updateeUrl, connectionSettings);
bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
@@ -381,8 +386,7 @@ void UpdateClient::updateSession(broker:
broker::SessionState* ss = sh.getSession();
if (!ss) return; // no session.
- QPID_LOG(debug, updaterId << " updating session " << &sh.getConnection()
- << "[" << sh.getChannel() << "] = " << ss->getId());
+ QPID_LOG(debug, updaterId << " updating session " << ss->getId());
// Create a client session to update session state.
boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
Modified: qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp?rev=955370&r1=955369&r2=955370&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/failover_soak.cpp Wed Jun 16 20:32:04 2010
@@ -673,7 +673,7 @@ main ( int argc, char const ** argv )
// Get prefix for each queue name.
stringstream queue_prefix;
queue_prefix << "failover_soak_" << getpid();
-
+ string queue_prefix_str(queue_prefix.str());
// Run the declareQueues child.
int childStatus;
@@ -683,7 +683,7 @@ main ( int argc, char const ** argv )
declareQueuesPath,
verbosity,
durable,
- queue_prefix.str().c_str(),
+ queue_prefix_str.c_str(),
n_queues
);
if ( -1 == dqClientPid ) {
@@ -707,6 +707,7 @@ main ( int argc, char const ** argv )
stringstream queue_name;
queue_name << queue_prefix.str() << '_' << i;
+ string queue_name_str(queue_name.str());
// Receiving client ---------------------------
pid_t receivingClientPid =
@@ -715,7 +716,7 @@ main ( int argc, char const ** argv )
receiverPath,
reportFrequency,
verbosity,
- queue_name.str().c_str() );
+ queue_name_str.c_str() );
if ( -1 == receivingClientPid ) {
cerr << "END_OF_TEST ERROR_START_RECEIVER\n";
return CANT_FORK_RECEIVER;
@@ -731,7 +732,7 @@ main ( int argc, char const ** argv )
reportFrequency,
verbosity,
durable,
- queue_name.str().c_str() );
+ queue_name_str.c_str() );
if ( -1 == sendingClientPid ) {
cerr << "END_OF_TEST ERROR_START_SENDER\n";
return CANT_FORK_SENDER;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org