You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2009/01/20 14:30:09 UTC

svn commit: r736018 - in /qpid/trunk/qpid: cpp/rubygen/framing.0-10/ cpp/src/qpid/amqp_0_10/ cpp/src/qpid/broker/ cpp/src/qpid/framing/ cpp/src/qpid/replication/ cpp/src/tests/ python/commands/ specs/

Author: gsim
Date: Tue Jan 20 05:30:08 2009
New Revision: 736018

URL: http://svn.apache.org/viewvc?rev=736018&view=rev
Log:
QPID-1567: added 'exactly-once' guarantee to asynchronous replication of queue state

* altered replication protocol to detect and eliminate duplicates
* added support for acknowledged transfer over inter-broker bridges
* added option to qpid-route to control this


Added:
    qpid/trunk/qpid/cpp/src/tests/ProxyTest.cpp   (with props)
    qpid/trunk/qpid/cpp/src/tests/reliable_replication_test   (with props)
Modified:
    qpid/trunk/qpid/cpp/rubygen/framing.0-10/Proxy.rb
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h
    qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.cpp
    qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.h
    qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
    qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h
    qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h
    qpid/trunk/qpid/cpp/src/qpid/replication/constants.h
    qpid/trunk/qpid/cpp/src/tests/Makefile.am
    qpid/trunk/qpid/cpp/src/tests/QueueEvents.cpp
    qpid/trunk/qpid/cpp/src/tests/federation.py
    qpid/trunk/qpid/cpp/src/tests/replication_test
    qpid/trunk/qpid/python/commands/qpid-route
    qpid/trunk/qpid/specs/management-schema.xml

Modified: qpid/trunk/qpid/cpp/rubygen/framing.0-10/Proxy.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/rubygen/framing.0-10/Proxy.rb?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/rubygen/framing.0-10/Proxy.rb (original)
+++ qpid/trunk/qpid/cpp/rubygen/framing.0-10/Proxy.rb Tue Jan 20 05:30:08 2009
@@ -37,7 +37,7 @@
   
   def inner_class_decl(c)
     cname=c.name.caps
-    cpp_class(cname, "Proxy") {
+    cpp_class(cname, "public Proxy") {
           gen <<EOS
 public:
 #{cname}(FrameHandler& f) : Proxy(f) {}
@@ -92,7 +92,7 @@
       genl
       namespace("qpid::framing") { 
         genl "#{@classname}::#{@classname}(FrameHandler& f) :"
-        gen "    Proxy(f)"
+        gen "   Proxy(f)"
         @amqp.classes.each { |c| gen ",\n    "+proxy_member(c)+"(f)" }
         genl "{}\n"
         @amqp.classes.each { |c| inner_class_defn(c) }

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Tue Jan 20 05:30:08 2009
@@ -299,6 +299,12 @@
     }
 }
 
+void SessionHandler::markReadyToSend() {
+    if (!sendReady) {
+        sendReady = true;
+    }
+}
+
 void SessionHandler::sendTimeout(uint32_t t) {
     checkAttached();
     peer.requestTimeout(t);

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h Tue Jan 20 05:30:08 2009
@@ -60,6 +60,7 @@
     void sendAttach(bool force);
     void sendTimeout(uint32_t t);
     void sendFlush();
+    void markReadyToSend();//TODO: only needed for inter-broker bridge; cleanup
 
     /** True if the handler is ready to send and receive */
     bool ready() const;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Tue Jan 20 05:30:08 2009
@@ -68,7 +68,7 @@
         mgmtObject = new _qmf::Bridge
             (agent, this, link, id, args.i_durable, args.i_src, args.i_dest,
              args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
-             args.i_tag, args.i_excludes, args.i_dynamic);
+             args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
         if (!args.i_durable)
             agent->addObject(mgmtObject);
     }
@@ -81,6 +81,8 @@
 
 void Bridge::create(ConnectionState& c)
 {
+    FieldTable options;
+    if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
     connState = &c;
     if (args.i_srcIsLocal) {
         if (args.i_dynamic)
@@ -103,7 +105,7 @@
     session->commandPoint(0,0);
         
     if (args.i_srcIsQueue) {
-        peer->getMessage().subscribe(args.i_src, args.i_dest, 1, 0, false, "", 0, FieldTable());
+        peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options);
         peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
         peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
     } else {
@@ -194,9 +196,10 @@
     buffer.getShortString(id);
     buffer.getShortString(excludes);
     bool dynamic(buffer.getOctet());
+    uint16_t sync = buffer.getShort();
 
     return links.declare(host, port, durable, src, dest, key,
-                         is_queue, is_local, id, excludes, dynamic).first;
+                         is_queue, is_local, id, excludes, dynamic, sync).first;
 }
 
 void Bridge::encode(Buffer& buffer) const 
@@ -213,6 +216,7 @@
     buffer.putShortString(args.i_tag);
     buffer.putShortString(args.i_excludes);
     buffer.putOctet(args.i_dynamic ? 1 : 0);
+    buffer.putShort(args.i_sync);
 }
 
 uint32_t Bridge::encodedSize() const 
@@ -228,7 +232,8 @@
         + 1                // srcIsLocal
         + args.i_tag.size() + 1
         + args.i_excludes.size() + 1
