You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/08/21 20:04:19 UTC

svn commit: r687813 - in /incubator/qpid/trunk/qpid/cpp: src/ src/qpid/amqp_0_10/ src/qpid/broker/ src/qpid/cluster/ src/qpid/sys/ src/tests/ xml/

Author: aconway
Date: Thu Aug 21 11:04:18 2008
New Revision: 687813

URL: http://svn.apache.org/viewvc?rev=687813&view=rev
Log:
Pre-buffering output strategy for cluster.
Additional hooks in broker code, should not affect standalone broker.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster
    incubator/qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Aug 21 11:04:18 2008
@@ -546,6 +546,7 @@
   qpid/sys/ConnectionInputHandler.h \
   qpid/sys/ConnectionInputHandlerFactory.h \
   qpid/sys/ConnectionOutputHandler.h \
+  qpid/sys/ConnectionOutputHandlerPtr.h \
   qpid/sys/DeletionManager.h \
   qpid/sys/Dispatcher.h \
   qpid/sys/IOHandle.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Thu Aug 21 11:04:18 2008
@@ -19,7 +19,11 @@
   qpid/cluster/ShadowConnectionOutputHandler.h \
   qpid/cluster/PollableCondition.h \
   qpid/cluster/PollableCondition.cpp \
-  qpid/cluster/PollableQueue.h
+  qpid/cluster/PollableQueue.h \
+  qpid/cluster/WriteEstimate.h \
+  qpid/cluster/WriteEstimate.cpp \
+  qpid/cluster/OutputInterceptor.h \
+  qpid/cluster/OutputInterceptor.cpp
 
 libqpidcluster_la_LIBADD= -lcpg libqpidbroker.la
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Thu Aug 21 11:04:18 2008
@@ -30,7 +30,7 @@
 Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
     : frameQueueClosed(false), output(o),
       connection(new broker::Connection(this, broker, id, _isClient)),
-      identifier(id), initialized(false), isClient(_isClient) {}
+      identifier(id), initialized(false), isClient(_isClient), buffered(0) {}
 
 size_t  Connection::decode(const char* buffer, size_t size) {
     framing::Buffer in(const_cast<char*>(buffer), size);
@@ -53,7 +53,7 @@
 
 bool Connection::canEncode() {
     if (!frameQueueClosed) connection->doOutput();
-    Mutex::ScopedLock l(frameQueueLock);
+        Mutex::ScopedLock l(frameQueueLock);
     return (!isClient && !initialized) || !frameQueue.empty();
 }
 
@@ -71,10 +71,12 @@
         initialized = true;
         QPID_LOG(trace, "SENT " << identifier << " INIT(" << pi << ")");
     }
-    while (!frameQueue.empty() && (frameQueue.front().size() <= out.available())) {
+    size_t frameSize=0;
+    while (!frameQueue.empty() && ((frameSize=frameQueue.front().size()) <= out.available())) {
             frameQueue.front().encode(out);
             QPID_LOG(trace, "SENT [" << identifier << "]: " << frameQueue.front());
-            frameQueue.pop();
+            frameQueue.pop_front();
+            buffered -= frameSize;
     }
     assert(frameQueue.empty() || frameQueue.front().size() <= size);
     if (!frameQueue.empty() && frameQueue.front().size() > size)
@@ -98,7 +100,8 @@
     {
         Mutex::ScopedLock l(frameQueueLock);
 	if (!frameQueueClosed)
-            frameQueue.push(f);
+            frameQueue.push_back(f);
+        buffered += f.size();
     }
     activateOutput();
 }
@@ -107,4 +110,9 @@
     return framing::ProtocolVersion(0,10);
 }
 
+size_t Connection::getBuffered() const { 
+    Mutex::ScopedLock l(frameQueueLock);
+    return buffered;
+}
+
 }} // namespace qpid::amqp_0_10

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Thu Aug 21 11:04:18 2008
@@ -26,7 +26,7 @@
 #include "qpid/sys/Mutex.h"
 #include "qpid/broker/Connection.h"
 #include <boost/intrusive_ptr.hpp>
