You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/02/22 19:23:07 UTC

svn commit: r1073448 [2/2] - in /qpid/branches/qpid-2920/qpid/cpp: design_docs/ src/ src/qpid/broker/ src/qpid/cluster/ src/qpid/cluster/exp/ src/tests/ xml/

Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp Tue Feb 22 18:23:06 2011
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Core.h"
+#include "MessageHandler.h"
+#include "BrokerHandler.h"
+#include "EventHandler.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace cluster {
+using namespace broker;
+
+MessageHandler::MessageHandler(EventHandler& e) :
+    HandlerBase(e),
+    broker(e.getCore().getBroker())
+{}
+
+bool MessageHandler::invoke(const framing::AMQBody& body) {
+    return framing::invoke(*this, body).wasHandled();
+}
+
+void MessageHandler::routing(RoutingId routingId, const std::string& message) {
+    if (sender() == self()) return; // Already in getCore().getRoutingMap()
+    boost::intrusive_ptr<Message> msg = new Message;
+    // FIXME aconway 2010-10-28: decode message in bounded-size buffers.
+    framing::Buffer buf(const_cast<char*>(&message[0]), message.size());
+    msg->decodeHeader(buf);
+    msg->decodeContent(buf);
+    memberMap[sender()].routingMap[routingId] = msg;
+}
+
+boost::shared_ptr<broker::Queue> MessageHandler::findQueue(
+    const std::string& q, const char* msg)
+{
+    boost::shared_ptr<Queue> queue = broker.getQueues().find(q);
+    if (!queue) throw Exception(QPID_MSG(msg << ": unknown queue " << q));
+    return queue;
+}
+
+void MessageHandler::enqueue(RoutingId routingId, const std::string& q) {
+    boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed");
+    boost::intrusive_ptr<Message> msg;
+    if (sender() == self())
+        msg = eventHandler.getCore().getRoutingMap().get(routingId);
+    else
+        msg = memberMap[sender()].routingMap[routingId];
+    if (!msg) throw Exception(QPID_MSG("Cluster enqueue on " << q
+                                       << " failed:  unknown message"));
+    BrokerHandler::ScopedSuppressReplication ssr;
+    queue->deliver(msg);
+}
+
+void MessageHandler::routed(RoutingId routingId) {
+    if (sender() == self())
+        eventHandler.getCore().getRoutingMap().erase(routingId);
+    else
+        memberMap[sender()].routingMap.erase(routingId);
+}
+
+void MessageHandler::dequeue(const std::string& q, uint32_t position) {
+    if (sender() == self()) {
+        // FIXME aconway 2010-10-28: we should complete the ack that initiated
+        // the dequeue at this point, see BrokerHandler::dequeue
+        return;
+    }
+    boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed");
+    BrokerHandler::ScopedSuppressReplication ssr;
+    QueuedMessage qm;
+    // FIXME aconway 2010-10-28: when we replicate acquires, the acquired
+    // messages will be stored by MessageHandler::acquire.
+    if (queue->acquireMessageAt(position, qm)) {
+        assert(qm.position.getValue() == position);
+        assert(qm.payload);
+        queue->dequeue(0, qm);
+    }
+}
+
+}} // namespace qpid::cluster

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h Tue Feb 22 18:23:06 2011
@@ -0,0 +1,73 @@
+#ifndef QPID_CLUSTER_MESSAGEHANDLER_H
+#define QPID_CLUSTER_MESSAGEHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+#include "HandlerBase.h"
+#include "qpid/framing/AMQP_AllOperations.h"
+#include <boost/intrusive_ptr.hpp>
+#include <map>
+
+namespace qpid {
+
+namespace broker {
+class Message;
+class Broker;
+class Queue;
+}
+
+namespace cluster {
+class EventHandler;
+class BrokerHandler;
+
+/**
+ * Handler for message disposition events.
+ */
+class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler,
+                       public HandlerBase
+{
+  public:
+    MessageHandler(EventHandler&);
+
+    bool invoke(const framing::AMQBody& body);
+
+    void routing(uint32_t routingId, const std::string& message);
+    void enqueue(uint32_t routingId, const std::string& queue);
+    void routed(uint32_t routingId);
+    void dequeue(const std::string& queue, uint32_t position);
+  private:
+    struct Member {
+        typedef std::map<uint32_t, boost::intrusive_ptr<broker::Message> > RoutingMap;
+        RoutingMap routingMap;
+    };
+    typedef std::map<MemberId, Member> MemberMap;
+
+    boost::shared_ptr<broker::Queue> findQueue(const std::string& q, const char* msg);
+
+    broker::Broker& broker;
+    MemberMap memberMap;
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_MESSAGEHANDLER_H*/

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/README.txt
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/README.txt?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/README.txt (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/README.txt Tue Feb 22 18:23:06 2011
@@ -0,0 +1,2 @@
+
+Experimental code to test ideas about a new cluster design.

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/README.txt
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/README.txt
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp Tue Feb 22 18:23:06 2011
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Core.h"
+#include "WiringHandler.h"
+#include "EventHandler.h"
+#include "BrokerHandler.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/ExchangeRegistry.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace cluster {
+using namespace broker;
+using framing::FieldTable;
+
+WiringHandler::WiringHandler(EventHandler& e) :
+    HandlerBase(e),
+    broker(e.getCore().getBroker()),
+    recovery(broker.getQueues(), broker.getExchanges(),
+             broker.getLinks(), broker.getDtxManager())
+{}
+
+bool WiringHandler::invoke(const framing::AMQBody& body) {
+    return framing::invoke(*this, body).wasHandled();
+}
+
+void WiringHandler::createQueue(const std::string& data) {
+    if (sender() == self()) return;
+    BrokerHandler::ScopedSuppressReplication ssr;
+    framing::Buffer buf(const_cast<char*>(&data[0]), data.size());
+    // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*()
+    RecoverableQueue::shared_ptr queue = recovery.recoverQueue(buf);
+    QPID_LOG(debug, "cluster: create queue " << queue->getName());
+}
+
+void WiringHandler::destroyQueue(const std::string& name) {
+    if (sender() == self()) return;
+    QPID_LOG(debug, "cluster: destroy queue " << name);
+    BrokerHandler::ScopedSuppressReplication ssr;
+    broker.deleteQueue(name, std::string(), std::string());
+}
+
+void WiringHandler::createExchange(const std::string& data) {
+    if (sender() == self()) return;
+    BrokerHandler::ScopedSuppressReplication ssr;
+    framing::Buffer buf(const_cast<char*>(&data[0]), data.size());
+    // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*()
+    RecoverableExchange::shared_ptr exchange = recovery.recoverExchange(buf);
+    QPID_LOG(debug, "cluster: create exchange " << exchange->getName());
+}
+
+void WiringHandler::destroyExchange(const std::string& name) {
+    if (sender() == self()) return;
+    QPID_LOG(debug, "cluster: destroy exchange " << name);
+    BrokerHandler::ScopedSuppressReplication ssr;
+    broker.getExchanges().destroy(name);
+}
+
+void WiringHandler::bind(
+    const std::string& queueName, const std::string& exchangeName,
+    const std::string& routingKey, const FieldTable& arguments)
+{
+    if (sender() == self()) return;
+    QPID_LOG(debug, "cluster: bind queue=" << queueName
+             << " exchange=" << exchangeName
+             << " key=" << routingKey
+             << " arguments=" << arguments);
+    BrokerHandler::ScopedSuppressReplication ssr;
+    broker.bind(queueName, exchangeName, routingKey, arguments, std::string(), std::string());
+}
+
+void WiringHandler::unbind(
+    const std::string& queueName, const std::string& exchangeName,
+    const std::string& routingKey, const FieldTable& arguments)
+{
+    if (sender() == self()) return;
+    QPID_LOG(debug, "cluster: unbind queue=" << queueName
+             << " exchange=" << exchangeName
+             << " key=" << routingKey
+             << " arguments=" << arguments);
+    BrokerHandler::ScopedSuppressReplication ssr;
+    broker.unbind(queueName, exchangeName, routingKey, std::string(), std::string());
+}
+
+}} // namespace qpid::cluster

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h Tue Feb 22 18:23:06 2011
@@ -0,0 +1,75 @@
+#ifndef QPID_CLUSTER_WIRINGHANDLER_H
+#define QPID_CLUSTER_WIRINGHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+#include "HandlerBase.h"
+#include "qpid/broker/RecoveryManagerImpl.h"
+#include "qpid/framing/AMQP_AllOperations.h"
+#include <boost/intrusive_ptr.hpp>
+#include <map>
+
+namespace qpid {
+
+namespace framing {
+class FieldTable;
+}
+
+namespace broker {
+class Broker;
+}
+
+namespace cluster {
+class EventHandler;
+
+
+/**
+ * Handler for wiring disposition events.
+ */
+class WiringHandler : public framing::AMQP_AllOperations::ClusterWiringHandler,
+                      public HandlerBase
+{
+  public:
+    WiringHandler(EventHandler&);
+
+    bool invoke(const framing::AMQBody& body);
+
+    void createQueue(const std::string& data);
+    void destroyQueue(const std::string& name);
+    void createExchange(const std::string& data);
+    void destroyExchange(const std::string& name);
+    void bind(const std::string& queue, const std::string& exchange,
+              const std::string& routingKey, const framing::FieldTable& arguments);
+    void unbind(const std::string& queue, const std::string& exchange,
+                const std::string& routingKey, const framing::FieldTable& arguments);
+
+
+  private:
+    broker::Broker& broker;
+    broker::RecoveryManagerImpl recovery;
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_WIRINGHANDLER_H*/

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/types.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/types.h?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/types.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/types.h Tue Feb 22 18:23:06 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -37,7 +37,7 @@ extern "C" {
 #  include <corosync/cpg.h>
 #else
 #  error "No cpg.h header file available"
-#endif    
+#endif
 }
 
 namespace qpid {
@@ -78,6 +78,9 @@ std::ostream& operator<<(std::ostream&, 
 
 std::ostream& operator<<(std::ostream&, EventType);
 
+/** Number to identify a message being routed. */
+typedef uint32_t RoutingId;
+
 }} // namespace qpid::cluster
 
 #endif  /*!QPID_CLUSTER_TYPES_H*/

Added: qpid/branches/qpid-2920/qpid/cpp/src/tests/BrokerClusterCalls.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/BrokerClusterCalls.cpp?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/BrokerClusterCalls.cpp (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/BrokerClusterCalls.cpp Tue Feb 22 18:23:06 2011
@@ -0,0 +1,419 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+///@file
+// Tests using a dummy broker::Cluster implementation to verify the expected
+// Cluster functions are called for various actions on the broker.
+//
+
+#include "unit_test.h"
+#include "test_tools.h"
+#include "qpid/broker/Cluster.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Session.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Duration.h"
+#include "BrokerFixture.h"
+#include <boost/assign.hpp>
+#include <boost/format.hpp>
+
+using namespace std;
+using namespace boost;
+using namespace boost::assign;
+using namespace qpid::messaging;
+using boost::format;
+using boost::intrusive_ptr;
+
+namespace qpid {
+namespace tests {
+
+class DummyCluster : public broker::Cluster
+{
+  private:
+    /** Flag used to ignore events other than enqueues while routing,
+     * e.g. acquires and accepts generated in a ring queue to replace an element..
+     * In real impl would be a thread-local variable.
+     */
+    bool isRouting;
+
+    void recordQm(const string& op, const broker::QueuedMessage& qm) {
+        history += (format("%s(%s, %d, %s)") % op % qm.queue->getName()
+                    % qm.position % qm.payload->getFrames().getContent()).str();
+    }
+    void recordMsg(const string& op, broker::Queue& q, intrusive_ptr<broker::Message> msg) {
+        history += (format("%s(%s, %s)") % op % q.getName() % msg->getFrames().getContent()).str();
+    }
+    void recordStr(const string& op, const string& name) {
+        history += (format("%s(%s)") % op % name).str();
+    }
+  public:
+    // Messages
+
+    virtual void routing(const boost::intrusive_ptr<broker::Message>& m) {
+        isRouting = true;
+        history += (format("routing(%s)") % m->getFrames().getContent()).str();
+    }
+
+    virtual bool enqueue(broker::Queue& q, const intrusive_ptr<broker::Message>&msg) {
+        recordMsg("enqueue", q, msg);
+        return true;
+    }
+
+    virtual void routed(const boost::intrusive_ptr<broker::Message>& m) {
+        history += (format("routed(%s)") % m->getFrames().getContent()).str();
+        isRouting = false;
+    }
+    virtual void acquire(const broker::QueuedMessage& qm) {
+        if (!isRouting) recordQm("acquire", qm);
+    }
+    virtual void release(const broker::QueuedMessage& qm) {
+        if (!isRouting) recordQm("release", qm);
+    }
+    virtual void dequeue(const broker::QueuedMessage& qm) {
+        if (!isRouting) recordQm("dequeue", qm);
+    }
+
+    // Consumers
+
+    virtual void consume(const broker::Queue& q, size_t n) {
+        history += (format("consume(%s, %d)") % q.getName() % n).str();
+    }
+    virtual void cancel(const broker::Queue& q, size_t n) {
+        history += (format("cancel(%s, %d)") % q.getName() % n).str();
+    }
+
+    // Wiring
+
+    virtual void create(const broker::Queue& q) { recordStr("createq", q.getName()); }
+    virtual void destroy(const broker::Queue& q) { recordStr("destroyq", q.getName()); }
+    virtual void create(const broker::Exchange& ex) { recordStr("createex", ex.getName()); }
+    virtual void destroy(const broker::Exchange& ex) { recordStr("destroyex", ex.getName()); }
+    virtual void bind(const broker::Queue& q, const broker::Exchange& ex, const std::string& key, const framing::FieldTable& /*args*/) {
+        history += (format("bind(%s, %s, %s)") % q.getName() % ex.getName() % key).str();
+    }
+    virtual void unbind(const broker::Queue& q, const broker::Exchange& ex, const std::string& key, const framing::FieldTable& /*args*/) {
+        history += (format("unbind(%s, %s, %s)")% q.getName()%ex.getName()%key).str();
+    }
+    vector<string> history;
+};
+
+QPID_AUTO_TEST_SUITE(BrokerClusterCallsTestSuite)
+
+// Broker fixture with DummyCluster set up and some new API client bits.
+struct DummyClusterFixture: public BrokerFixture {
+    Connection c;
+    Session s;
+    DummyCluster*dc;
+    DummyClusterFixture() {
+        broker->setCluster(auto_ptr<broker::Cluster>(new DummyCluster));
+        dc = &static_cast<DummyCluster&>(broker->getCluster());
+        c = Connection("localhost:"+lexical_cast<string>(getPort()));
+        c.open();
+        s = c.createSession();
+    }
+    ~DummyClusterFixture() {
+        c.close();
+    }
+};
+
+QPID_AUTO_TEST_CASE(testSimplePubSub) {
+    DummyClusterFixture f;
+    vector<string>& h = f.dc->history;
+
+    // Queue creation
+    Sender sender = f.s.createSender("q;{create:always,delete:always}");
+    size_t i = 0;
+    BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking.
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    // Consumer
+    Receiver receiver = f.s.createReceiver("q");
+    f.s.sync();
+    BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    // Send message
+    sender.send(Message("a"));
+    f.s.sync();
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+    // Don't check size here as it is uncertain whether acquire has happened yet.
+
+    // Acquire message
+    Message m = receiver.fetch(Duration::SECOND);
+    BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    // Acknowledge message
+    f.s.acknowledge(true);
+    f.s.sync();
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    // Close a consumer
+    receiver.close();
+    BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 0)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    // Destroy the queue
+    f.c.close();
+    BOOST_CHECK_EQUAL(h.at(i++), "destroyq(q)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_CASE(testReleaseReject) {
+    DummyClusterFixture f;
+    vector<string>& h = f.dc->history;
+
+    Sender sender = f.s.createSender("q;{create:always,delete:always,node:{x-declare:{alternate-exchange:amq.fanout}}}");
+    sender.send(Message("a"));
+    Receiver receiver = f.s.createReceiver("q");
+    Receiver altReceiver = f.s.createReceiver("amq.fanout;{link:{name:altq}}");
+    Message m = receiver.fetch(Duration::SECOND);
+    h.clear();
+
+    // Explicit release
+    f.s.release(m);
+    f.s.sync();
+    size_t i = 0;
+    BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    // Implicit release on closing connection.
+    Connection c("localhost:"+lexical_cast<string>(f.getPort()));
+    c.open();
+    Session s = c.createSession();
+    Receiver r = s.createReceiver("q");
+    m = r.fetch(Duration::SECOND);
+    h.clear();
+    i = 0;
+    c.close();
+    BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 1)");
+    BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    // Reject message, goes to alternate exchange.
+    m = receiver.fetch(Duration::SECOND);
+    h.clear();
+    i = 0;
+    f.s.reject(m);
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); // Routing to alt exchange
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+    m = altReceiver.fetch(Duration::SECOND);
+    BOOST_CHECK_EQUAL(m.getContent(), "a");
+
+    // Timed out message
+    h.clear();
+    i = 0;
+    m = Message("t");
+    m.setTtl(Duration(1));                // Timeout 1ms
+    sender.send(m);
+    usleep(2000);               // Sleep 2ms
+    bool received = receiver.fetch(m, Duration::IMMEDIATE);
+    BOOST_CHECK(!received);                     // Timed out
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(t)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(t)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    // Message replaced on LVQ
+    sender = f.s.createSender("lvq;{create:always,delete:always,node:{x-declare:{arguments:{qpid.last_value_queue:1}}}}");
+    m = Message("a");
+    m.getProperties()["qpid.LVQ_key"] = "foo";
+    sender.send(m);
+    f.s.sync();
+    BOOST_CHECK_EQUAL(h.at(i++), "createq(lvq)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    m = Message("b");
+    m.getProperties()["qpid.LVQ_key"] = "foo";
+    sender.send(m);
+    f.s.sync();
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, b)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    receiver = f.s.createReceiver("lvq");
+    BOOST_CHECK_EQUAL(receiver.fetch(Duration::SECOND).getContent(), "b");
+    f.s.acknowledge(true);
+    BOOST_CHECK_EQUAL(h.at(i++), "consume(lvq, 1)");
+    BOOST_CHECK_EQUAL(h.at(i++), "acquire(lvq, 1, b)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, b)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_CASE(testFanout) {
+    DummyClusterFixture f;
+    vector<string>& h = f.dc->history;
+
+    Receiver r1 = f.s.createReceiver("amq.fanout;{link:{name:r1}}");
+    Receiver r2 = f.s.createReceiver("amq.fanout;{link:{name:r2}}");
+    Sender sender = f.s.createSender("amq.fanout");
+    r1.setCapacity(0);          // Don't receive immediately.
+    r2.setCapacity(0);
+    h.clear();
+    size_t i = 0;
+
+    // Send message
+    sender.send(Message("a"));
+    f.s.sync();
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+    BOOST_CHECK_EQUAL(0u, h.at(i++).find("enqueue(amq.fanout_r"));
+    BOOST_CHECK_EQUAL(0u, h.at(i++).find("enqueue(amq.fanout_r"));
+    BOOST_CHECK(h.at(i-1) != h.at(i-2));
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    // Receive messages
+    Message m1 = r1.fetch(Duration::SECOND);
+    f.s.acknowledge(m1, true);
+    Message m2 = r2.fetch(Duration::SECOND);
+    f.s.acknowledge(m2, true);
+
+    BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r1, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r1, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r2, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r2, 1, a)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_CASE(testRingQueue) {
+    DummyClusterFixture f;
+    vector<string>& h = f.dc->history;
+
+    // FIXME aconway 2010-10-15: QPID-2908 ring queue address string is not working,
+    // so we can't do this:
+    // Sender sender = f.s.createSender("ring;{create:always,node:{x-declare:{arguments:{qpid.max_size:3,qpid.policy_type:ring}}}}");
+    // Must use old API to declare ring queue:
+    qpid::client::Connection c;
+    f.open(c);
+    qpid::client::Session s = c.newSession();
+    qpid::framing::FieldTable args;
+    args.setInt("qpid.max_size", 3);
+    args.setString("qpid.policy_type","ring");
+    s.queueDeclare(qpid::client::arg::queue="ring", qpid::client::arg::arguments=args);
+    c.close();
+    Sender sender = f.s.createSender("ring");
+
+    size_t i = 0;
+    // Send message
+    sender.send(Message("a"));
+    sender.send(Message("b"));
+    sender.send(Message("c"));
+    sender.send(Message("d"));
+    f.s.sync();
+
+    BOOST_CHECK_EQUAL(h.at(i++), "createq(ring)");
+
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, b)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
+
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(c)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, c)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(c)");
+
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(d)");
+    BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, d)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(d)");
+
+    Receiver receiver = f.s.createReceiver("ring");
+    BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b");
+    BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "c");
+    BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "d");
+    f.s.acknowledge(true);
+
+    BOOST_CHECK_EQUAL(h.at(i++), "consume(ring, 1)");
+    BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 2, b)");
+    BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 3, c)");
+    BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 4, d)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 2, b)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 3, c)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 4, d)");
+
+    BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_CASE(testTransactions) {
+    DummyClusterFixture f;
+    vector<string>& h = f.dc->history;
+    Session ts = f.c.createTransactionalSession();
+    Sender sender = ts.createSender("q;{create:always,delete:always}");
+    size_t i = 0;
+    BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking.
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+    sender.send(Message("a"));
+    sender.send(Message("b"));
+    ts.sync();
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
+    BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
+    BOOST_CHECK_EQUAL(h.size(), i); // Not replicated till commit
+    ts.commit();
+    // FIXME aconway 2010-10-18: As things stand the cluster is not
+    // compatible with transactions
+    // - enqueues occur after routing is complete
+    // - no call to Cluster::enqueue, should be in Queue::process?
+    // - no transaction context associated with messages in the Cluster interface.
+    // - no call to Cluster::accept in Queue::dequeueCommitted
+    // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)");
+    // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, b)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+
+
+    Receiver receiver = ts.createReceiver("q");
+    BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "a");
+    BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b");
+    ts.acknowledge();
+    ts.sync();
+    BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)");
+    BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 2, b)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+    ts.commit();
+    ts.sync();
+    // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 1, a)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)");
+    // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 2, b)");
+    BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, b)");
+    BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+}} // namespace qpid::tests

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/tests/BrokerClusterCalls.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/tests/BrokerClusterCalls.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/Makefile.am?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/Makefile.am Tue Feb 22 18:23:06 2011
@@ -25,7 +25,7 @@ QMF_GEN=$(top_srcdir)/managementgen/qmf-
 abs_builddir=@abs_builddir@
 abs_srcdir=@abs_srcdir@
 
