You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/09/27 15:11:30 UTC

svn commit: r1176373 - in /qpid/branches/qpid-2920-active/qpid/cpp: src/qpid/broker/ src/qpid/cluster/exp/ src/tests/ xml/

Author: aconway
Date: Tue Sep 27 13:11:30 2011
New Revision: 1176373

URL: http://svn.apache.org/viewvc?rev=1176373&view=rev
Log:
QPID-2920: Use multiple independent CPG groups.

Use a fixed set of CPG groups, hash queue names to choose the group
for a given operation. Operations on a given queue will always use the
same CPG group but operations on different queues can execute
concurrently on different groups.

Removed fanuout optimization, it doesn't work with multiple CPG
groups.  Can't guratnee that "routing" will proceed enqueues on
different CPG groups.

Modified:
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-cluster-benchmark
    qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-receive.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/xml/cluster.xml

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1176373&r1=1176372&r2=1176373&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp Tue Sep 27 13:11:30 2011
@@ -237,8 +237,6 @@ void Queue::requeue(const QueuedMessage&
  * calling function sets qmsg with the lock held, but the call to
  * Cluster::acquire() will happen after the lock is released in
  * ~ClusterAcquireScope().
- *
- * Also marks a Stoppable as busy for the duration of the scope.
  **/
 struct ClusterAcquireScope {
     QueuedMessage qmsg;

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp?rev=1176373&r1=1176372&r2=1176373&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp Tue Sep 27 13:11:30 2011
@@ -25,8 +25,6 @@
 #include "QueueHandler.h"
 #include "Multicaster.h"
 #include "hash.h"
-#include "qpid/framing/ClusterMessageRoutingBody.h"
-#include "qpid/framing/ClusterMessageRoutedBody.h"
 #include "qpid/framing/ClusterMessageEnqueueBody.h"
 #include "qpid/framing/ClusterMessageAcquireBody.h"
 #include "qpid/framing/ClusterMessageDequeueBody.h"
@@ -57,10 +55,6 @@ const ProtocolVersion pv;     // shortha
 // noReplicate means the current thread is handling a message
 // received from the cluster so it should not be replicated.
 QPID_TSS bool tssNoReplicate = false;
-
-// Routing ID of the message being routed in the current thread.
-// 0 if we are not currently routing a message.
-QPID_TSS RoutingId tssRoutingId = 0;
 }
 
 // FIXME aconway 2011-09-26: de-const the broker::Cluster interface,
@@ -90,38 +84,24 @@ BrokerContext::ScopedSuppressReplication
 BrokerContext::BrokerContext(Core& c, boost::intrusive_ptr<QueueHandler> q)
     : core(c), queueHandler(q) {}
 
-RoutingId BrokerContext::nextRoutingId() {
-    RoutingId id = ++routingId;
-    if (id == 0) id = ++routingId; // Avoid 0 on wrap-around.
-    return id;
-}
-
-void BrokerContext::routing(const boost::intrusive_ptr<Message>&) { }
+BrokerContext::~BrokerContext() {}
 
 bool BrokerContext::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg)
 {
     if (tssNoReplicate) return true;
-    if (!tssRoutingId) {             // This is the first enqueue, so send the message
-        tssRoutingId = nextRoutingId();
-        // FIXME aconway 2010-10-20: replicate message in fixed size buffers.
-        std::string data(msg->encodedSize(),char());
-        framing::Buffer buf(&data[0], data.size());
-        msg->encode(buf);
-        mcaster(queue).mcast(ClusterMessageRoutingBody(pv, tssRoutingId, data));
-        core.getRoutingMap().put(tssRoutingId, msg);
-    }
-    mcaster(queue).mcast(ClusterMessageEnqueueBody(pv, tssRoutingId, queue.getName()));
+    // FIXME aconway 2010-10-20: replicate message in fragments
+    // (frames), using fixed size bufffers.
+    std::string data(msg->encodedSize(),char());
+    framing::Buffer buf(&data[0], data.size());
+    msg->encode(buf);
+    mcaster(queue).mcast(ClusterMessageEnqueueBody(pv, queue.getName(), data));
     return false; // Strict order, wait for CPG self-delivery to enqueue.
 }
 