-#include <queue>
+#include <deque>
 #include <memory>
 
 namespace qpid {
@@ -36,7 +36,9 @@
 class Connection  : public sys::ConnectionCodec,
                     public sys::ConnectionOutputHandler
 {
-    std::queue<framing::AMQFrame> frameQueue;
+    typedef std::deque<framing::AMQFrame> FrameQueue;
+
+    FrameQueue frameQueue;
     bool frameQueueClosed;
     mutable sys::Mutex frameQueueLock;
     sys::OutputControl& output;
@@ -44,7 +46,8 @@
     std::string identifier;
     bool initialized;
     bool isClient;
-    
+    size_t buffered;
+
   public:
     Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false);
     size_t decode(const char* buffer, size_t size);
@@ -56,6 +59,7 @@
     void close();               // closing from this end.
     void send(framing::AMQFrame&);
     framing::ProtocolVersion getVersion() const;
+    size_t getBuffered() const;
 };
 
 }} // namespace qpid::amqp_0_10

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Thu Aug 21 11:04:18 2008
@@ -79,7 +79,7 @@
 void Connection::requestIOProcessing(boost::function0<void> callback)
 {
     ioCallback = callback;
-    out->activateOutput();
+    out.activateOutput();
 }
 
 Connection::~Connection()
@@ -178,7 +178,6 @@
     try {
         while (!channels.empty()) 
             ptr_map_ptr(channels.begin())->handleDetach();
-        // FIXME aconway 2008-07-15: exclusive is per-session not per-connection in 0-10.
         while (!exclusiveQueues.empty()) {
             Queue::shared_ptr q(exclusiveQueues.front());
             q->releaseExclusiveOwnership();
@@ -245,7 +244,7 @@
     case management::Connection::METHOD_CLOSE :
         mgmtClosing = true;
         if (mgmtObject != 0) mgmtObject->set_closing(1);
-        out->activateOutput();
+        out.activateOutput();
         status = Manageable::STATUS_OK;
         break;
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h Thu Aug 21 11:04:18 2008
@@ -24,7 +24,7 @@
 #include <vector>
 
 #include "qpid/sys/AggregateOutput.h"
-#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/ConnectionOutputHandlerPtr.h"
 #include "qpid/framing/ProtocolVersion.h"
 #include "qpid/management/Manageable.h"
 #include "Broker.h"
@@ -34,11 +34,14 @@
 
 class ConnectionState : public ConnectionToken, public management::Manageable
 {
+  protected:
+    sys::ConnectionOutputHandlerPtr out;
+
   public:
-    ConnectionState(qpid::sys::ConnectionOutputHandler* o, Broker& b) : 
+    ConnectionState(qpid::sys::ConnectionOutputHandler* o, Broker& b) :
+        out(o),
         broker(b), 
-        outputTasks(*o),
-        out(o), 
+        outputTasks(out),
         framemax(65535), 
         heartbeat(0),
         stagingThreshold(broker.getStagingThreshold())
@@ -67,14 +70,13 @@
     //contained output tasks
     sys::AggregateOutput outputTasks;
 
-    sys::ConnectionOutputHandler& getOutput() const { return *out; }
+    sys::ConnectionOutputHandlerPtr& getOutput() { return out; }
     framing::ProtocolVersion getVersion() const { return version; }
 
-    void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out = o; }
+    void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out.set(o); }
 
   protected:
     framing::ProtocolVersion version;
-    sys::ConnectionOutputHandler* out;
     uint32_t framemax;
     uint16_t heartbeat;
     uint64_t stagingThreshold;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Aug 21 11:04:18 2008
@@ -25,12 +25,14 @@
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/ClusterNotifyBody.h"
 #include "qpid/framing/ClusterConnectionCloseBody.h"
+#include "qpid/framing/ClusterConnectionDoOutputBody.h"
 #include "qpid/log/Statement.h"
 #include "qpid/memory.h"
 #include "qpid/shared_ptr.h"
 
 #include <boost/bind.hpp>
 #include <boost/cast.hpp>
+#include <boost/current_function.hpp>
 #include <algorithm>
 #include <iterator>
 #include <map>
@@ -76,11 +78,6 @@
     cpg.join(name);
     notify();
 
-    // FIXME aconway 2008-08-11: can we remove this loop?
-    // Dispatch till we show up in the cluster map.
-    while (empty()) 
-        cpg.dispatchOne();
-
     // Start dispatching from the poller.
     cpgDispatchHandle.startWatch(poller);
     deliverQueue.start(poller);
@@ -97,9 +94,8 @@
     std::for_each(localConnectionSet.begin(), localConnectionSet.end(), boost::bind(&ConnectionInterceptor::dirtyClose, _1));
 }
 
