You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/10/05 23:45:46 UTC
svn commit: r1179456 - in
/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster: ./ exp/
Author: aconway
Date: Wed Oct 5 21:45:45 2011
New Revision: 1179456
URL: http://svn.apache.org/viewvc?rev=1179456&view=rev
Log:
QPID-2920: Configurable control over concurrency
- new config option controls number of CPG groups used.
- minor log message improvements
- minor code clean-up
Modified:
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/Cpg.cpp
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.cpp
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.h
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/Cpg.cpp Wed Oct 5 21:45:45 2011
@@ -126,7 +126,7 @@ Cpg::Cpg(Handler& h) : IOHandle(new sys:
callbacks.cpg_deliver_fn = &globalDeliver;
callbacks.cpg_confchg_fn = &globalConfigChange;
- QPID_LOG(notice, "Initializing CPG");
+ QPID_LOG(debug, "Initializing CPG");
cpg_error_t err = cpg_initialize(&handle, &callbacks);
int retries = 6; // FIXME aconway 2009-08-06: make this configurable.
while (err == CPG_ERR_TRY_AGAIN && --retries) {
Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Cluster2Plugin.cpp Wed Oct 5 21:45:45 2011
@@ -35,8 +35,10 @@ struct Cluster2Plugin : public Plugin {
Opts(Settings& s) : Options("Cluster Options"), settings(s) {
addOptions()
("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join")
- ("consume-lock", optValue(settings.consumeLockMicros, "uS"), "Maximum time a broker can hold the consume lock on a shared queue, in microseconds.");
- // TODO aconway 2010-10-19: copy across other options from ClusterPlugin.h
+ ("cluster2-consume-lock", optValue(settings.consumeLockMicros, "uS"), "Maximum time a broker can hold the consume lock on a shared queue, in microseconds.")
+ ("cluster2-concurrency", optValue(settings.concurrency, "N"), "Number concurrent streams of processing for multicast/deliver.");
+ // FIXME aconway 2011-10-05: add all relevant options from ClusterPlugin.h.
+ // FIXME aconway 2011-10-05: rename to final option names.
}
};
Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp Wed Oct 5 21:45:45 2011
@@ -39,35 +39,36 @@ namespace cluster {
Core::Core(const Settings& s, broker::Broker& b) : broker(b), settings(s)
{
- // FIXME aconway 2011-09-26: this has to be consistent in a
+ // FIXME aconway 2011-09-26: S.concurrency has to be consistent in a
// cluster, negotiate as part of join protocol.
- size_t nGroups = broker.getOptions().workerThreads;
+ uint32_t nGroups = s.concurrency ? s.concurrency : 1;
for (size_t i = 0; i < nGroups; ++i) {
- // FIXME aconway 2011-09-26: review naming. Create group for non-message traffic, e.g. initial join protocol.
+ // FIXME aconway 2011-09-26: review naming. Create group for non-message traffic, e.g. initial join protocol?
std::string groupName = s.name + "-" + boost::lexical_cast<std::string>(i);
groups.push_back(new Group(*this));
boost::intrusive_ptr<Group> group(groups.back());
- // FIXME aconway 2011-10-03: clean up, all Handler ctors take Group.
- boost::intrusive_ptr<QueueHandler> queueHandler(
- new QueueHandler(group->getEventHandler(), group->getMulticaster(), settings));
- group->getEventHandler().add(queueHandler);
- group->getEventHandler().add(
- boost::intrusive_ptr<HandlerBase>(
- new WiringHandler(group->getEventHandler(), queueHandler, broker)));
- group->getEventHandler().add(
- boost::intrusive_ptr<HandlerBase>(new MessageHandler(*group, *this)));
+
+ EventHandler& eh(group->getEventHandler());
+ typedef boost::intrusive_ptr<HandlerBase> HandlerBasePtr;
+ boost::intrusive_ptr<QueueHandler> queueHandler(new QueueHandler(*group, settings));
+ eh.add(queueHandler);
+ eh.add(HandlerBasePtr(new WiringHandler(*group, queueHandler, broker)));
+ eh.add(HandlerBasePtr(new MessageHandler(*group, *this)));
std::auto_ptr<BrokerContext> bh(new BrokerContext(*this));
brokerHandler = bh.get();
// BrokerContext belongs to Broker
broker.setCluster(std::auto_ptr<broker::Cluster>(bh));
// FIXME aconway 2011-09-26: multi-group
- group->getEventHandler().start();
- group->getEventHandler().getCpg().join(groupName);
- // TODO aconway 2010-11-18: logging standards
- // FIXME aconway 2011-09-26: multi-group
- QPID_LOG(notice, "cluster: joined " << groupName << ", member-id="<< group->getEventHandler().getSelf());
+ eh.start();
+ eh.getCpg().join(groupName);
+ // TODO aconway 2010-11-18: logging standards // FIXME aconway 2011-09-26: multi-group
+ QPID_LOG(debug, "cluster: joined CPG group " << groupName << ", member-id=" << eh.getSelf());
}
+ QPID_LOG(notice, "cluster: joined cluster " << s.name
+ << ", member-id="<< groups[0]->getEventHandler().getSelf());
+ QPID_LOG(debug, "cluster: consume-lock=" << s.consumeLockMicros << "us "
+ << " concurrency=" << s.concurrency);
}
void Core::initialize() {}
Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.h?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.h Wed Oct 5 21:45:45 2011
@@ -34,6 +34,7 @@ namespace qpid {
namespace framing {
class AMQBody;
+class AMQFrame;
}
namespace cluster {
Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHolder.cpp Wed Oct 5 21:45:45 2011
@@ -41,7 +41,7 @@ bool MessageHolder::check(const framing:
assert(i != messages.end());
msgOut = i->second.first;
queueOut = i->second.second;
- messages.erase(frame.getChannel()); // re-use the frame.
+ messages.erase(frame.getChannel()); // re-use the channel.
return true;
}
return false;
Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.cpp Wed Oct 5 21:45:45 2011
@@ -19,22 +19,25 @@
*
*/
-#include "QueueHandler.h"
#include "EventHandler.h"
-#include "QueueReplica.h"
+#include "Group.h"
#include "QueueContext.h"
+#include "QueueHandler.h"
+#include "QueueReplica.h"
+#include "qpid/Exception.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueuedMessage.h"
#include "qpid/framing/AllInvoker.h"
-#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
namespace qpid {
namespace cluster {
-// FIXME aconway 2011-05-11: make Multicaster+EventHandler available as Group, clean this up?
-QueueHandler::QueueHandler(EventHandler& eh, Multicaster& m, const Settings& s)
- : HandlerBase(eh), multicaster(m), consumeLock(s.getConsumeLock()) {}
+QueueHandler::QueueHandler(Group& g, const Settings& s)
+ : HandlerBase(g.getEventHandler()),
+ multicaster(g.getMulticaster()),
+ consumeLock(s.getConsumeLock())
+{}
bool QueueHandler::handle(const framing::AMQFrame& frame) {
return framing::invoke(*this, *frame.getBody()).wasHandled();
Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueHandler.h Wed Oct 5 21:45:45 2011
@@ -42,6 +42,7 @@ namespace cluster {
class EventHandler;
class QueueReplica;
class Multicaster;
+class Group;
/**
* Handler for queue subscription events.
@@ -54,7 +55,7 @@ class QueueHandler : public framing::AMQ
public HandlerBase
{
public:
- QueueHandler(EventHandler&, Multicaster&, const Settings&);
+ QueueHandler(Group&, const Settings&);
bool handle(const framing::AMQFrame& body);
Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.cpp?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.cpp Wed Oct 5 21:45:45 2011
@@ -20,12 +20,14 @@
*/
#include "Settings.h"
+#include "qpid/sys/SystemInfo.h"
namespace qpid {
namespace cluster {
Settings::Settings() : // Default settings
- consumeLockMicros(10000)
+ consumeLockMicros(10000),
+ concurrency(sys::SystemInfo::concurrency() + 1)
{}
}} // namespace qpid::cluster
Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.h?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Settings.h Wed Oct 5 21:45:45 2011
@@ -35,6 +35,8 @@ struct Settings {
Settings();
std::string name;
uint32_t consumeLockMicros;
+ uint32_t concurrency;
+
sys::Duration getConsumeLock() const { return consumeLockMicros * sys::TIME_USEC; }
};
Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp Wed Oct 5 21:45:45 2011
@@ -20,6 +20,7 @@
*/
#include "Core.h"
+#include "Group.h"
#include "WiringHandler.h"
#include "EventHandler.h"
#include "QueueHandler.h"
@@ -40,10 +41,10 @@ namespace cluster {
using namespace broker;
using framing::FieldTable;
-WiringHandler::WiringHandler(EventHandler& e,
+WiringHandler::WiringHandler(Group& g,
const boost::intrusive_ptr<QueueHandler>& qh,
broker::Broker& b) :
- HandlerBase(e),
+ HandlerBase(g.getEventHandler()),
broker(b),
recovery(broker.getQueues(), broker.getExchanges(),
broker.getLinks(), broker.getDtxManager()),
Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h?rev=1179456&r1=1179455&r2=1179456&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h Wed Oct 5 21:45:45 2011
@@ -41,7 +41,7 @@ class Broker;
}
namespace cluster {
-class EventHandler;
+class Group;
class QueueHandler;
/**
@@ -51,7 +51,7 @@ class WiringHandler : public framing::AM
public HandlerBase
{
public:
- WiringHandler(EventHandler&, const boost::intrusive_ptr<QueueHandler>& qh, broker::Broker&);
+ WiringHandler(Group&, const boost::intrusive_ptr<QueueHandler>& qh, broker::Broker&);
bool handle(const framing::AMQFrame&);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org