-extra_libs = 
+extra_libs =
 lib_client = $(abs_builddir)/../libqpidclient.la
 lib_messaging = $(abs_builddir)/../libqpidmessaging.la
 lib_common = $(abs_builddir)/../libqpidcommon.la
@@ -36,7 +36,7 @@ lib_qmf2 = $(abs_builddir)/../libqmf2.la
 
 #
 # Initialize variables that are incremented with +=
-# 
+#
 check_PROGRAMS=
 check_LTLIBRARIES=
 TESTS=
@@ -61,9 +61,9 @@ tmodule_LTLIBRARIES=
 # Unit test program
 #
 # Unit tests are built as a single program to reduce valgrind overhead
-# when running the tests. If you want to build a subset of the tests do 
+# when running the tests. If you want to build a subset of the tests do
 #   rm -f unit_test; make unit_test unit_test_OBJECTS="unit_test.o SelectedTest.o"
-# 
+#
 
 TESTS+=unit_test
 check_PROGRAMS+=unit_test
@@ -124,7 +124,8 @@ unit_test_SOURCES= unit_test.cpp unit_te
 	Variant.cpp \
 	Address.cpp \
 	ClientMessage.cpp \
-	Qmf2.cpp
+	Qmf2.cpp \
+	BrokerClusterCalls.cpp
 
 if HAVE_XML
 unit_test_SOURCES+= XmlClientSessionTest.cpp