-        + 1;               // dynamic
+        + 1               // dynamic
+        + 2;              // sync
 }
 
 management::ManagementObject* Bridge::GetManagementObject (void) const

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryAdapter.h Tue Jan 20 05:30:08 2009
@@ -43,7 +43,7 @@
 class DeliveryAdapter
 {
   public:
-    virtual void deliver(DeliveryRecord&) = 0;
+    virtual void deliver(DeliveryRecord&, bool sync) = 0;
     virtual ~DeliveryAdapter(){}
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Tue Jan 20 05:30:08 2009
@@ -81,7 +81,7 @@
             requeue();
         }else{
             msg.payload->redeliver();//mark as redelivered
-            session->deliver(*this);
+            session->deliver(*this, false);
         }
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Tue Jan 20 05:30:08 2009
@@ -125,17 +125,19 @@
 
 void Link::established ()
 {
-    Mutex::ScopedLock mutex(lock);
     stringstream addr;
     addr << host << ":" << port;
 
     QPID_LOG (info, "Inter-broker link established to " << addr.str());
     agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
-    setStateLH(STATE_OPERATIONAL);
-    currentInterval = 1;
-    visitCount      = 0;
-    if (closing)
-        destroy();
+    {
+        Mutex::ScopedLock mutex(lock);
+        setStateLH(STATE_OPERATIONAL);
+        currentInterval = 1;
+        visitCount      = 0;
+        if (closing)
+            destroy();
+    }
 }
 
 void Link::closed (int, std::string text)
@@ -170,9 +172,9 @@
 
 void Link::destroy ()
 {
+    Bridges toDelete;
     {
         Mutex::ScopedLock mutex(lock);
-        Bridges toDelete;
 
         AclModule* acl = getBroker()->getAcl();
         std::string userID = getUsername() + "@" + getBroker()->getOptions().realm;
@@ -195,12 +197,11 @@
         for (Bridges::iterator i = created.begin(); i != created.end(); i++)
             toDelete.push_back(*i);
         created.clear();
-
-        // Now delete all bridges on this link.
-        for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
-            (*i)->destroy();
-        toDelete.clear();
     }
+    // Now delete all bridges on this link (don't hold the lock for this).
+    for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
+        (*i)->destroy();
+    toDelete.clear();
     links->destroy (host, port);
 }
 
@@ -386,7 +387,7 @@
             links->declare (host, port, iargs.i_durable, iargs.i_src,
                             iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
                             iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
-                            iargs.i_dynamic);
+                            iargs.i_dynamic, iargs.i_sync);
 
         if (result.second && iargs.i_durable)
             store->create(*result.first);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Tue Jan 20 05:30:08 2009
@@ -92,7 +92,8 @@
                                                      bool         isLocal,
                                                      std::string& tag,
                                                      std::string& excludes,
-                                                     bool         dynamic)
+                                                     bool         dynamic,
+                                                     uint16_t     sync)
 {
     Mutex::ScopedLock locker(lock);
     stringstream      keystream;
@@ -121,6 +122,7 @@
         args.i_tag        = tag;
         args.i_excludes   = excludes;
         args.i_dynamic    = dynamic;
+        args.i_sync       = sync;
 
         bridge = Bridge::shared_ptr
             (new Bridge (l->second.get(), l->second->nextChannel(),

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h Tue Jan 20 05:30:08 2009
@@ -85,7 +85,8 @@
                     bool         isLocal,
                     std::string& id,
                     std::string& excludes,
-                    bool         dynamic);
+                    bool         dynamic,
+                    uint16_t     sync);
 
         void destroy(const std::string& host, const uint16_t port);
         void destroy(const std::string& host,

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Jan 20 05:30:08 2009
@@ -256,7 +256,9 @@
     arguments(_arguments),
     msgCredit(0), 
     byteCredit(0),
-    notifyEnabled(true) {}
+    notifyEnabled(true),
+    syncFrequency(_arguments.getAsInt("qpid.sync_frequency")),
+    deliveryCount(0) {}
 
 OwnershipToken* SemanticState::ConsumerImpl::getSession()
 {
@@ -267,7 +269,9 @@
 {
     allocateCredit(msg.payload);
     DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing);
-    parent->deliver(record);
+    bool sync = syncFrequency && ++deliveryCount >= syncFrequency;
+    if (sync) deliveryCount = 0;//reset
+    parent->deliver(record, sync);
     if (!ackExpected) record.setEnded();//allows message to be released now its been delivered
     if (windowing || ackExpected || !acquire) {
         parent->record(record);
@@ -449,9 +453,9 @@
     }
 }
 
-void SemanticState::deliver(DeliveryRecord& msg)
+void SemanticState::deliver(DeliveryRecord& msg, bool sync)
 {
-    return deliveryAdapter.deliver(msg);
+    return deliveryAdapter.deliver(msg, sync);
 }
 
 SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Tue Jan 20 05:30:08 2009
@@ -77,6 +77,8 @@
         uint32_t msgCredit;
         uint32_t byteCredit;
         bool notifyEnabled;
+        const int syncFrequency;
+        int deliveryCount;
 
         bool checkCredit(boost::intrusive_ptr<Message>& msg);
         void allocateCredit(boost::intrusive_ptr<Message>& msg);
@@ -197,7 +199,7 @@
     void suspendDtx(const std::string& xid);
     void resumeDtx(const std::string& xid);
     void recover(bool requeue);
-    void deliver(DeliveryRecord& message);            
+    void deliver(DeliveryRecord& message, bool sync);            
     void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired);
     void release(DeliveryId first, DeliveryId last, bool setRedelivered);
     void reject(DeliveryId first, DeliveryId last);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Tue Jan 20 05:30:08 2009
@@ -89,13 +89,14 @@
 // in the bridge.
 // 
 void SessionHandler::attached(const std::string& name) {
-    if (session.get())
+    if (session.get()) {
         checkName(name);
-    else {
+    } else {
         SessionId id(connection.getUserId(), name);
         SessionState::Configuration config = connection.broker.getSessionManager().getSessionConfig();
         session.reset(new SessionState(connection.getBroker(), *this, id, config));
-}
+        markReadyToSend();
+    }
 }
     
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Jan 20 05:30:08 2009
@@ -175,7 +175,7 @@
     }
     if (method->isSync()) { 
         incomplete.process(enqueuedOp, true);
-        sendCompletion(); 
+        sendAcceptAndCompletion();
     }
 }
 
