You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2009/06/08 22:23:19 UTC
svn commit: r782766 - in /qpid/trunk/qpid/cpp: examples/qmf-console/ping.cpp
src/qpid/console/Broker.cpp src/qpid/console/Broker.h
src/qpid/console/SessionManager.cpp src/qpid/console/SessionManager.h
Author: tross
Date: Mon Jun 8 20:23:19 2009
New Revision: 782766
URL: http://svn.apache.org/viewvc?rev=782766&view=rev
Log:
Bugfixes in the c++ console API:
- Connection threads now shut down cleanly
- get-query timeouts now work properly
- waitForStable now only waits for connected brokers
The ping example was improved. It now more cleanly handles connection loss/reconnect.
Modified:
qpid/trunk/qpid/cpp/examples/qmf-console/ping.cpp
qpid/trunk/qpid/cpp/src/qpid/console/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/console/Broker.h
qpid/trunk/qpid/cpp/src/qpid/console/SessionManager.cpp
qpid/trunk/qpid/cpp/src/qpid/console/SessionManager.h
Modified: qpid/trunk/qpid/cpp/examples/qmf-console/ping.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/examples/qmf-console/ping.cpp?rev=782766&r1=782765&r2=782766&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/examples/qmf-console/ping.cpp (original)
+++ qpid/trunk/qpid/cpp/examples/qmf-console/ping.cpp Mon Jun 8 20:23:19 2009
@@ -43,6 +43,7 @@
//
SessionManager::Settings smSettings;
smSettings.methodTimeout = 2;
+ smSettings.getTimeout = 2;
//
// Declare the console session manager. With a null listener argument, it defaults to
@@ -58,18 +59,21 @@
uint32_t count = 5; // The number of echo requests we will send to the broker.
Object::Vector list; // A container for holding objects retrieved from the broker.
- //
- // Query for a list of 'broker' objects from the Management Database
- //
- sm.getObjects(list, "broker");
-
- //
- // We expect one object (since we are connected to only one broker)
- //
- if (list.size() == 1) {
- Object& brokerObject = *(list.begin());
+ for (uint32_t iter = 0; iter < count; iter++) {
+ cout << "Ping Broker: " << broker->getUrl() << "... ";
+ cout.flush();
+
+ //
+ // Query for a list of 'broker' objects from the Management Database
+ //
+ sm.getObjects(list, "broker");
+
+ //
+ // We expect one object (since we are connected to only one broker)
+ //
+ if (list.size() == 1) {
+ Object& brokerObject = *(list.begin());
- for (uint32_t iter = 0; iter < count; iter++) {
//
// Declare a container for arguments to be sent with the "echo" method
// that we will invoke on the remote "broker" object.
@@ -87,9 +91,6 @@
args.addUint("sequence", iter);
args.addString("body", "ABCDEFGHIJKLMNOPQRSTUVWXYZ");
- cout << "Ping Broker: " << broker->getUrl() << "... ";
- cout.flush();
-
//
// Invoke the method. This is a synchronous operation that will block until
// the method completes and returns a result.
@@ -109,6 +110,9 @@
if (result.code == 0 && iter < count - 1)
qpid::sys::sleep(1);
+ } else {
+ cout << "Disconnected..." << endl;
+ qpid::sys::sleep(1);
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/console/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/console/Broker.cpp?rev=782766&r1=782765&r2=782766&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/console/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/console/Broker.cpp Mon Jun 8 20:23:19 2009
@@ -57,6 +57,8 @@
Broker::~Broker()
{
+ connThreadBody.shutdown();
+ connThread.join();
}
string Broker::getUrl() const
@@ -184,6 +186,8 @@
subscriptions->setFlowControl(dest, FlowControl::unlimited());
{
Mutex::ScopedLock _lock(connLock);
+ if (shuttingDown)
+ return;
operational = true;
broker.resetAgents();
broker.connected = true;
@@ -199,16 +203,26 @@
broker.sessionManager.handleBrokerDisconnect(&broker);
}
delay = delayMin;
+ connection.close();
delete subscriptions;
subscriptions = 0;
- session.close();
} catch (std::exception &e) {
QPID_LOG(debug, " outer exception: " << e.what());
if (delay < delayMax)
delay *= delayFactor;
}
- ::sleep(delay);
+ {
+ Mutex::ScopedLock _lock(connLock);
+ if (shuttingDown)
+ return;
+ {
+ Mutex::ScopedUnlock _unlock(connLock);
+ ::sleep(delay);
+ }
+ if (shuttingDown)
+ return;
+ }
}
}
@@ -253,6 +267,16 @@
arg::bindingKey=key);
}
+void Broker::ConnectionThread::shutdown()
+{
+ {
+ Mutex::ScopedLock _lock(connLock);
+ shuttingDown = true;
+ }
+ if (subscriptions)
+ subscriptions->stop();
+}
+
void Broker::waitForStable()
{
Mutex::ScopedLock l(lock);
Modified: qpid/trunk/qpid/cpp/src/qpid/console/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/console/Broker.h?rev=782766&r1=782765&r2=782766&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/console/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/console/Broker.h Mon Jun 8 20:23:19 2009
@@ -73,7 +73,6 @@
SessionManager& sessionManager;
AgentMap agents;
- client::SubscriptionManager* subscription;
bool connected;
std::string error;
std::string amqpSessionId;
@@ -88,25 +87,27 @@
friend class ConnectionThread;
class ConnectionThread : public sys::Runnable {
- bool operational;
- Broker& broker;
- framing::Uuid sessionId;
- client::Connection connection;
- client::Session session;
- client::SubscriptionManager* subscriptions;
- std::stringstream queueName;
- sys::Mutex connLock;
- void run();
- public:
+ bool operational;
+ bool shuttingDown;
+ Broker& broker;
+ framing::Uuid sessionId;
+ client::Connection connection;
+ client::Session session;
+ client::SubscriptionManager* subscriptions;
+ std::stringstream queueName;
+ sys::Mutex connLock;
+ void run();
+ public:
ConnectionThread(Broker& _broker) :
- operational(false), broker(_broker), subscriptions(0) {}
- ~ConnectionThread();
- void sendBuffer(qpid::framing::Buffer& buf,
- uint32_t length,
- const std::string& exchange = "qpid.management",
- const std::string& routingKey = "broker");
- void bindExchange(const std::string& exchange, const std::string& key);
- };
+ operational(false), shuttingDown(false), broker(_broker), subscriptions(0) {}
+ ~ConnectionThread();
+ void sendBuffer(qpid::framing::Buffer& buf,
+ uint32_t length,
+ const std::string& exchange = "qpid.management",
+ const std::string& routingKey = "broker");
+ void bindExchange(const std::string& exchange, const std::string& key);
+ void shutdown();
+ };
ConnectionThread connThreadBody;
sys::Thread connThread;
Modified: qpid/trunk/qpid/cpp/src/qpid/console/SessionManager.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/console/SessionManager.cpp?rev=782766&r1=782765&r2=782766&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/console/SessionManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/console/SessionManager.cpp Mon Jun 8 20:23:19 2009
@@ -41,6 +41,13 @@
bindingKeys();
}
+SessionManager::~SessionManager()
+{
+ for (vector<Broker*>::iterator iter = brokers.begin();
+ iter != brokers.end(); iter++)
+ delete *iter;
+}
+
Broker* SessionManager::addBroker(client::ConnectionSettings& settings)
{
Broker* broker(new Broker(*this, settings));
@@ -58,6 +65,7 @@
iter != brokers.end(); iter++)
if (*iter == broker) {
brokers.erase(iter);
+ delete broker;
return;
}
}
@@ -171,6 +179,11 @@
syncSequenceList.clear();
error = string();
+ if (agentList.empty()) {
+ objects = getResult;
+ return;
+ }
+
for (Agent::Vector::iterator iter = agentList.begin(); iter != agentList.end(); iter++) {
Agent* agent = *iter;
char rawbuffer[512];
@@ -191,8 +204,12 @@
{
Mutex::ScopedLock _lock(lock);
+ sys::AbsTime startTime = sys::now();
while (!syncSequenceList.empty() && error.empty()) {
cv.wait(lock, AbsTime(now(), settings.getTimeout * TIME_SEC));
+ sys::AbsTime currTime = sys::now();
+ if (sys::Duration(startTime, currTime) > settings.getTimeout * TIME_SEC)
+ break;
}
}
@@ -221,7 +238,8 @@
Mutex::ScopedLock l(brokerListLock);
for (vector<Broker*>::iterator iter = brokers.begin();
iter != brokers.end(); iter++)
- (*iter)->waitForStable();
+ if ((*iter)->isConnected())
+ (*iter)->waitForStable();
}
void SessionManager::startProtocol(Broker* broker)
Modified: qpid/trunk/qpid/cpp/src/qpid/console/SessionManager.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/console/SessionManager.h?rev=782766&r1=782765&r2=782766&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/console/SessionManager.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/console/SessionManager.h Mon Jun 8 20:23:19 2009
@@ -52,7 +52,7 @@
public:
typedef std::vector<std::string> NameVector;
typedef std::vector<ClassKey> KeyVector;
- ~SessionManager() {}
+ ~SessionManager();
struct Settings {
bool rcvObjects;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org