-void BrokerContext::routed(const boost::intrusive_ptr<Message>&) {
-    if (tssRoutingId) {             // we enqueued at least one message.
-        core.getGroup(tssRoutingId).getMulticaster().mcast(
-            ClusterMessageRoutedBody(pv, tssRoutingId));
-        // Note: routingMap is cleaned up on CPG delivery in MessageHandler.
-        tssRoutingId = 0;
-    }
-}
+// routing and routed are no-ops. They are needed to implement fanout
+// optimization, which is currently not implemnted
+void BrokerContext::routing(const boost::intrusive_ptr<broker::Message>&) {}
+void BrokerContext::routed(const boost::intrusive_ptr<Message>&) {}
 
 void BrokerContext::acquire(const broker::QueuedMessage& qm) {
     if (tssNoReplicate) return;

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h?rev=1176373&r1=1176372&r2=1176373&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h Tue Sep 27 13:11:30 2011
@@ -48,8 +48,7 @@ class BrokerContext : public broker::Clu
     };
 
     BrokerContext(Core&, boost::intrusive_ptr<QueueHandler>);
-
-    // FIXME aconway 2010-10-20: implement all points.
+    ~BrokerContext();
 
     // Messages
 
@@ -81,7 +80,6 @@ class BrokerContext : public broker::Clu
 
 
   private:
-    uint32_t nextRoutingId();
     // Get multicaster associated with a queue
     Multicaster& mcaster(const broker::QueuedMessage& qm);
     Multicaster& mcaster(const broker::Queue& q);
@@ -89,7 +87,6 @@ class BrokerContext : public broker::Clu
 
     Core& core;
     boost::intrusive_ptr<QueueHandler> queueHandler;
-    sys::AtomicValue<uint32_t> routingId;
 };
 }} // namespace qpid::cluster
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp?rev=1176373&r1=1176372&r2=1176373&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp Tue Sep 27 13:11:30 2011
@@ -32,6 +32,7 @@
 #include "qpid/framing/Buffer.h"
 #include "qpid/log/Statement.h"
 #include <sys/uio.h>            // For iovec