@@ -207,18 +207,27 @@
         //hold up execution until async enqueue is complete        
         if (msg->getFrames().getMethod()->isSync()) { 
             incomplete.process(enqueuedOp, true);
-            sendCompletion(); 
+            sendAcceptAndCompletion();
         } else {
             incomplete.process(enqueuedOp, false);
         }
     }
 }
 
+void SessionState::sendAcceptAndCompletion()
+{
+    if (!accepted.empty()) {
+        getProxy().getMessage().accept(accepted);
+        accepted.clear();
+    }
+    sendCompletion(); 
+}
+
 void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
 {
     receiverCompleted(msg->getCommandId());
-    if (msg->requiresAccept())         
-        getProxy().getMessage().accept(SequenceSet(msg->getCommandId()));        
+    if (msg->requiresAccept())
+        accepted.add(msg->getCommandId());
 }
 
 void SessionState::handleIn(AMQFrame& frame) {
@@ -240,16 +249,23 @@
     handler->out(frame);
 }
 
-void SessionState::deliver(DeliveryRecord& msg)
+void SessionState::deliver(DeliveryRecord& msg, bool sync)
 {
     uint32_t maxFrameSize = getConnection().getFrameMax();
     assert(senderGetCommandPoint().offset == 0);
     SequenceNumber commandId = senderGetCommandPoint().command;
     msg.deliver(getProxy().getHandler(), commandId, maxFrameSize);
     assert(senderGetCommandPoint() == SessionPoint(commandId+1, 0)); // Delivery has moved sendPoint.
+    if (sync) {
+        AMQP_ClientProxy::Execution& p(getProxy().getExecution());
+        Proxy::ScopedSync s(p);
+        p.sync();
+    }
 }
 
-void SessionState::sendCompletion() { handler->sendCompletion(); }
+void SessionState::sendCompletion() { 
+    handler->sendCompletion(); 
+}
 
 void SessionState::senderCompleted(const SequenceSet& commands) {
     qpid::SessionState::senderCompleted(commands);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Jan 20 05:30:08 2009
@@ -93,12 +93,12 @@
     void sendCompletion();
 
     //delivery adapter methods:
-    void deliver(DeliveryRecord&);
+    void deliver(DeliveryRecord&, bool sync);
 
     // Manageable entry points
     management::ManagementObject* GetManagementObject (void) const;
     management::Manageable::status_t
-        ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
+    ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
 
     void readyToSend();
 
@@ -119,6 +119,8 @@
     void handleInLast(framing::AMQFrame& frame);
     void handleOutLast(framing::AMQFrame& frame);
 
+    void sendAcceptAndCompletion();
+
     Broker& broker;
     SessionHandler* handler;    
     sys::AbsTime expiry;        // Used by SessionManager.
@@ -128,7 +130,8 @@
     IncompleteMessageList incomplete;
     IncompleteMessageList::CompletionListener enqueuedOp;
     qmf::org::apache::qpid::broker::Session* mgmtObject;
-
+    qpid::framing::SequenceSet accepted;
+    
   friend class SessionManager;
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.cpp Tue Jan 20 05:30:08 2009
@@ -18,15 +18,21 @@
 
 #include "Proxy.h"
 #include "AMQFrame.h"
+#include "AMQMethodBody.h"
+#include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace framing {
 
-Proxy::Proxy(FrameHandler& h) : out(&h) {}
+Proxy::Proxy(FrameHandler& h) : out(&h), sync(false) {}
 
 Proxy::~Proxy() {}
 
 void Proxy::send(const AMQBody& b) {
+    if (sync) {
+        const AMQMethodBody* m = dynamic_cast<const AMQMethodBody*>(&b);
+        if (m)  m->setSync(sync);
+    }
     AMQFrame f(b);
     out->handle(f);
 }
@@ -39,4 +45,7 @@
 
 void Proxy::setHandler(FrameHandler& f) { out=&f; }
 
+Proxy::ScopedSync::ScopedSync(Proxy& p) : proxy(p) { proxy.sync = true; }
+Proxy::ScopedSync::~ScopedSync() { proxy.sync = false; }
+
 }} // namespace qpid::framing

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.h?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/Proxy.h Tue Jan 20 05:30:08 2009
@@ -33,6 +33,14 @@
 class Proxy
 {
   public:
+    class ScopedSync
+    {
+        Proxy& proxy;
+      public:
+        ScopedSync(Proxy& p);
+        ~ScopedSync();
+    };
+
     Proxy(FrameHandler& h);
     virtual ~Proxy();
 
@@ -42,9 +50,9 @@
 
     FrameHandler& getHandler();
     void setHandler(FrameHandler&);
-
   private:
     FrameHandler* out;
+    bool sync;
 };
 
 }} // namespace qpid::framing

Modified: qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp Tue Jan 20 05:30:08 2009
@@ -23,6 +23,7 @@
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/QueueEvents.h"
 #include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FrameHandler.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/log/Statement.h"
 