@@ -188,32 +189,32 @@ qpid_send_LDADD = $(lib_messaging)
 qpidtest_PROGRAMS+=qpid-perftest
 qpid_perftest_SOURCES=qpid-perftest.cpp test_tools.h TestOptions.h ConnectionOptions.h
 qpid_perftest_INCLUDES=$(PUBLIC_INCLUDES)
-qpid_perftest_LDADD=$(lib_client) 
+qpid_perftest_LDADD=$(lib_client)
 
 qpidtest_PROGRAMS+=qpid-txtest
 qpid_txtest_INCLUDES=$(PUBLIC_INCLUDES)
 qpid_txtest_SOURCES=qpid-txtest.cpp  TestOptions.h ConnectionOptions.h
-qpid_txtest_LDADD=$(lib_client) 
+qpid_txtest_LDADD=$(lib_client)
 
 qpidtest_PROGRAMS+=qpid-latency-test
 qpid_latency_test_INCLUDES=$(PUBLIC_INCLUDES)
 qpid_latency_test_SOURCES=qpid-latency-test.cpp TestOptions.h ConnectionOptions.h
-qpid_latency_test_LDADD=$(lib_client) 
+qpid_latency_test_LDADD=$(lib_client)
 
 qpidtest_PROGRAMS+=qpid-client-test
 qpid_client_test_INCLUDES=$(PUBLIC_INCLUDES)
 qpid_client_test_SOURCES=qpid-client-test.cpp TestOptions.h ConnectionOptions.h
