You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/02/22 19:23:07 UTC
svn commit: r1073448 [2/2] - in /qpid/branches/qpid-2920/qpid/cpp:
design_docs/ src/ src/qpid/broker/ src/qpid/cluster/ src/qpid/cluster/exp/
src/tests/ xml/
Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp Tue Feb 22 18:23:06 2011
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Core.h"
+#include "MessageHandler.h"
+#include "BrokerHandler.h"
+#include "EventHandler.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace cluster {
+using namespace broker;
+
+MessageHandler::MessageHandler(EventHandler& e) :
+ HandlerBase(e),
+ broker(e.getCore().getBroker())
+{}
+
+bool MessageHandler::invoke(const framing::AMQBody& body) {
+ return framing::invoke(*this, body).wasHandled();
+}
+
+void MessageHandler::routing(RoutingId routingId, const std::string& message) {
+ if (sender() == self()) return; // Already in getCore().getRoutingMap()
+ boost::intrusive_ptr<Message> msg = new Message;
+ // FIXME aconway 2010-10-28: decode message in bounded-size buffers.
+ framing::Buffer buf(const_cast<char*>(&message[0]), message.size());
+ msg->decodeHeader(buf);
+ msg->decodeContent(buf);
+ memberMap[sender()].routingMap[routingId] = msg;
+}
+
+boost::shared_ptr<broker::Queue> MessageHandler::findQueue(
+ const std::string& q, const char* msg)
+{
+ boost::shared_ptr<Queue> queue = broker.getQueues().find(q);
+ if (!queue) throw Exception(QPID_MSG(msg << ": unknown queue " << q));
+ return queue;
+}
+
+void MessageHandler::enqueue(RoutingId routingId, const std::string& q) {
+ boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed");
+ boost::intrusive_ptr<Message> msg;
+ if (sender() == self())
+ msg = eventHandler.getCore().getRoutingMap().get(routingId);
+ else
+ msg = memberMap[sender()].routingMap[routingId];
+ if (!msg) throw Exception(QPID_MSG("Cluster enqueue on " << q
+ << " failed: unknown message"));
+ BrokerHandler::ScopedSuppressReplication ssr;
+ queue->deliver(msg);
+}
+
+void MessageHandler::routed(RoutingId routingId) {
+ if (sender() == self())
+ eventHandler.getCore().getRoutingMap().erase(routingId);
+ else
+ memberMap[sender()].routingMap.erase(routingId);
+}
+
+void MessageHandler::dequeue(const std::string& q, uint32_t position) {
+ if (sender() == self()) {
+ // FIXME aconway 2010-10-28: we should complete the ack that initiated
+ // the dequeue at this point, see BrokerHandler::dequeue
+ return;
+ }
+ boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed");
+ BrokerHandler::ScopedSuppressReplication ssr;
+ QueuedMessage qm;
+ // FIXME aconway 2010-10-28: when we replicate acquires, the acquired
+ // messages will be stored by MessageHandler::acquire.
+ if (queue->acquireMessageAt(position, qm)) {
+ assert(qm.position.getValue() == position);
+ assert(qm.payload);
+ queue->dequeue(0, qm);
+ }
+}
+
+}} // namespace qpid::cluster
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h Tue Feb 22 18:23:06 2011
@@ -0,0 +1,73 @@
+#ifndef QPID_CLUSTER_MESSAGEHANDLER_H
+#define QPID_CLUSTER_MESSAGEHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+#include "HandlerBase.h"
+#include "qpid/framing/AMQP_AllOperations.h"
+#include <boost/intrusive_ptr.hpp>
+#include <map>
+
+namespace qpid {
+
+namespace broker {
+class Message;
+class Broker;
+class Queue;
+}
+
+namespace cluster {
+class EventHandler;
+class BrokerHandler;
+
+/**
+ * Handler for message disposition events.
+ */
+class MessageHandler : public framing::AMQP_AllOperations::ClusterMessageHandler,
+ public HandlerBase
+{
+ public:
+ MessageHandler(EventHandler&);
+
+ bool invoke(const framing::AMQBody& body);
+
+ void routing(uint32_t routingId, const std::string& message);
+ void enqueue(uint32_t routingId, const std::string& queue);
+ void routed(uint32_t routingId);
+ void dequeue(const std::string& queue, uint32_t position);
+ private:
+ struct Member {
+ typedef std::map<uint32_t, boost::intrusive_ptr<broker::Message> > RoutingMap;
+ RoutingMap routingMap;
+ };
+ typedef std::map<MemberId, Member> MemberMap;
+
+ boost::shared_ptr<broker::Queue> findQueue(const std::string& q, const char* msg);
+
+ broker::Broker& broker;
+ MemberMap memberMap;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_MESSAGEHANDLER_H*/
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/README.txt
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/README.txt?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/README.txt (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/README.txt Tue Feb 22 18:23:06 2011
@@ -0,0 +1,2 @@
+
+Experimental code to test ideas about a new cluster design.
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/README.txt
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/README.txt
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp Tue Feb 22 18:23:06 2011
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Core.h"
+#include "WiringHandler.h"
+#include "EventHandler.h"
+#include "BrokerHandler.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/ExchangeRegistry.h"
+#include "qpid/broker/QueueRegistry.h"
+#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace cluster {
+using namespace broker;
+using framing::FieldTable;
+
+WiringHandler::WiringHandler(EventHandler& e) :
+ HandlerBase(e),
+ broker(e.getCore().getBroker()),
+ recovery(broker.getQueues(), broker.getExchanges(),
+ broker.getLinks(), broker.getDtxManager())
+{}
+
+bool WiringHandler::invoke(const framing::AMQBody& body) {
+ return framing::invoke(*this, body).wasHandled();
+}
+
+void WiringHandler::createQueue(const std::string& data) {
+ if (sender() == self()) return;
+ BrokerHandler::ScopedSuppressReplication ssr;
+ framing::Buffer buf(const_cast<char*>(&data[0]), data.size());
+ // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*()
+ RecoverableQueue::shared_ptr queue = recovery.recoverQueue(buf);
+ QPID_LOG(debug, "cluster: create queue " << queue->getName());
+}
+
+void WiringHandler::destroyQueue(const std::string& name) {
+ if (sender() == self()) return;
+ QPID_LOG(debug, "cluster: destroy queue " << name);
+ BrokerHandler::ScopedSuppressReplication ssr;
+ broker.deleteQueue(name, std::string(), std::string());
+}
+
+void WiringHandler::createExchange(const std::string& data) {
+ if (sender() == self()) return;
+ BrokerHandler::ScopedSuppressReplication ssr;
+ framing::Buffer buf(const_cast<char*>(&data[0]), data.size());
+ // TODO aconway 2011-02-21: asymetric - RecoveryManager vs Broker::create*()
+ RecoverableExchange::shared_ptr exchange = recovery.recoverExchange(buf);
+ QPID_LOG(debug, "cluster: create exchange " << exchange->getName());
+}
+
+void WiringHandler::destroyExchange(const std::string& name) {
+ if (sender() == self()) return;
+ QPID_LOG(debug, "cluster: destroy exchange " << name);
+ BrokerHandler::ScopedSuppressReplication ssr;
+ broker.getExchanges().destroy(name);
+}
+
+void WiringHandler::bind(
+ const std::string& queueName, const std::string& exchangeName,
+ const std::string& routingKey, const FieldTable& arguments)
+{
+ if (sender() == self()) return;
+ QPID_LOG(debug, "cluster: bind queue=" << queueName
+ << " exchange=" << exchangeName
+ << " key=" << routingKey
+ << " arguments=" << arguments);
+ BrokerHandler::ScopedSuppressReplication ssr;
+ broker.bind(queueName, exchangeName, routingKey, arguments, std::string(), std::string());
+}
+
+void WiringHandler::unbind(
+ const std::string& queueName, const std::string& exchangeName,
+ const std::string& routingKey, const FieldTable& arguments)
+{
+ if (sender() == self()) return;
+ QPID_LOG(debug, "cluster: unbind queue=" << queueName
+ << " exchange=" << exchangeName
+ << " key=" << routingKey
+ << " arguments=" << arguments);
+ BrokerHandler::ScopedSuppressReplication ssr;
+ broker.unbind(queueName, exchangeName, routingKey, std::string(), std::string());
+}
+
+}} // namespace qpid::cluster
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h Tue Feb 22 18:23:06 2011
@@ -0,0 +1,75 @@
+#ifndef QPID_CLUSTER_WIRINGHANDLER_H
+#define QPID_CLUSTER_WIRINGHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// TODO aconway 2010-10-19: experimental cluster code.
+
+#include "HandlerBase.h"
+#include "qpid/broker/RecoveryManagerImpl.h"
+#include "qpid/framing/AMQP_AllOperations.h"
+#include <boost/intrusive_ptr.hpp>
+#include <map>
+
+namespace qpid {
+
+namespace framing {
+class FieldTable;
+}
+
+namespace broker {
+class Broker;
+}
+
+namespace cluster {
+class EventHandler;
+
+
+/**
+ * Handler for wiring disposition events.
+ */
+class WiringHandler : public framing::AMQP_AllOperations::ClusterWiringHandler,
+ public HandlerBase
+{
+ public:
+ WiringHandler(EventHandler&);
+
+ bool invoke(const framing::AMQBody& body);
+
+ void createQueue(const std::string& data);
+ void destroyQueue(const std::string& name);
+ void createExchange(const std::string& data);
+ void destroyExchange(const std::string& name);
+ void bind(const std::string& queue, const std::string& exchange,
+ const std::string& routingKey, const framing::FieldTable& arguments);
+ void unbind(const std::string& queue, const std::string& exchange,
+ const std::string& routingKey, const framing::FieldTable& arguments);
+
+
+ private:
+ broker::Broker& broker;
+ broker::RecoveryManagerImpl recovery;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_WIRINGHANDLER_H*/
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/types.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/types.h?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/types.h (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/qpid/cluster/types.h Tue Feb 22 18:23:06 2011
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -37,7 +37,7 @@ extern "C" {
# include <corosync/cpg.h>
#else
# error "No cpg.h header file available"
-#endif
+#endif
}
namespace qpid {
@@ -78,6 +78,9 @@ std::ostream& operator<<(std::ostream&,
std::ostream& operator<<(std::ostream&, EventType);
+/** Number to identify a message being routed. */
+typedef uint32_t RoutingId;
+
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_TYPES_H*/
Added: qpid/branches/qpid-2920/qpid/cpp/src/tests/BrokerClusterCalls.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/BrokerClusterCalls.cpp?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/BrokerClusterCalls.cpp (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/BrokerClusterCalls.cpp Tue Feb 22 18:23:06 2011
@@ -0,0 +1,419 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+///@file
+// Tests using a dummy broker::Cluster implementation to verify the expected
+// Cluster functions are called for various actions on the broker.
+//
+
+#include "unit_test.h"
+#include "test_tools.h"
+#include "qpid/broker/Cluster.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Session.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Duration.h"
+#include "BrokerFixture.h"
+#include <boost/assign.hpp>
+#include <boost/format.hpp>
+
+using namespace std;
+using namespace boost;
+using namespace boost::assign;
+using namespace qpid::messaging;
+using boost::format;
+using boost::intrusive_ptr;
+
+namespace qpid {
+namespace tests {
+
+class DummyCluster : public broker::Cluster
+{
+ private:
+ /** Flag used to ignore events other than enqueues while routing,
+ * e.g. acquires and accepts generated in a ring queue to replace an element..
+ * In real impl would be a thread-local variable.
+ */
+ bool isRouting;
+
+ void recordQm(const string& op, const broker::QueuedMessage& qm) {
+ history += (format("%s(%s, %d, %s)") % op % qm.queue->getName()
+ % qm.position % qm.payload->getFrames().getContent()).str();
+ }
+ void recordMsg(const string& op, broker::Queue& q, intrusive_ptr<broker::Message> msg) {
+ history += (format("%s(%s, %s)") % op % q.getName() % msg->getFrames().getContent()).str();
+ }
+ void recordStr(const string& op, const string& name) {
+ history += (format("%s(%s)") % op % name).str();
+ }
+ public:
+ // Messages
+
+ virtual void routing(const boost::intrusive_ptr<broker::Message>& m) {
+ isRouting = true;
+ history += (format("routing(%s)") % m->getFrames().getContent()).str();
+ }
+
+ virtual bool enqueue(broker::Queue& q, const intrusive_ptr<broker::Message>&msg) {
+ recordMsg("enqueue", q, msg);
+ return true;
+ }
+
+ virtual void routed(const boost::intrusive_ptr<broker::Message>& m) {
+ history += (format("routed(%s)") % m->getFrames().getContent()).str();
+ isRouting = false;
+ }
+ virtual void acquire(const broker::QueuedMessage& qm) {
+ if (!isRouting) recordQm("acquire", qm);
+ }
+ virtual void release(const broker::QueuedMessage& qm) {
+ if (!isRouting) recordQm("release", qm);
+ }
+ virtual void dequeue(const broker::QueuedMessage& qm) {
+ if (!isRouting) recordQm("dequeue", qm);
+ }
+
+ // Consumers
+
+ virtual void consume(const broker::Queue& q, size_t n) {
+ history += (format("consume(%s, %d)") % q.getName() % n).str();
+ }
+ virtual void cancel(const broker::Queue& q, size_t n) {
+ history += (format("cancel(%s, %d)") % q.getName() % n).str();
+ }
+
+ // Wiring
+
+ virtual void create(const broker::Queue& q) { recordStr("createq", q.getName()); }
+ virtual void destroy(const broker::Queue& q) { recordStr("destroyq", q.getName()); }
+ virtual void create(const broker::Exchange& ex) { recordStr("createex", ex.getName()); }
+ virtual void destroy(const broker::Exchange& ex) { recordStr("destroyex", ex.getName()); }
+ virtual void bind(const broker::Queue& q, const broker::Exchange& ex, const std::string& key, const framing::FieldTable& /*args*/) {
+ history += (format("bind(%s, %s, %s)") % q.getName() % ex.getName() % key).str();
+ }
+ virtual void unbind(const broker::Queue& q, const broker::Exchange& ex, const std::string& key, const framing::FieldTable& /*args*/) {
+ history += (format("unbind(%s, %s, %s)")% q.getName()%ex.getName()%key).str();
+ }
+ vector<string> history;
+};
+
+QPID_AUTO_TEST_SUITE(BrokerClusterCallsTestSuite)
+
+// Broker fixture with DummyCluster set up and some new API client bits.
+struct DummyClusterFixture: public BrokerFixture {
+ Connection c;
+ Session s;
+ DummyCluster*dc;
+ DummyClusterFixture() {
+ broker->setCluster(auto_ptr<broker::Cluster>(new DummyCluster));
+ dc = &static_cast<DummyCluster&>(broker->getCluster());
+ c = Connection("localhost:"+lexical_cast<string>(getPort()));
+ c.open();
+ s = c.createSession();
+ }
+ ~DummyClusterFixture() {
+ c.close();
+ }
+};
+
+QPID_AUTO_TEST_CASE(testSimplePubSub) {
+ DummyClusterFixture f;
+ vector<string>& h = f.dc->history;
+
+ // Queue creation
+ Sender sender = f.s.createSender("q;{create:always,delete:always}");
+ size_t i = 0;
+ BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking.
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Consumer
+ Receiver receiver = f.s.createReceiver("q");
+ f.s.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Send message
+ sender.send(Message("a"));
+ f.s.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+ // Don't check size here as it is uncertain whether acquire has happened yet.
+
+ // Acquire message
+ Message m = receiver.fetch(Duration::SECOND);
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Acknowledge message
+ f.s.acknowledge(true);
+ f.s.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Close a consumer
+ receiver.close();
+ BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 0)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Destroy the queue
+ f.c.close();
+ BOOST_CHECK_EQUAL(h.at(i++), "destroyq(q)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_CASE(testReleaseReject) {
+ DummyClusterFixture f;
+ vector<string>& h = f.dc->history;
+
+ Sender sender = f.s.createSender("q;{create:always,delete:always,node:{x-declare:{alternate-exchange:amq.fanout}}}");
+ sender.send(Message("a"));
+ Receiver receiver = f.s.createReceiver("q");
+ Receiver altReceiver = f.s.createReceiver("amq.fanout;{link:{name:altq}}");
+ Message m = receiver.fetch(Duration::SECOND);
+ h.clear();
+
+ // Explicit release
+ f.s.release(m);
+ f.s.sync();
+ size_t i = 0;
+ BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Implicit release on closing connection.
+ Connection c("localhost:"+lexical_cast<string>(f.getPort()));
+ c.open();
+ Session s = c.createSession();
+ Receiver r = s.createReceiver("q");
+ m = r.fetch(Duration::SECOND);
+ h.clear();
+ i = 0;
+ c.close();
+ BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 1)");
+ BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Reject message, goes to alternate exchange.
+ m = receiver.fetch(Duration::SECOND);
+ h.clear();
+ i = 0;
+ f.s.reject(m);
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); // Routing to alt exchange
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+ m = altReceiver.fetch(Duration::SECOND);
+ BOOST_CHECK_EQUAL(m.getContent(), "a");
+
+ // Timed out message
+ h.clear();
+ i = 0;
+ m = Message("t");
+ m.setTtl(Duration(1)); // Timeout 1ms
+ sender.send(m);
+ usleep(2000); // Sleep 2ms
+ bool received = receiver.fetch(m, Duration::IMMEDIATE);
+ BOOST_CHECK(!received); // Timed out
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(t)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, t)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(t)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Message replaced on LVQ
+ sender = f.s.createSender("lvq;{create:always,delete:always,node:{x-declare:{arguments:{qpid.last_value_queue:1}}}}");
+ m = Message("a");
+ m.getProperties()["qpid.LVQ_key"] = "foo";
+ sender.send(m);
+ f.s.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "createq(lvq)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ m = Message("b");
+ m.getProperties()["qpid.LVQ_key"] = "foo";
+ sender.send(m);
+ f.s.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ receiver = f.s.createReceiver("lvq");
+ BOOST_CHECK_EQUAL(receiver.fetch(Duration::SECOND).getContent(), "b");
+ f.s.acknowledge(true);
+ BOOST_CHECK_EQUAL(h.at(i++), "consume(lvq, 1)");
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(lvq, 1, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, b)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_CASE(testFanout) {
+ DummyClusterFixture f;
+ vector<string>& h = f.dc->history;
+
+ Receiver r1 = f.s.createReceiver("amq.fanout;{link:{name:r1}}");
+ Receiver r2 = f.s.createReceiver("amq.fanout;{link:{name:r2}}");
+ Sender sender = f.s.createSender("amq.fanout");
+ r1.setCapacity(0); // Don't receive immediately.
+ r2.setCapacity(0);
+ h.clear();
+ size_t i = 0;
+
+ // Send message
+ sender.send(Message("a"));
+ f.s.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+ BOOST_CHECK_EQUAL(0u, h.at(i++).find("enqueue(amq.fanout_r"));
+ BOOST_CHECK_EQUAL(0u, h.at(i++).find("enqueue(amq.fanout_r"));
+ BOOST_CHECK(h.at(i-1) != h.at(i-2));
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Receive messages
+ Message m1 = r1.fetch(Duration::SECOND);
+ f.s.acknowledge(m1, true);
+ Message m2 = r2.fetch(Duration::SECOND);
+ f.s.acknowledge(m2, true);
+
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r1, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r1, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r2, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r2, 1, a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_CASE(testRingQueue) {
+ DummyClusterFixture f;
+ vector<string>& h = f.dc->history;
+
+ // FIXME aconway 2010-10-15: QPID-2908 ring queue address string is not working,
+ // so we can't do this:
+ // Sender sender = f.s.createSender("ring;{create:always,node:{x-declare:{arguments:{qpid.max_size:3,qpid.policy_type:ring}}}}");
+ // Must use old API to declare ring queue:
+ qpid::client::Connection c;
+ f.open(c);
+ qpid::client::Session s = c.newSession();
+ qpid::framing::FieldTable args;
+ args.setInt("qpid.max_size", 3);
+ args.setString("qpid.policy_type","ring");
+ s.queueDeclare(qpid::client::arg::queue="ring", qpid::client::arg::arguments=args);
+ c.close();
+ Sender sender = f.s.createSender("ring");
+
+ size_t i = 0;
+ // Send message
+ sender.send(Message("a"));
+ sender.send(Message("b"));
+ sender.send(Message("c"));
+ sender.send(Message("d"));
+ f.s.sync();
+
+ BOOST_CHECK_EQUAL(h.at(i++), "createq(ring)");
+
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
+
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(c)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, c)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(c)");
+
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(d)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, d)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(d)");
+
+ Receiver receiver = f.s.createReceiver("ring");
+ BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b");
+ BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "c");
+ BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "d");
+ f.s.acknowledge(true);
+
+ BOOST_CHECK_EQUAL(h.at(i++), "consume(ring, 1)");
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 2, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 3, c)");
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 4, d)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 2, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 3, c)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 4, d)");
+
+ BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_CASE(testTransactions) {
+ DummyClusterFixture f;
+ vector<string>& h = f.dc->history;
+ Session ts = f.c.createTransactionalSession();
+ Sender sender = ts.createSender("q;{create:always,delete:always}");
+ size_t i = 0;
+ BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking.
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ sender.send(Message("a"));
+ sender.send(Message("b"));
+ ts.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
+ BOOST_CHECK_EQUAL(h.size(), i); // Not replicated till commit
+ ts.commit();
+ // FIXME aconway 2010-10-18: As things stand the cluster is not
+ // compatible with transactions
+ // - enqueues occur after routing is complete
+ // - no call to Cluster::enqueue, should be in Queue::process?
+ // - no transaction context associated with messages in the Cluster interface.
+ // - no call to Cluster::accept in Queue::dequeueCommitted
+ // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, a)");
+ // BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, b)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+
+ Receiver receiver = ts.createReceiver("q");
+ BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "a");
+ BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b");
+ ts.acknowledge();
+ ts.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)");
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 2, b)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+ ts.commit();
+ ts.sync();
+ // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)");
+ // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 2, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, b)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+}} // namespace qpid::tests
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/tests/BrokerClusterCalls.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/tests/BrokerClusterCalls.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/Makefile.am?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/Makefile.am Tue Feb 22 18:23:06 2011
@@ -25,7 +25,7 @@ QMF_GEN=$(top_srcdir)/managementgen/qmf-
abs_builddir=@abs_builddir@
abs_srcdir=@abs_srcdir@
-extra_libs =
+extra_libs =
lib_client = $(abs_builddir)/../libqpidclient.la
lib_messaging = $(abs_builddir)/../libqpidmessaging.la
lib_common = $(abs_builddir)/../libqpidcommon.la
@@ -36,7 +36,7 @@ lib_qmf2 = $(abs_builddir)/../libqmf2.la
#
# Initialize variables that are incremented with +=
-#
+#
check_PROGRAMS=
check_LTLIBRARIES=
TESTS=
@@ -61,9 +61,9 @@ tmodule_LTLIBRARIES=
# Unit test program
#
# Unit tests are built as a single program to reduce valgrind overhead
-# when running the tests. If you want to build a subset of the tests do
+# when running the tests. If you want to build a subset of the tests do
# rm -f unit_test; make unit_test unit_test_OBJECTS="unit_test.o SelectedTest.o"
-#
+#
TESTS+=unit_test
check_PROGRAMS+=unit_test
@@ -124,7 +124,8 @@ unit_test_SOURCES= unit_test.cpp unit_te
Variant.cpp \
Address.cpp \
ClientMessage.cpp \
- Qmf2.cpp
+ Qmf2.cpp \
+ BrokerClusterCalls.cpp
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
@@ -188,32 +189,32 @@ qpid_send_LDADD = $(lib_messaging)
qpidtest_PROGRAMS+=qpid-perftest
qpid_perftest_SOURCES=qpid-perftest.cpp test_tools.h TestOptions.h ConnectionOptions.h
qpid_perftest_INCLUDES=$(PUBLIC_INCLUDES)
-qpid_perftest_LDADD=$(lib_client)
+qpid_perftest_LDADD=$(lib_client)
qpidtest_PROGRAMS+=qpid-txtest
qpid_txtest_INCLUDES=$(PUBLIC_INCLUDES)
qpid_txtest_SOURCES=qpid-txtest.cpp TestOptions.h ConnectionOptions.h
-qpid_txtest_LDADD=$(lib_client)
+qpid_txtest_LDADD=$(lib_client)
qpidtest_PROGRAMS+=qpid-latency-test
qpid_latency_test_INCLUDES=$(PUBLIC_INCLUDES)
qpid_latency_test_SOURCES=qpid-latency-test.cpp TestOptions.h ConnectionOptions.h
-qpid_latency_test_LDADD=$(lib_client)
+qpid_latency_test_LDADD=$(lib_client)
qpidtest_PROGRAMS+=qpid-client-test
qpid_client_test_INCLUDES=$(PUBLIC_INCLUDES)
qpid_client_test_SOURCES=qpid-client-test.cpp TestOptions.h ConnectionOptions.h
-qpid_client_test_LDADD=$(lib_client)
+qpid_client_test_LDADD=$(lib_client)
qpidtest_PROGRAMS+=qpid-topic-listener
qpid_topic_listener_INCLUDES=$(PUBLIC_INCLUDES)
qpid_topic_listener_SOURCES=qpid-topic-listener.cpp TestOptions.h ConnectionOptions.h
-qpid_topic_listener_LDADD=$(lib_client)
+qpid_topic_listener_LDADD=$(lib_client)
qpidtest_PROGRAMS+=qpid-topic-publisher
qpid_topic_publisher_INCLUDES=$(PUBLIC_INCLUDES)
qpid_topic_publisher_SOURCES=qpid-topic-publisher.cpp TestOptions.h ConnectionOptions.h
-qpid_topic_publisher_LDADD=$(lib_client)
+qpid_topic_publisher_LDADD=$(lib_client)
qpidtest_PROGRAMS+=qpid-ping
qpid_ping_INCLUDES=$(PUBLIC_INCLUDES)
@@ -232,17 +233,17 @@ echotest_LDADD=$(lib_client)
check_PROGRAMS+=publish
publish_INCLUDES=$(PUBLIC_INCLUDES)
publish_SOURCES=publish.cpp TestOptions.h ConnectionOptions.h
-publish_LDADD=$(lib_client)
+publish_LDADD=$(lib_client)
check_PROGRAMS+=consume
consume_INCLUDES=$(PUBLIC_INCLUDES)
consume_SOURCES=consume.cpp TestOptions.h ConnectionOptions.h
-consume_LDADD=$(lib_client)
+consume_LDADD=$(lib_client)
check_PROGRAMS+=header_test
header_test_INCLUDES=$(PUBLIC_INCLUDES)
header_test_SOURCES=header_test.cpp TestOptions.h ConnectionOptions.h
-header_test_LDADD=$(lib_client)
+header_test_LDADD=$(lib_client)
check_PROGRAMS+=failover_soak
failover_soak_INCLUDES=$(PUBLIC_INCLUDES)
@@ -251,28 +252,28 @@ failover_soak_LDADD=$(lib_client) $(lib_
check_PROGRAMS+=declare_queues
declare_queues_INCLUDES=$(PUBLIC_INCLUDES)
-declare_queues_SOURCES=declare_queues.cpp
-declare_queues_LDADD=$(lib_client)
+declare_queues_SOURCES=declare_queues.cpp
+declare_queues_LDADD=$(lib_client)
check_PROGRAMS+=replaying_sender
replaying_sender_INCLUDES=$(PUBLIC_INCLUDES)
-replaying_sender_SOURCES=replaying_sender.cpp
-replaying_sender_LDADD=$(lib_client)
+replaying_sender_SOURCES=replaying_sender.cpp
+replaying_sender_LDADD=$(lib_client)
check_PROGRAMS+=resuming_receiver
resuming_receiver_INCLUDES=$(PUBLIC_INCLUDES)
-resuming_receiver_SOURCES=resuming_receiver.cpp
-resuming_receiver_LDADD=$(lib_client)
+resuming_receiver_SOURCES=resuming_receiver.cpp
+resuming_receiver_LDADD=$(lib_client)
check_PROGRAMS+=txshift
txshift_INCLUDES=$(PUBLIC_INCLUDES)
txshift_SOURCES=txshift.cpp TestOptions.h ConnectionOptions.h
-txshift_LDADD=$(lib_client)
+txshift_LDADD=$(lib_client)
check_PROGRAMS+=txjob
txjob_INCLUDES=$(PUBLIC_INCLUDES)
txjob_SOURCES=txjob.cpp TestOptions.h ConnectionOptions.h
-txjob_LDADD=$(lib_client)
+txjob_LDADD=$(lib_client)
check_PROGRAMS+=PollerTest
PollerTest_SOURCES=PollerTest.cpp
@@ -307,7 +308,7 @@ TESTS_ENVIRONMENT = \
VALGRIND=$(VALGRIND) \
LIBTOOL="$(LIBTOOL)" \
QPID_DATA_DIR= \
- $(srcdir)/run_test
+ $(srcdir)/run_test
system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest
TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests \
@@ -352,7 +353,8 @@ EXTRA_DIST += \
start_broker.ps1 \
stop_broker.ps1 \
topictest.ps1 \
- run_queue_flow_limit_tests
+ run_queue_flow_limit_tests \
+ run_cluster_authentication_test
check_LTLIBRARIES += libdlclose_noop.la
libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir)
@@ -375,6 +377,7 @@ EXTRA_DIST+= \
run_failover_soak \
reliable_replication_test \
federated_cluster_test_with_node_failure \
+ run_cluster_authentication_soak \
sasl_test_setup.sh
check-long:
Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/brokertest.py?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/brokertest.py Tue Feb 22 18:23:06 2011
@@ -437,17 +437,25 @@ class Cluster:
_cluster_count = 0
- def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
+ def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True,
+ cluster2=False):
+ if cluster2:
+ cluster_name = "--cluster2-name"
+ cluster_lib = BrokerTest.cluster2_lib
+ else:
+ cluster_name = "--cluster-name"
+ cluster_lib = BrokerTest.cluster_lib
self.test = test
self._brokers=[]
self.name = "cluster%d" % Cluster._cluster_count
Cluster._cluster_count += 1
# Use unique cluster name
self.args = copy(args)
- self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
+ self.args += [ cluster_name,
+ "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
- assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in"
- self.args += [ "--load-module", BrokerTest.cluster_lib ]
+ assert cluster_lib, "Cannot locate cluster plug-in"
+ self.args += [ "--load-module", cluster_lib ]
self.start_n(count, expect=expect, wait=wait)
def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0):
@@ -473,6 +481,7 @@ class BrokerTest(TestCase):
# Environment settings.
qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
cluster_lib = os.getenv("CLUSTER_LIB")
+ cluster2_lib = os.getenv("CLUSTER2_LIB")
xml_lib = os.getenv("XML_LIB")
qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
qpid_route_exec = os.getenv("QPID_ROUTE_EXEC")
@@ -523,9 +532,9 @@ class BrokerTest(TestCase):
raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
return b
- def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
+ def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, cluster2=False):
"""Create and return a cluster ready for use"""
- cluster = Cluster(self, count, args, expect=expect, wait=wait)
+ cluster = Cluster(self, count, args, expect=expect, wait=wait, cluster2=cluster2)
return cluster
def assert_browse(self, session, queue, expect_contents, timeout=0):
Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster.mk?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster.mk (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster.mk Tue Feb 22 18:23:06 2011
@@ -92,7 +92,7 @@ cluster_test_SOURCES = \
PartialFailure.cpp \
ClusterFailover.cpp
-cluster_test_LDADD=$(lib_client) $(lib_broker) ../cluster.la -lboost_unit_test_framework
+cluster_test_LDADD=$(lib_client) $(lib_broker) $(lib_messaging) ../cluster.la -lboost_unit_test_framework
qpidtest_SCRIPTS += run_cluster_tests brokertest.py cluster_tests.py cluster_test_logs.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail
qpidtest_SCRIPTS += $(CLUSTER_TEST_SCRIPTS_LIST)
Added: qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster2_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster2_tests.py?rev=1073448&view=auto
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster2_tests.py (added)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster2_tests.py Tue Feb 22 18:23:06 2011
@@ -0,0 +1,116 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, signal, sys, time, imp, re, subprocess
+from qpid import datatypes, messaging
+from brokertest import *
+from qpid.harness import Skipped
+from qpid.messaging import Message
+from qpid.messaging.exceptions import *
+from threading import Thread, Lock
+from logging import getLogger
+from itertools import chain
+
+log = getLogger("qpid.cluster_tests")
+
+class Cluster2Tests(BrokerTest):
+ """Tests for new cluster code."""
+
+ def verify_content(self, content, receiver):
+ for c in content: self.assertEqual(c, receiver.fetch(1).content)
+ self.assertRaises(Empty, receiver.fetch, 0)
+
+ def test_message_enqueue(self):
+ """Test basic replication of enqueued messages.
+ Verify that fanout messages are replicated correctly.
+ """
+
+ cluster = self.cluster(2, cluster2=True)
+
+ sn0 = cluster[0].connect().session()
+ r0p = sn0.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}");
+ r0q = sn0.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}");
+ s0 = sn0.sender("amq.fanout");
+
+ sn1 = cluster[1].connect().session()
+ r1p = sn1.receiver("p; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:p}]}}");
+ r1q = sn1.receiver("q; {mode:browse, create:always, node:{x-bindings:[{exchange:'amq.fanout', queue:q}]}}");
+
+
+ # Send messages on member 0
+ content = ["a","b","c"]
+ for m in content: s0.send(Message(m))
+
+ # Browse on both members.
+ self.verify_content(content, r0p)
+ self.verify_content(content, r0q)
+ self.verify_content(content, r1p)
+ self.verify_content(content, r1q)
+
+ sn1.connection.close()
+ sn0.connection.close()
+
+ def test_message_dequeue(self):
+ """Test replication of dequeues"""
+ cluster = self.cluster(2, cluster2=True)
+ sn0 = cluster[0].connect().session()
+ s0 = sn0.sender("q;{create:always,delete:always}")
+ r0 = sn0.receiver("q")
+ sn1 = cluster[1].connect().session()
+ r1 = sn1.receiver("q;{create:always}") # Not yet replicating wiring.
+
+ content = ["a","b","c"]
+ for m in content: s0.send(Message(m))
+ # Verify enqueued on cluster[1]
+ self.verify_content(content, sn1.receiver("q;{mode:browse}"))
+ # Dequeue on cluster[0]
+ self.assertEqual(r0.fetch(1).content, "a")
+ sn0.acknowledge(sync=True)
+
+ # Verify dequeued on cluster[0] and cluster[1]
+ self.verify_content(["b", "c"], sn0.receiver("q;{mode:browse}"))
+ self.verify_content(["b", "c"], sn1.receiver("q;{mode:browse}"))
+
+ def test_wiring(self):
+ """Test replication of wiring"""
+ cluster = self.cluster(2, cluster2=True)
+ sn0 = cluster[0].connect().session()
+ sn1 = cluster[1].connect().session()
+
+ # Test creation of queue, exchange, binding
+ r0ex = sn0.receiver("ex; {create:always, delete:always, node:{type:topic, x-declare:{name:ex, type:'direct'}}}")
+ r0q = sn0.receiver("q; {create:always, delete:always, link:{x-bindings:[{exchange:ex,queue:q,key:k}]}}")
+
+ # Verify objects were created on member 1
+ r1 = sn1.receiver("q") # Queue
+ s1ex = sn1.sender("ex/k; {node:{type:topic}}"); # Exchange
+ s1ex.send(Message("x")) # Binding with key k
+ self.assertEqual(r1.fetch(1).content, "x")
+
+ # Test destroy.
+ r0q.close() # Delete queue q
+ self.assertRaises(NotFound, sn1.receiver, "q")
+ r0ex.close() # Delete exchange ex
+ # FIXME aconway 2010-11-05: this does not raise NotFound, sn1 is caching "ex"
+ # self.assertRaises(NotFound, sn1.sender, "ex")
+ # Have to create a new session.
+ self.assertRaises(NotFound, cluster[1].connect().session().receiver, "ex")
+
+ # FIXME aconway 2010-10-29: test unbind, may need to use old API.
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster2_tests.py
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-2920/qpid/cpp/src/tests/cluster2_tests.py
------------------------------------------------------------------------------
svn:executable = *
Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/run_cluster_tests
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/run_cluster_tests?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/run_cluster_tests (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/run_cluster_tests Tue Feb 22 18:23:06 2011
@@ -33,5 +33,5 @@ mkdir -p $OUTDIR
CLUSTER_TESTS_IGNORE=${CLUSTER_TESTS_IGNORE:--i cluster_tests.StoreTests.* -I $srcdir/cluster_tests.fail}
CLUSTER_TESTS=${CLUSTER_TESTS:-$*}
-with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1
+with_ais_group $QPID_PYTHON_TEST -DOUTDIR=$OUTDIR -m cluster_tests -m cluster2_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1
rm -rf $OUTDIR
Modified: qpid/branches/qpid-2920/qpid/cpp/src/tests/test_env.sh.in
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/src/tests/test_env.sh.in?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/src/tests/test_env.sh.in (original)
+++ qpid/branches/qpid-2920/qpid/cpp/src/tests/test_env.sh.in Tue Feb 22 18:23:06 2011
@@ -63,6 +63,7 @@ export TEST_STORE_LIB=$testmoduledir/tes
exportmodule() { test -f $moduledir/$2 && eval "export $1=$moduledir/$2"; }
exportmodule ACL_LIB acl.so
exportmodule CLUSTER_LIB cluster.so
+exportmodule CLUSTER2_LIB cluster2.so
exportmodule REPLICATING_LISTENER_LIB replicating_listener.so
exportmodule REPLICATION_EXCHANGE_LIB replication_exchange.so
exportmodule SSLCONNECTOR_LIB sslconnector.so
Modified: qpid/branches/qpid-2920/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920/qpid/cpp/xml/cluster.xml?rev=1073448&r1=1073447&r2=1073448&view=diff
==============================================================================
--- qpid/branches/qpid-2920/qpid/cpp/xml/cluster.xml (original)
+++ qpid/branches/qpid-2920/qpid/cpp/xml/cluster.xml Tue Feb 22 18:23:06 2011
@@ -284,4 +284,62 @@
</control>
</class>
+
+ <!-- TODO aconway 2010-10-20: Experimental classes for new cluster. -->
+
+ <!-- Message delivery and disposition -->
+ <class name="cluster-message" code="0x82">
+ <!-- FIXME aconway 2010-10-19: create message in fragments -->
+ <control name="routing" code="0x1">
+ <field name="routing-id" type="uint32"/>
+ <field name="message" type="str32"/>
+ </control>
+
+ <control name="enqueue" code="0x2">
+ <field name="routing-id" type="uint32"/>
+ <field name="queue" type="queue.name"/>
+ </control>
+
+ <control name="routed" code="0x3">
+ <field name="routing-id" type="uint32"/>
+ </control>
+
+ <control name="dequeue" code="0x4">
+ <field name="queue" type="queue.name"/>
+ <field name="position" type="uint32"/>
+ </control>
+ </class>
+
+ <class name="cluster-wiring" code="0x83">
+ <control name="create-queue" code="0x1">
+ <field name="data" type="str32"/>
+ </control>
+
+ <control name="destroy-queue" code="0x2">
+ <field name="name" type="queue.name"/>
+ </control>
+
+ <control name="create-exchange" code="0x3">
+ <field name="data" type="str32"/>
+ </control>
+
+ <control name="destroy-exchange" code="0x4">
+ <field name="name" type="exchange.name"/>
+ </control>
+
+ <control name="bind" code="0x5">
+ <field name="queue" type="queue.name"/>
+ <field name="exchange" type="exchange.name"/>
+ <field name="binding-key" type="str8"/>
+ <field name="arguments" type="map"/>
+ </control>
+
+ <control name="unbind" code="0x6">
+ <field name="queue" type="queue.name"/>
+ <field name="exchange" type="exchange.name"/>
+ <field name="binding-key" type="str8"/>
+ <field name="arguments" type="map"/>
+ </control>
+
+ </class>
</amqp>
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org