+#include <boost/lexical_cast.hpp>
 
 namespace qpid {
 namespace cluster {
@@ -40,33 +41,38 @@ Core::Core(const Settings& s, broker::Br
     broker(b),
     settings(s)
 {
-    // FIXME aconway 2011-09-23: multi-group
-    groups.push_back(new Group(*this));
-    boost::intrusive_ptr<QueueHandler> queueHandler(
-        new QueueHandler(groups[0]->getEventHandler(), groups[0]->getMulticaster(), settings));
-    groups[0]->getEventHandler().add(queueHandler);
-    groups[0]->getEventHandler().add(boost::intrusive_ptr<HandlerBase>(
-                              new WiringHandler(groups[0]->getEventHandler(), queueHandler, broker)));
-    groups[0]->getEventHandler().add(boost::intrusive_ptr<HandlerBase>(
-                              new MessageHandler(groups[0]->getEventHandler(), *this)));
-
-    std::auto_ptr<BrokerContext> bh(new BrokerContext(*this, queueHandler));
-    brokerHandler = bh.get();
-    // BrokerContext belongs to Broker
-    broker.setCluster(std::auto_ptr<broker::Cluster>(bh));
-    // FIXME aconway 2011-09-26: multi-group
-    groups[0]->getEventHandler().start();
-    groups[0]->getEventHandler().getCpg().join(s.name);
-    // TODO aconway 2010-11-18: logging standards
-    // FIXME aconway 2011-09-26: multi-group
-    QPID_LOG(notice, "cluster: joined " << s.name << ", member-id="<< groups[0]->getEventHandler().getSelf());
+    // FIXME aconway 2011-09-26: this has to be consistent in a
+    // cluster, negotiate as part of join protocol.
+    size_t nGroups = broker.getOptions().workerThreads;
+    for (size_t i = 0; i < nGroups; ++i) {
+        // FIXME aconway 2011-09-26: review naming. Create group for non-message traffic, e.g. initial join protocol.
+        std::string groupName = s.name + "-" + boost::lexical_cast<std::string>(i);
+        QPID_LOG(critical, "FIXME create group " << i << " of " << "nGroups. " << groupName);
+        groups.push_back(new Group(*this));
+        boost::intrusive_ptr<QueueHandler> queueHandler(
+            new QueueHandler(groups[i]->getEventHandler(), groups[i]->getMulticaster(), settings));
+        groups[i]->getEventHandler().add(queueHandler);
+        groups[i]->getEventHandler().add(boost::intrusive_ptr<HandlerBase>(
+                                             new WiringHandler(groups[i]->getEventHandler(), queueHandler, broker)));
+        groups[i]->getEventHandler().add(boost::intrusive_ptr<HandlerBase>(
+                                             new MessageHandler(groups[i]->getEventHandler(), *this)));
+
+        std::auto_ptr<BrokerContext> bh(new BrokerContext(*this, queueHandler));
+        brokerHandler = bh.get();
+        // BrokerContext belongs to Broker
+        broker.setCluster(std::auto_ptr<broker::Cluster>(bh));
+        // FIXME aconway 2011-09-26: multi-group
+        groups[i]->getEventHandler().start();
+        groups[i]->getEventHandler().getCpg().join(groupName);
+        // TODO aconway 2010-11-18: logging standards
+        // FIXME aconway 2011-09-26: multi-group
+        QPID_LOG(notice, "cluster: joined " << groupName << ", member-id="<< groups[i]->getEventHandler().getSelf());
+    }
 }
 
 void Core::initialize() {}
 
 void Core::fatal() {
-    // FIXME aconway 2010-10-20: error handling
-    assert(0);
     broker::SignalHandler::shutdown();
 }
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.h?rev=1176373&r1=1176372&r2=1176373&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.h Tue Sep 27 13:11:30 2011
@@ -60,7 +60,6 @@ class BrokerContext;
 class Core
 {
   public:
-    typedef LockedMap<RoutingId, boost::intrusive_ptr<broker::Message> > RoutingMap;
     typedef std::vector<boost::intrusive_ptr<Group> > Groups;
 
     /** Constructed during Plugin::earlyInitialize() */
@@ -75,11 +74,6 @@ class Core
     broker::Broker& getBroker() { return broker; }
     BrokerContext& getBrokerContext() { return *brokerHandler; }
 
-    /** Map of messages that are currently being routed.
-     * Used to pass messages being routed from BrokerContext to MessageHandler
-     */
-    RoutingMap& getRoutingMap() { return routingMap; }
-
     const Settings& getSettings() const { return settings; }
 
     /** Get group by hash value. */
@@ -88,7 +82,6 @@ class Core
   private:
     broker::Broker& broker;
     BrokerContext* brokerHandler; // Handles broker events.
-    RoutingMap routingMap;
     Settings settings;
     Groups groups;
 };

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp?rev=1176373&r1=1176372&r2=1176373&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp Tue Sep 27 13:11:30 2011
@@ -49,16 +49,6 @@ bool MessageHandler::invoke(const framin
     return framing::invoke(*this, body).wasHandled();
 }
 
-void MessageHandler::routing(RoutingId routingId, const std::string& message) {
-    if (sender() == self()) return; // Already in core.getRoutingMap()
-    boost::intrusive_ptr<Message> msg = new Message;
-    // FIXME aconway 2010-10-28: decode message in bounded-size buffers.
-    framing::Buffer buf(const_cast<char*>(&message[0]), message.size());
-    msg->decodeHeader(buf);
-    msg->decodeContent(buf);
-    memberMap[sender()].routingMap[routingId] = msg;
-}
-
 boost::shared_ptr<broker::Queue> MessageHandler::findQueue(
     const std::string& q, const char* msg)
 {
@@ -67,26 +57,18 @@ boost::shared_ptr<broker::Queue> Message
     return queue;
 }
 
-void MessageHandler::enqueue(RoutingId routingId, const std::string& q) {
+void MessageHandler::enqueue(const std::string& q, const std::string& message) {
+
     boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed");
-    boost::intrusive_ptr<Message> msg;
-    if (sender() == self())
-        msg = core.getRoutingMap().get(routingId);
-    else
-        msg = memberMap[sender()].routingMap[routingId];
-    if (!msg) throw Exception(QPID_MSG("Cluster enqueue on " << q
-                                       << " failed: unknown message"));
+    // FIXME aconway 2010-10-28: decode message by frame in bounded-size buffers.
+    boost::intrusive_ptr<broker::Message> msg = new broker::Message();
+    framing::Buffer buf(const_cast<char*>(&message[0]), message.size());
+    msg->decodeHeader(buf);
+    msg->decodeContent(buf);
     BrokerContext::ScopedSuppressReplication ssr;
     queue->deliver(msg);
 }
 
-void MessageHandler::routed(RoutingId routingId) {
-    if (sender() == self())
-        core.getRoutingMap().erase(routingId);
-    else
-        memberMap[sender()].routingMap.erase(routingId);
-}
-
 // FIXME aconway 2011-09-14: performance: pack acquires into a SequenceSet
 // and scan queue once.
 void MessageHandler::acquire(const std::string& q, uint32_t position) {

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h?rev=1176373&r1=1176372&r2=1176373&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h Tue Sep 27 13:11:30 2011
@@ -56,24 +56,15 @@ class MessageHandler : public framing::A
 
     bool invoke(const framing::AMQBody& body);
 
-    void routing(uint32_t routingId, const std::string& message);
-    void enqueue(uint32_t routingId, const std::string& queue);
-    void routed(uint32_t routingId);
+    void enqueue(const std::string& queue, const std::string& message);
     void acquire(const std::string& queue, uint32_t position);
     void dequeue(const std::string& queue, uint32_t position);
     void requeue(const std::string& queue, uint32_t position, bool redelivered);
 
   private:
-    struct Member {
-        typedef std::map<uint32_t, boost::intrusive_ptr<broker::Message> > RoutingMap;
-        RoutingMap routingMap;
-    };
-    typedef std::map<MemberId, Member> MemberMap;
-
     boost::shared_ptr<broker::Queue> findQueue(const std::string& q, const char* msg);
 
     broker::Broker& broker;
-    MemberMap memberMap;
     Core& core;
 };
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-cluster-benchmark
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-cluster-benchmark?rev=1176373&r1=1176372&r2=1176373&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-cluster-benchmark (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-cluster-benchmark Tue Sep 27 13:11:30 2011
@@ -47,6 +47,5 @@ done
 run_test() { echo $*; shift; "$@"; echo; echo; echo; }
 
 run_test "Throughput:" qpid-cpp-benchmark $REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS
-
 run_test "Latency:" qpid-cpp-benchmark $REPEAT $BROKERS --connection-options "{tcp-nodelay:true}" $MESSAGES $FLOW $CLIENT_HOSTS
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-receive.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-receive.cpp?rev=1176373&r1=1176372&r2=1176373&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-receive.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-receive.cpp Tue Sep 27 13:11:30 2011
@@ -217,10 +217,7 @@ int main(int argc, char ** argv)
                         }
                         if (opts.printContent)
                             std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages
-                        if (opts.messages && count >= opts.messages) {
-                            cerr << "qpid-receive(" << getpid() << ") DONE" << endl;
-                            done = true;
-                        }
+                        if (opts.messages && count >= opts.messages) done = true;
                     }
                 } else if (opts.checkRedelivered && !msg.getRedelivered()) {
                     throw qpid::Exception("duplicate sequence number received, message not marked as redelivered!");

Modified: qpid/branches/qpid-2920-active/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/xml/cluster.xml?rev=1176373&r1=1176372&r2=1176373&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/xml/cluster.xml (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/xml/cluster.xml Tue Sep 27 13:11:30 2011
@@ -331,23 +331,11 @@
 
   <!-- Message delivery and disposition -->
   <class name="cluster-message" code="0x82">
-    <!-- FIXME aconway 2010-10-19: create message in fragments -->
-    <control name="routing" code="0x1">
-      <field name="routing-id" type="uint32"/>
-      <field name="message" type="str32"/>
-    </control>
-
-    <!--  FIXME aconway 2011-04-27: reference queues by index, not name -->
     <control name="enqueue" code="0x2">
-      <field name="routing-id" type="uint32"/>
       <field name="queue" type="queue.name"/>
+      <field name="message" type="str32"/>
     </control>
 
-    <control name="routed" code="0x3">
-      <field name="routing-id" type="uint32"/>
-    </control>
-
-    <!-- FIXME aconway 2011-04-27: review queue positions vs. global message IDs -->
     <control name="acquire" code="0x4">
       <field name="queue" type="queue.name"/>
       <field name="position" type="uint32"/>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org