You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2011/08/15 18:47:57 UTC

svn commit: r1157907 - in /qpid/trunk/qpid/cpp: include/qmf/AgentSession.h include/qmf/ConsoleSession.h src/qmf/AgentSession.cpp src/qmf/ConsoleSession.cpp src/qmf/ConsoleSessionImpl.h src/qpid/agent/ManagementAgentImpl.cpp

Author: tross
Date: Mon Aug 15 16:47:56 2011
New Revision: 1157907

URL: http://svn.apache.org/viewvc?rev=1157907&view=rev
Log:
QPID-3423 - Timing and Performance Improvements in QMF Libraries

Modified:
    qpid/trunk/qpid/cpp/include/qmf/AgentSession.h
    qpid/trunk/qpid/cpp/include/qmf/ConsoleSession.h
    qpid/trunk/qpid/cpp/src/qmf/AgentSession.cpp
    qpid/trunk/qpid/cpp/src/qmf/ConsoleSession.cpp
    qpid/trunk/qpid/cpp/src/qmf/ConsoleSessionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp

Modified: qpid/trunk/qpid/cpp/include/qmf/AgentSession.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qmf/AgentSession.h?rev=1157907&r1=1157906&r2=1157907&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qmf/AgentSession.h (original)
+++ qpid/trunk/qpid/cpp/include/qmf/AgentSession.h Mon Aug 15 16:47:56 2011
@@ -71,6 +71,11 @@ namespace qmf {
          *                                    If False: Listen only on the routable direct address
          *    strict-security:{True,False}  - If True:  Cooperate with the broker to enforce strict access control to the network
          *                                  - If False: Operate more flexibly with regard to use of messaging facilities [default]
+         *    max-thread-wait-time:N     - Time (in seconds) the session thread will wait for messages from the network between
+         *                                 periodic background processing passes. [default: 5]
+         *                                 Must not be greater than 'interval'.  Larger numbers will cause fewer wake-ups but will
+         *                                 increase the time it takes to shut down the process.  This setting will not affect the
+         *                                 agent's response time for queries or method invocation.
          */
         QMF_EXTERN AgentSession(qpid::messaging::Connection& conn, const std::string& options="");
 

Modified: qpid/trunk/qpid/cpp/include/qmf/ConsoleSession.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qmf/ConsoleSession.h?rev=1157907&r1=1157906&r2=1157907&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qmf/ConsoleSession.h (original)
+++ qpid/trunk/qpid/cpp/include/qmf/ConsoleSession.h Mon Aug 15 16:47:56 2011
@@ -61,6 +61,10 @@ namespace qmf {
          *                                    If False: Listen only on the routable direct address
          *    strict-security:{True,False}  - If True:  Cooperate with the broker to enforce strict access control to the network
          *                                  - If False: Operate more flexibly with regard to use of messaging facilities [default]
+         *    max-thread-wait-time:N     - Time (in seconds) the session thread will wait for messages from the network between
+         *                                 periodic background processing passes.
+         *                                 Must not be greater than 60.  Larger numbers will cause fewer wake-ups but will
+         *                                 increase the time it takes to shut down the process. [default: 5]
          */
         QMF_EXTERN ConsoleSession(qpid::messaging::Connection& conn, const std::string& options="");
 

Modified: qpid/trunk/qpid/cpp/src/qmf/AgentSession.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/AgentSession.cpp?rev=1157907&r1=1157906&r2=1157907&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/AgentSession.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/AgentSession.cpp Mon Aug 15 16:47:56 2011
@@ -120,6 +120,7 @@ namespace qmf {
         bool publicEvents;
         bool listenOnDirect;
         bool strictSecurity;
+        uint32_t maxThreadWaitTime;
         uint64_t schemaUpdateTime;
         string directBase;
         string topicBase;
@@ -185,7 +186,7 @@ AgentSessionImpl::AgentSessionImpl(Conne
     bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false),
     externalStorage(false), autoAllowQueries(true), autoAllowMethods(true),
     maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true),
-    listenOnDirect(true), strictSecurity(false),
+    listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
     schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())))
 {
     //
@@ -246,7 +247,14 @@ AgentSessionImpl::AgentSessionImpl(Conne
         iter = optMap.find("strict-security");
         if (iter != optMap.end())
             strictSecurity = iter->second.asBool();
+
+        iter = optMap.find("max-thread-wait-time");
+        if (iter != optMap.end())
+            maxThreadWaitTime = iter->second.asUint32();
     }
+
+    if (maxThreadWaitTime > interval)
+        maxThreadWaitTime = interval;
 }
 
 