-qpid_client_test_LDADD=$(lib_client) 
+qpid_client_test_LDADD=$(lib_client)
 
 qpidtest_PROGRAMS+=qpid-topic-listener
 qpid_topic_listener_INCLUDES=$(PUBLIC_INCLUDES)
 qpid_topic_listener_SOURCES=qpid-topic-listener.cpp TestOptions.h ConnectionOptions.h
-qpid_topic_listener_LDADD=$(lib_client) 
+qpid_topic_listener_LDADD=$(lib_client)
 
 qpidtest_PROGRAMS+=qpid-topic-publisher
 qpid_topic_publisher_INCLUDES=$(PUBLIC_INCLUDES)
 qpid_topic_publisher_SOURCES=qpid-topic-publisher.cpp TestOptions.h ConnectionOptions.h
-qpid_topic_publisher_LDADD=$(lib_client) 
+qpid_topic_publisher_LDADD=$(lib_client)
 
 qpidtest_PROGRAMS+=qpid-ping
 qpid_ping_INCLUDES=$(PUBLIC_INCLUDES)
@@ -232,17 +233,17 @@ echotest_LDADD=$(lib_client)
 check_PROGRAMS+=publish
 publish_INCLUDES=$(PUBLIC_INCLUDES)
 publish_SOURCES=publish.cpp TestOptions.h ConnectionOptions.h