@@ -35,27 +36,14 @@
 
 void ReplicatingEventListener::handle(QueueEvents::Event event)
 {
-    //create event message and enqueue it on replication queue
-    FieldTable headers;
-    boost::intrusive_ptr<Message> message;
     switch (event.type) {
       case QueueEvents::ENQUEUE:
-        headers.setString(REPLICATION_EVENT_TYPE, ENQUEUE);
-        headers.setString(REPLICATION_TARGET_QUEUE, event.msg.queue->getName());
-        message = createEventMessage(headers);
-        queue->deliver(message);
-        //if its an enqueue, enqueue the message itself on the
-        //replication queue also:
-        queue->deliver(event.msg.payload);
-        QPID_LOG(debug, "Queued 'enqueue' event on " << event.msg.queue->getName() << " for replication");
+        deliverEnqueueMessage(event.msg);
+        QPID_LOG(debug, "Queuing 'enqueue' event on " << event.msg.queue->getName() << " for replication");
         break;
       case QueueEvents::DEQUEUE:
-        headers.setString(REPLICATION_EVENT_TYPE, DEQUEUE);
-        headers.setString(REPLICATION_TARGET_QUEUE, event.msg.queue->getName());
-        headers.setInt(DEQUEUED_MESSAGE_POSITION, event.msg.position);
-        message = createEventMessage(headers);
-        queue->deliver(message);
-        QPID_LOG(debug, "Queued 'dequeue' event from " << event.msg.queue->getName() << " for replication, (from position "
+        deliverDequeueMessage(event.msg);
+        QPID_LOG(debug, "Queuing 'dequeue' event from " << event.msg.queue->getName() << " for replication, (from position "
                  << event.msg.position << ")");
         break;
     }
@@ -65,20 +53,64 @@
 const std::string EMPTY;
 }
 
-boost::intrusive_ptr<Message> ReplicatingEventListener::createEventMessage(const FieldTable& headers)
+void ReplicatingEventListener::deliverDequeueMessage(const QueuedMessage& dequeued)
+{
+    FieldTable headers;
+    headers.setString(REPLICATION_TARGET_QUEUE, dequeued.queue->getName());
+    headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence);
+    headers.setInt(REPLICATION_EVENT_TYPE, DEQUEUE);
+    headers.setInt(DEQUEUED_MESSAGE_POSITION, dequeued.position);
+    boost::intrusive_ptr<Message> msg(createMessage(headers));
+    queue->deliver(msg);
+}
+
+void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueued)
+{
+    boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload));
+    FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders();
+    headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
+    headers.setInt(REPLICATION_EVENT_SEQNO, ++sequence);
+    headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE);
+    queue->deliver(msg);
+}
+
+boost::intrusive_ptr<Message> ReplicatingEventListener::createMessage(const FieldTable& headers)
+{
+    boost::intrusive_ptr<Message> msg(new Message());
+    AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), EMPTY, 0, 0));
+    AMQFrame header(in_place<AMQHeaderBody>());
+    header.setBof(false);
+    header.setEof(true);
+    header.setBos(true);
+    header.setEos(true);
+    msg->getFrames().append(method);
+    msg->getFrames().append(header);
+    MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
+    props->setApplicationHeaders(headers);
+    return msg;
+}
+
+struct AppendingHandler : FrameHandler
+{
+    boost::intrusive_ptr<Message> msg;
+    
+    AppendingHandler(boost::intrusive_ptr<Message> m) : msg(m) {}
+
+    void handle(AMQFrame& f)
+    {
+        msg->getFrames().append(f);
+    }
+};
+
+boost::intrusive_ptr<Message> ReplicatingEventListener::cloneMessage(Queue& queue, boost::intrusive_ptr<Message> original)
 {
-        boost::intrusive_ptr<Message> msg(new Message());
-        AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), EMPTY, 0, 0));
-        AMQFrame header(in_place<AMQHeaderBody>());
-        header.setBof(false);
-        header.setEof(true);
-        header.setBos(true);
-        header.setEos(true);
-        msg->getFrames().append(method);
-        msg->getFrames().append(header);
-        MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
-        props->setApplicationHeaders(headers);
-        return msg;
+    boost::intrusive_ptr<Message> copy(new Message());
+    AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), EMPTY, 0, 0));
+    AppendingHandler handler(copy);
+    handler.handle(method);
+    original->sendHeader(handler, std::numeric_limits<int16_t>::max());
+    original->sendContent(queue, handler, std::numeric_limits<int16_t>::max());
+    return copy;
 }
     
 Options* ReplicatingEventListener::getOptions() 

Modified: qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/replication/ReplicatingEventListener.h Tue Jan 20 05:30:08 2009
@@ -28,6 +28,7 @@
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueEvents.h"
 #include "qpid/framing/FieldTable.h"
+#include "qpid/framing/SequenceNumber.h"
 
 namespace qpid {
 namespace replication {
@@ -57,8 +58,14 @@
 
     PluginOptions options;    
     qpid::broker::Queue::shared_ptr queue;
+    qpid::framing::SequenceNumber sequence;
 
-    boost::intrusive_ptr<qpid::broker::Message> createEventMessage(const qpid::framing::FieldTable& headers);
+    void deliverDequeueMessage(const qpid::broker::QueuedMessage& enqueued);
+    void deliverEnqueueMessage(const qpid::broker::QueuedMessage& enqueued);
+
+    boost::intrusive_ptr<qpid::broker::Message> createMessage(const qpid::framing::FieldTable& headers);
+    boost::intrusive_ptr<qpid::broker::Message> cloneMessage(qpid::broker::Queue& queue, 
+                                                             boost::intrusive_ptr<qpid::broker::Message> original);
 };
 
 }} // namespace qpid::replication

Modified: qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp Tue Jan 20 05:30:08 2009
@@ -38,46 +38,75 @@
                                          const FieldTable& args,
                                          QueueRegistry& qr,
                                          Manageable* parent) 