@@ -254,6 +262,11 @@ AgentSessionImpl::~AgentSessionImpl()
 {
     if (opened)
         close();
+
+    if (thread) {
+        thread->join();
+        delete thread;
+    }
 }
 
 
@@ -262,6 +275,12 @@ void AgentSessionImpl::open()
     if (opened)
         throw QmfException("The session is already open");
 
+    // If the thread exists, join and delete it before creating a new one.
+    if (thread) {
+        thread->join();
+        delete thread;
+    }
+
     const string addrArgs(";{create:never,node:{type:topic}}");
     const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str());
     attributes["_direct_subject"] = routableAddr;
@@ -304,13 +323,8 @@ void AgentSessionImpl::close()
     if (!opened)
         return;
 
-    // Stop and join the receiver thread
+    // Stop the receiver thread.  Don't join it until the destructor is called or open() is called.
     threadCanceled = true;
-    thread->join();
-    delete thread;
-
-    // Close the AMQP session
-    session.close();
     opened = false;
 }
 
@@ -320,9 +334,13 @@ bool AgentSessionImpl::nextEvent(AgentEv
     uint64_t milliseconds = timeout.getMilliseconds();
     qpid::sys::Mutex::ScopedLock l(lock);
 
-    if (eventQueue.empty() && milliseconds > 0)
-        cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(),
-                                           qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
+    if (eventQueue.empty() && milliseconds > 0) {
+        int64_t nsecs(qpid::sys::TIME_INFINITE);
+        if ((uint64_t)(nsecs / 1000000) > milliseconds)
+            nsecs = (int64_t) milliseconds * 1000000;
+        qpid::sys::Duration then(nsecs);
+        cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
+    }
 
     if (!eventQueue.empty()) {
         event = eventQueue.front();
@@ -1050,7 +1068,7 @@ void AgentSessionImpl::run()
             periodicProcessing((uint64_t) qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()) / qpid::sys::TIME_SEC);
 
             Receiver rx;
-            bool valid = session.nextReceiver(rx, Duration::SECOND);
+            bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
             if (threadCanceled)
                 break;
             if (valid) {
@@ -1067,6 +1085,7 @@ void AgentSessionImpl::run()
         enqueueEvent(AgentEvent(new AgentEventImpl(AGENT_THREAD_FAILED)));
     }
 
+    session.close();
     QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qmf/ConsoleSession.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ConsoleSession.cpp?rev=1157907&r1=1157906&r2=1157907&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ConsoleSession.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qmf/ConsoleSession.cpp Mon Aug 15 16:47:56 2011
@@ -66,7 +66,7 @@ Subscription ConsoleSession::subscribe(c
 //========================================================================================
 
 ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
-    connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false),
+    connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
     opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
     connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1)
 {
@@ -92,7 +92,14 @@ ConsoleSessionImpl::ConsoleSessionImpl(C
         iter = optMap.find("strict-security");
         if (iter != optMap.end())
             strictSecurity = iter->second.asBool();
+
+        iter = optMap.find("max-thread-wait-time");
+        if (iter != optMap.end())
+            maxThreadWaitTime = iter->second.asUint32();
     }
+
+    if (maxThreadWaitTime > 60)
+        maxThreadWaitTime = 60;
 }
 
 
