You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/01/20 14:30:09 UTC
svn commit: r736018 - in /qpid/trunk/qpid: cpp/rubygen/framing.0-10/
cpp/src/qpid/amqp_0_10/ cpp/src/qpid/broker/ cpp/src/qpid/framing/
cpp/src/qpid/replication/ cpp/src/tests/ python/commands/ specs/
Author: gsim
Date: Tue Jan 20 05:30:08 2009
New Revision: 736018
URL: http://svn.apache.org/viewvc?rev=736018&view=rev
Log:
QPID-1567: added 'exactly-once' guarantee to asynchronous replication of queue state
* altered replication protocol to detect and eliminate duplicates
* added support for acknowledged transfer over inter-broker bridges
* added option to qpid-route to control this
Added:
qpid/trunk/qpid/cpp/src/tests/ProxyTest.cpp (with props)
qpid/trunk/qpid/cpp/src/tests/reliable_replication_test (with props)
Modified:
qpid/trunk/qpid/cpp/rubygen/framing.0-10/Proxy.rb
qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h
qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.cpp
qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.h
qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h
qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h
qpid/trunk/qpid/cpp/src/qpid/replication/constants.h
qpid/trunk/qpid/cpp/src/tests/Makefile.am
qpid/trunk/qpid/cpp/src/tests/QueueEvents.cpp
qpid/trunk/qpid/cpp/src/tests/federation.py
qpid/trunk/qpid/cpp/src/tests/replication_test
qpid/trunk/qpid/python/commands/qpid-route
qpid/trunk/qpid/specs/management-schema.xml
Modified: qpid/trunk/qpid/cpp/rubygen/framing.0-10/Proxy.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/rubygen/framing.0-10/Proxy.rb?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/rubygen/framing.0-10/Proxy.rb (original)
+++ qpid/trunk/qpid/cpp/rubygen/framing.0-10/Proxy.rb Tue Jan 20 05:30:08 2009
@@ -37,7 +37,7 @@
def inner_class_decl(c)
cname=c.name.caps
- cpp_class(cname, "Proxy") {
+ cpp_class(cname, "public Proxy") {
gen <<EOS
public:
#{cname}(FrameHandler& f) : Proxy(f) {}
@@ -92,7 +92,7 @@
genl
namespace("qpid::framing") {
genl "#{@classname}::#{@classname}(FrameHandler& f) :"
- gen " Proxy(f)"
+ gen " Proxy(f)"
@amqp.classes.each { |c| gen ",\n "+proxy_member(c)+"(f)" }
genl "{}\n"
@amqp.classes.each { |c| inner_class_defn(c) }
Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Tue Jan 20 05:30:08 2009
@@ -299,6 +299,12 @@
}
}
+void SessionHandler::markReadyToSend() {
+ if (!sendReady) {
+ sendReady = true;
+ }
+}
+
void SessionHandler::sendTimeout(uint32_t t) {
checkAttached();
peer.requestTimeout(t);
Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h Tue Jan 20 05:30:08 2009
@@ -60,6 +60,7 @@
void sendAttach(bool force);
void sendTimeout(uint32_t t);
void sendFlush();
+ void markReadyToSend();//TODO: only needed for inter-broker bridge; cleanup
/** True if the handler is ready to send and receive */
bool ready() const;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Tue Jan 20 05:30:08 2009
@@ -68,7 +68,7 @@
mgmtObject = new _qmf::Bridge
(agent, this, link, id, args.i_durable, args.i_src, args.i_dest,
args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
- args.i_tag, args.i_excludes, args.i_dynamic);
+ args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
if (!args.i_durable)
agent->addObject(mgmtObject);
}
@@ -81,6 +81,8 @@
void Bridge::create(ConnectionState& c)
{
+ FieldTable options;
+ if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
connState = &c;
if (args.i_srcIsLocal) {
if (args.i_dynamic)
@@ -103,7 +105,7 @@
session->commandPoint(0,0);
if (args.i_srcIsQueue) {
- peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options);
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
} else {
@@ -194,9 +196,10 @@
buffer.getShortString(id);
buffer.getShortString(excludes);
bool dynamic(buffer.getOctet());
+ uint16_t sync = buffer.getShort();
return links.declare(host, port, durable, src, dest, key,
- is_queue, is_local, id, excludes, dynamic).first;
+ is_queue, is_local, id, excludes, dynamic, sync).first;
}
void Bridge::encode(Buffer& buffer) const
@@ -213,6 +216,7 @@
buffer.putShortString(args.i_tag);
buffer.putShortString(args.i_excludes);
buffer.putOctet(args.i_dynamic ? 1 : 0);
+ buffer.putShort(args.i_sync);
}
uint32_t Bridge::encodedSize() const
@@ -228,7 +232,8 @@
+ 1 // srcIsLocal
+ args.i_tag.size() + 1
+ args.i_excludes.size() + 1
- + 1; // dynamic
+ + 1 // dynamic
+ + 2; // sync
}
management::ManagementObject* Bridge::GetManagementObject (void) const
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h Tue Jan 20 05:30:08 2009
@@ -43,7 +43,7 @@
class DeliveryAdapter
{
public:
- virtual void deliver(DeliveryRecord&) = 0;
+ virtual void deliver(DeliveryRecord&, bool sync) = 0;
virtual ~DeliveryAdapter(){}
};
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Tue Jan 20 05:30:08 2009
@@ -81,7 +81,7 @@
requeue();
}else{
msg.payload->redeliver();//mark as redelivered
- session->deliver(*this);
+ session->deliver(*this, false);
}
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Tue Jan 20 05:30:08 2009
@@ -125,17 +125,19 @@
void Link::established ()
{
- Mutex::ScopedLock mutex(lock);
stringstream addr;
addr << host << ":" << port;
QPID_LOG (info, "Inter-broker link established to " << addr.str());
agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
- setStateLH(STATE_OPERATIONAL);
- currentInterval = 1;
- visitCount = 0;
- if (closing)
- destroy();
+ {
+ Mutex::ScopedLock mutex(lock);
+ setStateLH(STATE_OPERATIONAL);
+ currentInterval = 1;
+ visitCount = 0;
+ if (closing)
+ destroy();
+ }
}
void Link::closed (int, std::string text)
@@ -170,9 +172,9 @@
void Link::destroy ()
{
+ Bridges toDelete;
{
Mutex::ScopedLock mutex(lock);
- Bridges toDelete;
AclModule* acl = getBroker()->getAcl();
std::string userID = getUsername() + "@" + getBroker()->getOptions().realm;
@@ -195,12 +197,11 @@
for (Bridges::iterator i = created.begin(); i != created.end(); i++)
toDelete.push_back(*i);
created.clear();
-
- // Now delete all bridges on this link.
- for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
- (*i)->destroy();
- toDelete.clear();
}
+ // Now delete all bridges on this link (don't hold the lock for this).
+ for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
+ (*i)->destroy();
+ toDelete.clear();
links->destroy (host, port);
}
@@ -386,7 +387,7 @@
links->declare (host, port, iargs.i_durable, iargs.i_src,
iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
- iargs.i_dynamic);
+ iargs.i_dynamic, iargs.i_sync);
if (result.second && iargs.i_durable)
store->create(*result.first);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Tue Jan 20 05:30:08 2009
@@ -92,7 +92,8 @@
bool isLocal,
std::string& tag,
std::string& excludes,
- bool dynamic)
+ bool dynamic,
+ uint16_t sync)
{
Mutex::ScopedLock locker(lock);
stringstream keystream;
@@ -121,6 +122,7 @@
args.i_tag = tag;
args.i_excludes = excludes;
args.i_dynamic = dynamic;
+ args.i_sync = sync;
bridge = Bridge::shared_ptr
(new Bridge (l->second.get(), l->second->nextChannel(),
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h Tue Jan 20 05:30:08 2009
@@ -85,7 +85,8 @@
bool isLocal,
std::string& id,
std::string& excludes,
- bool dynamic);
+ bool dynamic,
+ uint16_t sync);
void destroy(const std::string& host, const uint16_t port);
void destroy(const std::string& host,
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Jan 20 05:30:08 2009
@@ -256,7 +256,9 @@
arguments(_arguments),
msgCredit(0),
byteCredit(0),
- notifyEnabled(true) {}
+ notifyEnabled(true),
+ syncFrequency(_arguments.getAsInt("qpid.sync_frequency")),
+ deliveryCount(0) {}
OwnershipToken* SemanticState::ConsumerImpl::getSession()
{
@@ -267,7 +269,9 @@
{
allocateCredit(msg.payload);
DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing);
- parent->deliver(record);
+ bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
+ if (sync) deliveryCount = 0;//reset
+ parent->deliver(record, sync);
if (!ackExpected) record.setEnded();//allows message to be released now its been delivered
if (windowing || ackExpected || !acquire) {
parent->record(record);
@@ -449,9 +453,9 @@
}
}
-void SemanticState::deliver(DeliveryRecord& msg)
+void SemanticState::deliver(DeliveryRecord& msg, bool sync)
{
- return deliveryAdapter.deliver(msg);
+ return deliveryAdapter.deliver(msg, sync);
}
SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Tue Jan 20 05:30:08 2009
@@ -77,6 +77,8 @@
uint32_t msgCredit;
uint32_t byteCredit;
bool notifyEnabled;
+ const int syncFrequency;
+ int deliveryCount;
bool checkCredit(boost::intrusive_ptr<Message>& msg);
void allocateCredit(boost::intrusive_ptr<Message>& msg);
@@ -197,7 +199,7 @@
void suspendDtx(const std::string& xid);
void resumeDtx(const std::string& xid);
void recover(bool requeue);
- void deliver(DeliveryRecord& message);
+ void deliver(DeliveryRecord& message, bool sync);
void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired);
void release(DeliveryId first, DeliveryId last, bool setRedelivered);
void reject(DeliveryId first, DeliveryId last);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Tue Jan 20 05:30:08 2009
@@ -89,13 +89,14 @@
// in the bridge.
//
void SessionHandler::attached(const std::string& name) {
- if (session.get())
+ if (session.get()) {
checkName(name);
- else {
+ } else {
SessionId id(connection.getUserId(), name);
SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig();
session.reset(new SessionState(connection.getBroker(), *this, id, config));
-}
+ markReadyToSend();
+ }
}
}} // namespace qpid::broker
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Jan 20 05:30:08 2009
@@ -175,7 +175,7 @@
}
if (method->isSync()) {
incomplete.process(enqueuedOp, true);
- sendCompletion();
+ sendAcceptAndCompletion();
}
}
@@ -207,18 +207,27 @@
//hold up execution until async enqueue is complete
if (msg->getFrames().getMethod()->isSync()) {
incomplete.process(enqueuedOp, true);
- sendCompletion();
+ sendAcceptAndCompletion();
} else {
incomplete.process(enqueuedOp, false);
}
}
}
+void SessionState::sendAcceptAndCompletion()
+{
+ if (!accepted.empty()) {
+ getProxy().getMessage().accept(accepted);
+ accepted.clear();
+ }
+ sendCompletion();
+}
+
void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
{
receiverCompleted(msg->getCommandId());
- if (msg->requiresAccept())
- getProxy().getMessage().accept(SequenceSet(msg->getCommandId()));
+ if (msg->requiresAccept())
+ accepted.add(msg->getCommandId());
}
void SessionState::handleIn(AMQFrame& frame) {
@@ -240,16 +249,23 @@
handler->out(frame);
}
-void SessionState::deliver(DeliveryRecord& msg)
+void SessionState::deliver(DeliveryRecord& msg, bool sync)
{
uint32_t maxFrameSize = getConnection().getFrameMax();
assert(senderGetCommandPoint().offset == 0);
SequenceNumber commandId = senderGetCommandPoint().command;
msg.deliver(getProxy().getHandler(), commandId, maxFrameSize);
assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint.
+ if (sync) {
+ AMQP_ClientProxy::Execution& p(getProxy().getExecution());
+ Proxy::ScopedSync s(p);
+ p.sync();
+ }
}
-void SessionState::sendCompletion() { handler->sendCompletion(); }
+void SessionState::sendCompletion() {
+ handler->sendCompletion();
+}
void SessionState::senderCompleted(const SequenceSet& commands) {
qpid::SessionState::senderCompleted(commands);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Jan 20 05:30:08 2009
@@ -93,12 +93,12 @@
void sendCompletion();
//delivery adapter methods:
- void deliver(DeliveryRecord&);
+ void deliver(DeliveryRecord&, bool sync);
// Manageable entry points
management::ManagementObject* GetManagementObject (void) const;
management::Manageable::status_t
- ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
+ ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
void readyToSend();
@@ -119,6 +119,8 @@
void handleInLast(framing::AMQFrame& frame);
void handleOutLast(framing::AMQFrame& frame);
+ void sendAcceptAndCompletion();
+
Broker& broker;
SessionHandler* handler;
sys::AbsTime expiry; // Used by SessionManager.
@@ -128,7 +130,8 @@
IncompleteMessageList incomplete;
IncompleteMessageList::CompletionListener enqueuedOp;
qmf::org::apache::qpid::broker::Session* mgmtObject;
-
+ qpid::framing::SequenceSet accepted;
+
friend class SessionManager;
};
Modified: qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.cpp Tue Jan 20 05:30:08 2009
@@ -18,15 +18,21 @@
#include "Proxy.h"
#include "AMQFrame.h"
+#include "AMQMethodBody.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace framing {
-Proxy::Proxy(FrameHandler& h) : out(&h) {}
+Proxy::Proxy(FrameHandler& h) : out(&h), sync(false) {}
Proxy::~Proxy() {}
void Proxy::send(const AMQBody& b) {
+ if (sync) {
+ const AMQMethodBody* m = dynamic_cast<const AMQMethodBody*>(&b);
+ if (m) m->setSync(sync);
+ }
AMQFrame f(b);
out->handle(f);
}
@@ -39,4 +45,7 @@
void Proxy::setHandler(FrameHandler& f) { out=&f; }
+Proxy::ScopedSync::ScopedSync(Proxy& p) : proxy(p) { proxy.sync = true; }
+Proxy::ScopedSync::~ScopedSync() { proxy.sync = false; }
+
}} // namespace qpid::framing
Modified: qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.h?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.h Tue Jan 20 05:30:08 2009
@@ -33,6 +33,14 @@
class Proxy
{
public:
+ class ScopedSync
+ {
+ Proxy& proxy;
+ public:
+ ScopedSync(Proxy& p);
+ ~ScopedSync();
+ };
+
Proxy(FrameHandler& h);
virtual ~Proxy();
@@ -42,9 +50,9 @@
FrameHandler& getHandler();
void setHandler(FrameHandler&);
-
private:
FrameHandler* out;
+ bool sync;
};
}} // namespace qpid::framing
Modified: qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp Tue Jan 20 05:30:08 2009
@@ -23,6 +23,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/QueueEvents.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
@@ -35,27 +36,14 @@
void ReplicatingEventListener::handle(QueueEvents::Event event)
{
- //create event message and enqueue it on replication queue
- FieldTable headers;
- boost::intrusive_ptr<Message> message;
switch (event.type) {
case QueueEvents::ENQUEUE:
- headers.setString(REPLICATION_EVENT_TYPE, ENQUEUE);
- headers.setString(REPLICATION_TARGET_QUEUE, event.msg.queue->getName());
- message = createEventMessage(headers);
- queue->deliver(message);
- //if its an enqueue, enqueue the message itself on the
- //replication queue also:
- queue->deliver(event.msg.payload);
- QPID_LOG(debug, "Queued 'enqueue' event on " << event.msg.queue->getName() << " for replication");
+ deliverEnqueueMessage(event.msg);
+ QPID_LOG(debug, "Queuing 'enqueue' event on " << event.msg.queue->getName() << " for replication");
break;
case QueueEvents::DEQUEUE:
- headers.setString(REPLICATION_EVENT_TYPE, DEQUEUE);
- headers.setString(REPLICATION_TARGET_QUEUE, event.msg.queue->getName());
- headers.setInt(DEQUEUED_MESSAGE_POSITION, event.msg.position);
- message = createEventMessage(headers);
- queue->deliver(message);
- QPID_LOG(debug, "Queued 'dequeue' event from " << event.msg.queue->getName() << " for replication, (from position "
+ deliverDequeueMessage(event.msg);
+ QPID_LOG(debug, "Queuing 'dequeue' event from " << event.msg.queue->getName() << " for replication, (from position "
<< event.msg.position << ")");
break;
}
@@ -65,20 +53,64 @@
const std::string EMPTY;
}
-boost::intrusive_ptr<Message> ReplicatingEventListener::createEventMessage(const FieldTable& headers)
+void ReplicatingEventListener::deliverDequeueMessage(const QueuedMessage& dequeued)
+{
+ FieldTable headers;
+ headers.setString(REPLICATION_TARGET_QUEUE, dequeued.queue->getName());
+ headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence);
+ headers.setInt(REPLICATION_EVENT_TYPE, DEQUEUE);
+ headers.setInt(DEQUEUED_MESSAGE_POSITION, dequeued.position);
+ boost::intrusive_ptr<Message> msg(createMessage(headers));
+ queue->deliver(msg);
+}
+
+void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueued)
+{
+ boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload));
+ FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders();
+ headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
+ headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence);
+ headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE);
+ queue->deliver(msg);
+}
+
+boost::intrusive_ptr<Message> ReplicatingEventListener::createMessage(const FieldTable& headers)
+{
+ boost::intrusive_ptr<Message> msg(new Message());
+ AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), EMPTY, 0, 0));
+ AMQFrame header(in_place<AMQHeaderBody>());
+ header.setBof(false);
+ header.setEof(true);
+ header.setBos(true);
+ header.setEos(true);
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setApplicationHeaders(headers);
+ return msg;
+}
+
+struct AppendingHandler : FrameHandler
+{
+ boost::intrusive_ptr<Message> msg;
+
+ AppendingHandler(boost::intrusive_ptr<Message> m) : msg(m) {}
+
+ void handle(AMQFrame& f)
+ {
+ msg->getFrames().append(f);
+ }
+};
+
+boost::intrusive_ptr<Message> ReplicatingEventListener::cloneMessage(Queue& queue, boost::intrusive_ptr<Message> original)
{
- boost::intrusive_ptr<Message> msg(new Message());
- AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), EMPTY, 0, 0));
- AMQFrame header(in_place<AMQHeaderBody>());
- header.setBof(false);
- header.setEof(true);
- header.setBos(true);
- header.setEos(true);
- msg->getFrames().append(method);
- msg->getFrames().append(header);
- MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
- props->setApplicationHeaders(headers);
- return msg;
+ boost::intrusive_ptr<Message> copy(new Message());
+ AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), EMPTY, 0, 0));
+ AppendingHandler handler(copy);
+ handler.handle(method);
+ original->sendHeader(handler, std::numeric_limits<int16_t>::max());
+ original->sendContent(queue, handler, std::numeric_limits<int16_t>::max());
+ return copy;
}
Options* ReplicatingEventListener::getOptions()
Modified: qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h Tue Jan 20 05:30:08 2009
@@ -28,6 +28,7 @@
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueEvents.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/SequenceNumber.h"
namespace qpid {
namespace replication {
@@ -57,8 +58,14 @@
PluginOptions options;
qpid::broker::Queue::shared_ptr queue;
+ qpid::framing::SequenceNumber sequence;
- boost::intrusive_ptr<qpid::broker::Message> createEventMessage(const qpid::framing::FieldTable& headers);
+ void deliverDequeueMessage(const qpid::broker::QueuedMessage& enqueued);
+ void deliverEnqueueMessage(const qpid::broker::QueuedMessage& enqueued);
+
+ boost::intrusive_ptr<qpid::broker::Message> createMessage(const qpid::framing::FieldTable& headers);
+ boost::intrusive_ptr<qpid::broker::Message> cloneMessage(qpid::broker::Queue& queue,
+ boost::intrusive_ptr<qpid::broker::Message> original);
};
}} // namespace qpid::replication
Modified: qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp Tue Jan 20 05:30:08 2009
@@ -38,46 +38,75 @@
const FieldTable& args,
QueueRegistry& qr,
Manageable* parent)
- : Exchange(name, durable, args, parent), queues(qr), expectingEnqueue(false) {}
+ : Exchange(name, durable, args, parent), queues(qr), init(false) {}
std::string ReplicationExchange::getType() const { return typeName; }
void ReplicationExchange::route(Deliverable& msg, const std::string& /*routingKey*/, const FieldTable* args)
{
if (args) {
- std::string eventType = args->getAsString(REPLICATION_EVENT_TYPE);
- if (eventType == ENQUEUE) {
- expectingEnqueue = true;
- targetQueue = args->getAsString(REPLICATION_TARGET_QUEUE);
- QPID_LOG(debug, "Recorded replicated 'enqueue' event for " << targetQueue);
- return;
- } else if (eventType == DEQUEUE) {
- std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE);
- Queue::shared_ptr queue = queues.find(queueName);
- SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION));
-
- QueuedMessage dequeued;
- if (queue->acquireMessageAt(position, dequeued)) {
- queue->dequeue(0, dequeued);
- QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position);
- } else {
- QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName);
+ int eventType = args->getAsInt(REPLICATION_EVENT_TYPE);
+ if (eventType) {
+ if (isDuplicate(args)) return;
+ switch (eventType) {
+ case ENQUEUE:
+ handleEnqueueEvent(args, msg);
+ return;
+ case DEQUEUE:
+ handleDequeueEvent(args);
+ return;
+ default:
+ throw IllegalArgumentException(QPID_MSG("Illegal value for " << REPLICATION_EVENT_TYPE << ": " << eventType));
}
-
- return;
- } else if (!eventType.empty()) {
- throw IllegalArgumentException(QPID_MSG("Illegal value for " << REPLICATION_EVENT_TYPE << ": " << eventType));
}
+ } else {
+ QPID_LOG(warning, "Dropping unexpected message with no headers");
+ }
+}
+
+void ReplicationExchange::handleEnqueueEvent(const FieldTable* args, Deliverable& msg)
+{
+ std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE);
+ Queue::shared_ptr queue = queues.find(queueName);
+ FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders();
+ headers.erase(REPLICATION_TARGET_QUEUE);
+ headers.erase(REPLICATION_EVENT_SEQNO);
+ headers.erase(REPLICATION_EVENT_TYPE);
+ msg.deliverTo(queue);
+ QPID_LOG(debug, "Enqueued replicated message onto " << queue);
+}
+
+void ReplicationExchange::handleDequeueEvent(const FieldTable* args)
+{
+ std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE);
+ Queue::shared_ptr queue = queues.find(queueName);
+ SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION));
+
+ QueuedMessage dequeued;
+ if (queue->acquireMessageAt(position, dequeued)) {
+ queue->dequeue(0, dequeued);
+ QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position);
+ } else {
+ QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName);
}
- //if we get here assume its not an event message, assume its an enqueue
- if (expectingEnqueue) {
- Queue::shared_ptr queue = queues.find(targetQueue);
- msg.deliverTo(queue);
- expectingEnqueue = false;
- targetQueue.clear();
- QPID_LOG(debug, "Eenqueued replicated message onto " << targetQueue);
+}
+
+bool ReplicationExchange::isDuplicate(const FieldTable* args)
+{
+ SequenceNumber seqno(args->getAsInt(REPLICATION_EVENT_SEQNO));
+ if (!init) {
+ init = true;
+ sequence = seqno;
+ return false;
+ } else if (seqno > sequence) {
+ if (seqno - sequence > 1) {
+ QPID_LOG(error, "Gap in replication event sequence between: " << sequence << " and " << seqno);
+ }
+ sequence = seqno;
+ return false;
} else {
- QPID_LOG(warning, "Dropping unexpected message");
+ QPID_LOG(info, "Duplicate detected: seqno=" << seqno << " (last seqno=" << sequence << ")");
+ return true;
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h Tue Jan 20 05:30:08 2009
@@ -22,6 +22,7 @@
*
*/
#include "qpid/broker/Exchange.h"
+#include "qpid/framing/SequenceNumber.h"
namespace qpid {
namespace replication {
@@ -51,8 +52,12 @@
bool isBound(qpid::broker::Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args);
private:
qpid::broker::QueueRegistry& queues;
- bool expectingEnqueue;
- std::string targetQueue;
+ qpid::framing::SequenceNumber sequence;
+ bool init;
+
+ bool isDuplicate(const qpid::framing::FieldTable* args);
+ void handleEnqueueEvent(const qpid::framing::FieldTable* args, qpid::broker::Deliverable& msg);
+ void handleDequeueEvent(const qpid::framing::FieldTable* args);
};
}} // namespace qpid::replication
Modified: qpid/trunk/qpid/cpp/src/qpid/replication/constants.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/replication/constants.h?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/replication/constants.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/replication/constants.h Tue Jan 20 05:30:08 2009
@@ -22,10 +22,12 @@
namespace replication {
namespace constants {
-const std::string REPLICATION_EVENT_TYPE("qpid.replication_event_type");
-const std::string ENQUEUE("enqueue");
-const std::string DEQUEUE("dequeue");
-const std::string REPLICATION_TARGET_QUEUE("qpid.replication_target_queue");
-const std::string DEQUEUED_MESSAGE_POSITION("qpid.dequeued_message_position");
+const std::string REPLICATION_EVENT_TYPE("qpid.replication.type");
+const std::string REPLICATION_EVENT_SEQNO("qpid.replication.seqno");
+const std::string REPLICATION_TARGET_QUEUE("qpid.replication.target_queue");
+const std::string DEQUEUED_MESSAGE_POSITION("qpid.replication.message");
+
+const int ENQUEUE(1);
+const int DEQUEUE(2);
}}}
Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Tue Jan 20 05:30:08 2009
@@ -89,7 +89,8 @@
ManagementTest.cpp \
MessageReplayTracker.cpp \
ConsoleTest.cpp \
- QueueEvents.cpp
+ QueueEvents.cpp \
+ ProxyTest.cpp
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
@@ -249,7 +250,7 @@
# Longer running stability tests, not run by default check: target.
# Not run under valgrind, too slow
-LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest run_failover_soak
+LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest run_failover_soak reliable_replication_test
EXTRA_DIST+=$(LONG_TESTS) run_perftest
check-long:
$(MAKE) check TESTS="start_broker $(LONG_TESTS) stop_broker" VALGRIND=
Added: qpid/trunk/qpid/cpp/src/tests/ProxyTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ProxyTest.cpp?rev=736018&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ProxyTest.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/ProxyTest.cpp Tue Jan 20 05:30:08 2009
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/ExecutionSyncBody.h"
+#include "qpid/framing/Proxy.h"
+#include <alloca.h>
+
+#include "unit_test.h"
+
+using namespace qpid::framing;
+
+QPID_AUTO_TEST_SUITE(ProxyTestSuite)
+
+
+QPID_AUTO_TEST_CASE(testScopedSync)
+{
+ struct DummyHandler : FrameHandler
+ {
+ void handle(AMQFrame& f) {
+ AMQMethodBody* m = f.getMethod();
+ BOOST_CHECK(m);
+ BOOST_CHECK(m->isA<ExecutionSyncBody>());
+ BOOST_CHECK(m->isSync());
+ }
+ };
+ DummyHandler f;
+ Proxy p(f);
+ Proxy::ScopedSync s(p);
+ p.send(ExecutionSyncBody(p.getVersion()));
+}
+
+QPID_AUTO_TEST_SUITE_END()
Propchange: qpid/trunk/qpid/cpp/src/tests/ProxyTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/cpp/src/tests/ProxyTest.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/trunk/qpid/cpp/src/tests/QueueEvents.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueEvents.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueEvents.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueEvents.cpp Tue Jan 20 05:30:08 2009
@@ -168,7 +168,7 @@
BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
}
fixture.connection.close();
- fixture.shutdownBroker();
+ fixture.broker->getQueueEvents().shutdown();
//check listener was notified of all events, and in correct order
SequenceNumber enqueueId(1);
@@ -215,7 +215,7 @@
BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
}
fixture.connection.close();
- fixture.shutdownBroker();
+ fixture.broker->getQueueEvents().shutdown();
//check listener was notified of all events, and in correct order
SequenceNumber enqueueId(1);
Modified: qpid/trunk/qpid/cpp/src/tests/federation.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/federation.py?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/federation.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/federation.py Tue Jan 20 05:30:08 2009
@@ -64,7 +64,7 @@
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False)
+ result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -88,7 +88,7 @@
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False)
+ result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -135,7 +135,7 @@
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False)
+ result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -195,7 +195,7 @@
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False)
+ result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -244,8 +244,8 @@
l_link = self.qmf.getObjects(_class="link", _broker=l_broker)[0]
r_link = self.qmf.getObjects(_class="link", _broker=r_broker)[0]
- l_res = l_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False)
- r_res = r_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False)
+ l_res = l_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0)
+ r_res = r_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0)
self.assertEqual(l_res.status, 0)
self.assertEqual(r_res.status, 0)
@@ -296,7 +296,7 @@
link = qmf.getObjects(_class="link")[0]
result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "my-bridge-id",
- "exclude-me,also-exclude-me", False, False, False)
+ "exclude-me,also-exclude-me", False, False, False, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
@@ -354,7 +354,7 @@
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True)
+ result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
sleep(5)
@@ -401,7 +401,7 @@
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True)
+ result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
sleep(5)
@@ -448,7 +448,7 @@
self.assertEqual(result.status, 0)
link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True)
+ result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0)
self.assertEqual(result.status, 0)
bridge = qmf.getObjects(_class="bridge")[0]
sleep(5)
@@ -478,8 +478,7 @@
sleep(3)
self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
self.assertEqual(len(qmf.getObjects(_class="link")), 0)
-
-
+
def getProperty(self, msg, name):
for h in msg.headers:
if hasattr(h, name): return getattr(h, name)
Added: qpid/trunk/qpid/cpp/src/tests/reliable_replication_test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/reliable_replication_test?rev=736018&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/reliable_replication_test (added)
+++ qpid/trunk/qpid/cpp/src/tests/reliable_replication_test Tue Jan 20 05:30:08 2009
@@ -0,0 +1,98 @@
+#!/bin/sh
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Test reliability of the replication feature in the face of link
+# failures:
+MY_DIR=`dirname \`which $0\``
+PYTHON_DIR=${MY_DIR}/../../../python
+
+trap stop_brokers EXIT
+
+stop_brokers() {
+ if [[ $BROKER_A ]] ; then
+ ../qpidd -q --port $BROKER_A
+ unset BROKER_A
+ fi
+ if [[ $BROKER_B ]] ; then
+ ../qpidd -q --port $BROKER_B
+ unset BROKER_B
+ fi
+}
+
+setup() {
+ rm -f replication-source.log replication-dest.log
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true --log-enable trace+ --log-to-file replication-source.log --log-to-stderr 0 > qpidd.port
+ BROKER_A=`cat qpidd.port`
+
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 > qpidd.port
+ BROKER_B=`cat qpidd.port`
+
+ #../qpidd --port 5555 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true --log-enable trace+ --log-to-file replication-source.log --log-to-stderr 0 &
+ #BROKER_A=5555
+
+ #../qpidd --port 6666 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 &
+ #BROKER_B=6666
+ echo "Testing replication from port $BROKER_A to port $BROKER_B"
+ export PYTHONPATH=$PYTHON_DIR
+
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication
+ $PYTHON_DIR/commands/qpid-route --ack 500 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
+
+ #create test queue (only replicate enqueues for this test):
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-a --generate-queue-events 1
+ $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-a
+}
+
+send() {
+ ./sender --port $BROKER_A --routing-key queue-a --send-eos 1 < replicated.expected
+}
+
+receive() {
+ rm -f replicated.actual
+ ./receiver --port $BROKER_B --queue queue-a > replicated.actual
+}
+
+bounce_link() {
+ echo "Destroying link..."
+ $PYTHON_DIR/commands/qpid-route link del "localhost:$BROKER_B" "localhost:$BROKER_A"
+ echo "Link destroyed; recreating route..."
+ sleep 2
+ $PYTHON_DIR/commands/qpid-route --ack 500 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
+ echo "Route re-established"
+}
+
+if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e ../.libs/replication_exchange.so ; then
+ setup
+ for i in `seq 1 100000`; do echo Message $i; done > replicated.expected
+ send &
+ receive &
+ for i in `seq 1 5`; do sleep 10; bounce_link; done;
+ wait
+ #check that received list is identical to sent list
+ diff replicated.actual replicated.expected || FAIL=1
+ if [[ $FAIL ]]; then
+ echo reliable replication test failed: expectations not met!
+ else
+ echo replication reliable in the face of link failures
+ rm -f replication.actual replication.expected replication-source.log replication-dest.log
+ fi
+fi
+
Propchange: qpid/trunk/qpid/cpp/src/tests/reliable_replication_test
------------------------------------------------------------------------------
svn:executable = *
Modified: qpid/trunk/qpid/cpp/src/tests/replication_test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/replication_test?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/replication_test (original)
+++ qpid/trunk/qpid/cpp/src/tests/replication_test Tue Jan 20 05:30:08 2009
@@ -19,7 +19,7 @@
# under the License.
#
-# Run the federation tests.
+# Run a test of the replication feature
MY_DIR=`dirname \`which $0\``
PYTHON_DIR=${MY_DIR}/../../../python
@@ -37,15 +37,18 @@
}
if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e ../.libs/replication_exchange.so ; then
- ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true > qpidd.port
+ rm -f queue-*.repl replication-*.log #cleanup from any earlier runs
+
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true --log-enable info+ --log-to-file replication-source.log --log-to-stderr 0 > qpidd.port
BROKER_A=`cat qpidd.port`
- ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so > qpidd.port
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 > qpidd.port
BROKER_B=`cat qpidd.port`
export PYTHONPATH=$PYTHON_DIR
+ echo "Running replication test between localhost:$BROKER_A and localhost:$BROKER_B"
$PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication
- $PYTHON_DIR/commands/qpid-route queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
+ $PYTHON_DIR/commands/qpid-route --ack 5 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
#create test queues
@@ -58,7 +61,6 @@
$PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-c
#publish and consume from test queus on broker A:
- rm -f queue-*.repl
for i in `seq 1 10`; do echo Message $i for A >> queue-a-input.repl; done
for i in `seq 1 20`; do echo Message $i for B >> queue-b-input.repl; done
for i in `seq 1 15`; do echo Message $i for C >> queue-c-input.repl; done
@@ -79,6 +81,9 @@
./receiver --port $BROKER_B --queue queue-a > queue-a-backup.repl
./receiver --port $BROKER_B --queue queue-b > queue-b-backup.repl
./receiver --port $BROKER_B --queue queue-c > queue-c-backup.repl
+
+ stop_brokers
+
tail -5 queue-a-input.repl > queue-a-expected.repl
tail -10 queue-b-input.repl > queue-b-expected.repl
diff queue-a-backup.repl queue-a-expected.repl || FAIL=1
@@ -87,12 +92,12 @@
if [[ $FAIL ]]; then
echo replication test failed: expectations not met!
+ exit 1
else
echo queue state replicated as expected
- rm queue-*.repl
+ rm -f queue-*.repl replication-*.log
fi
- stop_brokers
else
echo "Skipping replication test, plugins not built or python utils not located"
fi
Modified: qpid/trunk/qpid/python/commands/qpid-route
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-route?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/python/commands/qpid-route (original)
+++ qpid/trunk/qpid/python/commands/qpid-route Tue Jan 20 05:30:08 2009
@@ -48,6 +48,7 @@
print " -d [ --durable ] Added configuration shall be durable"
print " -e [ --del-empty-link ] Delete link after deleting last route on the link"
print " -s [ --src-local ] Make connection to source broker (push route)"
+ print " --ack N Acknowledge transfers over the bridge in batches of N"
print " -t <transport> [ --transport <transport>]"
print " Specify transport to use for links, defaults to tcp"
print
@@ -62,6 +63,7 @@
_dellink = False
_srclocal = False
_transport = "tcp"
+_ack = 0
class RouteManager:
def __init__(self, localBroker):
@@ -234,7 +236,7 @@
if _verbose:
print "Creating inter-broker binding..."
- res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, _srclocal, dynamic)
+ res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, _srclocal, dynamic, _ack)
if res.status != 0:
raise Exception(res.text)
if _verbose:
@@ -256,7 +258,7 @@
if _verbose:
print "Creating inter-broker binding..."
- res = link.bridge(_durable, queue, exchange, "", "", "", True, _srclocal, False)
+ res = link.bridge(_durable, queue, exchange, "", "", "", True, _srclocal, False, _ack)
if res.status != 0:
raise Exception(res.text)
if _verbose:
@@ -401,7 +403,7 @@
##
try:
- longOpts = ("verbose", "quiet", "durable", "del-empty-link", "src-local", "transport=")
+ longOpts = ("verbose", "quiet", "durable", "del-empty-link", "src-local", "transport=", "ack=")
(optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "vqdest:", longOpts)
except:
Usage()
@@ -425,6 +427,8 @@
_srclocal = True
if opt[0] == "-t" or opt[0] == "--transport":
_transport = opt[1]
+ if opt[0] == "--ack":
+ _ack = int(opt[1])
nargs = len(cargs)
if nargs < 2:
Modified: qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/specs/management-schema.xml (original)
+++ qpid/trunk/qpid/specs/management-schema.xml Tue Jan 20 05:30:08 2009
@@ -245,6 +245,7 @@
<arg name="srcIsQueue" dir="I" type="bool"/>
<arg name="srcIsLocal" dir="I" type="bool"/>
<arg name="dynamic" dir="I" type="bool"/>
+ <arg name="sync" dir="I" type="uint16"/>
</method>
</class>
@@ -266,6 +267,7 @@
<property name="tag" type="sstr" access="RC"/>
<property name="excludes" type="sstr" access="RC"/>
<property name="dynamic" type="bool" access="RC"/>
+ <property name="sync" type="uint16" access="RC"/>
<method name="close"/>
</class>
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org