-    : Exchange(name, durable, args, parent), queues(qr), expectingEnqueue(false) {}
+    : Exchange(name, durable, args, parent), queues(qr), init(false) {}
 
 std::string ReplicationExchange::getType() const { return typeName; }            
 
 void ReplicationExchange::route(Deliverable& msg, const std::string& /*routingKey*/, const FieldTable* args)
 {
     if (args) {
-        std::string eventType = args->getAsString(REPLICATION_EVENT_TYPE);
-        if (eventType == ENQUEUE) {
-            expectingEnqueue = true;
-            targetQueue = args->getAsString(REPLICATION_TARGET_QUEUE);
-            QPID_LOG(debug, "Recorded replicated 'enqueue' event for " << targetQueue);
-            return;
-        } else if (eventType == DEQUEUE) {
-            std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE);
-            Queue::shared_ptr queue = queues.find(queueName);
-            SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION));
-
-            QueuedMessage dequeued;
-            if (queue->acquireMessageAt(position, dequeued)) {
-                queue->dequeue(0, dequeued);
-                QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position);
-            } else {
-                QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName);
+        int eventType = args->getAsInt(REPLICATION_EVENT_TYPE);
+        if (eventType) {
+            if (isDuplicate(args)) return;
+            switch (eventType) {
+              case ENQUEUE:
+                handleEnqueueEvent(args, msg);
+                return;
+              case DEQUEUE:
+                handleDequeueEvent(args);
+                return;
+              default:
+                throw IllegalArgumentException(QPID_MSG("Illegal value for " << REPLICATION_EVENT_TYPE << ": " << eventType));
             }
-            
-            return;
-        } else if (!eventType.empty()) {
-            throw IllegalArgumentException(QPID_MSG("Illegal value for " << REPLICATION_EVENT_TYPE << ": " << eventType));
         }
+    } else {
+        QPID_LOG(warning, "Dropping unexpected message with no headers");
+    }
+}
+
+void ReplicationExchange::handleEnqueueEvent(const FieldTable* args, Deliverable& msg)
+{
+    std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE);
+    Queue::shared_ptr queue = queues.find(queueName);
+    FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders();
+    headers.erase(REPLICATION_TARGET_QUEUE);
+    headers.erase(REPLICATION_EVENT_SEQNO);
+    headers.erase(REPLICATION_EVENT_TYPE);
+    msg.deliverTo(queue);
+    QPID_LOG(debug, "Enqueued replicated message onto " << queue);
+}
+
+void ReplicationExchange::handleDequeueEvent(const FieldTable* args)
+{
+    std::string queueName = args->getAsString(REPLICATION_TARGET_QUEUE);
+    Queue::shared_ptr queue = queues.find(queueName);
+    SequenceNumber position(args->getAsInt(DEQUEUED_MESSAGE_POSITION));
+    
+    QueuedMessage dequeued;
+    if (queue->acquireMessageAt(position, dequeued)) {
+        queue->dequeue(0, dequeued);
+        QPID_LOG(debug, "Processed replicated 'dequeue' event from " << queueName << " at position " << position);
+    } else {
+        QPID_LOG(warning, "Could not acquire message " << position << " from " << queueName);
     }
-    //if we get here assume its not an event message, assume its an enqueue
-    if (expectingEnqueue) {
-        Queue::shared_ptr queue = queues.find(targetQueue);
-        msg.deliverTo(queue);
-        expectingEnqueue = false;
-        targetQueue.clear();
-        QPID_LOG(debug, "Eenqueued replicated message onto " << targetQueue);
+}
+
+bool ReplicationExchange::isDuplicate(const FieldTable* args)
+{
+    SequenceNumber seqno(args->getAsInt(REPLICATION_EVENT_SEQNO));
+    if (!init) {
+        init = true;
+        sequence = seqno;
+        return false;
+    } else if (seqno > sequence) {
+        if (seqno - sequence > 1) {
+            QPID_LOG(error, "Gap in replication event sequence between: " << sequence << " and " << seqno);
+        }
+        sequence = seqno;
+        return false;
     } else {
-        QPID_LOG(warning, "Dropping unexpected message");
+        QPID_LOG(info, "Duplicate detected: seqno=" << seqno << " (last seqno=" << sequence << ")");
+        return true;
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/replication/ReplicationExchange.h Tue Jan 20 05:30:08 2009
@@ -22,6 +22,7 @@
  *
  */
 #include "qpid/broker/Exchange.h"
+#include "qpid/framing/SequenceNumber.h"
 
 namespace qpid {
 namespace replication {
@@ -51,8 +52,12 @@
     bool isBound(qpid::broker::Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args);
   private:
     qpid::broker::QueueRegistry& queues;
-    bool expectingEnqueue;
-    std::string targetQueue;
+    qpid::framing::SequenceNumber sequence;
+    bool init;
+
+    bool isDuplicate(const qpid::framing::FieldTable* args);
+    void handleEnqueueEvent(const qpid::framing::FieldTable* args, qpid::broker::Deliverable& msg);
+    void handleDequeueEvent(const qpid::framing::FieldTable* args);
 };
 }} // namespace qpid::replication
 

Modified: qpid/trunk/qpid/cpp/src/qpid/replication/constants.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/replication/constants.h?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/replication/constants.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/replication/constants.h Tue Jan 20 05:30:08 2009
@@ -22,10 +22,12 @@
 namespace replication {
 namespace constants {
 
-const std::string REPLICATION_EVENT_TYPE("qpid.replication_event_type");
-const std::string ENQUEUE("enqueue");
-const std::string DEQUEUE("dequeue");
-const std::string REPLICATION_TARGET_QUEUE("qpid.replication_target_queue");
-const std::string DEQUEUED_MESSAGE_POSITION("qpid.dequeued_message_position");
+const std::string REPLICATION_EVENT_TYPE("qpid.replication.type");
+const std::string REPLICATION_EVENT_SEQNO("qpid.replication.seqno");
+const std::string REPLICATION_TARGET_QUEUE("qpid.replication.target_queue");
+const std::string DEQUEUED_MESSAGE_POSITION("qpid.replication.message");
+
+const int ENQUEUE(1);
+const int DEQUEUE(2);
 
 }}}

Modified: qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/tests/Makefile.am Tue Jan 20 05:30:08 2009
@@ -89,7 +89,8 @@
 	ManagementTest.cpp \
 	MessageReplayTracker.cpp \
 	ConsoleTest.cpp \
-	QueueEvents.cpp
+	QueueEvents.cpp \
+	ProxyTest.cpp
 
 if HAVE_XML
 unit_test_SOURCES+= XmlClientSessionTest.cpp
@@ -249,7 +250,7 @@
 
 # Longer running stability tests, not run by default check: target.
 # Not run under valgrind, too slow
-LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest run_failover_soak
+LONG_TESTS=fanout_perftest shared_perftest multiq_perftest topic_perftest run_failover_soak reliable_replication_test
 EXTRA_DIST+=$(LONG_TESTS) run_perftest
 check-long:
 	$(MAKE) check TESTS="start_broker $(LONG_TESTS) stop_broker" VALGRIND=

Added: qpid/trunk/qpid/cpp/src/tests/ProxyTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ProxyTest.cpp?rev=736018&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ProxyTest.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/ProxyTest.cpp Tue Jan 20 05:30:08 2009
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iostream>
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/ExecutionSyncBody.h"
+#include "qpid/framing/Proxy.h"
+#include <alloca.h>
+
+#include "unit_test.h"
+
+using namespace qpid::framing;
+
+QPID_AUTO_TEST_SUITE(ProxyTestSuite)
+
+
+QPID_AUTO_TEST_CASE(testScopedSync)
+{
+    struct DummyHandler : FrameHandler
+    {
+        void handle(AMQFrame& f) {
+            AMQMethodBody* m = f.getMethod();
+            BOOST_CHECK(m);
+            BOOST_CHECK(m->isA<ExecutionSyncBody>());
+            BOOST_CHECK(m->isSync());
+        }
+    };
+    DummyHandler f;
+    Proxy p(f);
+    Proxy::ScopedSync s(p);
+    p.send(ExecutionSyncBody(p.getVersion()));
+}
+ 
+QPID_AUTO_TEST_SUITE_END()

Propchange: qpid/trunk/qpid/cpp/src/tests/ProxyTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/tests/ProxyTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/tests/QueueEvents.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueEvents.cpp?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueEvents.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueEvents.cpp Tue Jan 20 05:30:08 2009
@@ -168,7 +168,7 @@
         BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
     }
     fixture.connection.close();