-// local connection initializes plugins
 void Cluster::initialize(broker::Connection& c) {
-    bool isLocal = &c.getOutput() != &shadowOut;
+    bool isLocal = c.getOutput().get() != &shadowOut;
     if (isLocal)
         localConnectionSet.insert(new ConnectionInterceptor(c, *this));
 }
@@ -107,10 +103,8 @@
 void Cluster::leave() {
     Mutex::ScopedLock l(lock);
     if (!broker) return;                               // Already left.
-    // At this point the poller has already been shut down so
-    // no dispatches can occur thru the cpgDispatchHandle.
-    // 
-    // FIXME aconway 2008-08-11: assert this is the cae.
+    // Leave is called by from Broker destructor after the poller has
+    // been shut down. No dispatches can occur.
     
     QPID_LOG(debug, "Leaving cluster " << *this);
     cpg.leave(name);
@@ -173,13 +167,6 @@
     return result;        
 }
 
-// ################ HERE - leaking shadow connections.
-// FIXME aconway 2008-08-11: revisit memory management for shadow
-// connections, what if the Connection is closed other than via
-// disconnect? Dangling pointer in shadow map. Use ptr_map for shadow
-// map, add deleted state to ConnectionInterceptor? Interceptors need
-// to know about map? Check how Connections can be deleted.
-
 ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* remotePtr) {
     ShadowConnectionId id(member, remotePtr);
     ShadowConnectionMap::iterator i = shadowConnectionMap.find(id);
@@ -274,7 +261,8 @@
           break;
       }
       case CLUSTER_CONNECTION_DO_OUTPUT_METHOD_ID: {
-          connection->deliverDoOutput();
+          ClusterConnectionDoOutputBody& doOutput = static_cast<ClusterConnectionDoOutputBody&>(method);
+          connection->deliverDoOutput(doOutput.getBytes());
           break;
       }
       default:
@@ -309,9 +297,8 @@
 
 void Cluster::disconnect(sys::DispatchHandle& h) {
     h.stopWatch();
-    // FIXME aconway 2008-08-11: error handling if we are disconnected. 
-    // Kill the broker?
-    assert(0);
+    QPID_LOG(critical, "Disconnected from cluster, shutting down");
+    broker.shutdown();
 }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.cpp Thu Aug 21 11:04:18 2008
@@ -22,6 +22,8 @@
 #include "qpid/framing/ClusterConnectionCloseBody.h"
 #include "qpid/framing/ClusterConnectionDoOutputBody.h"
 #include "qpid/framing/AMQFrame.h"