@@ -100,6 +107,11 @@ ConsoleSessionImpl::~ConsoleSessionImpl(
 {
     if (opened)
         close();
+
+    if (thread) {
+        thread->join();
+        delete thread;
+    }
 }
 
 
@@ -154,6 +166,12 @@ void ConsoleSessionImpl::open()
     if (opened)
         throw QmfException("The session is already open");
 
+    // If the thread exists, join and delete it before creating a new one.
+    if (thread) {
+        thread->join();
+        delete thread;
+    }
+
     // Establish messaging addresses
     directBase = "qmf." + domain + ".direct";
     topicBase = "qmf." + domain + ".topic";
@@ -182,14 +200,13 @@ void ConsoleSessionImpl::open()
 
     // Start the receiver thread
     threadCanceled = false;
+    opened = true;
     thread = new qpid::sys::Thread(*this);
 
     // Send an agent_locate to direct address 'broker' to identify the connected-broker-agent.
     sendBrokerLocate();
     if (agentQuery)
         sendAgentLocate();
-
-    opened = true;
 }
 
 
@@ -198,13 +215,8 @@ void ConsoleSessionImpl::close()
     if (!opened)
         throw QmfException("The session is already closed");
 
-    // Stop and join the receiver thread
+    // Stop the receiver thread.  Don't join it until the destructor is called or open() is called.
     threadCanceled = true;
-    thread->join();
-    delete thread;
-
-    // Close the AMQP session
-    session.close();
     opened = false;
 }
 
@@ -214,9 +226,13 @@ bool ConsoleSessionImpl::nextEvent(Conso
     uint64_t milliseconds = timeout.getMilliseconds();
     qpid::sys::Mutex::ScopedLock l(lock);
 
-    if (eventQueue.empty() && milliseconds > 0)
-        cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(),
-                                           qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
+    if (eventQueue.empty() && milliseconds > 0) {
+        int64_t nsecs(qpid::sys::TIME_INFINITE);
+        if ((uint64_t)(nsecs / 1000000) > milliseconds)
+            nsecs = (int64_t) milliseconds * 1000000;
+        qpid::sys::Duration then(nsecs);
+        cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
+    }
 
     if (!eventQueue.empty()) {
         event = eventQueue.front();
@@ -596,7 +612,7 @@ void ConsoleSessionImpl::run()
                                qpid::sys::TIME_SEC);
 
             Receiver rx;
-            bool valid = session.nextReceiver(rx, Duration::SECOND);
+            bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
             if (threadCanceled)
                 break;
             if (valid) {
@@ -613,6 +629,7 @@ void ConsoleSessionImpl::run()
         enqueueEvent(ConsoleEvent(new ConsoleEventImpl(CONSOLE_THREAD_FAILED)));
     }
 
+    session.close();
     QPID_LOG(debug, "ConsoleSession thread exiting");
 }
 

Modified: qpid/trunk/qpid/cpp/src/qmf/ConsoleSessionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qmf/ConsoleSessionImpl.h?rev=1157907&r1=1157906&r2=1157907&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qmf/ConsoleSessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qmf/ConsoleSessionImpl.h Mon Aug 15 16:47:56 2011
@@ -76,6 +76,7 @@ namespace qmf {
         uint32_t maxAgentAgeMinutes;
         bool listenOnDirect;
         bool strictSecurity;
+        uint32_t maxThreadWaitTime;
         Query agentQuery;
         bool opened;
         std::queue<ConsoleEvent> eventQueue;

Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=1157907&r1=1157906&r2=1157907&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Mon Aug 15 16:47:56 2011
@@ -1378,13 +1378,26 @@ bool ManagementAgentImpl::ConnectionThre
 
 void ManagementAgentImpl::PublishThread::run()
 {
-    uint16_t    totalSleep;
+    uint16_t totalSleep;
+    uint16_t sleepTime;
 
     while (!shutdown) {
         agent.periodicProcessing();
         totalSleep = 0;
-        while (totalSleep++ < agent.getInterval() && !shutdown) {
-            ::sleep(1);
+
+        //
+        // Calculate a sleep time that is no greater than 5 seconds and
+        // no less than 1 second.
+        //
+        sleepTime = agent.getInterval();
+        if (sleepTime > 5)
+            sleepTime = 5;
+        else if (sleepTime == 0)
+            sleepTime = 1;
+
+        while (totalSleep < agent.getInterval() && !shutdown) {
+            ::sleep(sleepTime);
+            totalSleep += sleepTime;
         }
     }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org