-    fixture.shutdownBroker();
+    fixture.broker->getQueueEvents().shutdown();
 
     //check listener was notified of all events, and in correct order
     SequenceNumber enqueueId(1);
@@ -215,7 +215,7 @@
         BOOST_CHECK_EQUAL(incoming.pop().getData(), (boost::format("%1%_%2%") % "Message" % (i+1)).str());
     }
     fixture.connection.close();
-    fixture.shutdownBroker();
+    fixture.broker->getQueueEvents().shutdown();
 
     //check listener was notified of all events, and in correct order
     SequenceNumber enqueueId(1);

Modified: qpid/trunk/qpid/cpp/src/tests/federation.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/federation.py?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/federation.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/federation.py Tue Jan 20 05:30:08 2009
@@ -64,7 +64,7 @@
         self.assertEqual(result.status, 0)
 
         link = qmf.getObjects(_class="link")[0]
-        result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False)
+        result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False, 0)
         self.assertEqual(result.status, 0)
 
         bridge = qmf.getObjects(_class="bridge")[0]
@@ -88,7 +88,7 @@
         self.assertEqual(result.status, 0)
 
         link = qmf.getObjects(_class="link")[0]
-        result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False)
+        result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False, 0)
         self.assertEqual(result.status, 0)
 
         bridge = qmf.getObjects(_class="bridge")[0]
@@ -135,7 +135,7 @@
         self.assertEqual(result.status, 0)
 
         link = qmf.getObjects(_class="link")[0]
-        result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False)
+        result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False, 0)
         self.assertEqual(result.status, 0)
 
         bridge = qmf.getObjects(_class="bridge")[0]
@@ -195,7 +195,7 @@
         self.assertEqual(result.status, 0)
 
         link = qmf.getObjects(_class="link")[0]
-        result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False)
+        result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1)
         self.assertEqual(result.status, 0)
 
         bridge = qmf.getObjects(_class="bridge")[0]
@@ -244,8 +244,8 @@
         l_link = self.qmf.getObjects(_class="link", _broker=l_broker)[0]
         r_link = self.qmf.getObjects(_class="link", _broker=r_broker)[0]
 
-        l_res = l_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False)
-        r_res = r_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False)
+        l_res = l_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0)
+        r_res = r_link.bridge(False, "amq.direct", "amq.direct", "key", "", "", False, False, False, 0)
 
         self.assertEqual(l_res.status, 0)
         self.assertEqual(r_res.status, 0)
@@ -296,7 +296,7 @@
 
         link = qmf.getObjects(_class="link")[0]
         result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "my-bridge-id",
