You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/10/18 21:36:13 UTC
svn commit: r1023966 - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/
qpid/cluster/ tests/
Author: aconway
Date: Mon Oct 18 19:36:13 2010
New Revision: 1023966
URL: http://svn.apache.org/viewvc?rev=1023966&view=rev
Log:
Introduce broker::Cluster interface.
See cpp/src/qpid/cluster/new-cluster-design.txt and new-cluster-plan.txt.
qpid/cpp/src/tests/BrokerClusterCalls.cpp is a unit test that verifies
the broker makes the expected calls on broker::Cluster in various situations.
Added:
qpid/trunk/qpid/cpp/src/qpid/broker/Cluster.h (with props)
qpid/trunk/qpid/cpp/src/qpid/broker/NullCluster.h (with props)
qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt (with props)
qpid/trunk/qpid/cpp/src/tests/BrokerClusterCalls.cpp (with props)
Modified:
qpid/trunk/qpid/cpp/src/Makefile.am
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt
qpid/trunk/qpid/cpp/src/tests/Makefile.am
qpid/trunk/qpid/cpp/src/tests/cluster.mk
Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Mon Oct 18 19:36:13 2010
@@ -501,6 +501,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/Broker.cpp \
qpid/broker/Broker.h \
qpid/broker/BrokerImportExport.h \
+ qpid/broker/Cluster.h \
qpid/broker/Connection.cpp \
qpid/broker/Connection.h \
qpid/broker/ConnectionFactory.cpp \
@@ -559,6 +560,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/MessageStoreModule.h \
qpid/broker/NameGenerator.cpp \
qpid/broker/NameGenerator.h \
+ qpid/broker/NullCluster.h \
qpid/broker/NullMessageStore.cpp \
qpid/broker/NullMessageStore.h \
qpid/broker/OwnershipToken.h \
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Mon Oct 18 19:36:13 2010
@@ -24,6 +24,7 @@
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
#include "qpid/broker/MessageStoreModule.h"
+#include "qpid/broker/NullCluster.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/broker/SaslAuthenticator.h"
@@ -146,6 +147,7 @@ Broker::Broker(const Broker::Options& co
conf.qmf2Support)
: 0),
store(new NullMessageStore),
+ cluster(new NullCluster),
acl(0),
dataDir(conf.noDataDir ? std::string() : conf.dataDir),
queues(this),
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Mon Oct 18 19:36:13 2010
@@ -70,6 +70,7 @@ namespace broker {
class ExpiryPolicy;
class Message;
+class Cluster;
static const uint16_t DEFAULT_PORT=5672;
@@ -153,6 +154,7 @@ public:
std::auto_ptr<management::ManagementAgent> managementAgent;
ProtocolFactoryMap protocolFactories;
std::auto_ptr<MessageStore> store;
+ std::auto_ptr<Cluster> cluster;
AclModule* acl;
DataDir dataDir;
@@ -273,6 +275,9 @@ public:
void setClusterUpdatee(bool set) { clusterUpdatee = set; }
bool isClusterUpdatee() const { return clusterUpdatee; }
+ QPID_BROKER_EXTERN void setCluster(std::auto_ptr<Cluster> c) { cluster = c; }
+ QPID_BROKER_EXTERN Cluster& getCluster() { return *cluster; }
+
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
ConnectionCounter& getConnectionCounter() {return connectionCounter;}
Added: qpid/trunk/qpid/cpp/src/qpid/broker/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Cluster.h?rev=1023966&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Cluster.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Cluster.h Mon Oct 18 19:36:13 2010
@@ -0,0 +1,103 @@
+#ifndef QPID_BROKER_CLUSTER_H
+#define QPID_BROKER_CLUSTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+
+namespace framing {
+class FieldTable;
+}
+
+namespace broker {
+
+class Message;
+struct QueuedMessage;
+class Queue;
+class Exchange;
+
+/**
+ * NOTE: this is part of an experimental cluster implementation that is not
+ * yet fully functional. The original cluster implementation remains in place.
+ * See ../cluster/new-cluster-design.txt
+ *
+ * Interface for cluster implementations. Functions on this interface are
+ * called at relevant points in the Broker's processing.
+ */
+class Cluster
+{
+ public:
+ virtual ~Cluster() {}
+
+ // Messages
+
+ /** In Exchange::route, before the message is enqueued. */
+ virtual void routing(const boost::intrusive_ptr<Message>&) = 0;
+ /** A message is delivered to a queue. */
+ virtual void enqueue(QueuedMessage&) = 0;
+ /** In Exchange::route, after all enqueues for the message. */
+ virtual void routed(const boost::intrusive_ptr<Message>&) = 0;
+
+ /** A message is acquired by a local consumer, it is unavailable to replicas. */
+ virtual void acquire(const QueuedMessage&) = 0;
+ /** A locally-acquired message is accepted, it is removed from all replicas. */
+ virtual void accept(const QueuedMessage&) = 0;
+
+ /** A locally-acquired message is rejected, and may be re-routed. */
+ virtual void reject(const QueuedMessage&) = 0;
+ /** Re-routing (if any) is complete for a rejected message. */
+ virtual void rejected(const QueuedMessage&) = 0;
+
+ /** A locally-acquired message is released by the consumer and re-queued. */
+ virtual void release(const QueuedMessage&) = 0;
+ /** A message is dropped from the queue, e.g. expired or replaced on an LVQ.
+ * This function does only local book-keeping, it does not multicast.
+ * It is reasonable to call with a queue lock held.
+ */
+ virtual void dequeue(const QueuedMessage&) = 0;
+
+ // Consumers
+
+ /** A new consumer subscribes to a queue. */
+ virtual void consume(const Queue&, size_t consumerCount) = 0;
+ /** A consumer cancels its subscription to a queue */
+ virtual void cancel(const Queue&, size_t consumerCount) = 0;
+
+ // Wiring
+
+ /** A queue is created */
+ virtual void create(const Queue&) = 0;
+ /** A queue is destroyed */
+ virtual void destroy(const Queue&) = 0;
+ /** An exchange is created */
+ virtual void create(const Exchange&) = 0;
+ /** An exchange is destroyed */
+ virtual void destroy(const Exchange&) = 0;
+ /** A binding is created */
+ virtual void bind(const Queue&, const Exchange&, const std::string& key, const framing::FieldTable& args) = 0;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_CLUSTER_H*/
Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/Cluster.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/Cluster.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Mon Oct 18 19:36:13 2010
@@ -112,7 +112,7 @@ void DeliveryRecord::complete() {
bool DeliveryRecord::accept(TransactionContext* ctxt) {
if (acquired && !ended) {
- queue->dequeue(ctxt, msg);
+ queue->accept(ctxt, msg);
setEnded();
QPID_LOG(debug, "Accepted " << id);
}
@@ -130,19 +130,8 @@ void DeliveryRecord::committed() const{
}
void DeliveryRecord::reject()
-{
- Exchange::shared_ptr alternate = queue->getAlternateExchange();
- if (alternate) {
- DeliverableMessage delivery(msg.payload);
- alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders());
- QPID_LOG(info, "Routed rejected message from " << queue->getName() << " to "
- << alternate->getName());
- } else {
- //just drop it
- QPID_LOG(info, "Dropping rejected message from " << queue->getName());
- }
-
- dequeue();
+{
+ queue->reject(msg);
}
uint32_t DeliveryRecord::getCredit() const
@@ -156,7 +145,7 @@ void DeliveryRecord::acquire(DeliveryIds
results.push_back(id);
if (!acceptExpected) {
if (ended) { QPID_LOG(error, "Can't dequeue ended message"); }
- else { queue->dequeue(0, msg); setEnded(); }
+ else { queue->accept(0, msg); setEnded(); }
}
} else {
QPID_LOG(info, "Message already acquired " << id.getValue());
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Mon Oct 18 19:36:13 2010
@@ -22,6 +22,7 @@
#include "qpid/broker/Exchange.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Cluster.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/broker/Queue.h"
#include "qpid/log/Statement.h"
@@ -78,10 +79,23 @@ Exchange::PreRoute::~PreRoute(){
}
}
+// Bracket a scope with calls to Cluster::routing and Cluster::routed
+struct ScopedClusterRouting {
+ Broker* broker;
+ boost::intrusive_ptr<Message> message;
+ ScopedClusterRouting(Broker* b, boost::intrusive_ptr<Message> m)
+ : broker(b), message(m) {
+ if (broker) broker->getCluster().routing(message);
+ }
+ ~ScopedClusterRouting() {
+ if (broker) broker->getCluster().routed(message);
+ }
+};
+
void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
{
+ ScopedClusterRouting scr(broker, &msg.getMessage());
int count = 0;
-
if (b.get()) {
// Block the content release if the message is transient AND there is more than one binding
if (!msg.getMessage().isPersistent() && b->size() > 1) {
Added: qpid/trunk/qpid/cpp/src/qpid/broker/NullCluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/NullCluster.h?rev=1023966&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/NullCluster.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/NullCluster.h Mon Oct 18 19:36:13 2010
@@ -0,0 +1,66 @@
+#ifndef QPID_BROKER_NULLCLUSTER_H
+#define QPID_BROKER_NULLCLUSTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/broker/Cluster.h>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * No-op implementation of Cluster interface, installed by broker when
+ * no cluster plug-in is present or clustering is disabled.
+ */
+class NullCluster : public Cluster
+{
+ public:
+
+ // Messages
+
+ virtual void routing(const boost::intrusive_ptr<Message>&) {}
+ virtual void enqueue(QueuedMessage&) {}
+ virtual void routed(const boost::intrusive_ptr<Message>&) {}
+ virtual void acquire(const QueuedMessage&) {}
+ virtual void accept(const QueuedMessage&) {}
+ virtual void reject(const QueuedMessage&) {}
+ virtual void rejected(const QueuedMessage&) {}
+ virtual void release(const QueuedMessage&) {}
+ virtual void dequeue(const QueuedMessage&) {}
+
+ // Consumers
+
+ virtual void consume(const Queue&, size_t) {}
+ virtual void cancel(const Queue&, size_t) {}
+
+ // Wiring
+
+ virtual void create(const Queue&) {}
+ virtual void destroy(const Queue&) {}
+ virtual void create(const Exchange&) {}
+ virtual void destroy(const Exchange&) {}
+ virtual void bind(const Queue&, const Exchange&, const std::string&, const framing::FieldTable&) {}
+};
+
+}} // namespace qpid::broker
+
+#endif
Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/NullCluster.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/NullCluster.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Oct 18 19:36:13 2010
@@ -20,6 +20,7 @@
*/
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Cluster.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueEvents.h"
#include "qpid/broker/Exchange.h"
@@ -224,6 +225,7 @@ void Queue::requeue(const QueuedMessage&
}
}
}
+ if (broker) broker->getCluster().release(msg);
copy.notify();
}
@@ -236,8 +238,22 @@ void Queue::clearLVQIndex(const QueuedMe
}
}
+// Inform the cluster of an acquired message on exit from a function
+// that does the acquiring. The calling function should set qmsg
+// to the acquired message.
+struct ClusterAcquireOnExit {
+ Broker* broker;
+ QueuedMessage qmsg;
+ ClusterAcquireOnExit(Broker* b) : broker(b) {}
+ ~ClusterAcquireOnExit() {
+ if (broker && qmsg.queue) broker->getCluster().acquire(qmsg);
+ }
+};
+
bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
+ ClusterAcquireOnExit willAcquire(broker);
+
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
@@ -248,16 +264,18 @@ bool Queue::acquireMessageAt(const Seque
if (lastValueQueue) {
clearLVQIndex(*i);
}
- QPID_LOG(debug,
- "Acquired message at " << i->position << " from " << name);
+ QPID_LOG(debug, "Acquired message at " << i->position << " from " << name);
+ willAcquire.qmsg = *i;
messages.erase(i);
return true;
- }
+ }
QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
return false;
}
bool Queue::acquire(const QueuedMessage& msg) {
+ ClusterAcquireOnExit acquire(broker);
+
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
@@ -265,16 +283,17 @@ bool Queue::acquire(const QueuedMessage&
Messages::iterator i = findAt(msg.position);
if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set
(!lastValueQueue ||
- (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0
- ) {
+ (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0
+ ) {
clearLVQIndex(msg);
QPID_LOG(debug,
"Match found, acquire succeeded: " <<
i->position << " == " << msg.position);
+ acquire.qmsg = *i;
messages.erase(i);
return true;
- }
+ }
QPID_LOG(debug, "Acquire failed for " << msg.position);
return false;
@@ -314,6 +333,8 @@ bool Queue::getNextMessage(QueuedMessage
Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
while (true) {
+ ClusterAcquireOnExit willAcquire(broker); // Outside the lock
+
Mutex::ScopedLock locker(messageLock);
if (messages.empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
@@ -330,6 +351,7 @@ Queue::ConsumeCode Queue::consumeNextMes
if (c->filter(msg.payload)) {
if (c->accept(msg.payload)) {
m = msg;
+ willAcquire.qmsg = msg;
popMsg(msg);
return CONSUMED;
} else {
@@ -451,40 +473,51 @@ QueuedMessage Queue::find(SequenceNumber
return QueuedMessage();
}
-void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
+void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) {
assertClusterSafe();
- Mutex::ScopedLock locker(consumerLock);
- if(exclusive) {
- throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
- } else if(requestExclusive) {
- if(consumerCount) {
+ size_t consumers;
+ {
+ Mutex::ScopedLock locker(consumerLock);
+ if(exclusive) {
throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
- } else {
- exclusive = c->getSession();
+ QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
+ } else if(requestExclusive) {
+ if(consumerCount) {
+ throw ResourceLockedException(
+ QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
+ } else {
+ exclusive = c->getSession();
+ }
}
+ consumers = ++consumerCount;
+ if (mgmtObject != 0)
+ mgmtObject->inc_consumerCount ();
}
- consumerCount++;
- if (mgmtObject != 0)
- mgmtObject->inc_consumerCount ();
+ if (broker) broker->getCluster().consume(*this, consumers);
}
void Queue::cancel(Consumer::shared_ptr c){
removeListener(c);
- Mutex::ScopedLock locker(consumerLock);
- consumerCount--;
- if(exclusive) exclusive = 0;
- if (mgmtObject != 0)
- mgmtObject->dec_consumerCount ();
+ size_t consumers;
+ {
+ Mutex::ScopedLock locker(consumerLock);
+ consumers = --consumerCount;
+ if(exclusive) exclusive = 0;
+ if (mgmtObject != 0)
+ mgmtObject->dec_consumerCount ();
+ }
+ if (broker) broker->getCluster().cancel(*this, consumers);
}
QueuedMessage Queue::get(){
+ ClusterAcquireOnExit acquire(broker); // Outside lock
+
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
if(!messages.empty()){
msg = getFront();
+ acquire.qmsg = msg;
popMsg(msg);
}
return msg;
@@ -609,10 +642,11 @@ void Queue::popMsg(QueuedMessage& qmsg)
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
assertClusterSafe();
+ QueuedMessage qm;
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
- QueuedMessage qm(this, msg, ++sequence);
+ qm = QueuedMessage(this, msg, ++sequence);
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
LVQ::iterator i;
@@ -629,12 +663,14 @@ void Queue::push(boost::intrusive_ptr<Me
boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this);
if (!old) old = i->second;
i->second->setReplacementMessage(msg,this);
+ // FIXME aconway 2010-10-15: it is incorrect to use qm.position below
+ // should be using the position of the message being replaced.
if (isRecovery) {
//can't issue new requests for the store until
//recovery is complete
pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position));
} else {
- Mutex::ScopedUnlock u(messageLock);
+ Mutex::ScopedUnlock u(messageLock);
dequeue(0, QueuedMessage(qm.queue, old, qm.position));
}
}
@@ -651,6 +687,7 @@ void Queue::push(boost::intrusive_ptr<Me
}
}
copy.notify();
+ if (broker) broker->getCluster().enqueue(qm);
}
QueuedMessage Queue::getFront()
@@ -792,12 +829,42 @@ void Queue::enqueueAborted(boost::intrus
if (policy.get()) policy->enqueueAborted(msg);
}
+void Queue::accept(TransactionContext* ctxt, const QueuedMessage& msg) {
+ if (broker) broker->getCluster().accept(msg);
+ dequeue(ctxt, msg);
+}
+
+struct ScopedClusterReject {
+ Broker* broker;
+ const QueuedMessage& qmsg;
+ ScopedClusterReject(Broker* b, const QueuedMessage& m) : broker(b), qmsg(m) {
+ if (broker) broker->getCluster().reject(qmsg);
+ }
+ ~ScopedClusterReject() {
+ if (broker) broker->getCluster().rejected(qmsg);
+ }
+};
+
+void Queue::reject(const QueuedMessage &msg) {
+ ScopedClusterReject scr(broker, msg);
+ Exchange::shared_ptr alternate = getAlternateExchange();
+ if (alternate) {
+ DeliverableMessage delivery(msg.payload);
+ alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders());
+ QPID_LOG(info, "Routed rejected message from " << getName() << " to "
+ << alternate->getName());
+ } else {
+ //just drop it
+ QPID_LOG(info, "Dropping rejected message from " << getName());
+ }
+ dequeue(0, msg);
+}
+
// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
ScopedUse u(barrier);
if (!u.acquired) return false;
-
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return false;
@@ -846,6 +913,9 @@ void Queue::popAndDequeue()
*/
void Queue::dequeued(const QueuedMessage& msg)
{
+ // Note: Cluster::dequeued does only local book-keeping, no multicast
+ // So OK to call here with lock held.
+ if (broker) broker->getCluster().dequeue(msg);
if (policy.get()) policy->dequeued(msg);
mgntDeqStats(msg.payload);
if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {
@@ -861,6 +931,7 @@ void Queue::create(const FieldTable& _se
store->create(*this, _settings);
}
configure(_settings);
+ if (broker) broker->getCluster().create(*this);
}
void Queue::configure(const FieldTable& _settings, bool recovering)
@@ -934,6 +1005,7 @@ void Queue::destroy()
store->destroy(*this);
store = 0;//ensure we make no more calls to the store for this queue
}
+ if (broker) broker->getCluster().destroy(*this);
}
void Queue::notifyDeleted()
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Oct 18 19:36:13 2010
@@ -259,6 +259,13 @@ class Queue : public boost::enable_share
bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false);
void enqueueAborted(boost::intrusive_ptr<Message> msg);
+
+ /** Message acknowledged, dequeue it. */
+ QPID_BROKER_EXTERN void accept(TransactionContext* ctxt, const QueuedMessage &msg);
+
+ /** Message rejected, dequeue it and re-route to alternate exchange if necessary. */
+ QPID_BROKER_EXTERN void reject(const QueuedMessage &msg);
+
/**
* dequeue from store (only done once messages is acknowledged)
*/
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuedMessage.h Mon Oct 18 19:36:13 2010
@@ -34,10 +34,9 @@ struct QueuedMessage
framing::SequenceNumber position;
Queue* queue;
- QueuedMessage() : queue(0) {}
+ QueuedMessage(Queue* q=0) : position(0), queue(q) {}
QueuedMessage(Queue* q, boost::intrusive_ptr<Message> msg, framing::SequenceNumber sn) :
payload(msg), position(sn), queue(q) {}
- QueuedMessage(Queue* q) : queue(q) {}
};
inline bool operator<(const QueuedMessage& a, const QueuedMessage& b) { return a.position < b.position; }
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Mon Oct 18 19:36:13 2010
@@ -333,7 +333,7 @@ bool SemanticState::ConsumerImpl::delive
parent->record(record);
}
if (acquire && !ackExpected) {
- queue->dequeue(0, msg);
+ queue->accept(0, msg);
}
if (mgmtObject) { mgmtObject->inc_delivered(); }
return true;
@@ -347,11 +347,6 @@ bool SemanticState::ConsumerImpl::filter
bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
{
assertClusterSafe();
- // FIXME aconway 2009-06-08: if we have byte & message credit but
- // checkCredit fails because the message is to big, we should
- // remain on queue's listener list for possible smaller messages
- // in future.
- //
blocked = !(filter(msg) && checkCredit(msg));
return !blocked;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-design.txt Mon Oct 18 19:36:13 2010
@@ -75,6 +75,8 @@ Use a moving queue ownership protocol to
than relying on identical state and lock-step behavior to cause
identical dequeues on each broker.
+Clearly defined interface between broker code and cluster plug-in.
+
*** Requirements
The cluster must provide these delivery guarantees:
@@ -365,3 +367,4 @@ there a better term?
Clustering and scalability: new design may give us the flexibility to
address scalability as part of cluster design. Think about
relationship to federation and "fragmented queues" idea.
+
Added: qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt?rev=1023966&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt Mon Oct 18 19:36:13 2010
@@ -0,0 +1,439 @@
+-*-org-*-
+Notes on new cluster implementation. See also: new-cluster-design.txt
+
+* Implementation plan.
+
+Co-existence with old cluster code and tests:
+- Separate plugin cluster2, options --cluster2-*. Eventually renamed to replace cluster.
+- Double up tests with old version/new version as the new code develops.
+
+Minimal POC for message delivery & perf test.
+- no wiring replication, no updates, no failover, no persistence, no async completion.
+- just implement publish and acquire/dequeue locking protocol.
+- measure performance.
+
+Full implementation of transient cluster
+- Update (based on existing update), async completion etc.
+- Passing all existing transient cluster tests.
+
+Persistent cluster
+- Make sure async completion works correctly.
+- InitialStatus protoocl etc. to support persistent start-up (existing code)
+- cluster restart from store: stores not identical. Load one, update the rest.
+ - assign cluster ID's to messages recovered from store, don't replicate.
+
+Improved update protocol
+- per-queue, less stalling, bounded catch-up.
+
+* Task list
+
+** TODO [#A] Minimal POC: publish/acquire/dequeue protocol.
+
+NOTE: as implementation questions arise, take the easiest option and make
+a note for later optimization/improvement.
+
+*** Tests
+- python test: 4 senders, numbered messages, 4 receivers, verify message set.
+- acquire then release messages: verify can be dequeued on any member
+- acquire then kill broker: verify can be dequeued other members.
+- acquire then reject: verify goes on alt-exchange once only.
+
+*** TODO broker::Cluster interface and call points.
+
+Initial draft is commited.
+
+Issues to review:
+
+queue API: internal classes like RingQueuePolicy use Queue::acuqire/dequeue
+when messages are pushed. How to reconcile with queue ownership?
+
+rejecting messages: if there's an alternate exchange where do we do the
+re-routing? On origin broker or on all brokers?
+
+Intercept points: on Queue vs. on DeliveryRecord, SemanticState etc.
+Intercepting client actions on the queue vs. internal actions
+(e.g. ring policy)
+
+*** Main classes
+
+BrokerHandler:
+- implements broker::Cluster intercept points.
+- sends mcast events to inform cluster of local actions.
+- thread safe, called in connection threads.
+
+LocalMessageMap:
+- Holds local messages while they are being enqueued.
+- thread safe: called by both BrokerHandler and DeliverHandler
+
+MessageHandler:
+- handles delivered mcast messages related to messages.
+- initiates local actions in response to mcast events.
+- thread unsafe, only called in deliver thread.
+- maintains view of cluster state regarding messages.
+
+QueueOwnerHandler:
+- handles delivered mcast messages related to queue consumer ownership.
+- thread safe, called in deliver, connection and timer threads.
+- maintains view of cluster state regarding queue ownership.
+
+cluster::Core: class to hold new cluster together (replaces cluster::Cluster)
+- thread safe: manage state used by both DeliverHandler and BrokerHandler
+
+The following code sketch illustrates only the "happy path" error handling
+is omitted.
+
+*** BrokerHandler
+Types:
+- struct QueuedMessage { Message msg; QueueName q; Position pos; }
+- SequenceNumber 64 bit sequence number to identify messages.
+- NodeId 64 bit CPG node-id, identifies member of the cluster.
+- struct MessageId { NodeId node; SequenceNumber seq; }
+
+Members:
+- atomic<SequenceNumber> sequence // sequence number for message IDs.
+- thread_local bool noReplicate // suppress replication.
+- thread_local bool isRouting // suppress operations while routing
+- QueuedMessage localMessage[SequenceNumber] // local messages being enqueued.
+
+NOTE: localMessage is also modified by DeliverHandler.
+
+broker::Cluster intercept functions:
+
+routing(msg)
+ if noReplicate: return
+ # Supress everything except enqueues while we are routing.
+ # We don't want to replicate acquires & dequeues caused by an enqueu,
+ # e.g. removal of messages from ring/LV queues.
+ isRouting = true
+
+enqueue(qmsg):
+ if noReplicate: return
+ if !qmsg.msg.id:
+ seq = sequence++
+ qmsg.msg.id = (self,seq)
+ localMessage[seq] = qmsg
+ mcast create(encode(qmsg.msg),seq)
+ mcast enqueue(qmsg.q,qmsg.msg.id.seq)
+
+routed(msg):
+ if noReplicate: return
+ if msg.id: mcast routed(msg.id.seq)
+ isRouting = false
+
+acquire(qmsg):
+ if noReplicate: return
+ if isRouting: return # Ignore while we are routing a message.
+ if msg.id: mcast acquire(msg.id, q)
+
+release(QueuedMessage)
+ if noReplicate: return
+ if isRouting: return # Ignore while we are routing a message.
+ if msg.id: mcast release(id, q)
+
+accept(QueuedMessage):
+ if noReplicate: return
+ if isRouting: return # Ignore while we are routing a message.
+ if msg.id: mcast dequeue(msg.id, msg.q)
+
+reject(QueuedMessage):
+ isRejecting = true
+ if msg.id: mcast reject(msg.id, msg.q)
+
+rejected(QueuedMessage):
+ isRejecting = false
+ mcast dequeue(msg.id, msg.q)
+
+dequeue(QueuedMessage)
+ # No mcast in dequeue, only used for local cleanup of resources.
+ # E.g. messages that are replaced on an LVQ are dequeued without being
+ # accepted or rejected. dequeue is called with the queue lock held
+ # FIXME revisit - move it out of the queue lock.
+ cleanup(msg)
+
+*** DeliverHandler and mcast messages
+Types:
+- struct QueueEntry { QueuedMessage qmsg; NodeId acquired; }
+- struct QueueKey { MessageId id; QueueName q; }
+- typedef map<QueueKey, QueueEntry> Queue
+- struct Node { Message routing[SequenceNumber]; list<QueueKey> acquired; }
+
+Members:
+- QueueEntry enqueued[QueueKey]
+- Node node[NodeId]
+
+Mcast messages in Message class:
+
+create(msg,seq)
+ if sender != self: node[sender].routing[seq] = decode(msg)
+
+enqueue(q,seq):
+ id = (sender,seq)
+ if sender == self:
+ enqueued[id,q] = (localMessage[seq], acquired=None)
+ else:
+ msg = sender.routing[seq]
+ enqueued[id,q] = (qmsg, acquired=None)
+ with noReplicate=true: qmsg = broker.getQueue(q).push(msg)
+
+routed(seq):
+ if sender == self: localMessage.erase(msg.id.seq)
+ else: sender.routing.erase(seq)
+
+acquire(id,q):
+ enqueued[id,q].acquired = sender
+ node[sender].acquired.push_back((id,q))
+ if sender != self:
+ with noReplicate=true: broker.getQueue(q).acquire(enqueued[id,q])
+
+release(id,q)
+ enqueued[id,q].acquired = None
+ node[sender].acquired.erase((id,q))
+ if sender != self
+ with noReplicate=true: broker.getQueue(q).requeue(enqueued[id,q])
+
+reject(id,q):
+ sender.routing[id] = enqueued[id,q] # prepare for re-queueing
+
+rejected(id,q)
+ sender.routing.erase[id]
+
+dequeue(id,q)
+ entry = enqueued[id,q]
+ enqueued.erase[id,q]
+ node[entry.acquired].acquired.erase(id,q)
+ if sender != self:
+ with noReplicate=true: broker.getQueue(q).dequeue(entry.qmsg)
+
+member m leaves cluster:
+ for key in node[m].acquired:
+ release(key.id, key.q)
+ node.erase(m)
+
+*** Queue consumer locking
+
+When a queue is locked it does not deliver messages to its consumers.
+
+New broker::Queue functions:
+- stopConsumers(): set consumersStopped flag, wait for currently busy consumers to exit.
+- startConsumers(): reset consumersStopped flag
+
+Implementation sketch, locking omitted:
+
+void Queue::stopConsumers() {
+ consumersStopped = true;
+ while (consumersBusy) consumersBusyMonitor.wait();
+}
+
+void Queue::startConsumers() {
+ consumersStopped = false;
+ listeners.notify();
+}
+
+bool Queue::dispatch(consumer) {
+ if (consumersStopped) return false;
+ ++consumersBusy;
+ do_regular_dispatch_body()
+ if (--consumersBusy == 0) consumersBusyMonitor.notify();
+}
+
+*** QueueOwnerHandler
+
+Invariants:
+- Each queue is owned by at most one node at any time.
+- Each node is interested in a set of queues at any given time.
+- A queue is un-owned if no node is interested.
+
+The queue owner releases the queue when
+- it loses interest i.e. queue has no consumers with credit.
+- a configured time delay expires and there are other interested nodes.
+
+The owner mcasts release(q). On delivery the new queue owner is the
+next node in node-id order (treating nodes as a circular list)
+starting from the old owner that is interested in the queue.
+
+Queue consumers initially are stopped, only started when we get
+ownership from the cluster.
+
+Thread safety: called by deliver, connection and timer threads, needs locking.
+
+Thread safe object per queue holding queue ownership status.
+Called by deliver, connection and timer threads.
+
+class QueueOwnership {
+ bool owned;
+ Timer timer;
+ BrokerQueue q;
+
+ drop(): # locked
+ if owned:
+ owned = false
+ q.stopConsumers()
+ mcast release(q.name, false)
+ timer.stop()
+
+ take(): # locked
+ if not owned:
+ owned = true
+ q.startConsumers()
+ timer.start(timeout)
+
+ timer.fire(): drop()
+}
+
+Data Members, only modified/examined in deliver thread:
+- typedef set<NodeId> ConsumerSet
+- map<QueueName, ConsumerSet> consumers
+- map<QueueName, NodeId> owner
+
+Thread safe data members, accessed in connection threads (via BrokerHandler):
+- map<QueueName, QueueOwnership> ownership
+
+Multicast messages in QueueOwner class:
+
+consume(q):
+ if sender==self and consumers[q].empty(): ownership[q].take()
+ consumers[q].insert(sender)
+
+release(q):
+ asssert(owner[q] == sender and owner[q] in consumers[q])
+ owner[q] = circular search from sender in consumers[q]
+ if owner==self: ownership[q].take()
+
+cancel(q):
+ assert(queue[q].owner != sender) # sender must release() before cancel()
+ consumers[q].erase(sender)
+
+member-leaves:
+ for q in queue: if owner[q] = left: left.release(q)
+
+Need 2 more intercept points in broker::Cluster:
+
+consume(q,consumer,consumerCount) - Queue::consume()
+ if consumerCount == 1: mcast consume(q)
+
+cancel(q,consumer,consumerCount) - Queue::cancel()
+ if consumerCount == 0:
+ ownership[q].drop()
+ mcast cancel(q)
+
+#TODO: lifecycle, updating cluster data structures when queues are destroyed
+
+*** Re-use of existing cluster code
+- re-use Event
+- re-use Multicaster
+- re-use same PollableQueueSetup (may experiment later)
+- new Core class to replace Cluster.
+- keep design modular, keep threading rules clear.
+
+** TODO [#B] Large message replication.
+Need to be able to multicast large messages in fragments
+
+** TODO [#B] Batch CPG multicast messages
+The new cluster design involves a lot of small multicast messages,
+they need to be batched into larger CPG messages for efficiency.
+** TODO [#B] Genuine async completion
+Replace current synchronous waiting implementation with genuine async completion.
+
+Test: enhance test_store.cpp to defer enqueueComplete till special message received.
+
+Async callback uses *requestIOProcessing* to queue action on IO thread.
+
+** TODO [#B] Async completion of accept when dequeue completes.
+Interface is already there on broker::Message, just need to ensure
+that store and cluster implementations call it appropriately.
+
+** TODO [#B] Replicate wiring.
+From messageStore create/destroy/bind, replicate encoded declare/destroy/bind command.
+
+** TODO [#B] New members joining - first pass
+
+Re-use update code from old cluster but don't replicate sessions &
+connections.
+
+Need to extend it to send cluster IDs with messages.
+
+Need to replicate the queue ownership data as part of the update.
+
+** TODO [#B] Persistence support.
+InitialStatus protoocl etc. to support persistent start-up (existing code)
+
+Only one broker recovers from store, update to others.
+
+Assign cluster IDs to messages recovered from store, don't replicate. See Queue::recover.
+
+** TODO [#B] Handle other ways that messages can leave a queue.
+
+Other ways (other than via a consumer) that messages are take off a queue.
+
+NOTE: Not controlled by queue lock, how to make them consistent?
+
+Target broker may not have all messages on other brokers for purge/destroy.
+- Queue::move() - need to wait for lock? Replicate?
+- Queue::get() - ???
+- Queue::purge() - replicate purge? or just delete what's on broker ?
+- Queue::destroy() - messages to alternate exchange on all brokers.?
+
+Need to add callpoints & mcast messages to replicate these?
+
+** TODO [#B] Flow control for internal queues.
+
+Need to bound the size of the internal queues holding cluster events & frames.
+- stop polling when we reach bound.
+- start polling when we get back under it.
+** TODO [#B] Integration with transactions.
+Do we want to replicate during transaction & replicate commit/rollback
+or replicate only on commit?
+No integration with DTX transactions.
+** TODO [#B] Make new cluster work with replication exchange.
+Possibly re-use some common logic. Replication exchange is like clustering
+except over TCP.
+** TODO [#C] Async completion for declare, bind, destroy queues and exchanges.
+Cluster needs to complete these asynchronously to guarantee resources
+exist across the cluster when the command completes.
+
+** TODO [#C] Allow non-replicated exchanges, queues.
+
+Set qpid.replicated=false in declare arguments, set flag on Exchange, Queue objects.
+- save replicated status to store.
+- support in management tools.
+Replicated exchange: replicate binds to replicated queues.
+Replicated queue: replicate all messages.
+
+** TODO [#C] New members joining - improved.
+
+Replicate wiring like old cluster, stall for wiring but not for
+messages. Update messages on a per-queue basis from back to front.
+
+Updater:
+- stall & push wiring: declare exchanges, queues, bindings.
+- start update iterator thread on each queue.
+- unstall and process normally while iterator threads run.
+
+Update iterator thread:
+- starts at back of updater queue, message m.
+- send update_front(q,m) to updatee and advance towards front
+- at front: send update_done(q)
+
+Updatee:
+- stall, receive wiring, lock all queues, mark queues "updating", unstall
+- update_front(q,m): push m to *front* of q
+- update_done(q): mark queue "ready"
+
+Updatee cannot take the queue consume lock for a queue that is updating.
+Updatee *can* push messages onto a queue that is updating.
+
+TODO: Is there any way to eliminate the stall for wiring?
+
+** TODO [#C] Refactoring of common concerns.
+
+There are a bunch of things that act as "Queue observers" with intercept
+points in similar places.
+- QueuePolicy
+- QueuedEvents (async replication)
+- MessageStore
+- Cluster
+
+Look for ways to capitalize on the similarity & simplify the code.
+
+In particular QueuedEvents (async replication) strongly resembles
+cluster replication, but over TCP rather than multicast.
Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/new-cluster-plan.txt
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: qpid/trunk/qpid/cpp/src/tests/BrokerClusterCalls.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/BrokerClusterCalls.cpp?rev=1023966&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/BrokerClusterCalls.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/BrokerClusterCalls.cpp Mon Oct 18 19:36:13 2010
@@ -0,0 +1,435 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+///@file
+// Tests using a dummy broker::Cluster implementation to verify the expected
+// Cluster functions are called for various actions on the broker.
+//
+
+#include "unit_test.h"
+#include "test_tools.h"
+#include "qpid/broker/Cluster.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Session.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Duration.h"
+#include "BrokerFixture.h"
+#include <boost/assign.hpp>
+#include <boost/format.hpp>
+#include <boost/regex.hpp>
+
+using namespace std;
+using namespace boost;
+using namespace boost::assign;
+using namespace qpid::messaging;
+using boost::format;
+using boost::regex;
+
+namespace qpid {
+namespace tests {
+
+class DummyCluster : public broker::Cluster
+{
+ private:
+ /** Flag used to ignore events other than enqueues while routing,
+ * e.g. acquires and accepts generated in a ring queue to replace an element..
+ * In real impl would be a thread-local variable.
+ */
+ bool isRouting;
+
+ void recordQm(const string& op, const broker::QueuedMessage& qm) {
+ history += (format("%s(%s, %d, %s)") % op % qm.queue->getName()
+ % qm.position % qm.payload->getFrames().getContent()).str();
+ }
+ void recordStr(const string& op, const string& name) {
+ history += (format("%s(%s)") % op % name).str();
+ }
+ public:
+ // Messages
+
+ virtual void routing(const boost::intrusive_ptr<broker::Message>& m) {
+ isRouting = true;
+ history += (format("routing(%s)") % m->getFrames().getContent()).str();
+ }
+
+ virtual void enqueue(broker::QueuedMessage& qm) { recordQm("enqueue", qm); }
+
+ virtual void routed(const boost::intrusive_ptr<broker::Message>& m) {
+ history += (format("routed(%s)") % m->getFrames().getContent()).str();
+ isRouting = false;
+ }
+ virtual void acquire(const broker::QueuedMessage& qm) {
+ if (!isRouting) recordQm("acquire", qm);
+ }
+ virtual void accept(const broker::QueuedMessage& qm) {
+ if (!isRouting) recordQm("accept", qm);
+ }
+ virtual void reject(const broker::QueuedMessage& qm) {
+ if (!isRouting) recordQm("reject", qm);
+ }
+ virtual void rejected(const broker::QueuedMessage& qm) {
+ if (!isRouting) recordQm("rejected", qm);
+ }
+ virtual void release(const broker::QueuedMessage& qm) {
+ if (!isRouting) recordQm("release", qm);
+ }
+ virtual void dequeue(const broker::QueuedMessage& qm) {
+ // Never ignore dequeue, used to avoid resource leaks.
+ recordQm("dequeue", qm);
+ }
+
+ // Consumers
+
+ virtual void consume(const broker::Queue& q, size_t n) {
+ history += (format("consume(%s, %d)") % q.getName() % n).str();
+ }
+ virtual void cancel(const broker::Queue& q, size_t n) {
+ history += (format("cancel(%s, %d)") % q.getName() % n).str();
+ }
+
+ // Wiring
+
+ virtual void create(const broker::Queue& q) { recordStr("createq", q.getName()); }
+ virtual void destroy(const broker::Queue& q) { recordStr("destroyq", q.getName()); }
+ virtual void create(const broker::Exchange& ex) { recordStr("createex", ex.getName()); }
+ virtual void destroy(const broker::Exchange& ex) { recordStr("destroyex", ex.getName()); }
+ virtual void bind(const broker::Queue& q, const broker::Exchange& ex, const std::string& key, const framing::FieldTable& /*args*/) {
+ history += (format("bind(%s, %s, %s)") % q.getName() % ex.getName() % key).str();
+ }
+ vector<string> history;
+};
+
+QPID_AUTO_TEST_SUITE(BrokerClusterCallsTestSuite)
+
+// Broker fixture with DummyCluster set up and some new API client bits.
+struct DummyClusterFixture: public BrokerFixture {
+ Connection c;
+ Session s;
+ DummyCluster*dc;
+ DummyClusterFixture() {
+ broker->setCluster(auto_ptr<broker::Cluster>(new DummyCluster));
+ dc = &static_cast<DummyCluster&>(broker->getCluster());
+ c = Connection("localhost:"+lexical_cast<string>(getPort()));
+ c.open();
+ s = c.createSession();
+ }
+ ~DummyClusterFixture() {
+ c.close();
+ }
+};
+
+QPID_AUTO_TEST_CASE(testSimplePubSub) {
+ DummyClusterFixture f;
+ vector<string>& h = f.dc->history;
+
+ // Queue creation
+ Sender sender = f.s.createSender("q;{create:always,delete:always}");
+ int i = 0;
+ BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking.
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Consumer
+ Receiver receiver = f.s.createReceiver("q");
+ f.s.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Send message
+ sender.send(Message("a"));
+ f.s.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+ // Don't check size here as it is uncertain whether acquire has happened yet.
+
+ // Acquire message
+ Message m = receiver.fetch(Duration::SECOND);
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Acknowledge message
+ f.s.acknowledge(true);
+ f.s.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Close a consumer
+ receiver.close();
+ BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 0)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Destroy the queue
+ f.c.close();
+ BOOST_CHECK_EQUAL(h.at(i++), "destroyq(q)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_CASE(testReleaseReject) {
+ DummyClusterFixture f;
+ vector<string>& h = f.dc->history;
+
+ Sender sender = f.s.createSender("q;{create:always,delete:always,node:{x-declare:{alternate-exchange:amq.fanout}}}");
+ sender.send(Message("a"));
+ Receiver receiver = f.s.createReceiver("q");
+ Receiver altReceiver = f.s.createReceiver("amq.fanout;{link:{name:altq}}");
+ Message m = receiver.fetch(Duration::SECOND);
+ h.clear();
+
+ // Explicit release
+ f.s.release(m);
+ f.s.sync();
+ int i = 0;
+ BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Implicit release on closing connection.
+ Connection c("localhost:"+lexical_cast<string>(f.getPort()));
+ c.open();
+ Session s = c.createSession();
+ Receiver r = s.createReceiver("q");
+ m = r.fetch(Duration::SECOND);
+ h.clear();
+ i = 0;
+ c.close();
+ BOOST_CHECK_EQUAL(h.at(i++), "cancel(q, 1)");
+ BOOST_CHECK_EQUAL(h.at(i++), "release(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Reject message, goes to alternate exchange.
+ m = receiver.fetch(Duration::SECOND);
+ h.clear();
+ i = 0;
+ f.s.reject(m);
+ BOOST_CHECK_EQUAL(h.at(i++), "reject(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(a)"); // Routing to alt exchange
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(amq.fanout_altq, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "rejected(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+ m = altReceiver.fetch(Duration::SECOND);
+ BOOST_CHECK_EQUAL(m.getContent(), "a");
+
+ // Timed out message
+ h.clear();
+ i = 0;
+ m = Message("t");
+ m.setTtl(Duration(1)); // Timeout 1ms
+ sender.send(m);
+ usleep(2000); // Sleep 2ms
+ bool received = receiver.fetch(m, Duration::IMMEDIATE);
+ BOOST_CHECK(!received); // Timed out
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(t)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 2, t)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(t)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, t)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Message replaced on LVQ
+ sender = f.s.createSender("lvq;{create:always,delete:always,node:{x-declare:{arguments:{qpid.last_value_queue:1}}}}");
+ m = Message("a");
+ m.getProperties()["qpid.LVQ_key"] = "foo";
+ sender.send(m);
+ f.s.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "createq(lvq)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ m = Message("b");
+ m.getProperties()["qpid.LVQ_key"] = "foo";
+ sender.send(m);
+ f.s.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
+ // FIXME: bug in Queue.cpp gives the incorrect position when
+ // dequeueing a replaced LVQ message.
+ // BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 2, a)"); // Should be 1
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(lvq, 2, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ receiver = f.s.createReceiver("lvq");
+ BOOST_CHECK_EQUAL(receiver.fetch(Duration::SECOND).getContent(), "b");
+ f.s.acknowledge(true);
+ BOOST_CHECK_EQUAL(h.at(i++), "consume(lvq, 1)");
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(lvq, 1, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "accept(lvq, 1, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(lvq, 1, b)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_CASE(testFanout) {
+ DummyClusterFixture f;
+ vector<string>& h = f.dc->history;
+
+ Receiver r1 = f.s.createReceiver("amq.fanout;{link:{name:r1}}");
+ Receiver r2 = f.s.createReceiver("amq.fanout;{link:{name:r2}}");
+ Sender sender = f.s.createSender("amq.fanout");
+ r1.setCapacity(0); // Don't receive immediately.
+ r2.setCapacity(0);
+ h.clear();
+ int i = 0;
+
+ // Send message
+ sender.send(Message("a"));
+ f.s.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+ BOOST_CHECK_REGEX("enqueue\\(amq.fanout_r[12], 1, a\\)", h.at(i++));
+ BOOST_CHECK_REGEX("enqueue\\(amq.fanout_r[12], 1, a\\)", h.at(i++));
+ BOOST_CHECK(h.at(i-1) != h.at(i-2));
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // Receive messages
+ Message m1 = r1.fetch(Duration::SECOND);
+ f.s.acknowledge(m1, true);
+ Message m2 = r2.fetch(Duration::SECOND);
+ f.s.acknowledge(m2, true);
+
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r1, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "accept(amq.fanout_r1, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r1, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(amq.fanout_r2, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "accept(amq.fanout_r2, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(amq.fanout_r2, 1, a)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_CASE(testRingQueue) {
+ DummyClusterFixture f;
+ vector<string>& h = f.dc->history;
+
+ // FIXME aconway 2010-10-15: QPID-2908 ring queue address string is not working,
+ // so we can't do this:
+ // Sender sender = f.s.createSender("ring;{create:always,node:{x-declare:{arguments:{qpid.max_size:3,qpid.policy_type:ring}}}}");
+ // Must use old API to declare ring queue:
+ qpid::client::Connection c;
+ f.open(c);
+ qpid::client::Session s = c.newSession();
+ qpid::framing::FieldTable args;
+ args.setInt("qpid.max_size", 3);
+ args.setString("qpid.policy_type","ring");
+ s.queueDeclare(qpid::client::arg::queue="ring", qpid::client::arg::arguments=args);
+ c.close();
+ Sender sender = f.s.createSender("ring");
+
+ int i = 0;
+ // Send message
+ sender.send(Message("a"));
+ sender.send(Message("b"));
+ sender.send(Message("c"));
+ sender.send(Message("d"));
+ f.s.sync();
+
+ BOOST_CHECK_EQUAL(h.at(i++), "createq(ring)");
+
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 2, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
+
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(c)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 3, c)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(c)");
+
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(d)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(ring, 4, d)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(d)");
+
+ Receiver receiver = f.s.createReceiver("ring");
+ BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b");
+ BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "c");
+ BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "d");
+ f.s.acknowledge(true);
+
+ BOOST_CHECK_EQUAL(h.at(i++), "consume(ring, 1)");
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 2, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 3, c)");
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(ring, 4, d)");
+ BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 2, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 2, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 3, c)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 3, c)");
+ BOOST_CHECK_EQUAL(h.at(i++), "accept(ring, 4, d)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(ring, 4, d)");
+
+ BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_CASE(testTransactions) {
+ DummyClusterFixture f;
+ vector<string>& h = f.dc->history;
+ Session ts = f.c.createTransactionalSession();
+ Sender sender = ts.createSender("q;{create:always,delete:always}");
+ int i = 0;
+ BOOST_CHECK_EQUAL(h.at(i++), "createq(q)"); // Note: at() does bounds checking.
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ sender.send(Message("a"));
+ sender.send(Message("b"));
+ ts.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routing(b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "routed(b)");
+ BOOST_CHECK_EQUAL(h.size(), i); // Not replicated till commit
+ ts.commit();
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "enqueue(q, 2, b)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+
+ // FIXME aconway 2010-10-18: As things stand the cluster is not
+ // compatible with transactions
+ // - enqueues occur after routing is complete.
+ // - no transaction context associated with messages in the Cluster interface.
+ // - no call to Cluster::accept in Queue::dequeueCommitted
+
+ Receiver receiver = ts.createReceiver("q");
+ BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "a");
+ BOOST_CHECK_EQUAL(receiver.fetch().getContent(), "b");
+ ts.acknowledge();
+ ts.sync();
+ BOOST_CHECK_EQUAL(h.at(i++), "consume(q, 1)");
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "acquire(q, 2, b)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+ ts.commit();
+ ts.sync();
+ // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 1, a)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 1, a)");
+ // BOOST_CHECK_EQUAL(h.at(i++), "accept(q, 2, b)");
+ BOOST_CHECK_EQUAL(h.at(i++), "dequeue(q, 2, b)");
+ BOOST_CHECK_EQUAL(h.size(), i);
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+}} // namespace qpid::tests
+
Propchange: qpid/trunk/qpid/cpp/src/tests/BrokerClusterCalls.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/cpp/src/tests/BrokerClusterCalls.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Mon Oct 18 19:36:13 2010
@@ -123,7 +123,8 @@ unit_test_SOURCES= unit_test.cpp unit_te
Variant.cpp \
Address.cpp \
ClientMessage.cpp \
- Qmf2.cpp
+ Qmf2.cpp \
+ BrokerClusterCalls.cpp
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
Modified: qpid/trunk/qpid/cpp/src/tests/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster.mk?rev=1023966&r1=1023965&r2=1023966&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster.mk Mon Oct 18 19:36:13 2010
@@ -77,7 +77,7 @@ cluster_test_SOURCES = \
PartialFailure.cpp \
ClusterFailover.cpp
-cluster_test_LDADD=$(lib_client) $(lib_broker) ../cluster.la -lboost_unit_test_framework
+cluster_test_LDADD=$(lib_client) $(lib_broker) $(lib_messaging) ../cluster.la -lboost_unit_test_framework -lboost_regex
qpidtest_SCRIPTS += run_cluster_tests cluster_tests.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org