+#include <boost/current_function.hpp>
+
 
 namespace qpid {
 namespace cluster {
@@ -32,24 +34,27 @@
 
 ConnectionInterceptor::ConnectionInterceptor(
     broker::Connection& conn, Cluster& clust, Cluster::ShadowConnectionId shadowId_)
-    : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_)
+    : connection(&conn), cluster(clust), isClosed(false), shadowId(shadowId_), output(*this, *conn.getOutput().get())
 {
     connection->addFinalizer(boost::bind(operator delete, this));
+    connection->setOutputHandler(&output),
     // Attach  my functions to Connection extension points.
     shift(receivedNext, connection->receivedFn, boost::bind(&ConnectionInterceptor::received, this, _1));
     shift(closedNext, connection->closedFn, boost::bind(&ConnectionInterceptor::closed, this));
-    shift(doOutputNext, connection->doOutputFn, boost::bind(&ConnectionInterceptor::doOutput, this));
+    shift(output.doOutputNext, connection->doOutputFn, boost::bind(&OutputInterceptor::doOutput, &output));
 }
 
 ConnectionInterceptor::~ConnectionInterceptor() {
     assert(connection == 0);
 }
 
+// Forward all received frames to the cluster, continue handling on delivery.
 void ConnectionInterceptor::received(framing::AMQFrame& f) {
     if (isClosed) return;
     cluster.send(f, this);
 }
 
+// Continue normal handling of delivered frames.
 void ConnectionInterceptor::deliver(framing::AMQFrame& f) {
     receivedNext(f);
 }
@@ -81,28 +86,17 @@
 }
 
 void ConnectionInterceptor::dirtyClose() {
-    // Not closed via cluster self-delivery but closed locally.
-    // Used for dirty cluster shutdown where active connections
-    // must be cleaned up.
+    // Not closed via cluster self-delivery but closed locally.  Used
+    // when local broker is shut down without a clean cluster shutdown.
+    // Release the connection, it will delete this.
     connection = 0;
 }
 
-bool  ConnectionInterceptor::doOutput() {
-    // FIXME aconway 2008-08-15: this is not correct.
-    // Run in write threads so order of execution of doOutput is not determinate.
-    // Will only work reliably for in single-consumer tests.   
-
-    if (connection->hasOutput()) {
-        cluster.send(AMQFrame(in_place<ClusterConnectionDoOutputBody>()), this);
-        return doOutputNext();
-    }
-    return false;
-}
-
-void ConnectionInterceptor::deliverDoOutput() {
-    // FIXME aconway 2008-08-15: see comment in doOutput.
-    if (isShadow()) 
-        doOutputNext();
+// Delivery of doOutput allows us to run the real connection doOutput()
+// which stocks up the write buffers with data.
+// 
+void ConnectionInterceptor::deliverDoOutput(size_t requested) {
+    output.deliverDoOutput(requested);
 }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionInterceptor.h Thu Aug 21 11:04:18 2008
@@ -1,5 +1,5 @@
-#ifndef QPID_CLUSTER_CONNECTIONPLUGIN_H
-#define QPID_CLUSTER_CONNECTIONPLUGIN_H
+#ifndef QPID_CLUSTER_CONNECTIONINTERCEPTOR_H
+#define QPID_CLUSTER_CONNECTIONINTERCEPTOR_H
 
 /*
  *
@@ -23,6 +23,8 @@
  */
 
 #include "Cluster.h"
+#include "WriteEstimate.h"
+#include "OutputInterceptor.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/sys/ConnectionOutputHandler.h"
 
@@ -41,42 +43,46 @@
     
     Cluster::ShadowConnectionId getShadowId() const { return shadowId; }
 
-    bool isLocal() const { return shadowId == Cluster::ShadowConnectionId(0,0); }
-
+    bool isShadow() const { return shadowId != Cluster::ShadowConnectionId(0,0); }
+    bool isLocal() const { return !isShadow(); }
+    bool getClosed() const { return isClosed; }
+    
     // self-delivery of intercepted extension points.
     void deliver(framing::AMQFrame& f);
     void deliverClosed();
-    void deliverDoOutput();
+    void deliverDoOutput(size_t requested);
 
     void dirtyClose();
 
+    Cluster& getCluster() { return cluster; }
+    
   private:
     struct NullConnectionHandler : public qpid::sys::ConnectionOutputHandler {
         void close() {}
         void send(framing::AMQFrame&) {}
-        void doOutput() {}
         void activateOutput() {}
     };
 
-    bool isShadow() { return shadowId != Cluster::ShadowConnectionId(0,0); }
-    
     // Functions to intercept to Connection extension points.
     void received(framing::AMQFrame&);
     void closed();
     bool doOutput();
+    void activateOutput();
+
+    void sendDoOutput();
 
     boost::function<void (framing::AMQFrame&)> receivedNext;
     boost::function<void ()> closedNext;
-    boost::function<bool ()> doOutputNext;
 
     boost::intrusive_ptr<broker::Connection> connection;
     Cluster& cluster;
     NullConnectionHandler discardHandler;
     bool isClosed;
     Cluster::ShadowConnectionId shadowId;
+    WriteEstimate writeEstimate;
+    OutputInterceptor output;
 };
 
 }} // namespace qpid::cluster
 