-                             "exclude-me,also-exclude-me", False, False, False)
+                             "exclude-me,also-exclude-me", False, False, False, 0)
         self.assertEqual(result.status, 0)
         bridge = qmf.getObjects(_class="bridge")[0]
 
@@ -354,7 +354,7 @@
         self.assertEqual(result.status, 0)
 
         link = qmf.getObjects(_class="link")[0]
-        result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True)
+        result = link.bridge(False, "fed.fanout", "fed.fanout", "", "", "", False, False, True, 0)
         self.assertEqual(result.status, 0)
         bridge = qmf.getObjects(_class="bridge")[0]
         sleep(5)
@@ -401,7 +401,7 @@
         self.assertEqual(result.status, 0)
 
         link = qmf.getObjects(_class="link")[0]
-        result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True)
+        result = link.bridge(False, "fed.direct", "fed.direct", "", "", "", False, False, True, 0)
         self.assertEqual(result.status, 0)
         bridge = qmf.getObjects(_class="bridge")[0]
         sleep(5)
@@ -448,7 +448,7 @@
         self.assertEqual(result.status, 0)
 
         link = qmf.getObjects(_class="link")[0]
-        result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True)
+        result = link.bridge(False, "fed.topic", "fed.topic", "", "", "", False, False, True, 0)
         self.assertEqual(result.status, 0)
         bridge = qmf.getObjects(_class="bridge")[0]
         sleep(5)
@@ -478,8 +478,7 @@
         sleep(3)
         self.assertEqual(len(qmf.getObjects(_class="bridge")), 0)
         self.assertEqual(len(qmf.getObjects(_class="link")), 0)
-
-
+        
     def getProperty(self, msg, name):
         for h in msg.headers:
             if hasattr(h, name): return getattr(h, name)

Added: qpid/trunk/qpid/cpp/src/tests/reliable_replication_test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/reliable_replication_test?rev=736018&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/reliable_replication_test (added)
+++ qpid/trunk/qpid/cpp/src/tests/reliable_replication_test Tue Jan 20 05:30:08 2009
@@ -0,0 +1,98 @@
+#!/bin/sh
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Test reliability of the replication feature in the face of link
+# failures:
+MY_DIR=`dirname \`which $0\``
+PYTHON_DIR=${MY_DIR}/../../../python
+
+trap stop_brokers EXIT
+
+stop_brokers() {
+    if [[ $BROKER_A ]] ; then
+        ../qpidd -q --port $BROKER_A
+        unset BROKER_A
+    fi      
+    if [[ $BROKER_B ]] ; then
+        ../qpidd -q --port $BROKER_B
+        unset BROKER_B
+    fi
+}
+
+setup() {
+    rm -f replication-source.log replication-dest.log
+    ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true  --log-enable trace+ --log-to-file replication-source.log --log-to-stderr 0 > qpidd.port
+    BROKER_A=`cat qpidd.port`
+
+    ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so  --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 > qpidd.port
+    BROKER_B=`cat qpidd.port`
+
+    #../qpidd --port 5555 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true  --log-enable trace+ --log-to-file replication-source.log --log-to-stderr 0 &
+    #BROKER_A=5555
+
+    #../qpidd --port 6666 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so  --log-enable info+ --log-to-file replication-dest.log --log-to-stderr 0 &
+    #BROKER_B=6666
+    echo "Testing replication from port $BROKER_A to port $BROKER_B"
+    export PYTHONPATH=$PYTHON_DIR
+
+    $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication
+    $PYTHON_DIR/commands/qpid-route --ack 500 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
+
+    #create test queue (only replicate enqueues for this test):
+    $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_A" add queue queue-a --generate-queue-events 1
+    $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-a
+}
+
+send() {
+    ./sender  --port $BROKER_A --routing-key queue-a --send-eos 1 < replicated.expected
+}
+
+receive() {
+    rm -f replicated.actual
+    ./receiver --port $BROKER_B --queue queue-a > replicated.actual
+}
+
+bounce_link() {
+    echo "Destroying link..."
+    $PYTHON_DIR/commands/qpid-route link del "localhost:$BROKER_B" "localhost:$BROKER_A"
+    echo "Link destroyed; recreating route..."
+    sleep 2
+    $PYTHON_DIR/commands/qpid-route --ack 500 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
+    echo "Route re-established"
+}
+
+if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e ../.libs/replication_exchange.so ;  then
+    setup
+    for i in `seq 1 100000`; do echo Message $i; done > replicated.expected
+    send &
+    receive &
+    for i in `seq 1 5`; do sleep 10; bounce_link; done;
+    wait
+    #check that received list is identical to sent list
+    diff replicated.actual replicated.expected || FAIL=1
+    if [[ $FAIL ]]; then
+        echo reliable replication test failed: expectations not met!
+    else 
+        echo replication reliable in the face of link failures
+        rm -f replication.actual replication.expected replication-source.log replication-dest.log
+    fi
+fi
+

Propchange: qpid/trunk/qpid/cpp/src/tests/reliable_replication_test
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/trunk/qpid/cpp/src/tests/replication_test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/replication_test?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/replication_test (original)
+++ qpid/trunk/qpid/cpp/src/tests/replication_test Tue Jan 20 05:30:08 2009
@@ -19,7 +19,7 @@
 # under the License.
 #
 
-# Run the federation tests.
+# Run a test of the replication feature
 MY_DIR=`dirname \`which $0\``
 PYTHON_DIR=${MY_DIR}/../../../python
 
@@ -37,15 +37,18 @@
 }
 
 if test -d ${PYTHON_DIR} && test -e ../.libs/replicating_listener.so && test -e ../.libs/replication_exchange.so ;  then
