You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/10/05 23:45:46 UTC

svn commit: r1179456 - in /qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster: ./ exp/

Author: aconway
Date: Wed Oct  5 21:45:45 2011
New Revision: 1179456

URL: http://svn.apache.org/viewvc?rev=1179456&view=rev
Log:
QPID-2920: Configurable control over concurrency

- new config option controls number of CPG groups used.
- minor log message improvements
- minor code clean-up

Modified:
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/Cpg.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/Cpg.cpp Wed Oct  5 21:45:45 2011
@@ -126,7 +126,7 @@ Cpg::Cpg(Handler& h) : IOHandle(new sys:
     callbacks.cpg_deliver_fn = &globalDeliver;
     callbacks.cpg_confchg_fn = &globalConfigChange;
 
-    QPID_LOG(notice, "Initializing CPG");
+    QPID_LOG(debug, "Initializing CPG");
     cpg_error_t err = cpg_initialize(&handle, &callbacks);
     int retries = 6; // FIXME aconway 2009-08-06: make this configurable.
     while (err == CPG_ERR_TRY_AGAIN && --retries) {

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp Wed Oct  5 21:45:45 2011
@@ -35,8 +35,10 @@ struct Cluster2Plugin : public Plugin {
         Opts(Settings& s) : Options("Cluster Options"), settings(s) {
             addOptions()
                 ("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join")
-                ("consume-lock", optValue(settings.consumeLockMicros, "uS"), "Maximum time a broker can hold the consume lock on a shared queue, in microseconds.");
-                // TODO aconway 2010-10-19: copy across other options from ClusterPlugin.h
+                ("cluster2-consume-lock", optValue(settings.consumeLockMicros, "uS"), "Maximum time a broker can hold the consume lock on a shared queue, in microseconds.")
+                ("cluster2-concurrency", optValue(settings.concurrency, "N"), "Number concurrent streams of processing for multicast/deliver.");
+                // FIXME aconway 2011-10-05: add all relevant options from ClusterPlugin.h.
+                // FIXME aconway 2011-10-05: rename to final option names.
         }
     };
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp Wed Oct  5 21:45:45 2011
@@ -39,35 +39,36 @@ namespace cluster {
 
 Core::Core(const Settings& s, broker::Broker& b) : broker(b), settings(s)
 {
-    // FIXME aconway 2011-09-26: this has to be consistent in a
+    // FIXME aconway 2011-09-26: S.concurrency has to be consistent in a
     // cluster, negotiate as part of join protocol.
-    size_t nGroups = broker.getOptions().workerThreads;
+    uint32_t nGroups = s.concurrency ? s.concurrency : 1;
     for (size_t i = 0; i < nGroups; ++i) {
-        // FIXME aconway 2011-09-26: review naming. Create group for non-message traffic, e.g. initial join protocol.
+        // FIXME aconway 2011-09-26: review naming. Create group for non-message traffic, e.g. initial join protocol?
         std::string groupName = s.name + "-" + boost::lexical_cast<std::string>(i);
         groups.push_back(new Group(*this));
         boost::intrusive_ptr<Group> group(groups.back());
-        // FIXME aconway 2011-10-03:  clean up, all Handler ctors take Group.
-        boost::intrusive_ptr<QueueHandler> queueHandler(
-            new QueueHandler(group->getEventHandler(), group->getMulticaster(), settings));
-        group->getEventHandler().add(queueHandler);
-        group->getEventHandler().add(
-            boost::intrusive_ptr<HandlerBase>(
-                new WiringHandler(group->getEventHandler(), queueHandler, broker)));
-        group->getEventHandler().add(
-            boost::intrusive_ptr<HandlerBase>(new MessageHandler(*group, *this)));
+ 
+        EventHandler& eh(group->getEventHandler());
+        typedef boost::intrusive_ptr<HandlerBase>  HandlerBasePtr;
+        boost::intrusive_ptr<QueueHandler> queueHandler(new QueueHandler(*group, settings));
+        eh.add(queueHandler);
+        eh.add(HandlerBasePtr(new WiringHandler(*group, queueHandler, broker)));
+        eh.add(HandlerBasePtr(new MessageHandler(*group, *this)));
 
         std::auto_ptr<BrokerContext> bh(new BrokerContext(*this));
         brokerHandler = bh.get();
         // BrokerContext belongs to Broker
         broker.setCluster(std::auto_ptr<broker::Cluster>(bh));
         // FIXME aconway 2011-09-26: multi-group
-        group->getEventHandler().start();
-        group->getEventHandler().getCpg().join(groupName);
-        // TODO aconway 2010-11-18: logging standards
-        // FIXME aconway 2011-09-26: multi-group
-        QPID_LOG(notice, "cluster: joined " << groupName << ", member-id="<< group->getEventHandler().getSelf());
+        eh.start();
+        eh.getCpg().join(groupName);
+        // TODO aconway 2010-11-18: logging standards        // FIXME aconway 2011-09-26: multi-group
+        QPID_LOG(debug, "cluster: joined CPG group " << groupName << ", member-id=" << eh.getSelf());
     }
+    QPID_LOG(notice, "cluster: joined cluster " << s.name
+             << ", member-id="<< groups[0]->getEventHandler().getSelf());
+    QPID_LOG(debug, "cluster: consume-lock=" << s.consumeLockMicros << "us "
+             << " concurrency=" << s.concurrency);
 }
 
 void Core::initialize() {}

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.h?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.h Wed Oct  5 21:45:45 2011
@@ -34,6 +34,7 @@ namespace qpid {
 
 namespace framing {
 class AMQBody;
+class AMQFrame;
 }
 
 namespace cluster {

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp Wed Oct  5 21:45:45 2011
@@ -41,7 +41,7 @@ bool MessageHolder::check(const framing:
         assert(i != messages.end());
         msgOut = i->second.first;
         queueOut = i->second.second;
-        messages.erase(frame.getChannel()); // re-use the frame.
+        messages.erase(frame.getChannel()); // re-use the channel.
         return true;
     }
     return false;

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp Wed Oct  5 21:45:45 2011
@@ -19,22 +19,25 @@
  *
  */
 
-#include "QueueHandler.h"
 #include "EventHandler.h"
-#include "QueueReplica.h"
+#include "Group.h"
 #include "QueueContext.h"
+#include "QueueHandler.h"
+#include "QueueReplica.h"
+#include "qpid/Exception.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueuedMessage.h"
 #include "qpid/framing/AllInvoker.h"
-#include "qpid/Exception.h"
 #include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace cluster {
 
-// FIXME aconway 2011-05-11: make Multicaster+EventHandler available as Group, clean this up?
-QueueHandler::QueueHandler(EventHandler& eh, Multicaster& m, const Settings& s)
-    : HandlerBase(eh), multicaster(m),  consumeLock(s.getConsumeLock()) {}
+QueueHandler::QueueHandler(Group& g, const Settings& s)
+    : HandlerBase(g.getEventHandler()),
+      multicaster(g.getMulticaster()),
+      consumeLock(s.getConsumeLock())
+{}
 
 bool QueueHandler::handle(const framing::AMQFrame& frame) {
     return framing::invoke(*this, *frame.getBody()).wasHandled();

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h Wed Oct  5 21:45:45 2011
@@ -42,6 +42,7 @@ namespace cluster {
 class EventHandler;
 class QueueReplica;
 class Multicaster;
+class Group;
 
 /**
  * Handler for queue subscription events.
@@ -54,7 +55,7 @@ class QueueHandler : public framing::AMQ
                      public HandlerBase
 {
   public:
-    QueueHandler(EventHandler&, Multicaster&, const Settings&);
+    QueueHandler(Group&, const Settings&);
 
     bool handle(const framing::AMQFrame& body);
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.cpp?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.cpp Wed Oct  5 21:45:45 2011
@@ -20,12 +20,14 @@
  */
 
 #include "Settings.h"
+#include "qpid/sys/SystemInfo.h"
 
 namespace qpid {
 namespace cluster {
 
 Settings::Settings() :    // Default settings
-    consumeLockMicros(10000)
+    consumeLockMicros(10000),
+    concurrency(sys::SystemInfo::concurrency() + 1)
 {}
 
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.h?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.h Wed Oct  5 21:45:45 2011
@@ -35,6 +35,8 @@ struct Settings {
     Settings();
     std::string name;
     uint32_t consumeLockMicros;
+    uint32_t concurrency;
+
     sys::Duration getConsumeLock() const { return consumeLockMicros * sys::TIME_USEC; }
 };
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp Wed Oct  5 21:45:45 2011
@@ -20,6 +20,7 @@
  */
 
 #include "Core.h"
+#include "Group.h"
 #include "WiringHandler.h"
 #include "EventHandler.h"
 #include "QueueHandler.h"
@@ -40,10 +41,10 @@ namespace cluster {
 using namespace broker;
 using framing::FieldTable;
 
-WiringHandler::WiringHandler(EventHandler& e,
+WiringHandler::WiringHandler(Group& g,
                              const boost::intrusive_ptr<QueueHandler>& qh,
                              broker::Broker& b) :
-    HandlerBase(e),
+    HandlerBase(g.getEventHandler()),
     broker(b),
     recovery(broker.getQueues(), broker.getExchanges(),
              broker.getLinks(), broker.getDtxManager()),

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h Wed Oct  5 21:45:45 2011
@@ -41,7 +41,7 @@ class Broker;
 }
 
 namespace cluster {
-class EventHandler;
+class Group;
 class QueueHandler;
 
 /**
@@ -51,7 +51,7 @@ class WiringHandler : public framing::AM
                       public HandlerBase
 {
   public:
-    WiringHandler(EventHandler&, const boost::intrusive_ptr<QueueHandler>& qh, broker::Broker&);
+    WiringHandler(Group&, const boost::intrusive_ptr<QueueHandler>& qh, broker::Broker&);
 
     bool handle(const framing::AMQFrame&);
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org