You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/10/25 20:00:35 UTC

svn commit: r1027210 - in /qpid/trunk/qpid: cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/cluster/ cpp/src/tests/ cpp/xml/ python/qpid/

Author: aconway
Date: Mon Oct 25 18:00:34 2010
New Revision: 1027210

URL: http://svn.apache.org/viewvc?rev=1027210&view=rev
Log:
New cluster: core framework and initial implementation of enqueue logic.

Added:
    qpid/trunk/qpid/cpp/src/qpid/cluster/BrokerHandler.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/BrokerHandler.h   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster2Plugin.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/Core.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/Core.h   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/EventHandler.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/EventHandler.h   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/LockedMap.h   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/MessageHandler.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/MessageHandler.h   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/MessageId.cpp
      - copied, changed from r1026501, qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/MessageId.h
      - copied, changed from r1026501, qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h
    qpid/trunk/qpid/cpp/src/tests/cluster2_tests.py   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/cluster.mk
    qpid/trunk/qpid/cpp/src/qpid/broker/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/broker/NullCluster.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt
    qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt
    qpid/trunk/qpid/cpp/src/tests/BrokerClusterCalls.cpp
    qpid/trunk/qpid/cpp/src/tests/run_cluster_tests
    qpid/trunk/qpid/cpp/src/tests/test_env.sh.in
    qpid/trunk/qpid/cpp/xml/cluster.xml
    qpid/trunk/qpid/python/qpid/brokertest.py

Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=1027210&r1=1027209&r2=1027210&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Mon Oct 25 18:00:34 2010
@@ -35,7 +35,6 @@ endif
 if HAVE_LIBCPG
 
 dmodule_LTLIBRARIES += cluster.la
-
 cluster_la_SOURCES =				\
   $(CMAN_SOURCES)				\
   qpid/cluster/Cluster.cpp			\
@@ -99,6 +98,27 @@ cluster_la_LIBADD=  -lcpg $(libcman) lib
 cluster_la_CXXFLAGS = $(AM_CXXFLAGS) -fno-strict-aliasing
 cluster_la_LDFLAGS = $(PLUGINLDFLAGS)
 
+# Experimental new cluster plugin
+dmodule_LTLIBRARIES += cluster2.la
+cluster2_la_LIBADD = -lcpg libqpidbroker.la
+cluster2_la_LDFLAGS = $(PLUGINLDFLAGS)
+cluster2_la_SOURCES =				\
+	qpid/cluster/BrokerHandler.cpp		\
+	qpid/cluster/BrokerHandler.h		\
+	qpid/cluster/Cluster2Plugin.cpp		\
+	qpid/cluster/Core.cpp			\
+	qpid/cluster/Core.h			\
+	qpid/cluster/Cpg.cpp			\
+	qpid/cluster/Cpg.h			\
+	qpid/cluster/EventHandler.cpp		\
+	qpid/cluster/EventHandler.h		\
+	qpid/cluster/MessageHandler.cpp		\
+	qpid/cluster/MessageHandler.h		\
+	qpid/cluster/MessageId.cpp		\
+	qpid/cluster/MessageId.h		\
+	qpid/cluster/PollerDispatch.cpp		\
+	qpid/cluster/PollerDispatch.h
+
 # The watchdog plugin and helper executable
 dmodule_LTLIBRARIES += watchdog.la
 watchdog_la_SOURCES = qpid/cluster/WatchDogPlugin.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Cluster.h?rev=1027210&r1=1027209&r2=1027210&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Cluster.h Mon Oct 25 18:00:34 2010
@@ -54,8 +54,14 @@ class Cluster
 
     /** In Exchange::route, before the message is enqueued. */
     virtual void routing(const boost::intrusive_ptr<Message>&) = 0;
-    /** A message is delivered to a queue. */
-    virtual void enqueue(QueuedMessage&) = 0;
+
+    /** A message is delivered to a queue.
+     * Called before actually pushing the message to the queue.
+     *@return If true the message should be pushed to the queue now.
+     * otherwise the cluster code will push the message when it is replicated.
+     */
+    virtual bool enqueue(Queue& queue, const boost::intrusive_ptr<Message>&) = 0;
+
     /** In Exchange::route, after all enqueues for the message. */
     virtual void routed(const boost::intrusive_ptr<Message>&) = 0;
 
@@ -71,11 +77,12 @@ class Cluster
 
     /** A locally-acquired message is released by the consumer and re-queued. */
     virtual void release(const QueuedMessage&) = 0;
-    /** A message is dropped from the queue, e.g. expired or replaced on an LVQ.
-     * This function does only local book-keeping, it does not multicast.
-     * It is reasonable to call with a queue lock held.
+
+    /** A message is removed from the queue. It could have been
+     * accepted, rejected or dropped for other reasons e.g. expired or
+     * replaced on an LVQ.
      */
-    virtual void dequeue(const QueuedMessage&) = 0;
+    virtual void drop(const QueuedMessage&) = 0;
 
     // Consumers
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/NullCluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/NullCluster.h?rev=1027210&r1=1027209&r2=1027210&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/NullCluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/NullCluster.h Mon Oct 25 18:00:34 2010
@@ -38,14 +38,14 @@ class NullCluster : public Cluster
     // Messages
 
     virtual void routing(const boost::intrusive_ptr<Message>&) {}
-    virtual void enqueue(QueuedMessage&) {}
+    virtual bool enqueue(Queue&, const boost::intrusive_ptr<Message>&) { return true; }
     virtual void routed(const boost::intrusive_ptr<Message>&) {}
     virtual void acquire(const QueuedMessage&) {}
     virtual void accept(const QueuedMessage&) {}
     virtual void reject(const QueuedMessage&) {}
     virtual void rejected(const QueuedMessage&) {}
     virtual void release(const QueuedMessage&) {}