-#endif  /*!QPID_CLUSTER_CONNECTIONPLUGIN_H*/
-
+#endif  /*!QPID_CLUSTER_CONNECTIONINTERCEPTOR_H*/

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=687813&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Thu Aug 21 11:04:18 2008
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "OutputInterceptor.h"
+#include "ConnectionInterceptor.h"
+#include "qpid/framing/ClusterConnectionDoOutputBody.h"
+#include "qpid/framing/AMQFrame.h"
+#include <boost/current_function.hpp>
+
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+
+OutputInterceptor::OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h)
+    : parent(p), next(h), sent(), moreOutput(), doingOutput()
+{}
+
+void OutputInterceptor::send(framing::AMQFrame& f) {
+    Locker l(lock); 
+    next.send(f);
+    sent += f.size();
+}
+
+void OutputInterceptor::activateOutput() {
+    Locker l(lock); 
+    moreOutput = true;
+    sendDoOutput();             
+}
+
+// Called in write thread when the IO layer has no more data to write.
+// We do nothing in the write thread, we run doOutput only on delivery
+// of doOutput requests.
+bool  OutputInterceptor::doOutput() {
+    return false;
+}
+
+// Delivery of doOutput allows us to run the real connection doOutput()
+// which stocks up the write buffers with data.
+// 
+void OutputInterceptor::deliverDoOutput(size_t requested) {
+    if (parent.getClosed()) return;
+
+    Locker l(lock);
+    size_t buf = next.getBuffered();
+    if (parent.isLocal())
+        writeEstimate.delivered(sent, buf); // Update the estimate.
+
+    // Run the real doOutput() till we have added the requested data or there's nothing to output.
+    sent = 0;
+    do {
+        sys::Mutex::ScopedUnlock u(lock);
+        moreOutput = doOutputNext(); // Calls send()
+    } while (sent < requested && moreOutput);
+    sent += buf;                // Include buffered data in the sent total.
+
+    QPID_LOG(trace, "Delivered doOutput: requested=" << requested << " output=" << sent << " more=" << moreOutput);
+
+    if (parent.isLocal() && moreOutput) 
+        sendDoOutput();
+    else
+        doingOutput = false;
+}
+
+void OutputInterceptor::startDoOutput() {
+    if (!doingOutput) 
+        sendDoOutput();
+}
+
+// Send a doOutput request if one is not already in flight.
+void OutputInterceptor::sendDoOutput() {
+    // Call with lock held.
+    if (parent.isShadow() || parent.getClosed())
+        return;
+
+    doingOutput = true;
+    size_t request = writeEstimate.sending(getBuffered());
+    
+    // Note we may send 0 size request if there's more than 2*estimate in the buffer.
+    // Send it anyway to keep the doOutput chain going until we are sure there's no more output
+    // (in deliverDoOutput)
+    // 
+    parent.getCluster().send(AMQFrame(in_place<ClusterConnectionDoOutputBody>(
+                                          framing::ProtocolVersion(), request)), &parent);
+    QPID_LOG(trace, &parent << "Send doOutput request for " << request);
+}
+
+}} // namespace qpid::cluster

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h?rev=687813&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h Thu Aug 21 11:04:18 2008
@@ -0,0 +1,74 @@
+#ifndef QPID_CLUSTER_OUTPUTINTERCEPTOR_H
+#define QPID_CLUSTER_OUTPUTINTERCEPTOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "WriteEstimate.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/broker/ConnectionFactory.h"
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace framing { class AMQFrame; }
+namespace cluster {
+
+class ConnectionInterceptor;
+
+/**
+ * Interceptor for connection OutputHandler, manages outgoing message replication.
+ */
+class OutputInterceptor : public sys::ConnectionOutputHandler {
+  public:
+    OutputInterceptor(ConnectionInterceptor& p, sys::ConnectionOutputHandler& h);
+
+    // sys::ConnectionOutputHandler functions
+    void send(framing::AMQFrame& f);
+    void activateOutput();
+    void close() { Locker l(lock); next.close(); }
+    size_t getBuffered() const { Locker l(lock); return next.getBuffered(); }
+
+    // Delivery point for doOutput requests.
+    void deliverDoOutput(size_t requested);
+    // Intercept doOutput requests on Connection.
+    bool doOutput();
+
+    boost::function<bool ()> doOutputNext;
+    
+    ConnectionInterceptor& parent;
+    
+  private:
+    typedef sys::Mutex::ScopedLock Locker;
+
+    void startDoOutput();
+    void sendDoOutput();
+
+    mutable sys::Mutex lock;
+    sys::ConnectionOutputHandler& next;
+    size_t sent;
+    WriteEstimate writeEstimate;
+    bool moreOutput;
+    bool doingOutput;
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_OUTPUTINTERCEPTOR_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp?rev=687813&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp Thu Aug 21 11:04:18 2008
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "WriteEstimate.h"
+#include "qpid/log/Statement.h"
+#include <boost/current_function.hpp>
+
+namespace qpid {
+namespace cluster {
+
+WriteEstimate::WriteEstimate(size_t initial)
+    : growing(true), estimate(initial) {}
+
+size_t WriteEstimate::sending(size_t buffered) {
+    // We want to send a doOutput request for enough data such
+    // that if estimate bytes are written before it is self
+    // delivered then what is left in the buffer plus the doOutput
+    // request will be estimate bytes.
+
+    size_t predictLeft = (buffered > estimate) ? buffered - estimate : 0;
+    size_t request = (estimate > predictLeft) ? estimate - predictLeft : 0;
+    return request;   
+}
+
+size_t pad(size_t value) { return value + value/2; }
+
+void WriteEstimate::delivered(size_t sent, size_t buffered) {
+    size_t wrote =  sent > buffered ? sent - buffered : 0;
+    if (wrote == 0)             // No change
+        return; 
+    if (buffered > 0) { // Buffer was over-stocked, we wrote to capacity.
+        growing = false;
+        estimate = pad(wrote); // Estimate at 1.5 write for padding.
+    }
+    else if (wrote > estimate) { // Wrote everything, buffer was under-stocked
+        if (growing)
+            estimate = std::max(estimate*2, pad(wrote)); // Grow quickly if we have not yet seen an over-stock.
+        else
+            estimate = pad(wrote);
+    }
+}
+
+}} // namespace qpid::cluster
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h?rev=687813&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h Thu Aug 21 11:04:18 2008
@@ -0,0 +1,64 @@
+#ifndef QPID_CLUSTER_WRITEESTIMATE_H
+#define QPID_CLUSTER_WRITEESTIMATE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Mutex.h"
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Estimate the amount of data that a connection can write between sending
+ * a doOutput notice and re-receiving it.
+ *
+ * The goal is to avoid ever write-idling the connection by sending
+ * the next doOutput request as soon as we've processed the previous
+ * one, such that data generated by the previous request will keep the
+ * writer busy till the next one is delivered.
+ *
+ */
+class WriteEstimate
+{
+  public:
+    WriteEstimate(size_t initial=4096);
+
+    /** About to send a doOutput request.
+     * Update estimation state and return size for next request.
+     */
+    size_t sending(size_t buffered);
+
+    /**
+     * doOutput request just delivered, not yet executed. Update the estimate.
+     * and estimate how much data to request in the next onOutput
+     * request. 0 means don't send an onOutput request.
+     */
+    void delivered(size_t sent, size_t buffered);
+
+  private:
+    bool growing;
+    size_t estimate;
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_WRITEESTIMATE_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/WriteEstimate.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandler.h Thu Aug 21 11:04:18 2008
@@ -34,6 +34,7 @@
 {
   public:
     virtual void close() = 0;
+    virtual size_t getBuffered() const { return 0; }
 };
 
 }}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h?rev=687813&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h Thu Aug 21 11:04:18 2008
@@ -0,0 +1,54 @@
+#ifndef QPID_SYS_CONNECTIONOUTPUTHANDLERPTR_H
+#define QPID_SYS_CONNECTIONOUTPUTHANDLERPTR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConnectionOutputHandler.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ * A ConnectionOutputHandler that delegates to another
+ * ConnectionOutputHandler.  Allows the "real" ConnectionOutputHandler
+ * to be changed modified without updating all the pointers/references
+ * using the ConnectionOutputHandlerPtr
+ */
+class ConnectionOutputHandlerPtr : public ConnectionOutputHandler
+{
+  public:
+    ConnectionOutputHandlerPtr(ConnectionOutputHandler* p) : next(p) {}
+    void set(ConnectionOutputHandler* p) { next = p; }
+    ConnectionOutputHandler* get() { return next; }
+    const ConnectionOutputHandler* get() const { return next; }
+
+    void close() { next->close(); }
+    size_t getBuffered() const { return next->getBuffered(); }
+    void activateOutput() { next->activateOutput(); }
+    void send(framing::AMQFrame& f) { next->send(f); }
+
+  private:
+    ConnectionOutputHandler* next;
+};
+}} // namespace qpid::sys
+
+#endif  /*!QPID_SYS_CONNECTIONOUTPUTHANDLERPTR_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Thu Aug 21 11:04:18 2008
@@ -78,7 +78,7 @@
         ::sleep(1);
         --retry;
     }