-    ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true > qpidd.port
+    rm -f queue-*.repl replication-*.log #cleanup from any earlier runs
+
+    ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replicating_listener.so --replication-queue replication --create-replication-queue true  --log-enable info+ --log-to-file replication-source.log  --log-to-stderr 0 > qpidd.port
     BROKER_A=`cat qpidd.port`
 
-    ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so > qpidd.port
+    ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no --load-module ../.libs/replication_exchange.so  --log-enable info+ --log-to-file replication-dest.log  --log-to-stderr 0 > qpidd.port
     BROKER_B=`cat qpidd.port`
     export PYTHONPATH=$PYTHON_DIR
+    echo "Running replication test between localhost:$BROKER_A and localhost:$BROKER_B"
 
     $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add exchange replication replication
-    $PYTHON_DIR/commands/qpid-route queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
+    $PYTHON_DIR/commands/qpid-route --ack 5 queue add "localhost:$BROKER_B" "localhost:$BROKER_A" replication replication
 
     #create test queues
 
@@ -58,7 +61,6 @@
     $PYTHON_DIR/commands/qpid-config -a "localhost:$BROKER_B" add queue queue-c
 
     #publish and consume from test queus on broker A:
-    rm -f queue-*.repl
     for i in `seq 1 10`; do echo Message $i for A >> queue-a-input.repl; done
     for i in `seq 1 20`; do echo Message $i for B >> queue-b-input.repl; done
     for i in `seq 1 15`; do echo Message $i for C >> queue-c-input.repl; done
@@ -79,6 +81,9 @@
     ./receiver --port $BROKER_B --queue queue-a > queue-a-backup.repl
     ./receiver --port $BROKER_B --queue queue-b > queue-b-backup.repl
     ./receiver --port $BROKER_B --queue queue-c > queue-c-backup.repl
+
+    stop_brokers
+
     tail -5 queue-a-input.repl > queue-a-expected.repl
     tail -10 queue-b-input.repl > queue-b-expected.repl
     diff queue-a-backup.repl queue-a-expected.repl || FAIL=1
@@ -87,12 +92,12 @@
 
     if [[ $FAIL ]]; then
         echo replication test failed: expectations not met!
+        exit 1
     else 
         echo queue state replicated as expected
-        rm queue-*.repl
+        rm -f queue-*.repl replication-*.log
     fi
 
-    stop_brokers
 else
     echo "Skipping replication test, plugins not built or python utils not located"
 fi

Modified: qpid/trunk/qpid/python/commands/qpid-route
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-route?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/python/commands/qpid-route (original)
+++ qpid/trunk/qpid/python/commands/qpid-route Tue Jan 20 05:30:08 2009
@@ -48,6 +48,7 @@
     print "    -d [ --durable ]         Added configuration shall be durable"
     print "    -e [ --del-empty-link ]  Delete link after deleting last route on the link"
     print "    -s [ --src-local ]       Make connection to source broker (push route)"
+    print "    --ack N                  Acknowledge transfers over the bridge in batches of N"
     print "    -t <transport> [ --transport <transport>]"
     print "                             Specify transport to use for links, defaults to tcp"
     print
@@ -62,6 +63,7 @@
 _dellink   = False
 _srclocal  = False
 _transport = "tcp"
+_ack       = 0
 
 class RouteManager:
     def __init__(self, localBroker):
@@ -234,7 +236,7 @@
 
         if _verbose:
             print "Creating inter-broker binding..."
-        res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, _srclocal, dynamic)
+        res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, _srclocal, dynamic, _ack)
         if res.status != 0:
             raise Exception(res.text)
         if _verbose:
@@ -256,7 +258,7 @@
 
         if _verbose:
             print "Creating inter-broker binding..."
-        res = link.bridge(_durable, queue, exchange, "", "", "", True, _srclocal, False)
+        res = link.bridge(_durable, queue, exchange, "", "", "", True, _srclocal, False, _ack)
         if res.status != 0:
             raise Exception(res.text)
         if _verbose:
@@ -401,7 +403,7 @@
 ##
 
 try:
-    longOpts = ("verbose", "quiet", "durable", "del-empty-link", "src-local", "transport=")
+    longOpts = ("verbose", "quiet", "durable", "del-empty-link", "src-local", "transport=", "ack=")
     (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "vqdest:", longOpts)
 except:
     Usage()
@@ -425,6 +427,8 @@
         _srclocal = True
     if opt[0] == "-t" or opt[0] == "--transport":
         _transport = opt[1]
+    if opt[0] == "--ack":
+        _ack = int(opt[1])
 
 nargs = len(cargs)
 if nargs < 2:

Modified: qpid/trunk/qpid/specs/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=736018&r1=736017&r2=736018&view=diff
==============================================================================
--- qpid/trunk/qpid/specs/management-schema.xml (original)
+++ qpid/trunk/qpid/specs/management-schema.xml Tue Jan 20 05:30:08 2009
@@ -245,6 +245,7 @@
       <arg name="srcIsQueue"  dir="I" type="bool"/>
       <arg name="srcIsLocal"  dir="I" type="bool"/>
       <arg name="dynamic"     dir="I" type="bool"/>
+      <arg name="sync"        dir="I" type="uint16"/>
     </method>
   </class>
 
@@ -266,6 +267,7 @@
     <property name="tag"         type="sstr"   access="RC"/>
     <property name="excludes"    type="sstr"   access="RC"/>
     <property name="dynamic"     type="bool"   access="RC"/>
+    <property name="sync"        type="uint16" access="RC"/>
     <method name="close"/> 
   </class>
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org