-publish_LDADD=$(lib_client) 
+publish_LDADD=$(lib_client)
 
 check_PROGRAMS+=consume
 consume_INCLUDES=$(PUBLIC_INCLUDES)
 consume_SOURCES=consume.cpp  TestOptions.h ConnectionOptions.h
-consume_LDADD=$(lib_client) 
+consume_LDADD=$(lib_client)
 
 check_PROGRAMS+=header_test
 header_test_INCLUDES=$(PUBLIC_INCLUDES)
 header_test_SOURCES=header_test.cpp TestOptions.h ConnectionOptions.h
-header_test_LDADD=$(lib_client) 
+header_test_LDADD=$(lib_client)
 
 check_PROGRAMS+=failover_soak
 failover_soak_INCLUDES=$(PUBLIC_INCLUDES)
@@ -251,28 +252,28 @@ failover_soak_LDADD=$(lib_client) $(lib_
 
 check_PROGRAMS+=declare_queues
 declare_queues_INCLUDES=$(PUBLIC_INCLUDES)
-declare_queues_SOURCES=declare_queues.cpp  
-declare_queues_LDADD=$(lib_client) 
+declare_queues_SOURCES=declare_queues.cpp
+declare_queues_LDADD=$(lib_client)
 
 check_PROGRAMS+=replaying_sender
 replaying_sender_INCLUDES=$(PUBLIC_INCLUDES)
-replaying_sender_SOURCES=replaying_sender.cpp  
-replaying_sender_LDADD=$(lib_client) 
+replaying_sender_SOURCES=replaying_sender.cpp
+replaying_sender_LDADD=$(lib_client)
 
 check_PROGRAMS+=resuming_receiver
 resuming_receiver_INCLUDES=$(PUBLIC_INCLUDES)
-resuming_receiver_SOURCES=resuming_receiver.cpp  
-resuming_receiver_LDADD=$(lib_client) 
+resuming_receiver_SOURCES=resuming_receiver.cpp
+resuming_receiver_LDADD=$(lib_client)
 
 check_PROGRAMS+=txshift
 txshift_INCLUDES=$(PUBLIC_INCLUDES)
 txshift_SOURCES=txshift.cpp  TestOptions.h ConnectionOptions.h
-txshift_LDADD=$(lib_client) 
+txshift_LDADD=$(lib_client)
 
 check_PROGRAMS+=txjob
 txjob_INCLUDES=$(PUBLIC_INCLUDES)
 txjob_SOURCES=txjob.cpp  TestOptions.h ConnectionOptions.h
-txjob_LDADD=$(lib_client) 
+txjob_LDADD=$(lib_client)
 
 check_PROGRAMS+=PollerTest
 PollerTest_SOURCES=PollerTest.cpp
@@ -307,7 +308,7 @@ TESTS_ENVIRONMENT = \
     VALGRIND=$(VALGRIND) \
     LIBTOOL="$(LIBTOOL)" \
     QPID_DATA_DIR= \
-    $(srcdir)/run_test 
+    $(srcdir)/run_test
 
 system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest
 TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests \
@@ -352,7 +353,8 @@ EXTRA_DIST +=								\
   start_broker.ps1							\
   stop_broker.ps1							\
   topictest.ps1                                                         \
-  run_queue_flow_limit_tests
+  run_queue_flow_limit_tests						\
+  run_cluster_authentication_test
 
 check_LTLIBRARIES += libdlclose_noop.la
 libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir)