-    BOOST_CHECK_EQUAL(n, getGlobalCluster()->size());
+    BOOST_REQUIRE_EQUAL(n, getGlobalCluster()->size());
 }
 
 void ClusterFixture::add() {

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/consume.cpp Thu Aug 21 11:04:18 2008
@@ -42,13 +42,15 @@
     uint count;
     uint ack;
     string queue;
-
+    bool declare;
+    
     Args() : count(0), ack(1)
     {
         addOptions()
             ("count", optValue(count, "N"), "number of messages to publish")
             ("ack-frequency", optValue(ack, "N"), "ack every N messages (0 means use no-ack mode)")
-            ("queue", optValue(queue, "<queue name>"), "queue to consume from");
+            ("queue", optValue(queue, "<queue name>"), "queue to consume from")
+            ("declare", optValue(declare), "declare the queue");
     }
 };
 
@@ -67,7 +69,8 @@
 
     void consume()
     {
-        
+        if (opts.declare)
+            session.queueDeclare(opts.queue);
         SubscriptionManager subs(session);
         LocalQueue lq(AckPolicy(opts.ack));
         subs.setAcceptMode(opts.ack > 0 ? 0 : 1);
@@ -77,7 +80,7 @@
         Message msg;
         for (size_t i = 0; i < opts.count; ++i) {
             msg=lq.pop();
-            std::cout << "Received: " << msg.getMessageProperties().getCorrelationId() << std::endl;
+            QPID_LOG(info, "Received: " << msg.getMessageProperties().getCorrelationId());
         }
         if (opts.ack != 0)
             subs.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch.

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp Thu Aug 21 11:04:18 2008
@@ -44,6 +44,7 @@
     uint size;
     uint count;
     uint rate;
+    bool sync;
     uint reportFrequency;
     uint timeLimit;
     uint queues;
@@ -65,6 +66,7 @@
             ("queues", optValue(queues, "N"), "number of queues")
             ("count", optValue(count, "N"), "number of messages to send")
             ("rate", optValue(rate, "N"), "target message rate (causes count to be ignored)")
+            ("sync", optValue(sync), "send messages synchronously")
             ("report-frequency", optValue(reportFrequency, "N"), 
              "number of milliseconds to wait between reports (ignored unless rate specified)")
             ("time-limit", optValue(timeLimit, "N"), 
@@ -143,6 +145,7 @@
     void sendByRate();
     void sendByCount();
     Receiver& receiver;
+    const string data;
 public:
     Sender(const string& queue, Receiver& receiver);
     void test();
@@ -285,7 +288,7 @@
     totalLatency = maxLatency = minLatency = 0;           
 }
 
-Sender::Sender(const string& q, Receiver& receiver) : Client(q), receiver(receiver) {}
+Sender::Sender(const string& q, Receiver& receiver) : Client(q), receiver(receiver), data(generateData(opts.size)) {}
 
 void Sender::test()
 {
@@ -295,7 +298,7 @@
 
 void Sender::sendByCount()
 {
-    Message msg(generateData(opts.size), queue);
+    Message msg(data, queue);
     if (opts.durable) {
         msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
     }
@@ -303,15 +306,15 @@
     for (uint i = 0; i < opts.count; i++) {
         uint64_t sentAt(current_time());
         msg.getDeliveryProperties().setTimestamp(sentAt);
-        //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables
         async(session).messageTransfer(arg::content=msg, arg::acceptMode=1);
+        if (opts.sync) session.sync();
     }
     session.sync();
 }
 
 void Sender::sendByRate()
 {
-    Message msg(generateData(opts.size), queue);
+    Message msg(data, queue);
     if (opts.durable) {
         msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
     }
@@ -324,9 +327,9 @@
     while (true) {
         uint64_t start_msg(current_time());
         msg.getDeliveryProperties().setTimestamp(start_msg);
-        //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables
         async(session).messageTransfer(arg::content=msg, arg::acceptMode=1);
-
+        if (opts.sync) session.sync();
+        
 	uint64_t now = current_time();
 
 	if (timeLimit != 0 && (now - start) > timeLimit) {

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/start_cluster Thu Aug 21 11:04:18 2008
@@ -16,7 +16,7 @@
 
 if test "$SIZE" = "one"; then	# Special case of singleton cluster, use default port.
     ../qpidd -q
-    with_ais_group ../qpidd $OPTS || exit 1
+    with_ais_group ../qpidd $OPTS --log-output=cluster.log || exit 1
 else
     for (( i=0; i<SIZE; ++i )); do
 	PORT=`with_ais_group ../qpidd  -p0 --log-output=cluster$i.log $OPTS`  || exit 1

Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=687813&r1=687812&r2=687813&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Thu Aug 21 11:04:18 2008
@@ -26,12 +26,18 @@
     <doc>Qpid extension class to allow clustered brokers to communicate.</doc>
 
     <control name = "notify" code="0x1">
+      <role name="server" implement="MUST" />
       <field name="url" type="str16" />
     </control>
 
-    <control name="connection-close" code="0x2"/>
+    <control name="connection-close" code="0x2">
+      <role name="server" implement="MUST" />
+    </control>
 
-    <control name="connection-do-output" code="0x3"/>
+    <control name="connection-do-output" code="0x3">
+      <role name="server" implement="MUST" />
+      <field name="bytes" type="uint32"/>
+    </control>
 
   </class>
 </amqp>