-    virtual void dequeue(const QueuedMessage&) {}
+    virtual void drop(const QueuedMessage&) {}
 
     // Consumers
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1027210&r1=1027209&r2=1027210&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Oct 25 18:00:34 2010
@@ -146,6 +146,10 @@ void Queue::deliver(boost::intrusive_ptr
     // Check for deferred delivery in a cluster.
     if (broker && broker->deferDelivery(name, msg))
         return;
+    // Same thing but for the new cluster interface.
+    if (broker && !broker->getCluster().enqueue(*this, msg))
+        return;
+
     if (msg->isImmediate() && getConsumerCount() == 0) {
         if (alternateExchange) {
             DeliverableMessage deliverable(msg);
@@ -165,7 +169,6 @@ void Queue::deliver(boost::intrusive_ptr
         }else {
             push(msg);
         }
-        mgntEnqStats(msg);
         QPID_LOG(debug, "Message " << msg << " enqueued on " << name);
     }
 }
@@ -199,7 +202,6 @@ void Queue::recover(boost::intrusive_ptr
 
 void Queue::process(boost::intrusive_ptr<Message>& msg){
     push(msg);
-    mgntEnqStats(msg);
     if (mgmtObject != 0){
         mgmtObject->inc_msgTxnEnqueues ();
         mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
@@ -642,6 +644,7 @@ void Queue::popMsg(QueuedMessage& qmsg)
 
 void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
     assertClusterSafe();
+    if (!isRecovery) mgntEnqStats(msg);
     QueuedMessage qm;
     QueueListeners::NotificationSet copy;
     {
@@ -687,7 +690,6 @@ void Queue::push(boost::intrusive_ptr<Me
         }
     }
     copy.notify();
-    if (broker) broker->getCluster().enqueue(qm);
 }
 
 QueuedMessage Queue::getFront()
@@ -868,10 +870,9 @@ bool Queue::dequeue(TransactionContext* 
     {
         Mutex::ScopedLock locker(messageLock);
         if (!isEnqueued(msg)) return false;
-        if (!ctxt) { 
-            dequeued(msg);
-        }
+        if (!ctxt) dequeued(msg);
     }
+    if (!ctxt && broker) broker->getCluster().drop(msg); // Outside lock
     // This check prevents messages which have been forced persistent on one queue from dequeuing
     // from another on which no forcing has taken place and thus causing a store error.
     bool fp = msg.payload->isForcedPersistent();
@@ -888,6 +889,7 @@ bool Queue::dequeue(TransactionContext* 
 
 void Queue::dequeueCommitted(const QueuedMessage& msg)
 {
+    if (broker) broker->getCluster().drop(msg); // Outside lock
     Mutex::ScopedLock locker(messageLock);
     dequeued(msg);    
     if (mgmtObject != 0) {
@@ -913,9 +915,8 @@ void Queue::popAndDequeue()
  */
 void Queue::dequeued(const QueuedMessage& msg)
 {
-    // Note: Cluster::dequeued does only local book-keeping, no multicast
+    // Note: Cluster::drop does only local book-keeping, no multicast
     // So OK to call here with lock held.
-    if (broker) broker->getCluster().dequeue(msg);
     if (policy.get()) policy->dequeued(msg);
     mgntDeqStats(msg.payload);
     if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/BrokerHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/BrokerHandler.cpp?rev=1027210&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/BrokerHandler.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/BrokerHandler.cpp Mon Oct 25 18:00:34 2010
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Core.h"
+#include "BrokerHandler.h"
+#include "qpid/framing/ClusterMessageRoutingBody.h"
+#include "qpid/framing/ClusterMessageRoutedBody.h"
+#include "qpid/framing/ClusterMessageEnqueueBody.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+using namespace broker;
+
+namespace {
+// noReplicate means the current thread is handling a message
+// received from the cluster so it should not be replciated.
+QPID_TSS bool noReplicate = false;
+
+// Sequence number of the message currently being routed.
+// 0 if we are not currently routing a message.
+QPID_TSS SequenceNumber routeSeq = 0;
+}
+
+BrokerHandler::ScopedSuppressReplication::ScopedSuppressReplication() {
+    assert(!noReplicate);
+    noReplicate = true;
+}
+
+BrokerHandler::ScopedSuppressReplication::~ScopedSuppressReplication() {
+    assert(noReplicate);
+    noReplicate = false;
+}
+
+BrokerHandler::BrokerHandler(Core& c) : core(c) {}
+
+SequenceNumber BrokerHandler::nextSequenceNumber() {
+    SequenceNumber s = ++sequence;
+    if (!s) s = ++sequence;     // Avoid 0 on wrap-around.
+    return s;
+}
+
+void BrokerHandler::routing(const boost::intrusive_ptr<Message>&) { }
+
+bool BrokerHandler::enqueue(Queue& queue, const boost::intrusive_ptr<Message>& msg)
+{
+    if (noReplicate) return true;
+    if (!routeSeq) {             // This is the first enqueue, so send the message
+        routeSeq = nextSequenceNumber();
+        // FIXME aconway 2010-10-20: replicate message in fixed size buffers.
+        std::string data(msg->encodedSize(),char());
+        framing::Buffer buf(&data[0], data.size());
+        msg->encode(buf);
+        core.mcast(ClusterMessageRoutingBody(ProtocolVersion(), routeSeq, data));
+        core.getRoutingMap().put(routeSeq, msg);
+    }
+    core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), routeSeq, queue.getName()));
+    // TODO aconway 2010-10-21: configable option for strict (wait
+    // for CPG deliver to do local deliver) vs.  loose (local deliver
+    // immediately).
+    return false;
+}
+
+void BrokerHandler::routed(const boost::intrusive_ptr<Message>&) {
+    if (routeSeq) {             // we enqueued at least one message.
+        core.mcast(ClusterMessageRoutedBody(ProtocolVersion(), routeSeq));
+        // Note: routingMap is cleaned up on CPG delivery in MessageHandler.
+        routeSeq = 0;
+    }
+}
+
+}} // namespace qpid::cluster

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/BrokerHandler.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/BrokerHandler.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/BrokerHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/BrokerHandler.h?rev=1027210&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/BrokerHandler.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/BrokerHandler.h Mon Oct 25 18:00:34 2010
@@ -0,0 +1,86 @@
+#ifndef QPID_CLUSTER_BROKERHANDLER_H
+#define QPID_CLUSTER_BROKERHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/broker/Cluster.h"
+#include "qpid/sys/AtomicValue.h"
+
+namespace qpid {
+namespace cluster {
+class Core;
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+/**
+ * Implements broker::Cluster interface, handles events in broker code.
+ */
+class BrokerHandler : public broker::Cluster
+{
+  public:
+    /** Suppress replication while in scope.
+     * Used to prevent re-replication of messages received from the cluster.
+     */
+    struct ScopedSuppressReplication {
+        ScopedSuppressReplication();
+        ~ScopedSuppressReplication();
+    };
+
+    BrokerHandler(Core&);
+
+    // FIXME aconway 2010-10-20: implement all points.
+
+    // Messages
+
+    void routing(const boost::intrusive_ptr<broker::Message>&);
+    bool enqueue(broker::Queue&, const boost::intrusive_ptr<broker::Message>&);
+    void routed(const boost::intrusive_ptr<broker::Message>&);
+    void acquire(const broker::QueuedMessage&) {}
+    void accept(const broker::QueuedMessage&) {}
+    void reject(const broker::QueuedMessage&) {}
+    void rejected(const broker::QueuedMessage&) {}
+    void release(const broker::QueuedMessage&) {}
+    void drop(const broker::QueuedMessage&) {}
+
+    // Consumers
+
+    void consume(const broker::Queue&, size_t) {}
+    void cancel(const broker::Queue&, size_t) {}
+
+    // Wiring
+
+    void create(const broker::Queue&) {}
+    void destroy(const broker::Queue&) {}
+    void create(const broker::Exchange&) {}
+    void destroy(const broker::Exchange&) {}
+    void bind(const broker::Queue&, const broker::Exchange&,
+              const std::string&, const framing::FieldTable&) {}
+
+  private:
+    SequenceNumber nextSequenceNumber();
+
+    Core& core;
+    sys::AtomicValue<SequenceNumber> sequence;
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_BROKERHANDLER_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/BrokerHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/BrokerHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster2Plugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster2Plugin.cpp?rev=1027210&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster2Plugin.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster2Plugin.cpp Mon Oct 25 18:00:34 2010
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <qpid/Options.h>
+#include <qpid/broker/Broker.h>
+#include "Core.h"
+
+namespace qpid {
+namespace cluster {
+using broker::Broker;
+
+// TODO aconway 2010-10-19: experimental new cluster code.
+
+/**
+ * Plugin for the cluster.
+ */
+struct Cluster2Plugin : public Plugin {
+    struct Opts : public Options {
+        Core::Settings& settings;
+        Opts(Core::Settings& s) : Options("Cluster Options"), settings(s) {
+            addOptions()
+                ("cluster2-name", optValue(settings.name, "NAME"), "Name of cluster to join");
+            // TODO aconway 2010-10-19: copy across other options from ClusterPlugin.h
+        }
+    };
+
+    Core::Settings settings;
+    Opts options;
+    Core* core;                 // Core deletes itself on shutdown.
+
+    Cluster2Plugin() : options(settings), core(0) {}
+
+    Options* getOptions() { return &options; }
+
+    void earlyInitialize(Plugin::Target& target) {
+        if (settings.name.empty()) return;
+        Broker* broker = dynamic_cast<Broker*>(&target);
+        if (!broker) return;
+        core = new Core(settings, *broker);
+    }
+
+    void initialize(Plugin::Target& target) {
+        Broker* broker = dynamic_cast<Broker*>(&target);
+        if (broker && core) core->initialize();
+    }
+};
+
+static Cluster2Plugin instance; // Static initialization.
+
+}} // namespace qpid::cluster

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster2Plugin.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster2Plugin.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/Core.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Core.cpp?rev=1027210&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Core.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Core.cpp Mon Oct 25 18:00:34 2010
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Core.h"
+#include "EventHandler.h"
+#include "BrokerHandler.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/SignalHandler.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/log/Statement.h"
+#include <sys/uio.h>            // For iovec
+
+namespace qpid {
+namespace cluster {
+
+Core::Core(const Settings& s, broker::Broker& b) :
+    broker(b),
+    eventHandler(new EventHandler(*this))
+{
+    std::auto_ptr<BrokerHandler> bh(new BrokerHandler(*this));
+    brokerHandler = bh.get();
+    // BrokerHandler belongs to Broker
+    broker.setCluster(std::auto_ptr<broker::Cluster>(bh));
+    // FIXME aconway 2010-10-20: ownership of BrokerHandler, shutdown issues.
+    eventHandler->getCpg().join(s.name);
+}
+
+void Core::initialize() {}
+
+void Core::fatal() {
+    // FIXME aconway 2010-10-20: error handling
+    assert(0);
+    broker::SignalHandler::shutdown();
+}
+
+void Core::mcast(const framing::AMQBody& body) {
+    QPID_LOG(trace, "multicast: " << body);
+    // FIXME aconway 2010-10-20: use Multicaster, or bring in its features.
+    // here we multicast Frames rather than Events.
+    framing::AMQFrame f(body);
+    std::string data(f.encodedSize(), char());
+    framing::Buffer buf(&data[0], data.size());
+    f.encode(buf);
+    iovec iov = { buf.getPointer(), buf.getSize() };
+    while (!eventHandler->getCpg().mcast(&iov, 1))
+        ::usleep(1000);      // FIXME aconway 2010-10-20: flow control
+}
+
+}} // namespace qpid::cluster

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/Core.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/Core.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/Core.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Core.h?rev=1027210&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Core.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Core.h Mon Oct 25 18:00:34 2010
@@ -0,0 +1,95 @@
+#ifndef QPID_CLUSTER_CORE_H
+#define QPID_CLUSTER_CORE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+#include <memory>
+
+#include "Cpg.h"
+#include "MessageId.h"
+#include "LockedMap.h"
+#include <qpid/broker/QueuedMessage.h>
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+namespace qpid {
+
+namespace framing{
+class AMQBody;
+}
+
+namespace broker {
+class Broker;
+}
+
+namespace cluster {
+class EventHandler;
+class BrokerHandler;
+
+/**
+ * Cluster core state machine.
+ * Holds together the various objects that implement cluster behavior,
+ * and holds state that is shared by multiple components.
+ *
+ * Thread safe: called from broker connection threads and CPG dispatch threads.
+ */
+class Core
+{
+  public:
+    /** Configuration settings */
+    struct Settings {
+        std::string name;
+    };
+
+    typedef LockedMap<SequenceNumber, boost::intrusive_ptr<broker::Message> >
+    SequenceMessageMap;
+
+    /** Constructed during Plugin::earlyInitialize() */
+    Core(const Settings&, broker::Broker&);
+
+    /** Called during Plugin::initialize() */
+    void initialize();
+
+    /** Shut down broker due to fatal error. Caller should log a critical message */
+    void fatal();
+
+    /** Multicast an event */
+    void mcast(const framing::AMQBody&);
+
+    broker::Broker& getBroker() { return broker; }
+    EventHandler& getEventHandler() { return *eventHandler; }
+    BrokerHandler& getBrokerHandler() { return *brokerHandler; }
+
+    /** Map of messages that are currently being routed.
+     * Used to pass messages being routed from BrokerHandler to MessageHandler
+     */
+    SequenceMessageMap& getRoutingMap() { return routingMap; }
+  private:
+    broker::Broker& broker;
+    std::auto_ptr<EventHandler> eventHandler; // Handles CPG events.
+    BrokerHandler* brokerHandler; // Handles broker events.
+    SequenceMessageMap routingMap;
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_CORE_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/Core.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/Core.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/EventHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventHandler.cpp?rev=1027210&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventHandler.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventHandler.cpp Mon Oct 25 18:00:34 2010
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "MessageHandler.h"
+#include "EventHandler.h"
+#include "Core.h"
+#include "types.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+EventHandler::EventHandler(Core& c) :
+    core(c),
+    cpg(*this),                 // FIXME aconway 2010-10-20: belongs on Core.
+    dispatcher(cpg, core.getBroker().getPoller(), boost::bind(&Core::fatal, &core)),
+    self(cpg.self()),
+    messageHandler(new MessageHandler(*this))
+{
+    dispatcher.start();         // FIXME aconway 2010-10-20: later in initialization?
+}
+
+EventHandler::~EventHandler() {}
+
+// Deliver CPG message.
+void EventHandler::deliver(
+    cpg_handle_t /*handle*/,
+    const cpg_name* /*group*/,
+    uint32_t nodeid,
+    uint32_t pid,
+    void* msg,
+    int msg_len)
+{
+    sender = MemberId(nodeid, pid);
+    framing::Buffer buf(static_cast<char*>(msg), msg_len);
+    framing::AMQFrame frame;
+    while (buf.available()) {
+        frame.decode(buf);
+        assert(frame.getBody());
+        QPID_LOG(trace, "cluster deliver: " << *frame.getBody());
+        try {
+            invoke(*frame.getBody());
+        }
+        catch (const std::exception& e) {
+            // Note: exceptions are assumed to be survivable,
+            // fatal errors should log a message and call Core::fatal.
+            QPID_LOG(error, e.what());
+        }
+    }
+}
+
+void EventHandler::invoke(const framing::AMQBody& body) {
+    if (framing::invoke(*messageHandler, body).wasHandled()) return;
+}
+
+// CPG config-change callback.
+void EventHandler::configChange (
+    cpg_handle_t /*handle*/,
+    const cpg_name */*group*/,
+    const cpg_address */*members*/, int /*nMembers*/,
+    const cpg_address */*left*/, int /*nLeft*/,
+    const cpg_address */*joined*/, int /*nJoined*/)
+{
+    // FIXME aconway 2010-10-20: TODO
+}
+
+}} // namespace qpid::cluster

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/EventHandler.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/EventHandler.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/EventHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventHandler.h?rev=1027210&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventHandler.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventHandler.h Mon Oct 25 18:00:34 2010
@@ -0,0 +1,85 @@
+#ifndef QPID_CLUSTER_EVENTHANDLER_H
+#define QPID_CLUSTER_EVENTHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+#include "types.h"
+#include "Cpg.h"
+#include "PollerDispatch.h"
+
+namespace qpid {
+
+namespace framing {
+class AMQBody;
+}
+
+namespace cluster {
+class Core;
+class MessageHandler;
+
+/**
+ * Dispatch events received from CPG.
+ * Thread unsafe: only called in CPG deliver thread context.
+ */
+class EventHandler : public Cpg::Handler
+{
+  public:
+    EventHandler(Core&);
+    ~EventHandler();
+
+    void deliver( // CPG deliver callback.
+        cpg_handle_t /*handle*/,
+        const struct cpg_name *group,
+        uint32_t /*nodeid*/,
+        uint32_t /*pid*/,
+        void* /*msg*/,
+        int /*msg_len*/);
+
+    void configChange( // CPG config change callback.
+        cpg_handle_t /*handle*/,
+        const struct cpg_name */*group*/,
+        const struct cpg_address */*members*/, int /*nMembers*/,
+        const struct cpg_address */*left*/, int /*nLeft*/,
+        const struct cpg_address */*joined*/, int /*nJoined*/
+    );
+
+
+    MemberId getSender() { return sender; }
+    MemberId getSelf() { return self; }
+    Core& getCore() { return core; }
+    Cpg& getCpg() { return cpg; }
+
+  private:
+    void invoke(const framing::AMQBody& body);
+
+    Core& core;
+    Cpg cpg;
+    PollerDispatch dispatcher;
+    MemberId sender;              // sender of current event.
+    MemberId self;
+    std::auto_ptr<MessageHandler> messageHandler;
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_EVENTHANDLER_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/EventHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/EventHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/LockedMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/LockedMap.h?rev=1027210&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/LockedMap.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/LockedMap.h Mon Oct 25 18:00:34 2010
@@ -0,0 +1,73 @@
+#ifndef QPID_CLUSTER_LOCKEDMAP_H
+#define QPID_CLUSTER_LOCKEDMAP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Mutex.h"
+#include <map>
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * A reader-writer locked thread safe map.
+ */
+template <class Key, class Value>
+class LockedMap
+{
+  public:
+    /** Get value associated with key, returns Value() if none. */
+    Value get(const Key& key) const {
+        sys::RWlock::ScopedRlock r(lock);
+        typename Map::const_iterator i = map.find(key);
+        if (i == map.end()) return Value();
+        else return i->second;
+    }
+
+    /** Associate value with key, overwriting any previous value for key. */
+    void put(const Key& key, const Value& value) {
+        sys::RWlock::ScopedWlock w(lock);
+        map[key] = value;
+    }
+
+    /** Associate value with key if there is not already a value associated with key.
+     * Returns true if the value was added.
+     */
+    bool add(const Key& key, const Value& value) {
+        sys::RWlock::ScopedWlock w(lock);
+        return map.insert(key, value).second;
+    }
+
+    /** Erase the value associated with key if any. Return true if a value was erased. */
+    bool erase(const Key& key) {
+        sys::RWlock::ScopedWlock w(lock);
+        return map.erase(key);
+    }
+
+  private:
+    typedef std::map<Key, Value> Map;
+    Map map;
+    mutable sys::RWlock lock;
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_LOCKEDMAP_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/LockedMap.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/LockedMap.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/MessageHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/MessageHandler.cpp?rev=1027210&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/MessageHandler.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/MessageHandler.cpp Mon Oct 25 18:00:34 2010
@@ -0,0 +1,82 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Core.h"
+#include "MessageHandler.h"
+#include "BrokerHandler.h"
+#include "EventHandler.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/sys/Thread.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace cluster {
+using namespace broker;
+
+MessageHandler::MessageHandler(EventHandler& e) :
+    broker(e.getCore().getBroker()),
+    eventHandler(e),
+    brokerHandler(e.getCore().getBrokerHandler())
+{}
+
+MessageHandler::~MessageHandler() {}
+
+MemberId MessageHandler::sender() { return eventHandler.getSender(); }
+MemberId MessageHandler::self() { return eventHandler.getSelf(); }
+
+void MessageHandler::routing(uint64_t sequence, const std::string& message) {
+    MessageId id(sender(), sequence);
+    boost::intrusive_ptr<Message> msg;
+    if (sender() == self())
+        msg = eventHandler.getCore().getRoutingMap().get(sequence);
+    if (!msg) {
+        framing::Buffer buf(const_cast<char*>(&message[0]), message.size());
+        msg = new Message;
+        msg->decodeHeader(buf);
+        msg->decodeContent(buf);
+    }
+    routingMap[id] = msg;
+}
+
+void MessageHandler::enqueue(uint64_t sequence, const std::string& q) {
+    MessageId id(sender(), sequence);
+    boost::shared_ptr<Queue> queue = broker.getQueues().find(q);
+    if (!queue) throw Exception(QPID_MSG("Cluster message for unknown queue " << q));
+    boost::intrusive_ptr<Message> msg = routingMap[id];
+    if (!msg) throw Exception(QPID_MSG("Unknown cluster message for queue " << q));
+    BrokerHandler::ScopedSuppressReplication ssr;
+    // TODO aconway 2010-10-21: configable option for strict (wait
+    // for CPG deliver to do local deliver) vs.  loose (local deliver
+    // immediately).
+    queue->deliver(msg);
+}
+
+void MessageHandler::routed(uint64_t sequence) {
+    MessageId id(sender(), sequence);
+    routingMap.erase(id);
+    eventHandler.getCore().getRoutingMap().erase(sequence);
+}
+
+}} // namespace qpid::cluster

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/MessageHandler.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/MessageHandler.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/MessageHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/MessageHandler.h?rev=1027210&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/MessageHandler.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/MessageHandler.h Mon Oct 25 18:00:34 2010
@@ -0,0 +1,70 @@
+#ifndef QPID_CLUSTER_MESSAGEHANDLER_H
+#define QPID_CLUSTER_MESSAGEHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+#include "qpid/framing/AMQP_AllOperations.h"
+#include "MessageId.h"
+#include <boost/intrusive_ptr.hpp>
+#include <map>
+
+namespace qpid {
+
+namespace broker {
+class Message;
+class Broker;
+}
+
+namespace cluster {
+class EventHandler;
+class BrokerHandler;
+
+/**
+ * Handler for message disposition events.
+ */
+class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler
+{
+  public:
+    MessageHandler(EventHandler&);
+    ~MessageHandler();
+
+    void routing(uint64_t sequence, const std::string& message);
+    void enqueue(uint64_t sequence, const std::string& queue);
+    void routed(uint64_t sequence);
+
+  private:
+    typedef std::map<MessageId, boost::intrusive_ptr<broker::Message> > RoutingMap;
+
+    MemberId sender();
+    MemberId self();
+
+    broker::Broker& broker;
+    EventHandler& eventHandler;
+    BrokerHandler& brokerHandler;
+    RoutingMap routingMap;
+
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_MESSAGEHANDLER_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/MessageHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/MessageHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: qpid/trunk/qpid/cpp/src/qpid/cluster/MessageId.cpp (from r1026501, qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/MessageId.cpp?p2=qpid/trunk/qpid/cpp/src/qpid/cluster/MessageId.cpp&p1=qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h&r1=1026501&r2=1027210&rev=1027210&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/MessageId.cpp Mon Oct 25 18:00:34 2010
@@ -1,6 +1,3 @@
-#ifndef QPID_CLUSTER_POLLERDISPATCH_H
-#define QPID_CLUSTER_POLLERDISPATCH_H
-
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -10,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,40 +18,18 @@
  * under the License.
  *
  */
-
-#include "qpid/cluster/Cpg.h"
-#include "qpid/sys/Poller.h"
-#include "qpid/sys/DispatchHandle.h"
-#include <boost/function.hpp>
+#include "MessageId.h"
+#include <ostream>
 
 namespace qpid {
 namespace cluster {
 
-/**
- * Dispatch CPG events via the poller.
- */
-class PollerDispatch  {
-  public:
-    PollerDispatch(Cpg&, boost::shared_ptr<sys::Poller> poller,
-                   boost::function<void()> onError) ;
-
-    ~PollerDispatch();
+bool operator<(const MessageId& a, const MessageId& b) {
+    return a.member < b.member || ((a.member == b.member) && a.sequence < b.sequence);
+}
+
+std::ostream& operator<<(std::ostream& o, const MessageId& m) {
+    return o << m.member << ":" << m.sequence;
+}
 
-    void start();
-
-  private:
-    // Poller callbacks
-    void dispatch(sys::DispatchHandle&); // Dispatch CPG events.
-    void disconnect(sys::DispatchHandle&); // CPG was disconnected
-
-    Cpg& cpg;
-    boost::shared_ptr<sys::Poller> poller;
-    boost::function<void()> onError;
-    sys::DispatchHandleRef dispatchHandle;
-    bool started;
-
-
-};
 }} // namespace qpid::cluster
-
-#endif  /*!QPID_CLUSTER_POLLERDISPATCH_H*/

Copied: qpid/trunk/qpid/cpp/src/qpid/cluster/MessageId.h (from r1026501, qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/MessageId.h?p2=qpid/trunk/qpid/cpp/src/qpid/cluster/MessageId.h&p1=qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h&r1=1026501&r2=1027210&rev=1027210&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/MessageId.h Mon Oct 25 18:00:34 2010
@@ -1,5 +1,5 @@
-#ifndef QPID_CLUSTER_POLLERDISPATCH_H
-#define QPID_CLUSTER_POLLERDISPATCH_H
+#ifndef QPID_CLUSTER_MESSAGEID_H
+#define QPID_CLUSTER_MESSAGEID_H
 
 /*
  *
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,39 +22,31 @@
  *
  */
 
-#include "qpid/cluster/Cpg.h"
-#include "qpid/sys/Poller.h"
-#include "qpid/sys/DispatchHandle.h"
-#include <boost/function.hpp>
+#include "types.h"
+#include <iosfwd>
 
 namespace qpid {
 namespace cluster {
 
+// TODO aconway 2010-10-20: experimental new cluster code.
+
+/** Sequence number used in message identifiers */
+typedef uint64_t SequenceNumber;
+
 /**
- * Dispatch CPG events via the poller.
+ * Message identifier
  */
-class PollerDispatch  {
-  public:
-    PollerDispatch(Cpg&, boost::shared_ptr<sys::Poller> poller,
-                   boost::function<void()> onError) ;
-
-    ~PollerDispatch();
-
-    void start();
-
-  private:
-    // Poller callbacks
-    void dispatch(sys::DispatchHandle&); // Dispatch CPG events.
-    void disconnect(sys::DispatchHandle&); // CPG was disconnected
-
-    Cpg& cpg;
-    boost::shared_ptr<sys::Poller> poller;
-    boost::function<void()> onError;
-    sys::DispatchHandleRef dispatchHandle;
-    bool started;
+struct MessageId {
+    MemberId member;            /// Member that created the message
+    SequenceNumber sequence;    /// Sequence number assiged by member.
+    MessageId(MemberId m=MemberId(), SequenceNumber s=0) : member(m), sequence(s) {}
+};
+
+bool operator<(const MessageId&, const MessageId&);
+
+std::ostream& operator<<(std::ostream&, const MessageId&);
 
 
-};
 }} // namespace qpid::cluster
 
-#endif  /*!QPID_CLUSTER_POLLERDISPATCH_H*/
+#endif  /*!QPID_CLUSTER_MESSAGEID_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp?rev=1027210&r1=1027209&r2=1027210&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.cpp Mon Oct 25 18:00:34 2010
@@ -37,9 +37,11 @@ PollerDispatch::PollerDispatch(Cpg& c, b
       started(false)
 {}
     
-PollerDispatch::~PollerDispatch() {
-    if (started)
-        dispatchHandle.stopWatch();
+PollerDispatch::~PollerDispatch() { stop(); }
+
+void PollerDispatch::stop() {
+    if (started) dispatchHandle.stopWatch();
+    started = false;
 }
 
 void PollerDispatch::start() {
@@ -54,6 +56,7 @@ void PollerDispatch::dispatch(sys::Dispa
         h.rewatch();
     } catch (const std::exception& e) {
         QPID_LOG(critical, "Error in cluster dispatch: " << e.what());
+        stop();
         onError();
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h?rev=1027210&r1=1027209&r2=1027210&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollerDispatch.h Mon Oct 25 18:00:34 2010
@@ -41,6 +41,7 @@ class PollerDispatch  {
     ~PollerDispatch();
 
     void start();
+    void stop();
 
   private:
     // Poller callbacks

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt?rev=1027210&r1=1027209&r2=1027210&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt Mon Oct 25 18:00:34 2010
@@ -328,8 +328,6 @@ and been in use (one of the key missing 
 
 ** Misc outstanding issues & notes
 
-Message IDs: need an efficient cluster-wide message ID.
-
 Replicating wiring
 - Need async completion of wiring commands? 
 - qpid.sequence_counter: need extra work to support in new design, do we care?

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt?rev=1027210&r1=1027209&r2=1027210&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt Mon Oct 25 18:00:34 2010
@@ -38,21 +38,9 @@ a note for later optimization/improvemen
 - acquire then kill broker: verify can be dequeued other members.
 - acquire then reject: verify goes on alt-exchange once only.
 
-*** TODO broker::Cluster interface and call points.
+*** DONE broker::Cluster interface and call points.
 
-Initial draft is commited. 
-
-Issues to review:
-
-queue API: internal classes like RingQueuePolicy use Queue::acuqire/dequeue
-when messages are pushed. How to reconcile with queue ownership?
-
-rejecting messages: if there's an alternate exchange where do we do the
-re-routing? On origin broker or on all brokers?
-
-Intercept points: on Queue vs. on DeliveryRecord, SemanticState etc.
-Intercepting client actions on the queue vs. internal actions
-(e.g. ring policy)
+Initial interface commited.
 
 *** Main classes
 
@@ -63,7 +51,7 @@ BrokerHandler:
 
 LocalMessageMap:
 - Holds local messages while they are being enqueued.
-- thread safe: called by both BrokerHandler and DeliverHandler
+- thread safe: called by both BrokerHandler and MessageHandler
 
 MessageHandler:
 - handles delivered mcast messages related to messages.
@@ -77,7 +65,7 @@ QueueOwnerHandler:
 - maintains view of cluster state regarding queue ownership.
 
 cluster::Core: class to hold new cluster together (replaces cluster::Cluster)
-- thread safe: manage state used by both DeliverHandler and BrokerHandler
+- thread safe: manage state used by both MessageHandler and BrokerHandler
 
 The following code sketch illustrates only the "happy path" error handling
 is omitted.
@@ -89,13 +77,15 @@ Types:
 - NodeId 64 bit CPG node-id, identifies member of the cluster.
 - struct MessageId { NodeId node; SequenceNumber seq; }
 
+NOTE: Message ID's identify a QueuedMessage, i.e. a position on a queue.
+
 Members:
 - atomic<SequenceNumber> sequence // sequence number for message IDs.
 - thread_local bool noReplicate // suppress replication.
 - thread_local bool isRouting // suppress operations while routing
 - QueuedMessage localMessage[SequenceNumber] // local messages being enqueued.
 
-NOTE: localMessage is also modified by DeliverHandler.
+NOTE: localMessage is also modified by MessageHandler.
 
 broker::Cluster intercept functions:
 
@@ -150,7 +140,7 @@ dequeue(QueuedMessage) 
   # FIXME revisit - move it out of the queue lock.
   cleanup(msg)
 
-*** DeliverHandler and mcast messages
+*** MessageHandler and mcast messages
 Types:
 - struct QueueEntry { QueuedMessage qmsg; NodeId acquired; }
 - struct QueueKey { MessageId id; QueueName q; }
@@ -326,8 +316,9 @@ cancel(q,consumer,consumerCount) - Queue
 - keep design modular, keep threading rules clear.
 
 ** TODO [#B] Large message replication.
-Need to be able to multicast large messages in fragments
-
+Multicast should encode messages in fixed size buffers (64k)?
+Can't assume we can send message in one chunk.
+For 0-10 can use channel numbers & send whole frames packed into larger buffer.
 ** TODO [#B] Batch CPG multicast messages
 The new cluster design involves a lot of small multicast messages,
 they need to be batched into larger CPG messages for efficiency.
@@ -437,3 +428,9 @@ Look for ways to capitalize on the simil
 
 In particular QueuedEvents (async replication) strongly resembles
 cluster replication, but over TCP rather than multicast.
+** TODO [#C] Concurrency for enqueue events.
+All enqueue events are being processed in the CPG deliver thread context which
+serializes all the work. We only need ordering on a per queue basis, can we
+enqueue in parallel on different queues and will that improve performance?
+** TODO [#C] Handling immediate messages in a cluster
+Include remote consumers in descision to deliver an immediate message?

Modified: qpid/trunk/qpid/cpp/src/tests/BrokerClusterCalls.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/BrokerClusterCalls.cpp?rev=1027210&r1=1027209&r2=1027210&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/BrokerClusterCalls.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/BrokerClusterCalls.cpp Mon Oct 25 18:00:34 2010
@@ -42,6 +42,7 @@ using namespace boost;
 using namespace boost::assign;
 using namespace qpid::messaging;
 using boost::format;
+using boost::intrusive_ptr;
 
 namespace qpid {
 namespace tests {
@@ -59,6 +60,9 @@ class DummyCluster : public broker::Clus
         history += (format("%s(%s, %d, %s)") % op % qm.queue->getName()
                     % qm.position % qm.payload->getFrames().getContent()).str();
     }
+    void recordMsg(const string& op, broker::Queue& q, intrusive_ptr<broker::Message> msg) {
+        history += (format("%s(%s, %s)") % op % q.getName() % msg->getFrames().getContent()).str();
+    }
     void recordStr(const string& op, const string& name) {
         history += (format("%s(%s)") % op % name).str();
     }
@@ -70,7 +74,10 @@ class DummyCluster : public broker::Clus
         history += (format("routing(%s)") % m->getFrames().getContent()).str();
     }
 
-    virtual void enqueue(broker::QueuedMessage& qm) { recordQm("enqueue", qm); }
+    virtual bool enqueue(broker::Queue& q, const intrusive_ptr<broker::Message>&msg) {
+        recordMsg("enqueue", q, msg);
+        return true;
+    }
 
     virtual void routed(const boost::intrusive_ptr<broker::Message>& m) {
         history += (format("routed(%s)") % m->getFrames().getContent()).str();
@@ -91,9 +98,8 @@ class DummyCluster : public broker::Clus
     virtual void release(const broker::QueuedMessage& qm) {
         if (!isRouting) recordQm("release", qm);
     }
-    virtual void dequeue(const broker::QueuedMessage& qm) {
-        // Never ignore dequeue, used to avoid resource leaks.
-        recordQm("dequeue", qm);
+    virtual void drop(const broker::QueuedMessage& qm) {
+        if (!isRouting) recordQm("dequeue", qm);
     }
 
     // Consumers
@@ -156,7 +162,7 @@ QPID_AUTO_TEST_CASE(testSimplePubSub) {
     sender.send(Message("a"));
     f.s.sync();
     BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
-    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)");
     BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
     // Don't check size here as it is uncertain whether acquire has happened yet.
 
@@ -221,7 +227,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) {
     f.s.reject(m);
     BOOST_CHECK_EQUAL(h.at(i++), "reject(q, 1, a)"); 
     BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); // Routing to alt exchange
-    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, a)");
     BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
     BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)");
     BOOST_CHECK_EQUAL(h.at(i++), "rejected(q, 1, a)"); 
@@ -239,7 +245,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) {
     bool received = receiver.fetch(m, Duration::IMMEDIATE);
     BOOST_CHECK(!received);                     // Timed out
     BOOST_CHECK_EQUAL(h.at(i++), "routing(t)");
-    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 2, t)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)");
     BOOST_CHECK_EQUAL(h.at(i++), "routed(t)");
     BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)");
     BOOST_CHECK_EQUAL(h.size(), i);
@@ -252,7 +258,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) {
     f.s.sync();
     BOOST_CHECK_EQUAL(h.at(i++), "createq(lvq)");
     BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
-    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, a)");
     BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
     BOOST_CHECK_EQUAL(h.size(), i);
 
@@ -261,11 +267,7 @@ QPID_AUTO_TEST_CASE(testReleaseReject) {
     sender.send(m);
     f.s.sync();
     BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
-    // FIXME: bug in Queue.cpp gives the incorrect position when
-    // dequeueing a replaced LVQ message.
-    // BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, a)");
-    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 2, a)"); // Should be 1
-    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, 2, b)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, b)");
     BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
     BOOST_CHECK_EQUAL(h.size(), i);
 
@@ -345,20 +347,19 @@ QPID_AUTO_TEST_CASE(testRingQueue) {
     BOOST_CHECK_EQUAL(h.at(i++), "createq(ring)");
 
     BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
-    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, a)");
     BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
 
     BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
-    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 2, b)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, b)");
     BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
 
     BOOST_CHECK_EQUAL(h.at(i++), "routing(c)");
-    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 3, c)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, c)");
     BOOST_CHECK_EQUAL(h.at(i++), "routed(c)");
 
     BOOST_CHECK_EQUAL(h.at(i++), "routing(d)");
-    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 1, a)");
-    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 4, d)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, d)");
     BOOST_CHECK_EQUAL(h.at(i++), "routed(d)");
 
     Receiver receiver = f.s.createReceiver("ring");
@@ -399,15 +400,16 @@ QPID_AUTO_TEST_CASE(testTransactions) {
     BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
     BOOST_CHECK_EQUAL(h.size(), i); // Not replicated till commit
     ts.commit();
-    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 1, a)");
-    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 2, b)");
-    BOOST_CHECK_EQUAL(h.size(), i);
-
     // FIXME aconway 2010-10-18: As things stand the cluster is not
     // compatible with transactions
-    // - enqueues occur after routing is complete.
+    // - enqueues occur after routing is complete
+    // - no call to Cluster::enqueue, should be in Queue::process?
     // - no transaction context associated with messages in the Cluster interface.
     // - no call to Cluster::accept in Queue::dequeueCommitted
+    // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)");
+    // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, b)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
 
     Receiver receiver = ts.createReceiver("q");
     BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "a");

Added: qpid/trunk/qpid/cpp/src/tests/cluster2_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster2_tests.py?rev=1027210&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster2_tests.py (added)
+++ qpid/trunk/qpid/cpp/src/tests/cluster2_tests.py Mon Oct 25 18:00:34 2010
@@ -0,0 +1,66 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, signal, sys, time, imp, re, subprocess
+from qpid import datatypes, messaging
+from qpid.brokertest import *
+from qpid.harness import Skipped
+from qpid.messaging import Message
+from qpid.messaging.exceptions import Empty
+from threading import Thread, Lock
+from logging import getLogger
+from itertools import chain
+
+log = getLogger("qpid.cluster_tests")
+
+class Cluster2Tests(BrokerTest):
+    """Tests for new cluster code."""
+
+    def test_message_enqueue(self):
+        """Test basic replication of enqueued messages."""
+
+        cluster = self.cluster(2, cluster2=True, args=["--log-enable=trace+:cluster"])
+
+        sn0 = cluster[0].connect().session()
+        r0p = sn0.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}");
+        r0q = sn0.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}");
+        s0 = sn0.sender("amq.fanout");
+
+        sn1 = cluster[1].connect().session()
+        r1p = sn1.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}");
+        r1q = sn1.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}");
+
+
+        # Send messages on member 0
+        content = ["a","b","c"]
+        for m in content: s0.send(Message(m))
+
+        # Browse on both members.
+        def check(content, receiver):
+            for c in content: self.assertEqual(c, receiver.fetch(1).content)
+            self.assertRaises(Empty, receiver.fetch, 0)
+
+        check(content, r0p)
+        check(content, r0q)
+        check(content, r1p)
+        check(content, r1q)
+
+        sn1.connection.close()
+        sn0.connection.close()

Propchange: qpid/trunk/qpid/cpp/src/tests/cluster2_tests.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/tests/cluster2_tests.py
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/trunk/qpid/cpp/src/tests/run_cluster_tests
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_cluster_tests?rev=1027210&r1=1027209&r2=1027210&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_cluster_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_cluster_tests Mon Oct 25 18:00:34 2010
@@ -33,5 +33,5 @@ mkdir -p $OUTDIR
 CLUSTER_TESTS_IGNORE=${CLUSTER_TESTS_IGNORE:--i cluster_tests.StoreTests.* -I $srcdir/cluster_tests.fail}
 CLUSTER_TESTS=${CLUSTER_TESTS:-$*}
 
-with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1
+with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests -m cluster2_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1
 rm -rf $OUTDIR

Modified: qpid/trunk/qpid/cpp/src/tests/test_env.sh.in
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/test_env.sh.in?rev=1027210&r1=1027209&r2=1027210&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/test_env.sh.in (original)
+++ qpid/trunk/qpid/cpp/src/tests/test_env.sh.in Mon Oct 25 18:00:34 2010
@@ -63,6 +63,7 @@ export TEST_STORE_LIB=$testmoduledir/tes
 exportmodule() { test -f $moduledir/$2 && eval "export $1=$moduledir/$2"; }
 exportmodule ACL_LIB acl.so
 exportmodule CLUSTER_LIB cluster.so
+exportmodule CLUSTER2_LIB cluster2.so
 exportmodule REPLICATING_LISTENER_LIB replicating_listener.so
 exportmodule REPLICATION_EXCHANGE_LIB replication_exchange.so
 exportmodule SSLCONNECTOR_LIB sslconnector.so

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=1027210&r1=1027209&r2=1027210&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Mon Oct 25 18:00:34 2010
@@ -279,4 +279,26 @@
       <field name="data" type="vbin32"/>
     </control>
   </class>
+
+
+  <!-- TODO aconway 2010-10-20: Experimental classes for new cluster. -->
+
+  <!-- Message delivery and disposition -->
+  <class name="cluster-message" code="0x82">
+    <!-- FIXME aconway 2010-10-19: create message in fragments -->
+    <control name="routing" code="0x1">
+      <field name="sequence" type="uint64"/>
+      <field name="message" type="str32"/>
+    </control>
+
+    <control name="enqueue" code="0x2">
+      <field name="sequence" type="uint64"/>
+      <field name="queue" type="str8"/>
+    </control>
+
+    <control name="routed" code="0x3">
+      <field name="sequence" type="uint64"/>
+    </control>
+
+  </class>
 </amqp>

Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=1027210&r1=1027209&r2=1027210&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Mon Oct 25 18:00:34 2010
@@ -409,17 +409,25 @@ class Cluster:
 
     _cluster_count = 0
 
-    def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
+    def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True,
+                 cluster2=False):
+        if cluster2:
+            cluster_name = "--cluster2-name"
+            cluster_lib = BrokerTest.cluster2_lib
+        else:
+            cluster_name = "--cluster-name"
+            cluster_lib = BrokerTest.cluster_lib
         self.test = test
         self._brokers=[]
         self.name = "cluster%d" % Cluster._cluster_count
         Cluster._cluster_count += 1
         # Use unique cluster name
         self.args = copy(args)
-        self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
+        self.args += [ cluster_name,
+                       "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
         self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
-        assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in"
-        self.args += [ "--load-module", BrokerTest.cluster_lib ]
+        assert cluster_lib, "Cannot locate cluster plug-in"
+        self.args += [ "--load-module", cluster_lib ]
         self.start_n(count, expect=expect, wait=wait)
 
     def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0):
@@ -445,6 +453,7 @@ class BrokerTest(TestCase):
     # Environment settings.
     qpidd_exec = checkenv("QPIDD_EXEC")
     cluster_lib = os.getenv("CLUSTER_LIB")
+    cluster2_lib = os.getenv("CLUSTER2_LIB")
     xml_lib = os.getenv("XML_LIB")
     qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
     qpid_route_exec = os.getenv("QPID_ROUTE_EXEC")
@@ -491,9 +500,9 @@ class BrokerTest(TestCase):
                 raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
         return b
 
-    def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
+    def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, cluster2=False):
         """Create and return a cluster ready for use"""
-        cluster = Cluster(self, count, args, expect=expect, wait=wait)
+        cluster = Cluster(self, count, args, expect=expect, wait=wait, cluster2=cluster2)
         return cluster
 
 class RethrownException(Exception):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org