@@ -375,6 +377,7 @@ EXTRA_DIST+=						\
 	run_failover_soak				\
 	reliable_replication_test			\
 	federated_cluster_test_with_node_failure        \
+        run_cluster_authentication_soak			\
 	sasl_test_setup.sh
 
 check-long:

Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/brokertest.py?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/brokertest.py Tue Feb 22 18:23:06 2011
@@ -437,17 +437,25 @@ class Cluster:
 
     _cluster_count = 0
 
-    def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
+    def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True,
+                 cluster2=False):
+        if cluster2:
+            cluster_name = "--cluster2-name"
+            cluster_lib = BrokerTest.cluster2_lib
+        else:
+            cluster_name = "--cluster-name"
+            cluster_lib = BrokerTest.cluster_lib
         self.test = test
         self._brokers=[]
         self.name = "cluster%d" % Cluster._cluster_count
         Cluster._cluster_count += 1
         # Use unique cluster name
         self.args = copy(args)
-        self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
+        self.args += [ cluster_name,
+                       "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
         self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
-        assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in"
-        self.args += [ "--load-module", BrokerTest.cluster_lib ]
+        assert cluster_lib, "Cannot locate cluster plug-in"
+        self.args += [ "--load-module", cluster_lib ]
         self.start_n(count, expect=expect, wait=wait)
 
     def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0):
@@ -473,6 +481,7 @@ class BrokerTest(TestCase):
     # Environment settings.
     qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
     cluster_lib = os.getenv("CLUSTER_LIB")
+    cluster2_lib = os.getenv("CLUSTER2_LIB")
     xml_lib = os.getenv("XML_LIB")
     qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
     qpid_route_exec = os.getenv("QPID_ROUTE_EXEC")
@@ -523,9 +532,9 @@ class BrokerTest(TestCase):
                 raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
         return b
 
-    def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
+    def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, cluster2=False):
         """Create and return a cluster ready for use"""
-        cluster = Cluster(self, count, args, expect=expect, wait=wait)
+        cluster = Cluster(self, count, args, expect=expect, wait=wait, cluster2=cluster2)
         return cluster
 
     def assert_browse(self, session, queue, expect_contents, timeout=0):

Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster.mk?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster.mk (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster.mk Tue Feb 22 18:23:06 2011
@@ -92,7 +92,7 @@ cluster_test_SOURCES =				\
 	PartialFailure.cpp			\
 	ClusterFailover.cpp
 
-cluster_test_LDADD=$(lib_client) $(lib_broker) ../cluster.la -lboost_unit_test_framework
+cluster_test_LDADD=$(lib_client) $(lib_broker) $(lib_messaging) ../cluster.la -lboost_unit_test_framework
 
 qpidtest_SCRIPTS += run_cluster_tests brokertest.py cluster_tests.py cluster_test_logs.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail
 qpidtest_SCRIPTS += $(CLUSTER_TEST_SCRIPTS_LIST)

Added: qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster2_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster2_tests.py?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster2_tests.py (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster2_tests.py Tue Feb 22 18:23:06 2011
@@ -0,0 +1,116 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, signal, sys, time, imp, re, subprocess
+from qpid import datatypes, messaging
+from brokertest import *
+from qpid.harness import Skipped
+from qpid.messaging import Message
+from qpid.messaging.exceptions import *
+from threading import Thread, Lock
+from logging import getLogger
+from itertools import chain
+
+log = getLogger("qpid.cluster_tests")
+
+class Cluster2Tests(BrokerTest):
+    """Tests for new cluster code."""
+
+    def verify_content(self, content, receiver):
+        for c in content: self.assertEqual(c, receiver.fetch(1).content)
+        self.assertRaises(Empty, receiver.fetch, 0)
+
+    def test_message_enqueue(self):
+        """Test basic replication of enqueued messages.
+        Verify that fanout messages are replicated correctly.
+        """
+
+        cluster = self.cluster(2, cluster2=True)
+
+        sn0 = cluster[0].connect().session()
+        r0p = sn0.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}");
+        r0q = sn0.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}");
+        s0 = sn0.sender("amq.fanout");
+
+        sn1 = cluster[1].connect().session()
+        r1p = sn1.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}");
+        r1q = sn1.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}");
+
+
+        # Send messages on member 0
+        content = ["a","b","c"]
+        for m in content: s0.send(Message(m))
+
+        # Browse on both members.
+        self.verify_content(content, r0p)
+        self.verify_content(content, r0q)
+        self.verify_content(content, r1p)
+        self.verify_content(content, r1q)
+
+        sn1.connection.close()
+        sn0.connection.close()
+
+    def test_message_dequeue(self):
+        """Test replication of dequeues"""
+        cluster = self.cluster(2, cluster2=True)
+        sn0 = cluster[0].connect().session()
+        s0 = sn0.sender("q;{create:always,delete:always}")
+        r0 = sn0.receiver("q")
+        sn1 = cluster[1].connect().session()
+        r1 = sn1.receiver("q;{create:always}") # Not yet replicating wiring.
+
+        content = ["a","b","c"]
+        for m in content: s0.send(Message(m))
+         # Verify enqueued on cluster[1]
+        self.verify_content(content, sn1.receiver("q;{mode:browse}"))
+        # Dequeue on cluster[0]
+        self.assertEqual(r0.fetch(1).content, "a")
+        sn0.acknowledge(sync=True)
+
+        # Verify dequeued on cluster[0] and cluster[1]
+        self.verify_content(["b", "c"], sn0.receiver("q;{mode:browse}"))
+        self.verify_content(["b", "c"], sn1.receiver("q;{mode:browse}"))
+
+    def test_wiring(self):
+        """Test replication of wiring"""
+        cluster = self.cluster(2, cluster2=True)
+        sn0 = cluster[0].connect().session()
+        sn1 = cluster[1].connect().session()
+
+        # Test creation of queue, exchange, binding
+        r0ex = sn0.receiver("ex; {create:always, delete:always, node:{type:topic, x-declare:{name:ex, type:'direct'}}}")
+        r0q  = sn0.receiver("q;  {create:always, delete:always, link:{x-bindings:[{exchange:ex,queue:q,key:k}]}}")
+
+        # Verify objects were created on member 1
+        r1 = sn1.receiver("q")   # Queue
+        s1ex = sn1.sender("ex/k; {node:{type:topic}}"); # Exchange
+        s1ex.send(Message("x"))  # Binding with key k
+        self.assertEqual(r1.fetch(1).content, "x")
+
+        # Test destroy.
+        r0q.close()                                    # Delete queue q
+        self.assertRaises(NotFound, sn1.receiver, "q")
+        r0ex.close()            # Delete exchange ex
+        # FIXME aconway 2010-11-05: this does not raise NotFound, sn1 is caching "ex"
+        # self.assertRaises(NotFound, sn1.sender, "ex")
+        # Have to create a new session.
+        self.assertRaises(NotFound, cluster[1].connect().session().receiver, "ex")
+
+        # FIXME aconway 2010-10-29: test unbind, may need to use old API.

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster2_tests.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster2_tests.py
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/run_cluster_tests
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/run_cluster_tests?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/run_cluster_tests (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/run_cluster_tests Tue Feb 22 18:23:06 2011
@@ -33,5 +33,5 @@ mkdir -p $OUTDIR
 CLUSTER_TESTS_IGNORE=${CLUSTER_TESTS_IGNORE:--i cluster_tests.StoreTests.* -I $srcdir/cluster_tests.fail}
 CLUSTER_TESTS=${CLUSTER_TESTS:-$*}
 
-with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1
+with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests -m cluster2_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1
 rm -rf $OUTDIR

Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/test_env.sh.in
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/test_env.sh.in?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/test_env.sh.in (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/test_env.sh.in Tue Feb 22 18:23:06 2011
@@ -63,6 +63,7 @@ export TEST_STORE_LIB=$testmoduledir/tes
 exportmodule() { test -f $moduledir/$2 && eval "export $1=$moduledir/$2"; }
 exportmodule ACL_LIB acl.so
 exportmodule CLUSTER_LIB cluster.so
+exportmodule CLUSTER2_LIB cluster2.so
 exportmodule REPLICATING_LISTENER_LIB replicating_listener.so
 exportmodule REPLICATION_EXCHANGE_LIB replication_exchange.so
 exportmodule SSLCONNECTOR_LIB sslconnector.so

Modified: qpid/branches/qpid-2920/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/xml/cluster.xml?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/xml/cluster.xml (original)
+++ qpid/branches/qpid-2920/qpid/cpp/xml/cluster.xml Tue Feb 22 18:23:06 2011
@@ -284,4 +284,62 @@
     </control>
   </class>
 
+
+  <!-- TODO aconway 2010-10-20: Experimental classes for new cluster. -->
+
+  <!-- Message delivery and disposition -->
+  <class name="cluster-message" code="0x82">
+    <!-- FIXME aconway 2010-10-19: create message in fragments -->
+    <control name="routing" code="0x1">
+      <field name="routing-id" type="uint32"/>
+      <field name="message" type="str32"/>
+    </control>
+
+    <control name="enqueue" code="0x2">
+      <field name="routing-id" type="uint32"/>
+      <field name="queue" type="queue.name"/>
+    </control>
+
+    <control name="routed" code="0x3">
+      <field name="routing-id" type="uint32"/>
+    </control>
+
+    <control name="dequeue" code="0x4">
+      <field name="queue" type="queue.name"/>
+      <field name="position" type="uint32"/>
+    </control>
+  </class>
+
+  <class name="cluster-wiring" code="0x83">
+    <control name="create-queue" code="0x1">
+      <field name="data" type="str32"/>
+    </control>
+
+    <control name="destroy-queue" code="0x2">
+      <field name="name" type="queue.name"/>
+    </control>
+
+    <control name="create-exchange" code="0x3">
+      <field name="data" type="str32"/>
+    </control>
+
+    <control name="destroy-exchange" code="0x4">
+      <field name="name" type="exchange.name"/>
+    </control>
+
+    <control name="bind" code="0x5">
+      <field name="queue" type="queue.name"/>
+      <field name="exchange" type="exchange.name"/>
+      <field name="binding-key" type="str8"/>
+      <field name="arguments" type="map"/>
+    </control>
+
+    <control name="unbind" code="0x6">
+      <field name="queue" type="queue.name"/>
+      <field name="exchange" type="exchange.name"/>
+      <field name="binding-key" type="str8"/>
+      <field name="arguments" type="map"/>
+    </control>
+
+  </class>
 </amqp>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org